Workflow:


1.   Read in the data from the mdf file, take a look at channel names/some channels for sanity checking
2.   Convert the data to an evenly-spaced (by time) data frame using interpolation
3. Transform the data into the form we need to test the model (e.g. from lat/long to an inertial frame). This transformed data will be the base for model tuning/validation, so save it to a csv.
4. Choose one of the following ways:

*   Given some initial state and control action sequence, use the ModelExplorer to compare simulated vs real data interactively
*   Choose a stride (either 1 or 16 recommended). Then create the L2 loss report or run the optimizer.





In [1]:
%%capture
%pip install matplotlib
%pip install scipy
%pip install scikit-learn

%pip install asammdf
%pip install ipympl
%pip install mpl_interactions
%pip install mpl_interactions[jupyter]

To-Do List:

- Make the plotting able to take in the data frame
- Split into training and validation sets

Completed


---
- Verify the report generation works
- Implement strided passes in the report (since the model has to be accurate only for 16 timesteps)
- Provide a way to interactively tune parameters to see if
- Try running mpl_interactions locally (not on jupyter) to see if it will be less laggy
- Create transform functions to get signal data to be model-ready (e.g. transform lat/long to x and y, swap axes)



In [2]:
%matplotlib ipympl

In [3]:
from asammdf import MDF, Signal
import pickle
import pandas as pd
import numpy as np
import math
from scipy.interpolate import interp1d
from scipy.optimize import minimize
from scipy.spatial.transform import Rotation
from scipy.spatial.transform import Slerp
import matplotlib.pyplot as plt
import mpl_interactions.ipyplot as iplt
import random
from tire_model import get_forces

ldf is not supported
xls is not supported
xlsx is not supported
yaml is not supported


In [4]:
# Parameters
timestep = 0.02 # seconds
data_filename = "CAN_RUN1.mdf"
gps_filename = "MT_RUN1-MASTER-NOGNSS.txt"

torque_directions = ['FR', 'FL', 'RR', 'BL']

# Columns for GPS data

gps_latitude_channel = "Latitude"
gps_longitude_channel = "Longitude"
gps_velocity_x_channel = "Vel_E"
gps_velocity_y_channel = "Vel_N"
gps_rate_of_turn_channel = "Gyr_Z"
# TODO: quaternions, acceleration, etc. Add to as needed
quaternion_channels = ['Quat_q0', 'Quat_q1', 'Quat_q2', 'Quat_q3']

# Columns for CAN data

can_swangle_channel = "FSM_steeringWheelAngleADC"
can_manual_brake_channel = 'FSM_brakePressureFront'

can_front_torque_channels = ["AMK_FR_TorqueLimitPositiv", "AMK_FL_TorqueLimitPositiv"]
can_rear_torque_channels = ["AMK_RR_TorqueLimitPositiv", "AMK_BL_TorqueLimitPositiv"]
can_front_regen_channels = ["AMK_FR_TorqueLimitNegativ", "AMK_FL_TorqueLimitNegativ"]
can_rear_regen_channels = ["AMK_RR_TorqueLimitNegativ", "AMK_BL_TorqueLimitNegativ"]
can_torque_channels = can_front_torque_channels + can_rear_torque_channels
can_regen_channels = can_front_regen_channels + can_rear_regen_channels

can_front_wheel_speed_channels = ["AMK_FR_ActualVelocity", "AMK_FL_ActualVelocity"]
can_rear_wheel_speed_channels = ["AMK_BR_ActualVelocity", "AMK_BL_ActualVelocity"]
can_wheel_speed_channels = can_front_wheel_speed_channels + can_rear_wheel_speed_channels


can_all_channels = [can_swangle_channel] + can_torque_channels + can_wheel_speed_channels + can_regen_channels + [can_manual_brake_channel]

# Columns for composite data (what we create)

slipless_state_channels = ['x', 'y', 'yaw', 'v']
slipless_controls_channels = ['swangle', 'torque']

bicycle_state_channels = ['x', 'y', 'yaw', 'vlong', 'vlat', 'yawdot', 'front_wheel_speed', 'rear_wheel_speed']
bicycle_controls_channels = ['swangle', 'front_torque', 'rear_torque']

# 883.23 is the start of the car moving (according to gps)
# 221.28971 is the start of the car moving (according to can)
# difference is 661.94, round down to 661.90
can_correction = 661.90

manual_brake_threshold = 40
min_speed_threshold = 0.3
cg_to_movella = 0.5
can_swangle_adc_min_threshold = 500


In [5]:
# Utilities for Exploratory Data Analysis on a MDF file
def print_all_channels():
  with MDF(data_filename) as mdf_file:
    all_channels = list(mdf_file.channels_db.keys())
    print("\n".join(all_channels))
  return all_channels


def graph_channels(channel_names):
  with MDF(data_filename) as mdf_file:
    for channel_name in channel_names:
      if channel_name in mdf_file.channels_db.keys():
        signal = mdf_file.get(channel_name)
        debug_str = f"Time ranges from {min(signal.timestamps)} to {max(signal.timestamps)}.\n" + \
        f"Samples range from {min(signal.samples)} to {max(signal.samples)}.\n"
        print(debug_str)

        plt.plot(signal.timestamps, signal.samples, '-', label=channel_name, color="red")
        plt.legend()
        plt.xlabel('x')
        plt.ylabel('y')
        plt.show()
      else:
        print(f"error, channel {channel_name} not in file\n")

