# Hybrid Control Car

This project will demonstrated a hybrid machine learning and optimal control theory approach to controlling a simulated racecar. A neural network - which will henceforce be referred to as the pathfinding network - will be trained using reinforcement learning to pathfind, selecting the position of and the desired velocity of the car at the next path node based on sensor readings and the current state of the car. An inner control loop will use model predictive control with LQR feedback to drive the car to the next node.

The underlying physics simulation is written in the Rust programming language using [Rapier](https://rapier.rs/) as the physics engine with Python bindings generated by [PyO3](https://pyo3.rs). Its source code is avaiable at [src/lib.rs](src/lib.rs).

We will first start by importing the necessary libraries and establishing a few parameters.

In [87]:
from hybrid_control_car import CarSimulation # The simulation developed for this project
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import pysindy

nstates = 7 # The number of state variables (x, z, v, n_x, n_z)
ninputs = 2 # The number of inputs (F_e, phi)

# 1 Deriving a Model of the Car

While an accurate model of the car itself could be derived mathematically, for the purposes of this project we will not attempt to do so, treating the car as a standing for a far more complex system. To that end, we will use Sparse Indentification of Nonlinear Dynamics (SINDy) to create a model of the system computationally using the pySINDy library. This algorithm was chosen for several reasons:

1. It may be difficult to linearize about an equilibrium point while still maintaining a high level of accuracy.
2. pySINDy allows for finding the nonlinear dynamics using multiple trajectories of test data.
3. []

A model with at least 75% accuracy for the first few seconds is desired. Later on, feedback control will be used to compensate for any inaccuracies in the model, so a near-perfect model is not necessary, but we will aim to create one anyways.

## 1.1 Simulation and Analysis

Selecting the state and input vectors requires balancing adequet representation of the system and control in the inner MPC controller loop with the complexity of the model. A model with more states specifically will require the pathfinding network to determine more states for the next node and will make it more difficult to fit the SINDy model with accuracy. With these considerations, the following state and input vectors have been selected.

$$
\vec{x} = \begin{bmatrix} x \\ z \\ v \\ n_x \\ n_z \end{bmatrix}, \vec{u} = \begin{bmatrix} \theta \\ F_e \end{bmatrix}
$$

Where $x$ and $z$ are the position of the car along those axis, $v$ is the forward velocity scalar of the car, $n_x$ and $n_z$ are the unit vector components in the forward direction of the car in the $xz$ plane, $\theta$ is the steering angle of the car, and $F_e$ is the force exerted by the engine. While the angular velocity about the y-axis could be considered, it seems like it would add more complexity to the model for very little gain.

In terms of units, we will define one second as equivalent to 60 simulation timesteps. Since $x$ and $z$ are returned in units and $v$ is returned in units per timestep, we will define one meter as equivalent to 60 units, otherwise we will get very large velocities and distances.

Training data will be generated by holding both inputs steady at multiple different values, holding one steady while one is changes, and by having both change; Testing data will be generated in the same way.

In [88]:
def smooth_curve(start: float, end: float, steps: int):
    diff = end - start
    out = np.zeros(steps)

    for i in range(steps):
        x = i - int(steps/2)
        out[i] = diff/(1 + np.exp((-12/steps)*x)) + start

    return out

In [89]:
# Set up the simulation
sim = CarSimulation()
sim.create_floor()

hz = 100 # Timesteps per second
upm = 100 # Units per meter

def simulate(sim: CarSimulation, engine_force: float, steering_angle: float):
    # Convert force to kg*units/timestep^2
    engine_force = engine_force*1000*(hz/upm**2)

    state, colliding, checkpoint, finish = sim.step(engine_force, steering_angle)
    state = np.array(state)
    # Convert units
    state[0] = state[0]/upm # x
    state[1] = state[1]/upm # z
    state[2] = state[2]*(hz/upm) # v

    return state, colliding, checkpoint, finish

def generate_training_data(sim: CarSimulation, rotation: float, nsamples, nstates, inputs):
    sim.reset_car(rotation)

    data = np.zeros((nsamples, nstates))
    # x, z, v, n_x, n_z
    x0 = np.array([0.0, 0.0, 0.0, np.cos(rotation), np.sin(rotation), 0.0, 0.0])

    for n in range(nsamples):
        engine_force = inputs[n, 0]
        steering_angle = inputs[n, 1]
        state, _, _, _ = simulate(sim, engine_force, steering_angle)
        data[n, :] = x0
        x0 = state

    return data

nsteps_samples = hz*5 # 10 seconds worth of sample data
all_sample_states = []
all_sample_inputs = []

rotations = np.linspace(0, 2*np.pi, 4)
Fe_samples = [
    0.3 * np.ones(nsteps_samples),
    0.6 * np.ones(nsteps_samples),
    0.9 * np.ones(nsteps_samples),
    1.2 * np.ones(nsteps_samples),
    1.5 * np.ones(nsteps_samples),
    1.8 * np.ones(nsteps_samples),
    smooth_curve(0, 0.6, nsteps_samples),
    smooth_curve(0.6, 0, nsteps_samples),
    smooth_curve(-0.6, 0.6, nsteps_samples),
    smooth_curve(0.6, -0.6, nsteps_samples),
    smooth_curve(0.9, -0.3, nsteps_samples),
    smooth_curve(-0.9, 0.3, nsteps_samples),
    smooth_curve(0, 1.8, nsteps_samples),
    smooth_curve(1.8, 0, nsteps_samples),
    smooth_curve(0, -1.8, nsteps_samples),
    smooth_curve(-1.8, 0, nsteps_samples),
    smooth_curve(0.6, -1.8, nsteps_samples),
    smooth_curve(-1.8, 0.6, nsteps_samples),
    smooth_curve(0.6, -0.3, nsteps_samples),
    smooth_curve(-0.3, -0.3, nsteps_samples),
]
Theta_samples = [
    np.zeros(nsteps_samples),
    0.1*np.ones(nsteps_samples),
    -0.1*np.ones(nsteps_samples),
    0.35*np.ones(nsteps_samples),
    -0.35*np.ones(nsteps_samples),
    0.4*np.ones(nsteps_samples),
    -0.4*np.ones(nsteps_samples),
    0.5*np.ones(nsteps_samples),
    -0.5*np.ones(nsteps_samples),
    0.7*np.ones(nsteps_samples),
    -0.7*np.ones(nsteps_samples),
    smooth_curve(0, 0.35, nsteps_samples),
    smooth_curve(0, -0.35, nsteps_samples),
    smooth_curve(0, 0.7, nsteps_samples),
    smooth_curve(0, -0.7, nsteps_samples),
    smooth_curve(-0.7, 0.7, nsteps_samples),
    smooth_curve(0.7, -0.7, nsteps_samples),
    smooth_curve(-0.45, 0.7, nsteps_samples),
    smooth_curve(0.45, -0.7, nsteps_samples),
    smooth_curve(-0.35, 0.35, nsteps_samples),
    smooth_curve(0.35, -0.35, nsteps_samples),
]

for i in range(len(rotations)):
    for j in range(len(Fe_samples)):
        for k in range(len(Theta_samples)):
            rotation = rotations[i]
            Fe = Fe_samples[j]
            theta = Theta_samples[k]
            inputs = np.array([Fe, theta]).T
            state = generate_training_data(sim, rotation, nsteps_samples, nstates, inputs)

            all_sample_states.append(state)
            all_sample_inputs.append(inputs)

print(f"Generated a total of {len(all_sample_states)} sample trajectories.")


Generated a total of 1680 sample trajectories.


To visualize the system dynamics, lets plot some of the test data for analysis, starting with the very first trajectory.

In [90]:
def plot_data(t, state, input):
    x = state[:, 0]
    z = state[:, 1]
    v = state[:, 2]
    phi = np.arctan2(state[:, 4], state[:, 3])
    Fe = input[:, 0]
    # Fe = Fe/1000 # Convert to kN
    theta = input[:, 1]

    fig, axs = plt.subplots(2, 2, sharex=True, figsize = (12, 8))
    axs = axs.flatten()

    axs[0].plot(t, x, label='$x$')
    axs[0].plot(t, z, label='$z$')
    axs[0].set_title("Position vs. Time")
    axs[0].set_ylabel("position (m)")
    axs[0].legend()

    axs[1].plot(t, v)
    axs[1].set_title("Linear Velocity vs. Time")
    axs[1].set_ylabel("velocity (m/s)")

    axs[2].plot(t, phi, label=r'$\phi$')
    axs[2].plot(t, theta, label=r"$\theta$")
    axs[2].set_title("Angles vs. Time")
    axs[2].set_xlabel("time (s)")
    axs[2].set_ylabel("angle (rad)")
    axs[2].legend()

    axs[3].plot(t, Fe)
    axs[3].set_title("Engine force vs. Time")
    axs[3].set_xlabel("time (s)")
    axs[3].set_ylabel("force (kN)")

tseconds = np.linspace(0, nsteps_samples - 1, nsteps_samples)/hz

#plot_data(tseconds, all_sample_states[0], all_sample_inputs[0])
# plt.show()

As expected, holding the engine force steady results in an increasing forward velocity and position, with the position increasing exponentially. What's not expected is the car being rotated slightly, likely a floating point error. I attempted to fix this by adjusting the height the car is created at in the simulation code to no avail.

Let's plot a later data set and see if there are more interesting results.

In [91]:
# plot_data(tseconds, all_sample_states[19], all_sample_inputs[19])

A higher engine force to start with naturally leads to faster oscillations in the position and velocity responses. Now, let's make one final plot.

In [92]:
# plot_data(tseconds, all_sample_states[102], all_sample_inputs[102])

It is interesting how holding the engine force steady while changing the steering angle leads to oscillations in the state responses becoming a higher frequency.

## 1.2 Modeling with pySINDy

This data will now be used to fit a SINDy model; 

Fortunately, the pySINDy Github repo provides a handy flowchart for choosing the right function library/libraries and optimizer for fitting the SINDy model to the measurement/test data.

![foo](https://raw.githubusercontent.com/dynamicslab/pysindy/master/docs/JOSS2/Fig3.png)

Since we are not solving for an implicit ODE (as far as I'm aware), the data has control inputs, is not spatially dependent and does not have significant noise, the flowchart says to use any pySINDy library. Since there are no inequality constraints in the system (as far as I'm aware), we can also use any pySINDy optimizer.

The default library is the `PolynomialLibrary`, which contains a variety of polynomial functions (to the second degree by default). PySINDy also provides a `CustomLibrary`, which will allow us to insert our own functions - we will use this to create a feature library with `sin` and `cos`, then combine with a `PolynomialLibrary`.

In [110]:
from pysindy.feature_library import *

input_features = ["x", "z", "v", "n_x", "n_z", "Fe", "theta"]
input_features_expanded = ["x", "z", "v", "n_x", "n_z", "omega_c", "omega_w", "Fe", "theta"]

poly_library = PolynomialLibrary(degree=3)
phi_library = CustomLibrary(
    [
        lambda y, x: np.sin(np.arctan2(y, x)),
        lambda y, x: np.cos(np.arctan2(y, x))
    ], 
    [
        lambda y, x: f"sin(arctan2({y},{x}))",
        lambda y, x: f"cos(arctan2({y},{x}))"
    ]
    )

trig_library_functions = [
    lambda a: np.sin(a),
    lambda a: np.cos(a),
]
trig_library_function_names = [
    lambda a: f"sin({a})",
    lambda a: f"cos({a})",
]
trig_library = CustomLibrary(trig_library_functions, trig_library_function_names)

# x, z, v, n_x, n_z, omega_c, omega_w, Fe, theta
inputs_per_library_expanded = np.array([
    [0, 1, 2, 5, 6, 7, 7],
    [8, 8, 8, 8, 8, 8, 8],
    [3, 4, 4, 4, 4, 4, 4]
])

# x, z, v, n_x, n_z, Fe, theta
inputs_per_library = np.array([
    [0, 1, 2, 5],
    [6, 6, 6, 6],
    [3, 4, 4, 4]
])

tensor_array = [
    [1, 1, 0],
    [1, 0, 1],
    [0, 1, 1]
]

generalized_library = GeneralizedLibrary(
    libraries=[poly_library, trig_library, phi_library],
    tensor_array=tensor_array,
    inputs_per_library=inputs_per_library
)

x_full = np.concatenate(
    (
        all_sample_states[0][:, :5],
        all_sample_inputs[0]
    ),
    axis=1
)

print(x_full.shape)

# We have to fit the library before we can print out the features
generalized_library.fit(
    x_full
)

print(np.array(all_sample_states[0]).shape)

feature_names = generalized_library.get_feature_names(input_features=input_features)

print(f"Library features: {feature_names}")

(500, 7)
(500, 7)
Library features: ['1', 'x', 'z', 'v', 'Fe', 'x^2', 'x z', 'x v', 'x Fe', 'z^2', 'z v', 'z Fe', 'v^2', 'v Fe', 'Fe^2', 'x^3', 'x^2 z', 'x^2 v', 'x^2 Fe', 'x z^2', 'x z v', 'x z Fe', 'x v^2', 'x v Fe', 'x Fe^2', 'z^3', 'z^2 v', 'z^2 Fe', 'z v^2', 'z v Fe', 'z Fe^2', 'v^3', 'v^2 Fe', 'v Fe^2', 'Fe^3', 'sin(theta)', 'cos(theta)', 'sin(arctan2(n_x,n_z))', 'cos(arctan2(n_x,n_z))', '1 sin(theta)', '1 cos(theta)', 'x sin(theta)', 'x cos(theta)', 'z sin(theta)', 'z cos(theta)', 'v sin(theta)', 'v cos(theta)', 'Fe sin(theta)', 'Fe cos(theta)', 'x^2 sin(theta)', 'x^2 cos(theta)', 'x z sin(theta)', 'x z cos(theta)', 'x v sin(theta)', 'x v cos(theta)', 'x Fe sin(theta)', 'x Fe cos(theta)', 'z^2 sin(theta)', 'z^2 cos(theta)', 'z v sin(theta)', 'z v cos(theta)', 'z Fe sin(theta)', 'z Fe cos(theta)', 'v^2 sin(theta)', 'v^2 cos(theta)', 'v Fe sin(theta)', 'v Fe cos(theta)', 'Fe^2 sin(theta)', 'Fe^2 cos(theta)', 'x^3 sin(theta)', 'x^3 cos(theta)', 'x^2 z sin(theta)', 'x^2 z cos(theta)

In [111]:
import cvxpy

n_targets = 5 # states
n_targets_expanded = 7 # states
n_features = generalized_library.n_output_features_
n_constraints = 2
threshold = 0.01

constraint_lhs = np.zeros((n_constraints, n_targets*n_features))
constraint_rhs = np.asarray([0, 0])
print(constraint_lhs.shape)

# Ensure the x and z components of the velocity appear in the x prime and z prime equations
vx_index = feature_names.index('v cos(arctan2(n_x,n_z))')
vz_index = feature_names.index('v sin(arctan2(n_x,n_z))')

print(feature_names[vx_index])
print(feature_names[vz_index])

constraint_lhs[0, vx_index] = -1
constraint_lhs[1, vz_index] = -1

optimizer = pysindy.optimizers.ConstrainedSR3(
    constraint_rhs=constraint_rhs,
    constraint_lhs=constraint_lhs,
    inequality_constraints=True,
    threshold=threshold,
    thresholder="l1",
    tol=1e-7,
    max_iter=100
)

(2, 915)
v cos(arctan2(n_x,n_z))
v sin(arctan2(n_x,n_z))


In [112]:
model = pysindy.SINDy(
    feature_names=input_features_expanded,
    feature_library=generalized_library,
    optimizer=optimizer
    )

# Attempt to compensate for the jolting seen at the begining of the data
X = [x[60:, :] for x in all_sample_states]
U = [u[60:, :] for u in all_sample_inputs]
T = tseconds[60:]

model.fit(
    x=X,
    t=T,
    u=U,
    multiple_trajectories=True,
    quiet=True,
    library_ensemble=True
    )
print("pySINDy indentified the following dynamics:")
model.print()

ValueError: cannot reshape array of size 915 into shape (5,182)

Unfortunately, pySINDy's method of simulating SINDy models is rather slow. In an effort to remedy this, we will use the following function written by Dr. Rico Picone ([dynamicslab/pysindy #358](https://github.com/dynamicslab/pysindy/issues/538)) to extract the dynamics into a callable function that runs much quicker.

In [8]:
def extract_sindy_dynamics(sindy_model, eps=1e-12):
    """Extract SINDy dynamics"""
    variables = sindy_model.feature_names  # e.g., ["x", "y", "z", "u"]
    coefficients = sindy_model.coefficients()
    features = sindy_model.get_feature_names()  
        # e.g., ["1", "x", "y", "z", "u", "x * y", "x * z", "x * u", "y * z", ...]
    features = [f.replace("^", "**") for f in features]
    features = [f.replace(" ", " * ") for f in features]
    def rhs(coefficients, features):
        rhs = []
        for row in range(coefficients.shape[0]):
            rhs_row = ""
            for col in range(coefficients.shape[1]):
                if np.abs(coefficients[row, col]) > eps:
                    if rhs_row:
                        rhs_row += " + "
                    rhs_row += f"{coefficients[row, col]} * {features[col]}"
            rhs.append(rhs_row)
        return rhs
    rhs_str = rhs(coefficients, features)  # Eager evaluation
    n_equations = len(rhs_str)
    def sindy_dynamics(t, x_, u_, params={}):
        states_inputs = x_.tolist() + np.atleast_1d(u_).tolist()
        variables_dict = dict(zip(variables, states_inputs))
        # This modification is needed to work with our model as it uses trig functions
        variables_dict["sin"] = lambda a: np.sin(a)
        variables_dict["cos"] = lambda a: np.cos(a)
        variables_dict["arctan2"] = lambda y, x: np.arctan2(y, x)

        return [eval(rhs_str[i], variables_dict) for i in range(n_equations)]
    return sindy_dynamics

In [9]:
from scipy.integrate import solve_ivp
# import control as ctrl

nsteps_test = hz*4
t_test = np.linspace(0, nsteps_test, nsteps_test)
test_inputs = np.array([
    smooth_curve(1.5, 1.8, nsteps_test),
    np.concatenate((
        0.45*np.ones(int(nsteps_test/4)),
        smooth_curve(0.45, -0.1, 3*int(nsteps_test/4))
    ))
]).T
test_states = generate_training_data(sim, 0.0, nsteps_test, nstates, test_inputs)

# Now to generate a comparison trajectory
x0 = np.array([0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0])
dynamics = extract_sindy_dynamics(model)
# dynamics_sys = ctrl.NonlinearIOSystem(
#     dynamics, None, inputs=["Fe", "theta"], states=["x", "z", "v", "n_x", "n_z"],
#     name="dynamics_sys"
# )
# predicted_states = ctrl.input_output_response(
#     dynamics_sys, T=t_test/hz, U=test_inputs.T, X0=x0
# ).states

predicted_states = np.zeros((nsteps_test, nstates))

for t in range(nsteps_test):
    u = test_inputs[t, :]
    predicted_states[t, :] = x0
    x = dynamics(t, x0, u)
    x0 = np.array(x)

t_test_seconds = t_test/hz

plot_data(t_test_seconds, test_states, test_inputs)
plot_data(t_test_seconds, predicted_states, test_inputs)

OverflowError: (34, 'Result too large')