def show_table(df):
  print(df.columns)
  print(df.head())
  print(df.tail())
  print(df.shape)
  print(df.describe())


def graph_df_columns(df, column_names, bounds=None):
  plt.clf()
  for column in column_names:
    if column not in df.columns:
      print(df.columns)
      print("Column name not found in df")
      return
    plt.plot(df['time'], df[column], label=column)
  if bounds:
    plt.xlim(bounds[0], bounds[1])
  plt.legend()
  plt.show()


def graph_xy(df, bounds=None):
  df_copy = df.copy()
  if bounds:
    df_copy = df_copy.loc[bounds[0]:bounds[1], :].reset_index(drop=True)
  plt.clf()
  fig, ax = plt.subplots(figsize=(10, 10))
  if 'x' in df_copy.columns and 'y' in df_copy.columns:
    # reference trajectory exists
    def f_x(y, t):
      return df_copy.loc[t, 'x']

    def f_y(t):
      return df_copy.loc[t, 'y']

    iplt.scatter(f_y, f_x, t=np.arange(len(df_copy)), color='black')
    iplt.plot(df_copy['y'], df_copy['x'], color='black', label='trajectory')

    # def xlabel_func(t):
    #   return f"EFOIFEOIEF {t}"
    # iplt.xlabel(f_y)

  ax.set_aspect('equal')
  plt.legend()
  plt.show()
  




In [6]:
def gps_to_dataframe():
  with open(gps_filename, 'r') as file:
    lines = file.readlines()

  # Filter out lines starting with "//"
  filtered_lines = [line for line in lines if not line.strip().startswith("//")]

  # Extract column names from the first line
  column_names = filtered_lines[0].strip().split()
  column_names = [name for name in column_names if not name.startswith("UTC") and not name.startswith("Alt")]

  # Figure out the appropriate types
  dtypes = {name : float for name in column_names}
  dtypes['PacketCounter'] = int
  dtypes['StatusWord'] = int

  # Extract data from the remaining lines
  data = [line.strip().split() for line in filtered_lines[1:]]

  # Create the DataFrame
  df = pd.DataFrame(data, columns=column_names)
  df = df.astype(dtypes)
  # convert to seconds starting from 0
  # TODO: ALANA visualize NaNs


  length_with_nans = len(df)
  df.dropna(inplace=True)
  length_without_nans = len(df)
  print(f"Dropped {length_with_nans - length_without_nans} nan rows, remaining: {length_without_nans}")
  # 10000 is the number of ticks in one second (0.1ms between ticks)
  start_time = df.loc[0, 'SampleTimeFine']
  df.loc[:, 'SampleTimeFine'] = (df.loc[:, 'SampleTimeFine'] - start_time) / 10000.0
  return df

def interpolate_df(df, time_column, target_timestamps):
  """
  Interpolates a DataFrame to target timestamps.

  Args:
    df: The input DataFrame.
    time_column: The name of the time column in the DataFrame.
    target_timestamps: A NumPy array of target timestamps.

  Returns:
    A dictionary containing the interpolated data. Column names to np arrays
  """

  # 1. Extract time values and data columns
  time_values = df[time_column].values
  data_columns = [col for col in df.columns if col != time_column]

  # 2. Create interpolation functions for each data column
  interpolated_data = {'time': target_timestamps}
  for col in data_columns:
    if col not in quaternion_channels:
      interp_func = interp1d(time_values, df[col].values, kind='linear', fill_value="extrapolate")
      interpolated_data[col] = interp_func(target_timestamps)
    else:
      # quaternion interpolation
      rot_obj = Rotation.from_quat(df.loc[:, quaternion_channels], scalar_first=True)
      slerp = Slerp (time_values, rot_obj)
      euler_angles = slerp(target_timestamps).as_euler("ZYX", degrees=False)
      interpolated_data['yaw'] = euler_angles[:, 0]
      interpolated_data['pitch'] = euler_angles[:, 1]
      interpolated_data['roll'] = euler_angles[:, 2]
    

  # 3. Interpolate data to target timestamps    
  return interpolated_data

def create_evenly_spaced_table_gps(save=False):
  df = gps_to_dataframe()
  last_time = df.loc[len(df) - 1, 'SampleTimeFine']
  target_times = np.arange(0, last_time, timestep)
  interpolated_df = pd.DataFrame(interpolate_df(df, 'SampleTimeFine', target_times))
  if save:
    interpolated_df.to_csv('gps_output.csv', index=False)
  return interpolated_df



In [7]:
# Utilities to go from a mdf file to a evenly spaced dataframe

def get_time_bounds(mdf_file):
  time_start = -1
  time_stop = float('inf')
  for channel in can_all_channels:
    if channel in mdf_file.channels_db.keys():
      signal = mdf_file.get(channel)
      timestamps = signal.timestamps
      timestamps += can_correction
      if min(timestamps) > time_start:
        time_start = min(timestamps)
      if max(timestamps) < time_stop:
        time_stop = max(timestamps)
  assert time_start != -1 and time_stop != float('inf') and time_stop > time_start
  return (time_start, time_stop)


# Returns a numpy array
def interpolate(signal, x_new, verbose=False):
  x = signal.timestamps
  x += can_correction
  y = signal.samples

  interp_func = interp1d(x, y, kind='linear')
  assert min(x) <= x_new[0] and x_new[-1] <= max(x)
  y_new = interp_func(x_new)

  if verbose:

    print("Total length: ", len(signal.samples))

    print("Original x values:", x)
    print("Original y values:", y)
    print("Interpolated x values:", x_new)
    print("Interpolated y values:", y_new)

    # plt.plot(x, y, 'o', label='Original data')
    plt.plot(x_new, y_new, '-', label='Spline interpolation')
    plt.legend()
    plt.xlabel('x')
    plt.ylabel('y')
    plt.show()

  return y_new


def create_evenly_spaced_table_can(save=False):
    with MDF(data_filename) as mdf_file:
      time_start, time_end = get_time_bounds(mdf_file)
      x_new = np.arange(time_start, time_end, timestep)

      channel_data = {'time': x_new}
      for channel_name in can_all_channels:
        if channel_name in mdf_file.channels_db.keys():
          signal = mdf_file.get(channel_name)
          channel_data[channel_name] = interpolate(signal, x_new)

      df = pd.DataFrame(channel_data)
      if save:
        df.to_csv('output.csv', index=False)
      return df

In [8]:
def create_evenly_spaced_table_unified(save=False):
    with MDF(data_filename) as mdf_file:
        gps_df = gps_to_dataframe()
        gps_time_start = 0
        gps_time_end= gps_df.loc[len(gps_df) - 1, 'SampleTimeFine']
        can_time_start, can_time_end = get_time_bounds(mdf_file)
        x_new = np.arange(max(can_time_start, gps_time_start), min(can_time_end, gps_time_end), timestep)

        all_data = interpolate_df(gps_df, 'SampleTimeFine', x_new)
        for channel_name in can_all_channels:
            if channel_name in mdf_file.channels_db.keys():
                signal = mdf_file.get(channel_name)
                if channel_name == can_swangle_channel:
                    print("Correcting for warp-around swangle values...")
                    max_voltage = max(signal.samples)
                    signal.samples[signal.samples < can_swangle_adc_min_threshold] = max_voltage
                all_data[channel_name] = interpolate(signal, x_new)
            else:
                print(f"Channel {channel_name} not found in MDF file")

        unified_df = pd.DataFrame(all_data)
        # normalize time to start from 0
        unified_df['time'] = unified_df['time'] - unified_df['time'][0]
        if save:
            unified_df.to_csv('unified_output.csv', index=False)

    return unified_df

In [9]:
# Utilities to transform data such that it fits our model
# lateral_velocity_transform = lambda x: x # + r * alpha

def transform_controls(df):
  swangle_bias = df[can_swangle_channel].median()
  max_swangle_radians = 19 * np.pi / 180
  df['swangle'] = df[can_swangle_channel] - swangle_bias
  range = max(max(df['swangle']), -min(df['swangle']))
  df['swangle'] = -df['swangle'] * max_swangle_radians / range # convention: counter-clockwise is positive
  
  throttle_transform = lambda x: x * 0.1
  df['torque'] = np.zeros(len(df))
  for channel in can_torque_channels:
    df['torque'] += df[channel].apply(throttle_transform)
  for channel in can_regen_channels:
    df['torque'] += df[channel].apply(throttle_transform)
  df['front_torque'] = np.zeros(len(df))
  df['rear_torque'] = np.zeros(len(df))
  for channel in can_front_torque_channels: 
    df['front_torque'] += df[channel].apply(throttle_transform)
  for channel in can_rear_torque_channels:
    df['rear_torque'] += df[channel].apply(throttle_transform)
  for channel in can_front_regen_channels:
    df['front_torque'] += df[channel].apply(throttle_transform)
  for channel in can_rear_regen_channels:
    df['rear_torque'] += df[channel].apply(throttle_transform)
  

  

def transform_xy(df):
  origin_lat = df.loc[df.index[0], gps_latitude_channel]
  origin_long = df.loc[df.index[0], gps_longitude_channel]
  origin_lat_rad = origin_lat * math.pi / 180
  origin_long_rad = origin_long * math.pi / 180
  earth_radius = 6378137

  def latlon_to_xy(lat, long, yaw):
    lat_rad = lat * math.pi / 180
    long_rad = long * math.pi / 180
    dlat = lat_rad - origin_lat_rad
    dlong = long_rad - origin_long_rad

    # verify that this approximation is good enough
    # North
    y_movella = earth_radius * dlat
    # East
    x_movella = earth_radius * dlong * math.cos((lat_rad + origin_lat_rad) / 2)

    # Convert from movella x and y to x and y of the center of mass
    x = x_movella - cg_to_movella * math.cos(yaw)
    y = y_movella - cg_to_movella * math.sin(yaw)

    return x_movella, y_movella, x, y

  df[['x_movella', 'y_movella', 'x', 'y']] = df.apply(lambda row: latlon_to_xy(row[gps_latitude_channel], row[gps_longitude_channel], row['yaw']), axis=1, result_type='expand')
  
# xdot refers to rate of change of x (in global NorthEast frame), whilst vlong is velocity along the main axis of the car
def transform_xydot(df):
  def movella_to_latlong(xdot_movella, ydot_movella, yaw, yawdot):
    xdot = xdot_movella + cg_to_movella * yawdot * math.sin(yaw)
    ydot = ydot_movella - cg_to_movella * yawdot * math.cos(yaw)
    vlong = xdot * math.cos(yaw) + ydot * math.sin(yaw)
    vlat = -xdot * math.sin(yaw) + ydot * math.cos(yaw)
    return xdot, ydot, vlong, vlat
  df[['xdot', 'ydot', 'vlong', 'vlat']] = df.apply(lambda row: movella_to_latlong(row[gps_velocity_x_channel], row[gps_velocity_y_channel], row['yaw'], row['yawdot']), axis=1, result_type='expand')

# Take the average then convert RPM to rad/s
def transform_wheel_speeds(df):
  df['front_wheel_speed'] = (2 * math.pi) * (df[can_front_wheel_speed_channels[0]] + df[can_front_wheel_speed_channels[1]]) / (2 * 60)
  df['rear_wheel_speed'] = (2 * math.pi) * (df[can_rear_wheel_speed_channels[0]] + df[can_rear_wheel_speed_channels[1]]) / (2 * 60)

def transform(df):
  transform_xy(df)
  df['yawdot'] = df[gps_rate_of_turn_channel]
  transform_xydot(df)
  df['v'] = np.sqrt(df[gps_velocity_x_channel]**2 + df[gps_velocity_y_channel]**2) # Is this alright?
  transform_controls(df)
  transform_wheel_speeds(df)


In [10]:
# returns (start_index, start_time, end_index, end_time)
def get_usable_intervals(df, min_size=16):
    intervals = []
    in_interval = False
    for index, row in df.iterrows():
        if row[can_manual_brake_channel] < manual_brake_threshold and row['v'] > min_speed_threshold and not in_interval:
            in_interval = True
            start = index
        elif (row[can_manual_brake_channel] >= manual_brake_threshold or row['v'] <= min_speed_threshold) and in_interval:
            in_interval = False
            if df.loc[index, 'time'] - df.loc[start, 'time'] >= min_size:
                intervals.append((start, df.loc[start, 'time'], index, df.loc[index, 'time']))
    # Handle last interval
    if in_interval:
        if df.loc[index, 'time'] - df.loc[len(df) - 1, 'time'] >= min_size:
            intervals.append((start, df.loc[start, 'time'], len(df) - 1, df.loc[len(df) - 1, 'time']))
    return intervals


In [11]:
def get_random_color():
  return (random.random(), random.random(), random.random())


class ModelExplorer:
  def __init__(self, df, state_channels, controls_channels):
    self.df = df
    fig, ax = plt.subplots(figsize=(10, 10))
    dummy_x, dummy_y = np.array([]), np.array([])
    self.controls = iplt.scatter(dummy_x, dummy_y, t=np.arange(len(self.df)))
    if 'x' in self.df.columns and 'y' in self.df.columns:
      # reference trajectory exists
      def f_x(y, t):
        return self.df.loc[t, 'x']

      def f_y(t):
        return self.df.loc[t, 'y']

      iplt.scatter(f_y, f_x, controls=self.controls, color='black')
      iplt.plot(self.df['y'], self.df['x'], color='black', label='ref')

    self.initial_state = self.df.loc[0, state_channels].to_numpy().flatten()
    self.controls_channels = controls_channels
    self.model_instances = []
    ax.set_aspect('equal')

  def add_model_instance(self, dynamics_class):
    self.model_instances.append(dynamics_class)

  # Stateful function that updates the active plot
  def plot_trajectory(self, prefix):
    x_name = prefix + "_x"
    y_name = prefix + "_y"
    color = get_random_color()

    def f_x(y, t):
      return self.df.loc[t, x_name]

    def f_y(t):
      return self.df.loc[t, y_name]

    iplt.scatter(f_y, f_x, controls=self.controls, color=color)
    iplt.plot(self.df[y_name], self.df[x_name], color=color, label=prefix)


  def simulate_and_plot(self):
    for dynamics_class in self.model_instances:
      dynamics_function = dynamics_class.dynamics
      num_timesteps = len(self.df)
      prefix = dynamics_class.name
      x_name = prefix + "_x"
      y_name = prefix + "_y"
      self.df[x_name] = np.zeros(num_timesteps)
      self.df[y_name] = np.zeros(num_timesteps)
      last_state = self.initial_state

      for i in range(num_timesteps):
        self.df.loc[i, x_name] = last_state[0]
        self.df.loc[i, y_name] = last_state[1]
        control_action = self.df.loc[i, self.controls_channels].to_numpy().flatten()
        last_state = dynamics_function(last_state, control_action)

      self.plot_trajectory(prefix)
    plt.legend()


In [12]:
def checkpoint_optimization(xk, f, accept):
   with open('optimization_checkpoint.pkl', 'wb') as file:
      pickle.dump([xk, f, accept], file)

"""
params: [rolling_drag_constant, rolling_drag_coefficient, understeer_slope, car_mass, swangle_scale, swangle_bias, torque_scale, torque_bias]
stride represents the number of times the dynamics function is called in succession
approximate_range = [100, 1, 0.01, 100, 1, 0.01, 1, 0.1]

"""

def simulate_and_compute_cost(params, df, state_channels, controls_channels, params_names, cost_function, stride=1, usable_intervals=None, output_filename=None):
    dynamics_class = Bicycle()
    assert len(params) == len(params_names)
    for param, param_name in zip(params, params_names):
      setattr(dynamics_class, param_name, param)
    
    if output_filename:
      with open(output_filename, 'w') as f:
        f.write("")
    
    dynamics_function = dynamics_class.dynamics
    cost = 0
    num_rows = len(df)
    num_iterations = 0
    if usable_intervals is None:
       usable_intervals = [(0, 0, num_rows, num_rows)]
    for interval in usable_intervals:
      start_index, _, end_index, _ = interval
      for t in range(start_index, end_index - stride, stride):
          # (1, n) to (n, )
          initial_state = df.loc[t, state_channels].to_numpy().flatten()
          state = initial_state
          control_action_list = []
          for offset in range(stride):
            control_action = df.loc[t + offset, controls_channels].to_numpy().flatten()
            if output_filename is not None:
              control_action_list.append(control_action)
            state = dynamics_function(state, control_action)
          actual_state = df.loc[t + stride, state_channels].to_numpy().flatten()

          error_vector = state - actual_state
          individual_cost = cost_function(error_vector)

          cost += individual_cost
          num_iterations += 1


          if output_filename is not None:
            with open(output_filename, 'a') as file:
              file.write("----------------------------------\n")
              file.write(f"Time {t} to {t + stride}\n")
              file.write(f"Initial state: {initial_state}\n")
              file.write(f"Control actions: {control_action_list}\n")
              file.write(f"Predicted state: {state}\n")
              file.write(f"Actual state: {actual_state}\n")
              file.write(f"Prediction error: {individual_cost}")

              # file.write(f"Squared distance error: {cost_vector}\n")
              file.write("\n")

    if output_filename is not None:
        with open(output_filename, 'a') as file:
            file.write(f"Params: {params}\n")
            file.write(f"Stride: {stride}\n")
            file.write(f"Average cost: {cost / num_iterations}\n")

    return cost / num_iterations

In [None]:
# START OF CALLS

df = create_evenly_spaced_table_unified()
transform(df)

show_table(df.loc[(5 * 10830):(5 *10880), ['vlong', 'vlat', 'swangle', 'torque', 'time', 'yaw']])
show_table(df)
# graph_df_columns(df.loc[10830:10880, :], ['vlong', 'vlat', 'torque'])
# graph_xy(df, (5 * 10830, 5 *10880))


In [13]:
from enum import Enum

class TorqueMode(Enum):
    AWD = 0
    FWD = 1
    RWD = 2

# Slipless model constants

# tune: rolling drag, car inertia, tire model constants

timestep = 0.02
# TODO: make this correct as of the load on each tire

wheel_base = 1.55 # Verified: Gotten from Alex
# TODO: account for load transfer (mainly aero effects) and asymmetry
cg_to_front = 0.775
cg_to_rear = 0.775

whl_radius = 0.215 # Verified: Gotten from Samuel
gear_ratio = 13.93 # Verified: Gotten from Samuel
car_mass_default = 325 # Verified: Gotten from Josh
gravity = 9.81  # in m/s^2 | Verified: Gotten from Isaac 
# From video posted by Thomas, rolling resistance at base load ~ 300N
rolling_drag_default = 300.0 / (car_mass_default * gravity) # dimensionless, from video
# long_tractive_capability_default = 10.0  # m/s^2
# lat_tractive_capability = 6.0  # m/s^2
# understeer_slope_default = 0.0
# brake_enable_speed = 1.0
# saturating_motor_torque = (long_tractive_capability + rolling_drag / car_mass) * car_mass * whl_radius / gear_ratio
torque_mode = TorqueMode.AWD
slipless_state_dim = 4

# Bicycle model constants

wheel_rotational_inertia = 0.05199  # in kg*m^2 TAKEN ALONG AXLE AXIS | Verified: Gotten from Josh from CAD
car_rotational_inertia = 105.72  # in kg*m^2. TAKEN ALONG Z AXIS THROUGH CG MUST TUNE

# # Saturating Linear Bicycle Tire model constants
# slip_ratio_saturation = 0.1
# slip_ratio_max_x = 0.1
# slip_angle_max_y = 0.1
# max_force_y_at_1N = 1.0
# max_force_x_at_1N = 0.8
# post_saturation_force_x = 0.6
# post_saturation_force_y = 0.8
# rolling_resistance_tire_torque = 10.0

def sign(x):
    if x > 0:
        return 1
    elif x < 0:
        return -1
    else:
        return 0

class Bicycle:
    def __init__(self):
        self.car_mass = car_mass_default
        self.name = "bicycle"

    def calculate_slip_ratio(self, wheel_speed, velocity):
        # Ranges from 0 to infinity
        velocity = abs(velocity)
        if velocity == 0:
            # If wheel speed is 0, there is no motion so no slip
            # Else, we are completely spinning out which shouldn't happen (negative slip ratio)
            assert wheel_speed == 0
            # Either way, return 0
            return 0

        tangential_velocity = wheel_speed * whl_radius
        slip_ratio = tangential_velocity / velocity - 1
        assert slip_ratio >= 0
        return slip_ratio
    
    # def calculate_slip_ratio_linear(self, wheel_speed, velocity):
    #     velocity = abs(velocity)
    #     if velocity == 0:
    #         return 0

    #     if velocity == 0:
    #         return np.sign(wheel_speed) * slip_ratio_saturation

    #     tangential_velo = wheel_speed * whl_radius
    #     return np.clip(
    #         (tangential_velo - velocity) / velocity,
    #         -slip_ratio_saturation, # TODO: clip to -1 and 1
    #         slip_ratio_saturation
    #     )

    def tire_model(self, slip_ratio, slip_angle, load, forces):
        # if abs(slip_ratio) < abs(slip_ratio_max_x):
        #     numerator = load * slip_ratio * max_force_y_at_1N
        #     within_sqrt = np.square(np.tan(slip_angle)) + \
        #         np.square(max_force_y_at_1N / max_force_x_at_1N)
        #     denominator = slip_ratio_max_x * np.sqrt(within_sqrt)
        #     forces[0] = numerator / denominator
        # else:
        #     numerator = load * post_saturation_force_x * max_force_y_at_1N
        #     within_sqrt = np.square(np.tan(slip_angle)) + \
        #         np.square(max_force_y_at_1N / max_force_x_at_1N)
        #     denominator = max_force_x_at_1N * np.sqrt(within_sqrt)
        #     forces[0] = np.sign(slip_ratio) * numerator / denominator

        # if abs(slip_angle) < abs(slip_angle_max_y):
        #     forces[1] = load * max_force_y_at_1N / slip_angle_max_y * slip_angle
        # else:
        #     forces[1] = load * post_saturation_force_y * np.sign(slip_angle)
        pressure = 68.95
        forces[0], forces[1] = get_forces(slip_ratio, slip_angle, load, pressure, 0)

    """
    Axes: Cartesian coordinates, positive Yaw/Yaw_Rate/Swangle is counter clockwise           
    [0] X_world m
    [1] Y_world m
    [2] Yaw_World rad
    [3] V_long_Car m/s
    [4] V_lat_Car m/s
    [5] Yaw_Rate rad/s
    [6] Front Wheel Speed rad/s
    [7] rear Wheel Speed rad/s
    """
    def dynamics(self, state, action):
        # Get states
        yaw_world = state[2]
        v_long_car = state[3]
        v_lat_car = state[4]
        yaw_rate = state[5]
        front_wheel_speed = state[6]
        rear_wheel_speed = state[7]

        # Get control actions
        steering_angle = action[0]
        torque_front, torque_rear = 0, 0
        if torque_mode == TorqueMode.AWD:
            torque_front = action[1] * 0.5
            torque_rear = action[1] * 0.5
        elif torque_mode == TorqueMode.FWD:
            torque_front = action[1]
            torque_rear = 0
        elif torque_mode == TorqueMode.RWD:
            torque_front = 0
            torque_rear = action[1]
        torque_front *= gear_ratio
        torque_rear *= gear_ratio

        # Calculate inputs to tire model

        # Slip angles
        v_lat_at_front_tire = v_lat_car + yaw_rate * cg_to_front
        front_tire_velocity_angle = np.arctan(v_lat_at_front_tire / v_long_car) if v_long_car > 0 else steering_angle
        front_slip_angle = front_tire_velocity_angle - steering_angle
        v_lat_at_rear_tire = v_lat_car - yaw_rate * cg_to_rear
        rear_tire_speed = np.sqrt(v_long_car * v_long_car + v_lat_at_rear_tire * v_lat_at_rear_tire)
        rear_slip_angle = 0 if rear_tire_speed == 0 else np.arcsin(np.clip(v_lat_at_rear_tire / rear_tire_speed, -1.0, 1.0))
        alternate_slip_angle = np.arctan(v_lat_at_rear_tire / v_long_car) if v_long_car > 0 else 0
        
        assert abs(rear_slip_angle - alternate_slip_angle) < 1e-5

        # Slip ratios
        velocity_along_front_tire = v_long_car * np.cos(steering_angle) + v_lat_at_front_tire * np.sin(steering_angle)
        front_slip_ratio = self.calculate_slip_ratio(front_wheel_speed, velocity_along_front_tire)
        rear_slip_ratio = self.calculate_slip_ratio(rear_wheel_speed, v_long_car)

        front_load = (self.car_mass * gravity) * cg_to_rear / wheel_base
        rear_load = (self.car_mass * gravity) * cg_to_front / wheel_base

        front_forces_tire = np.zeros(2)
        rear_forces_tire = np.zeros(2)

        self.tire_model(front_slip_ratio, front_slip_angle, front_load, front_forces_tire)
        self.tire_model(rear_slip_ratio, rear_slip_angle, rear_load, rear_forces_tire)

        front_wheel_speed_next = front_wheel_speed + (torque_front - whl_radius * front_forces_tire[0] - np.sign(front_wheel_speed) * rolling_resistance_tire_torque) / wheel_rotational_inertia * timestep
        rear_wheel_speed_next = rear_wheel_speed + (torque_rear - whl_radius * rear_forces_tire[0] - np.sign(rear_wheel_speed) * rolling_resistance_tire_torque) / wheel_rotational_inertia * timestep
        front_slip_ratio_next = self.calculate_slip_ratio(front_wheel_speed_next, velocity_along_front_tire)
        rear_slip_ratio_next = self.calculate_slip_ratio(rear_wheel_speed_next, v_long_car)

        self.tire_model(front_slip_ratio_next, front_slip_angle, front_load, front_forces_tire)
        self.tire_model(rear_slip_ratio_next, rear_slip_angle, rear_load, rear_forces_tire)

        front_force_x_car = front_forces_tire[0] * np.cos(steering_angle) - front_forces_tire[1] * np.sin(steering_angle)
        front_force_y_car = front_forces_tire[0] * np.sin(steering_angle) + front_forces_tire[1] * np.cos(steering_angle)

        rear_force_x_car = rear_forces_tire[0]
        rear_force_y_car = rear_forces_tire[1]

        next_state = np.zeros(8)
        next_state[0] = state[0] + (v_long_car * np.cos(yaw_world) - v_lat_car * np.sin(yaw_world)) * timestep
        next_state[1] = state[1] + (v_long_car * np.sin(yaw_world) + v_lat_car * np.cos(yaw_world)) * timestep
        next_state[2] = state[2] + (yaw_rate * timestep) 
        next_state[3] = state[3] + ((front_force_x_car + rear_force_x_car) / self.car_mass + v_lat_car * yaw_rate) * timestep
        next_state[4] = state[4] + ((front_force_y_car + rear_force_y_car) / self.car_mass - v_long_car * yaw_rate) * timestep
        next_state[5] = state[5] + ((cg_to_front * front_force_y_car - cg_to_rear * rear_force_y_car) / car_rotational_inertia) * timestep
        next_state[6] = state[6] + ((torque_front - whl_radius * front_forces_tire[0]) / wheel_rotational_inertia) * timestep
        next_state[7] = state[7] + ((torque_rear - whl_radius * rear_forces_tire[0]) / wheel_rotational_inertia) * timestep
        return next_state


# torque is proportional to current
# velocity is proportinal to voltage


# model max torque and max regen as body acceleration and linear velocity

# max torque >>> max regen


# ttc with pacejka with scaling factor

"""
cdc code:

down force


"""




'\ncdc code:\n\ndown force\n\n\n'

In [None]:
# bicycle_channels = ['time'] + bicycle_state_channels + bicycle_controls_channels
# print(bicycle_channels)
# truncated_df = df.loc[10820:10900, bicycle_channels].reset_index(drop=True)
# show_table(truncated_df)


# Run the solver
print(get_usable_intervals(df))
# Initial guesses for model parameters
# Generate a report
simulate_and_compute_cost(np.array([]), df, bicycle_state_channels, bicycle_controls_channels, stride=1, output_filename='bicycle_initial_report.txt', usable_intervals=get_usable_intervals(df))

# 0.1 for vlong, 0.2 for vlat, 0.06 for yawdot


In [None]:
"""
params: [rolling_drag_constant, rolling_drag_coefficient, understeer_slope, car_mass, swangle_scale, swangle_bias, torque_scale, torque_bias]
stride represents the number of times the dynamics function is called in succession
"""

# Run the solver
import slipless_python as sp
from scipy.optimize import basinhopping
usable_intervals = get_usable_intervals(df)
# usable_intervals = [(5000, 500, 5800, 580), (10620, 1062, 11020, 1102)]
# Initial guesses for model parameters
initial_params = np.array([1.5, 1, 0, 3.3, 1, 0, 1, 0])
bounds = [(0, None), (0, None), (-50, 50), (2, 5), (0, None), (-100 * math.pi / 2, 100 * math.pi / 2), (0, None), (None, None)] # IOFEOIFNEOIFNOI
"""
params: [rolling_drag_constant, rolling_drag_coefficient, understeer_slope, car_mass, swangle_scale, swangle_bias, torque_scale, torque_bias]

approximate_range = [100, 1, 0.01, 100, 1, 0.01, 1, 0.1]

"""

# Generate a report
simulate_and_compute_cost(initial_params, df, slipless_state_channels, slipless_controls_channels, stride=16, output_filename='report.txt', usable_intervals=usable_intervals)

# Minimize cost function
# Each function invocation takes 150s

# minimizer_kwargs = {'args': (df, slipless_state_channels, slipless_controls_channels, 16, usable_intervals), 'method': 'L-BFGS-B', 'bounds': bounds}
result = basinhopping(simulate_and_compute_cost, initial_params, niter=200, T=1, stepsize=5, callback=checkpoint_optimization, minimizer_kwargs=minimizer_kwargs)

# # result = minimize(simulate_and_compute_cost, initial_params, args=(df, 16, usable_intervals), callback=checkpoint_optimization, method='L-BFGS-B', bounds=bounds, tol=1e-8, options={'maxfun': 140})
# optimized_params = result.x
# print("Optimized parameters:", optimized_params)
simulate_and_compute_cost(optimized_params, df, slipless_state_channels, slipless_controls_channels, stride=16, output_filename='optimized_report.txt', usable_intervals=usable_intervals)

# Optimized parameters: [  1.07411973  10.33957978  19.7785237    2.07410995   0.
#  -27.66939102   0.46838888 -40.61127863]


In [None]:
print(simulate_and_compute_cost(initial_params, df, slipless_state_channels stride=16, output_filename='initial_report.txt', usable_intervals=usable_intervals))
print(simulate_and_compute_cost(optimized_params, df, stride=16, output_filename='optimized_report.txt', usable_intervals=usable_intervals))

In [None]:
import slipless_python as sp
# 13459
# 0 to 1500, 4600 to 8000 are all the turns
# 8000 to 13000 are the all straights

# straight interval: 10830 - 10880 / 10430 - 11280
# circles interval: 5000 - 5800

slipless_channels = ['time', 'x', 'y', 'yaw', 'v', 'swangle', 'torque']

truncated_df = df.loc[10430:11280, slipless_channels].reset_index(drop=True)

model_explorer = ModelExplorer(truncated_df, slipless_state_channels, slipless_controls_channels)
sp2 = sp.Slipless(car_mass=500, name="heavy")
# sp3 = sp.Slipless(understeer_slope=-0.05, name="understeer")
model_explorer.add_model_instance(sp.Slipless(name="base"))
model_explorer.add_model_instance(sp2)
# model_explorer.add_model_instance(sp3)
model_explorer.simulate_and_plot()


In [None]:
# Code to find the sync point

# #graph of GPS x,y,v against time from 600 to like 650

# window = (1678.75, 1679.79)

# start_index = window[0] * 100
# end_index = window[1] * 100
# change_time = 661.9

# can_start_index = 0
# for i in range(len(can_df)):
#     if (can_df.loc[i, 'time'] > window[0] - change_time):
#         can_start_index = i
#         break
  
# can_end_index = 0
# for i in range(len(can_df)):
#     if (can_df.loc[i, 'time'] > window[1] - change_time):
#         can_end_index = i
#         break

# print(can_start_index, can_end_index)

# #actual plotting
# plt.clf()
# # plt.plot(gps_df.loc[start_index:end_index ,['time']], gps_df.loc[start_index:end_index ,['x']], color = 'red', label = 'x GPS')
# # plt.plot(gps_df.loc[start_index:end_index ,['time']], gps_df.loc[start_index:end_index ,['y']], color = 'blue', label = 'y GPS')
# plt.plot(gps_df.loc[start_index:end_index ,['time']], gps_df.loc[start_index:end_index ,['v']], color = 'purple', label = 'v GPS')
# # plt.plot((can_df.loc[can_start_index:can_end_index ,['time']] + change_time), (can_df.loc[can_start_index:can_end_index ,['AMK_FR_ActualVelocity']] / 500), color = 'green', label = 'v CAN')

# for wheel_channel in can_wheel_speed_channels:
#     plt.plot((can_df.loc[can_start_index:can_end_index ,['time']] + change_time), (can_df.loc[can_start_index:can_end_index ,[wheel_channel]] / 500), label = wheel_channel)


# # time_stamps=[]
# # horizontal_errors=[]
# # vertical_errors=[]
# # error_30_line = 30

# # with open("precisions.txt") as infile:
# #   for line in infile:
# #     line_list = line.split()
# #     time_stamps.append(((int(line_list[0]) - 31798799) / 10000))
# #     horizontal_errors.append(int(line_list[1]))
# #     vertical_errors.append(int(line_list[2]))


# # plt.xticks(np.arange(32e6,60e6,1e6))
# # plt.yticks(np.arange(0, 250,25))
# # plt.plot(time_stamps, horizontal_errors, color = 'yellow', label = 'horizontal error')
# # plt.plot(time_stamps, vertical_errors, color = 'cyan', label = 'vertical error')
# # plt.axhline(y=error_30_line, color='black', linestyle='--', label='30mm error')


# plt.title("Time values vs GPS x (red), GPS y (blue), GPS v (purple)")
# plt.legend()
# plt.xlabel("Time values")
# plt.ylabel("Values form GPS")
# plt.show()


# # print(gps_df.loc[start_index:end_index:1 ,
# # ['time', 'x', 'y', 'v']])
# # print(can_df.loc[can_start_index:can_end_index:1 ,['time'] + can_wheel_speed_channels])


# # 883.23 is the start of the car moving (according to gps)
# # 221.28971 is the start of the car moving (according to can)
# # difference is 661.94, round down to 661.90


In [None]:
# USE IF YOU NEED TO SANIY CHECK LOW ORDER EFFECTS
# 
# dummy_state_channels = ['x', 'y', 'yaw', 'xdot', 'ydot', gps_rate_of_turn_channel]
# dummy_controls_channels = ['swangle']

# class Dummy:

#     def dynamics(self, state, controls):
#         x = state[0]
#         y = state[1]
#         yaw = state[2]
#         x_dot = state[3]
#         y_dot = state[4]
#         yawdot = state[5]
#         swangle = controls[0]

#         x_next = x + x_dot * timestep 
#         y_next = y + y_dot * timestep
#         return np.array([x_next, y_next, 0, 0, 0, 0])


# simulate_and_compute_cost(np.array([]), df, dummy_state_channels, dummy_controls_channels, stride=1, output_filename='dummy_initial_report.txt', usable_intervals=get_usable_intervals(df))
# # 0.008 is the error with gps velocity -> gps position
# # 0.012 is the error with gps velocity -> cg position
# # 0.008 is the error with cg velocity -> cg position
