 # Import Python dependencies

In [None]:
# @markdown Run this cell to install dependencies. The whole process will take about 30 seconds.
%%capture

% cd /content
! git clone https://github.com/TUMFTM/racetrack-database
! cp -r racetrack-database/tracks racetrack
! rm -rf racetrack-database
! git clone https://github.com/mdolab/pyspline.git 
% cd pyspline 
! cp config/defaults/config.LINUX_GFORTRAN.mk config/config.mk 
! make 
! pip install .
% cd .. 
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1hxZ8z5TGmPFZROMJCEg1_0OOUupWME--' -O unit_test_sol.p

In [None]:
# @markdown Run this cell to define the helper functions for the simulation

import numpy as np
from matplotlib import pyplot as plt
from matplotlib import cm
from pyspline.pyCurve import Curve
import csv
import pickle
from matplotlib import animation
from IPython.display import HTML
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']

class Track:
    def __init__(self, center_line=None, width_left=None, width_right=None, loop = True):
        '''
        Consider a track with fixed width
            center_line: 2D numpy array containing samples of track center line 
                        [[x1,x2,...], [y1,y2,...]]
            width_left: float, width of the track on the left side
            width_right: float, width of the track on the right side
            loop: Boolean. If the track has loop
        '''
        self.width_left = width_left
        self.width_right = width_right
        self.loop = loop

                
        if center_line is not None:
            self.center_line = Curve(x = center_line[0,:], y = center_line[1,:], k = 3)
            self.length = self.center_line.getLength()            
        else:
            self.length = None
            self.center_line = None

        # variables for plotting
        self.track_bound = None
        self.track_center = None
  

    def _interp_s(self, s):
        '''
        Given a list of s (normalized progress since start), return corresponing (x,y) points  
        on the track. In addition, return slope of trangent line on those points
        '''
        n = len(s)
        
        interp_pt =  self.center_line.getValue(s)
        slope = np.zeros(n)

        for i in range(n):
            deri = self.center_line.getDerivative(s[i])
            slope[i] = np.arctan2(deri[1], deri[0])
        return interp_pt.T, slope

    def interp(self, theta_list):
        '''
        Given a list of theta (progress since start), return corresponing (x,y) points  
        on the track. In addition, return slope of trangent line on those points
        '''
        if self.loop:
            s = np.remainder(theta_list, self.length)/self.length
        else:
            s = np.array(theta_list)/self.length
            s[s>1] = 1
        return self._interp_s(s)
    
    def get_closest_pts(self, points):
        '''
        Given a list of  points [[x1,x2,...], [y1,y2,...]], return the closest points 
        on the track, slopes of track at cloest points, and corresponding theta (progress).
        '''
        s, _ = self.center_line.projectPoint(points.T, eps=1e-3)

        closest_pt, slope = self._interp_s(s)
        return closest_pt, slope, s*self.length
        
    def plot_track(self, centerline = False):
        N = 50000
        if self.track_bound is None:
            theta_sample = np.linspace(0,1,N, endpoint=False)*self.length
            interp_pt, slope = self.interp(theta_sample)
            
            if self.loop:
                self.track_bound = np.zeros((6,N+1))
            else:
                self.track_bound = np.zeros((6,N))

            self.track_bound[0,:N] = interp_pt[0,:] - np.sin(slope)*self.width_left
            self.track_bound[1,:N] = interp_pt[1,:] + np.cos(slope)*self.width_left

            self.track_bound[2,:N] = interp_pt[0,:] + np.sin(slope)*self.width_right
            self.track_bound[3,:N] = interp_pt[1,:] - np.cos(slope)*self.width_right

            self.track_bound[4:,:N] = interp_pt
            
            if self.loop:
                self.track_bound[:,-1] = self.track_bound[:,0]           

        plt.plot(self.track_bound[0,:], self.track_bound[1,:], 'k-')
        plt.plot(self.track_bound[2,:], self.track_bound[3,:], 'k-', label = 'Track Bound')
        if centerline:
          plt.plot(self.track_bound[4,:], self.track_bound[5,:], 'r--', label = 'Center Line')

class UnitTest:
    def __init__(self):
        self.params = {'L': 0.257, 'm': 2.99, 'track_width': 0.4,
                'delta_min': -0.35, 'delta_max': 0.35,
                'v_max': 4, 'v_min': 0, 'a_min': -3.5,
                'a_max': 3.5, 'alat_max': 5, 'w_vel': 2,
                'w_contour': 100, 'w_theta': 1, 'w_accel': 1,
                'w_delta': 1, 'q1_v': 0.4, 'q2_v': 2,
                'q1_road': 0.4, 'q2_road': 2, 'q1_lat': 0.4,
                'q2_lat': 2, 'T': 1, 'N': 6, 'max_itr': 50}

        # define a test track
        x = []
        y = []

        with open('racetrack/IMS.csv', newline='') as f:
            spamreader = csv.reader(f, delimiter=',')
            for i, row in enumerate(spamreader):
                if i>0:
                    x.append(float(row[0]))
                    y.append(float(row[1]))

        x = np.array(x)/25.0
        y = np.array(y)/25.0
        center_line = np.array([x,y])
        
        
        self.ref_path = Track(center_line = center_line, width_left = 0.4, width_right = 0.4)
        
        solutions = pickle.load(open("unit_test_sol.p","rb"))
        self.input = solutions[0]

        self.step = self.input.shape[-1]
        
        self.forward_step_sol = solutions[1]
        self.get_AB_matrix_sol = solutions[2]
        self.get_cost_sol = solutions[3]
        self.cost_state_deriv_sol = solutions[4]
        self.cost_control_deriv_sol = solutions[5]
        

    def choose_test_case(self, i):
        if i>=0 and i<self.step:
            print("Using testcase ",i)
            return i
        else:
            i_new = np.random.randint(0, self.step)
            print("Testcase ", i, " not found. Use randomly selected testcase ", i_new)
            return i_new      
    
    def check_solution(self, name, test, solution):
        print("Expected solution for ", name, " is:")
        print(solution)
        print("Your function output is:")
        print(test)
        print("with max error ", np.max(np.abs(test-solution)))     
          
    def test_forward_step(self, dyn_class, i):
        dynamics = dyn_class(self.params)
        i = self.choose_test_case(i)
        input = self.input[:,0,i]
        solution = self.forward_step_sol[:,i]
        test, _ = dynamics.forward_step(input[:4], input[4:])
        self.check_solution('test_forward', test, solution)
            
    def test_AB_matrix(self, dyn_class, i):
        dynamics = dyn_class(self.params)
        i = self.choose_test_case(i)
        input = self.input[:,:,i]
        solution_A, solution_B = self.get_AB_matrix_sol[i]
        A, B = dynamics.get_AB_matrix(input[:4,:], input[4:,:])
        self.check_solution('A_k', A, solution_A)
        self.check_solution('B_k', B, solution_B)
            
    def test_get_cost(self, cost_class, i):
        cost = cost_class(self.params)
        i = self.choose_test_case(i)
        input = self.input[:,:,i]
        solution = self.get_cost_sol[i]
        
        closest_pt, slope, theta = self.ref_path.get_closest_pts(input[:2,:])
        J = cost.get_cost(input[:4,:], input[4:,:], closest_pt, slope, theta)
        self.check_solution('J', J, solution)

    def test_cost_state_deriv(self, cost_class, i):
        cost = cost_class(self.params)
        i = self.choose_test_case(i)
        input = self.input[:,:,i]
        solution_c_x, solution_c_xx = self.cost_state_deriv_sol[i]
        closest_pt, slope, theta = self.ref_path.get_closest_pts(input[:2,:])
        c_x, c_xx = cost._get_cost_state_derivative(input[:4,:], closest_pt, slope)
        
        self.check_solution('c_x', c_x, solution_c_x)
        self.check_solution('c_xx', c_xx, solution_c_xx)
    
    def test_cost_control_deriv(self, cost_class, i):
        cost = cost_class(self.params)
        i = self.choose_test_case(i)
        input = self.input[:,:,i]
        solution_c_u, solution_c_uu = self.cost_control_deriv_sol[i]
        c_u, c_uu = cost._get_cost_control_derivative(input[4:,:])
        self.check_solution('c_u', c_u, solution_c_u)
        self.check_solution('c_uu', c_uu, solution_c_uu)


unit_test = UnitTest()


# Task 1: Define System Dynamics

* Implement ```Dynamics.forward_step()} ``` function following the discrete dynamics shown in Equation 2 of pdf file.
* Derive closed-form expressions for $A_k$ and $B_k$ matrices, and use your derivations to fill in the missing code of ```Dynamics.get_AB_matrix()```function in the colab
* Verify your functions using provided test cases in the Colab.

\begin{align*}
A_k &:= \frac{\partial f}{\partial x}(\bar{x}_k, \bar{u}_k) = \begin{pmatrix}
1 & 0 & \Delta t\cos(\bar{\psi}_k) & -(\bar{v}_k\Delta t + \frac{1}{2}\bar{a}_k\Delta t^2)\sin(\bar{\psi}_k)\\
0 & 1 & \Delta t\sin(\bar{\psi}_k) & (\bar{v}_k\Delta t + \frac{1}{2}\bar{a}_k\Delta t^2)\cos(\bar{\psi}_k)\\
0 & 0 & 1 & 0\\
0 & 0 & \frac{\Delta t}{L}\tan(\bar{\delta}_k) & 1
\end{pmatrix}\\
\\
B_k &:= \frac{\partial f}{\partial u}(\bar{x}_k, \bar{u}_k) = \begin{pmatrix}
\frac{1}{2}\Delta t^2 \cos(\bar{\psi}_k) & 0\\
\frac{1}{2}\Delta t^2 \sin(\bar{\psi}_k) & 0\\
\Delta t & 0\\
\frac{\Delta t^2}{2L} \tan(\bar{\delta}_k) & \frac{\bar{v}_k\Delta t + \frac{1}{2}\bar{a}_k\Delta t^2}{L} \frac{1}{\cos(\bar{\delta}_k)^2}\\
\end{pmatrix}
\end{align*}

In [None]:
class Dynamics:
    def __init__(self, params):
        self.dim_x = 4
        self.dim_u = 2
        
        # load parameters
        self.T = params['T'] # Planning Time Horizon
        self.N = params['N'] # number of planning steps
        self.dt = self.T/(self.N-1) # time step for each planning step
        
        self.L = params['L'] # vehicle chassis length
        self.delta_min = params['delta_min'] # min steering angle rad
        self.delta_max = params['delta_max'] # max steering angle rad
        self.a_min = params['a_min'] # min longitudial accel
        self.a_max = params['a_max'] # max longitudial accel
        self.v_min = params['v_min'] # min velocity
        self.v_max = params['v_max'] # max velocity

        # useful constants
        self.zeros = np.zeros((self.N))
        self.ones = np.ones((self.N))
        
    def forward_step(self, state, control):
        """
        Find the next state of the vehicle given the current state and 
        control input state

            state: 4x1 array [X, Y, V, psi]
            control: 2x1 array [a, delta]
        """
        # Clips the controller values between min and max accel and steer values
        accel = np.clip(control[0], self.a_min, self.a_max)
        delta = np.clip(control[1], self.delta_min, self.delta_max)
        control_clip = np.array([accel, delta])       

        '''
        TODO:
          Use the Equation 2 in the lab pdf, calculate the next_state of the 
          robot
        '''
        x, y, v, psi = state
        a, delta = control_clip
        c = v * self.dt + 0.5 * a * self.dt * self.dt

        next_state = state + np.array([
          c * np.cos(psi),
          c * np.sin(psi),
          a * self.dt,
          c / self.L * np.tan(delta),
        ])
        '''
        END of TODO
        '''
        # Clip the velocity
        next_state[2] = min(max(self.v_min, next_state[2]), self.v_max)
        
        return next_state, control_clip

    def get_AB_matrix(self, nominal_states, nominal_controls):
        """
        Returns the linearized 'A' and 'B' matrix of the ego vehicle around 
        nominal states and controls

          nominal_states: 4xN array
          nominal_controls: 2xN array
        """  
        
        '''
        TODO:
          Calculate A_k and B_k matrix around nominal trajectory and control
        '''
        x, y, v, psi = nominal_states
        a, delta = nominal_controls
        dt2 = self.dt * self.dt
        c = v * self.dt + 0.5 * a * dt2
        
        O = np.zeros(self.N)
        I = np.ones(self.N)

        A = np.array([
          [I, O, self.dt * np.cos(psi), -c * np.sin(psi)],
          [O, I, self.dt * np.sin(psi), c * np.cos(psi)],
          [O, O, I, O],
          [O, O, self.dt / self.L * np.tan(delta), I],
        ])
        B = np.array([
          [0.5 * dt2 * np.cos(psi), O],
          [0.5 * dt2 * np.sin(psi), O],
          [self.dt * I, O],
          [0.5 * dt2 / self.L * np.tan(delta), c / (self.L * np.cos(delta) * np.cos(delta))],
        ])
        
        '''
        END of TODO
        '''
        
        return A, B

In [None]:
# @markdown ##Test ```Dynamics.forward_step()``` function
test_case =  20#@param {type:"integer"}
unit_test.test_forward_step(Dynamics, test_case)

In [None]:
# @markdown ##Test ```Dynamics.get_AB_matrix()``` function
test_case = 0 #@param {type:"integer"}
unit_test.test_AB_matrix(Dynamics, test_case)

# Define the Optimization Problem

## Soft Constraints

In [None]:
# @markdown Run this cell to initialize the Constraint class.
class Constraints:
    def __init__(self, params):
        # load parameters
        self.T = params['T'] # Planning Time Horizon
        self.N = params['N'] # number of planning steps
        self.dt = self.T/(self.N-1) # time step for each planning step
        
        self.L = params['L'] # vehicle chassis length
        self.delta_min = params['delta_min'] # min steering angle rad
        self.delta_max = params['delta_max'] # max steering angle rad
        self.a_min = params['a_min'] # min longitudial accel
        self.a_max = params['a_max'] # max longitudial accel
        self.v_min = params['v_min'] # min velocity
        self.v_max = params['v_max'] # max velocity
        self.alat_max = params['alat_max'] # max lateral accel
        self.alat_min = -params['alat_max'] # min lateral accel
        self.track_width = params['track_width']

        # parameter for barrier functions
        self.q1_v = params['q1_v']
        self.q2_v = params['q2_v']

        self.q1_road = params['q1_road']
        self.q2_road = params['q2_road']

        self.q1_lat = params['q1_lat']
        self.q2_lat = params['q2_lat']
    
        # useful constants
        self.zeros = np.zeros((self.N))
        self.ones = np.ones((self.N))


    def get_cost(self, states, controls, closest_pt, slope):
        '''Road Boundary cost'''
        dx = states[0,:] - closest_pt[0,:]
        dy = states[1,:] - closest_pt[1,:]
        d = np.sin(slope)*dx - np.cos(slope)*dy

        c_boundary = self.q1_road*np.exp(self.q2_road*(d-self.track_width)) \
                        + self.q1_road*np.exp(self.q2_road*(self.track_width-d))
        
        c_vel = self.q1_v*np.exp(self.q2_v*(states[2,:] - self.v_max)) \
                        + self.q1_v*np.exp(-states[2,:]*self.q2_v)

        # calculate the acceleration
        accel = states[2,:]**2*np.tan(controls[1,:])/self.L
        error_ub = accel - self.alat_max
        error_lb = self.alat_min - accel

        b_ub = self.q1_lat*np.exp(self.q2_lat*error_ub)
        b_lb = self.q1_lat*np.exp(self.q2_lat*error_lb)
        c_lat = b_lb+b_ub
                
        return c_vel+c_boundary+c_lat 

    def get_derivatives(self, states, controls, closest_pt, slope):
        ''' 
        Calculate the Jacobian and Hessian of soft constraint cost
            states: 4xN array of planned trajectory
            controls: 2xN array of planned control
            closest_pt: 2xN array of each state's closest point [x,y] on the track
            slope: 1xN array of track's slopes (rad) at closest points
        '''

        c_x_lat, c_xx_lat, c_u_lat, c_uu_lat, c_ux_lat = \
            self._lat_accec_bound_derivative(states, controls)
        c_x_rd, c_xx_rd = self._road_boundary_derivate(states, closest_pt, slope)  
        c_x_vel, c_xx_vel = self._velocity_bound_derivate(states)
        c_x_cons = c_x_rd+c_x_vel+ c_x_lat
        c_xx_cons = c_xx_rd+c_xx_vel+ c_xx_lat
        c_u_cons = c_u_lat
        c_uu_cons = c_uu_lat
        c_ux_cons = c_ux_lat

        return c_x_cons, c_xx_cons, c_u_cons, c_uu_cons, c_ux_cons

    def _road_boundary_derivate(self, states, closest_pt, slope):
        ''' 
        Calculate the Jacobian and Hessian of road boundary soft constraint cost
            states: 4xN array of planned trajectory
            closest_pt: 2xN array of each state's closest point [x,y] on the track
            slope: 1xN array of track's slopes (rad) at closest points
        '''
        # constraint due to right road boundary. smaller than right_width             
        transform = np.array([np.sin(slope), -np.cos(slope), self.zeros, self.zeros])
        
        ref_states = np.zeros_like(states)
        ref_states[:2, :] = closest_pt
        
        error = states - ref_states

        c = np.einsum('an,an->n', transform, error) - self.track_width
        c_x_u, c_xx_u = self.barrier_function(self.q1_road, self.q2_road, c, transform)

        # constraint due to left road boundary. larger than -left_width
        c = - self.track_width - np.einsum('an,an->n', transform, error) 
        c_x_l, c_xx_l = self.barrier_function(self.q1_road, self.q2_road, c, -transform)

        return c_x_u+c_x_l, c_xx_u+c_xx_l
        
    def _velocity_bound_derivate(self, states):
        ''' 
        Calculate the Jacobian and Hessian of velocity soft constraint cost
            states: 4xN array of planned trajectory
        '''
        # less than V_max
        transform = np.array([self.zeros, self.zeros, self.ones, self.zeros])
        c = states[2,:] - self.v_max
        c_x_u, c_xx_u = self.barrier_function(self.q1_v, self.q2_v, c, transform)

        # larger than 0
        c = -states[2,:]
        c_x_l, c_xx_l = self.barrier_function(self.q1_v, self.q2_v, c, -transform)

        return c_x_u+c_x_l, c_xx_u+c_xx_l
        
    def _lat_accec_bound_derivative(self, states, controls):
        ''' 
        Calculate the Jacobian and Hessian of Lateral Acceleration soft constraint cost
            states: 4xN array of planned trajectory
            controls: 2xN array of planned control
        '''
        c_x = np.zeros((4,self.N))
        c_xx = np.zeros((4,4,self.N))
        c_u = np.zeros((2,self.N))
        c_uu =np.zeros((2,2,self.N))
        c_ux = np.zeros((2,4,self.N))

        # calculate the acceleration
        accel = states[2,:]**2*np.tan(controls[1,:])/self.L
        
        error_ub = accel - self.alat_max
        error_lb = self.alat_min - accel

        b_ub = self.q1_lat*np.exp(self.q2_lat*error_ub)
        b_lb = self.q1_lat*np.exp(self.q2_lat*error_lb)

        da_dx = 2*states[2,:]*np.tan(controls[1,:])/self.L
        da_dxx = 2*np.tan(controls[1,:])/self.L

        da_du = states[2,:]**2/(np.cos(controls[1,:])**2*self.L)
        da_duu = states[2,:]**2*np.sin(controls[1,:])/(np.cos(controls[1,:])**3*self.L)

        da_dux = 2*states[2,:]/(np.cos(controls[1,:])**2*self.L)

        c_x[2,:] = self.q2_lat*(b_ub-b_lb)*da_dx
        c_u[1,:] = self.q2_lat*(b_ub-b_lb)*da_du

        c_xx[2,2,:] = self.q2_lat**2*(b_ub+b_lb)*da_dx**2 + self.q2_lat*(b_ub-b_lb)*da_dxx
        c_uu[1,1,:] = self.q2_lat**2*(b_ub+b_lb)*da_du**2 + self.q2_lat*(b_ub-b_lb)*da_duu

        c_ux[1,2,:] = self.q2_lat**2*(b_ub+b_lb)*da_dx*da_du + self.q2_lat*(b_ub-b_lb)*da_dux
        return c_x, c_xx, c_u, c_uu, c_ux

    def barrier_function(self, q1, q2, c, c_dot):
        '''
        c = [n] array
        c_dot = [dxn] array
        '''
        b = q1*np.exp(q2*c)
        b_dot = np.einsum('n,an->an', q2*b, c_dot)
        b_ddot = np.einsum('n,abn->abn', (q2**2)*b, np.einsum('an,bn->abn',c_dot, c_dot))
        return b_dot, b_ddot

## Task 2: Cost Functions

* Verify the Jacobian and Hessian for $c^x$ and $c^\theta$. 
* Derive the  Jacobian and Hessian terms for  $c^u$ in the cost function
* Follow the hint in Colab and implement ```Cost.get_cost()```,  ```Cost._get_cost_state_derivative()```, and ```Cost._get_cost_control_derivative()``` functions. 
* Verify your functions using provided test cases in the Colab.

In [None]:
class Cost:
    def __init__(self, params):
        self.soft_constraints = Constraints(params)
                      
        # load parameters
        self.T = params['T'] # Planning Time Horizon
        self.N = params['N'] # number of planning steps
        self.dt = self.T/(self.N-1) # time step for each planning step
        self.v_max = params['v_max'] # max velocity

        # cost 
        self.w_vel = params['w_vel'] 
        self.w_contour = params['w_contour']
        self.w_theta = params['w_theta']
        self.w_accel = params['w_accel']
        self.w_delta = params['w_delta']
        
        self.W_state = np.array([[self.w_contour, 0],
                            [0, self.w_vel]])
        self.W_control = np.array([[self.w_accel, 0], [0, self.w_delta]])
        
        # useful constants
        self.zeros = np.zeros((self.N))
        self.ones = np.ones((self.N))

    def get_cost(self, states, controls, closest_pt, slope, theta):
        '''
        Given planned states and controls, calculate the cost
            states: 4xN array of planned trajectory
            controls: 2xN array of planned control
            closest_pt: 2xN array of each state's closest point [x,y] on the track
            slope: 1xN array of track's slopes (rad) at closest points
            theta: 1xN array of the progress at each state
        '''
        # This is the c^g term in Equation 5 in the lab pdf.
        # we have implemented for you.
        c_g = np.sum(self.soft_constraints.get_cost(states, controls, closest_pt, slope))
        
        '''
        TODO: 
          calculate c^x, c^theta, c^u terms in the Equation 5
        '''
        error = np.array([
          (states[0] - closest_pt[0]) * np.sin(slope) - (states[1] - closest_pt[1]) * np.cos(slope), # error in position
          states[2] - self.v_max, # error in velocity
        ])

        c_x = np.sum(np.einsum("ik,ij,jk->k", error, self.W_state, error))
        c_theta = np.sum(self.w_theta * theta)
        c_u = np.sum(np.einsum("ik,ij,jk->k", controls, self.W_control, controls))
        '''
        END OF TODO
        '''
        c  = c_x+c_u-c_theta+c_g
        return c
        
    def get_derivatives(self, states, controls, closest_pt, slope):
        '''
        Calculate Jacobian and Hessian of the cost function
            states: 4xN array of planned trajectory
            controls: 2xN array of planned control
            closest_pt: 2xN array of each state's closest point [x,y] on the track
            slope: 1xN array of track's slopes (rad) at closest points
        '''
        # This is calculate Jacobian and Hessian of c^g term in Equation 5 in the lab pdf.
        # we have implemented for you.
        c_x_cons, c_xx_cons, c_u_cons, c_uu_cons, c_ux_cons = \
            self.soft_constraints.get_derivatives( states, controls, closest_pt, slope)
        
        c_x_cost, c_xx_cost = self._get_cost_state_derivative(states, closest_pt, slope)
        
        c_u_cost, c_uu_cost = self._get_cost_control_derivative(controls)

        # calculate the jacobian and hessian of entire cost function
        # See equation 10 and 11 in lab pdf for details
        
        q = c_x_cons+c_x_cost
        Q = c_xx_cons+c_xx_cost
        
        r = c_u_cost+ c_u_cons
        R = c_uu_cost+c_uu_cons

        S = c_ux_cons
        
        return q, Q, r, R, S
    
    def _get_cost_state_derivative(self, states, closest_pt, slope):
        '''
        Calculate Jacobian and Hessian of the cost function with respect to state
            states: 4xN array of planned trajectory
            closest_pt: 2xN array of each state's closest point [x,y] on the track
            slope: 1xN array of track's slopes (rad) at closest points
        '''

        '''TODO 1: 
          Calculate the Jacobian and Hessian of c^x term in Equation 5.
          You can use the expression in Equation 6.
          Derivations can be found in Appendix.
        '''
        error = np.array([
          (states[0] - closest_pt[0]) * np.sin(slope) - (states[1] - closest_pt[1]) * np.cos(slope), # error in position
          states[2] - self.v_max, # error in velocity
        ])

        O = np.zeros(self.N)
        I = np.ones(self.N)
        T = np.array([
          [np.sin(slope), -np.cos(slope), O, O],
          [            O,              O, I, O],
        ])

        c_x_state = 2 * np.einsum("ipk,ij,jk->pk", T, self.W_state, error)
        c_xx = 2 * np.einsum("ipk,ij,jqk->pqk", T, self.W_state, T)
        '''TODO 2: 
          Calculate the Jacobian and Hessian of c^theta term in Equation 5.
          You can use the expression in Equation 9.
          Derivations can be found in Appendix.
        '''
        c_x_progress = self.w_theta * np.array([np.cos(slope), np.sin(slope), O, O])
        '''
        END OF TODO
        '''
        c_x = c_x_state - c_x_progress
        return c_x, c_xx
    
    def _get_cost_control_derivative(self, controls):
        '''
        Calculate Jacobian and Hessian of the cost function w.r.t the control
            controls: 2xN array of planned control
        '''
        '''TODO
          Calculate the Jacobian and Hessian of c^u term in Equation 5.
        '''
        WpWT = self.W_control + self.W_control.T
        c_u = np.einsum("ik,ij->jk", controls, WpWT)
        c_uu = np.repeat(WpWT[:,:,None], self.N, axis=-1)
        '''
        END OF TODO
        ''' 
        return c_u, c_uu

In [None]:
# @markdown ##Test ```Cost.get_cost()``` function
test_case = 0 #@param {type:"integer"}
unit_test.test_get_cost(Cost, test_case)

In [None]:
# @markdown ##Test ```Cost._get_cost_state_derivative()``` function
test_case = 0 #@param {type:"integer"}
unit_test.test_cost_state_deriv(Cost, test_case)

In [None]:
# @markdown ##Test ```Cost._get_cost_control_derivative()``` function
test_case = 0 #@param {type:"integer"}
unit_test.test_cost_control_deriv(Cost, test_case)

# Task 3: Iterative LQR

Following the precept handout and hints in the Colab notebook, implement the ```iLQR.forward_pass()``` and  ```iLQR.backward_pass()``` functions.

In [None]:
import time

class iLQR():
    def __init__(self, ref_path, params):
        
        self.T = params['T']
        self.N = params['N']
        
        self.ref_path = ref_path

        self.steps = params['max_itr']

        self.tol = 1e-4
        self.lambad = 100
        self.lambad_max = 1000
        self.lambad_min = 1e-3

        self.dynamics = Dynamics(params)
        self.alphas = 1.1**(-np.arange(10)**2)

        self.dim_x = self.dynamics.dim_x
        self.dim_u = self.dynamics.dim_u

        self.cost = Cost(params)
        
    def forward_pass(self, nominal_states, nominal_controls, K_closed_loop, k_open_loop, alpha):
        '''
        forward pass of ilqr
            nominal_states: 4xN array of nominal trajectory
            nominal_controls: 2xN array of nominal control
            K_closed_loop: [2x4xN] array of closed loop gain
            k_open_loop: [2xN] array of open loop gain
            alpha: float, line search parameters
        '''

        ''' 
        TODO:
          implement the forward pass following Algorithm 2 in the percept pdf
        '''
        X = np.zeros_like(nominal_states)
        U = np.zeros_like(nominal_controls)

        X[:,0] = nominal_states[:,0]
        for t in range(self.N-1):
          U[:,t] = nominal_controls[:,t] + K_closed_loop[:,:,t] @ (X[:,t] - nominal_states[:,t]) + alpha * k_open_loop[:,t]
          X[:,t+1], _ = self.dynamics.forward_step(X[:,t], U[:,t])
        '''
        END OF TODO
        '''
        # find the progress, closest points and their slope of updated trajectory
        closest_pt, slope, theta = self.ref_path.get_closest_pts(X[:2,:])
        # calculate the cost of new trajectory
        J = self.cost.get_cost(X, U, closest_pt, slope, theta)
        
        return X, U, J, closest_pt, slope, theta
 
 
        
    def backward_pass(self, nominal_states, nominal_controls, closest_pt, slope):
        '''
        forward pass of ilqr
            nominal_states: 4xN array of nominal trajectory
            nominal_controls: 2xN array of nominal control
            closest_pt: [2xN] array of the closed points for each waypoints on the reference path
            slope: N-D array of the reference path slope (rad) at closest points
        '''
        L_x, L_xx, L_u, L_uu, L_ux = self.cost.get_derivatives(nominal_states, nominal_controls, closest_pt, slope)
        
        A, B = self.dynamics.get_AB_matrix(nominal_states, nominal_controls)

        reg_mat = self.lambad*np.eye(self.dim_u)

        ''' TODO: 
          implement the forward pass following Algorithm 1 in the pdf
        '''

        ''' Hint: 
        When you want to invert Q_uu matrix, do 
            Q_uu_inv = np.linalg.inv(Q_uu+reg_mat)
        to improve numerical stability
        '''

        k_open_loop = np.zeros((self.dim_u, self.N-1))
        K_closed_loop = np.zeros((self.dim_u, self.dim_x, self.N-1))
        
        p = L_x[:,self.N-1]
        P = L_xx[:,:,self.N-1]
        for t in range(self.N-2, -1, -1):
          # compute quality function and its derivatives
          Q_x = L_x[:,t] + A[:,:,t].T @ p
          Q_u = L_u[:,t] + B[:,:,t].T @ p
          Q_xx = L_xx[:,:,t] + A[:,:,t].T @ P @ A[:,:,t]
          Q_uu = L_uu[:,:,t] + B[:,:,t].T @ P @ B[:,:,t]
          Q_ux = B[:,:,t].T @ P @ A[:,:,t] + L_ux[:,:,t]
          
          Q_uu_inv = np.linalg.inv(Q_uu + reg_mat)

          # update optimal control
          k_open_loop[:,t] = -Q_uu_inv @ Q_u
          K_closed_loop[:,:,t] = -Q_uu_inv @ Q_ux

          # update value function and its derivatives
          p = Q_x - Q_ux.T @ Q_uu_inv @ Q_u
          P = Q_xx - Q_ux.T @ Q_uu_inv @ Q_ux       
        '''
        END OF TODO
        '''     
        return K_closed_loop, k_open_loop



    def solve(self, cur_state, controls = None):
        status = 0
        self.lambad = 100

        time0 = time.time()

        if controls is None:
            controls = np.zeros((self.dim_u, self.N))            
        states = np.zeros((self.dim_x, self.N))
        states[:,0] = cur_state

        for i in range(1,self.N):
            states[:,i],_ = self.dynamics.forward_step(states[:,i-1], controls[:,i-1])
        closest_pt, slope, theta = self.ref_path.get_closest_pts(states[:2,:])

        J = self.cost.get_cost(states, controls,  closest_pt, slope, theta)
        
        converged = False
        
        for i in range(self.steps):
            K_closed_loop, k_open_loop = self.backward_pass(states, controls, closest_pt, slope)

            updated = False
            for alpha in self.alphas :
                X_new, U_new, J_new, closest_pt_new, slope_new, theta_new = self.forward_pass(states, controls, K_closed_loop, k_open_loop, alpha)
                if J_new<=J:
                    if np.abs((J - J_new) / J) < self.tol:
                        converged = True   
                    J = J_new
                    states = X_new
                    controls = U_new
                    closest_pt = closest_pt_new
                    slope = slope_new
                    theta = theta_new
                    updated = True
                    break
            if updated:
                self.lambad *= 0.7
            else:
                self.lambad *= 2
            self.lambad = min(max(self.lambad_min, self.lambad), self.lambad_max)
            
            if converged:
                status = 1
                break
        t_process = time.time()-time0
        return states, controls, t_process, status, theta

# Task 4: Simulation on Race Tracks

In [None]:
#@title { run: "auto" , form-width: "30%"}
#@markdown # Set parameters for the planner
#@markdown The default parameter can be used a baseline. Please play around with them to see how they will affect your planned trajectory

# fixed parameters
params = {}
params['L'] = 0.257 # wheelbase [m]
params['m'] = 2.99 # weight [kg]
params['track_width'] = 0.4 # [m]
params['delta_min']= -0.35  # minimum steering angle [rad]
params['delta_max']= 0.35  # maximum steering angle [rad]
params['track_width'] = 0.4 #[m ]

#@markdown ## velocity
v_max = 4 #@param {type:"slider", min:0, max:5, step:0.1}
params['v_max']=v_max # max long vel [m/s]
params['v_min']=0 # min long vel [m/s]


#@markdown ## longitudinal acceleration
a_min = -3.5 #@param {type:"slider", min:-10, max:0, step:0.1}
a_max = 3.5 #@param {type:"slider", min:0, max:10, step:0.1}
params['a_min']= a_min
params['a_max']= a_max

#@markdown ## lateral acceleration
alat_max = 5 #@param {type:"slider", min:0, max:10, step:0.1}
params['alat_max']= alat_max

#@markdown ## weights in cost

w_vel = 2 #@param {type:"number"}
w_contour = 100 #@param {type:"number"}
w_theta =  2#@param {type:"number"}
w_accel =  1#@param {type:"number"}
w_delta = 1 #@param {type:"number"}

params['w_vel'] = w_vel
params['w_contour'] = w_contour
params['w_theta'] = w_theta
params['w_accel'] = w_accel
params['w_delta'] = w_delta

#@markdown ## weights in soft constraints

# parameter for soft constraints
q1_v =  0.4#@param {type:"number"}
q2_v =  2#@param {type:"number"}

q1_road = 0.4#@param {type:"number"}
q2_road = 2#@param {type:"number"}

q1_lat =  0.4#@param {type:"number"}
q2_lat =  2#@param {type:"number"}

params['q1_v'] = q1_v
params['q2_v'] = q2_v
params['q1_road'] = q1_road
params['q2_road'] = q2_road
params['q1_lat'] = q1_lat
params['q2_lat'] = q2_lat

#@markdown ## Disturbance in simulation
sigma_x = 0 #@param {type:"slider", min:0, max:5, step:0.1}
sigma_y = 0 #@param {type:"slider", min:0, max:5, step:0.1}
sigma_v = 0 #@param {type:"slider", min:0, max:5, step:0.1}
sigma_theta = 0 #@param {type:"slider", min:0, max:5, step:0.1}
sigma = np.array([sigma_x, sigma_y, sigma_v, sigma_theta])

#@markdown ## iLQR Parameters
T = 2 #@param {type:"number"}
N = 10 #@param {type:"integer"}
max_itr = 50  #@param {type:"integer"}
params['T'] = T
params['N'] = N+1
params['max_itr'] = max_itr

track_name = "Austin" #@param ["Austin", "BrandsHatch", "Budapest", "Catalunya", "Hockenheim", "IMS", "MexicoCity", "Montreal", "Monza", "MoscowRaceway", "Norisring", "Nuerburgring", "Oschersleben", "Sakhir", "SaoPaulo", "Sepang", "Shanghai", "Silverstone", "Sochi", "Spa", "Spielberg", "Suzuka", "YasMarina", "Zandvoort"]

## Simulation results

### Tuning of iLQR parameters
v_max|a_min|a_max|alat_max| \| |w_vel|w_contour|w_theta|w_accel|w_delta| \| |sigma_x|sigma_y|sigma_v|sigma_theta| \| |T|N|max_itr| \| |time|comp_time|remark
-|-|-|-| - |-|-|-|-|-| - |-|-|-|-| - |-|-|-| - |-|-|-
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|0| \| |2|10|25| \| |**55.2**|60|*baseline*
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|0| \| |2|10|**50**| \| |**55.2**|116|*initial configuration*
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|0| \| |**1**|**5**|25| \| |**59.8**|24|
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|0| \| |2|10| **5**| \| |**54.8**|15|worse on track

### Tuning of weights in cost
v_max|a_min|a_max|alat_max| \| |w_vel|w_contour|w_theta|w_accel|w_delta| \| |sigma_x|sigma_y|sigma_v|sigma_theta| \| |T|N|max_itr| \| |time|comp_time|remark
-|-|-|-| - |-|-|-|-|-| - |-|-|-|-| - |-|-|-| - |-|-|-
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|0| \| |2|10|25| \| |**55.2**|60|*baseline*
4|-3.5|3.5|5| \| |**4**|100|2|1|1| \| |0|0|0|0| \| |2|10|25| \| |**54.2**|59|less braking before curves
4|-3.5|3.5|5| \| |2|**200**|2|1|1| \| |0|0|0|0| \| |2|10|25| \| |**56.2**|54|better on track
4|-3.5|3.5|5| \| |2|100|**3**|1|1| \| |0|0|0|0| \| |2|10|25| \| |**52.4**|66|stronger acceleration
4|-3.5|3.5|5| \| |2|100|2|**0.5**|1| \| |0|0|0|0| \| |2|10|25| \| |**54.8**|60|stronger acceleration/breakes in curves
4|-3.5|3.5|5| \| |2|100|2|**2**|1| \| |0|0|0|0| \| |2|10|25| \| |**56.0**|63|weaker acceleration
4|-3.5|3.5|5| \| |2|100|2|1|**0**| \| |0|0|0|0| \| |2|10|25| \| |**55.2**|59|no significant change in trajectory
4|-3.5|3.5|5| \| |2|100|2|1|**4**| \| |0|0|0|0| \| |2|10|25| \| |**55.2**|62|no change in trajectory
4|-3.5|3.5|5| \| |2|100|2|1|**8**| \| |0|0|0|0| \| |2|10|25| \| |**55.0**|58|worse on track (since less curvature allowed)
4|-3.5|3.5|5| \| |**4**|100|**3**|1|1| \| |0|0|0|0| \| |2|10|25| \| |**51.8**|61|very bad trajectory
4|-3.5|3.5|5| \| |**3**|**125**|**3**|1|1| \| |0|0|0|0| \| |2|10|25| \| |**52.2**|58|acceptable trajectory

### Impact of noise
v_max|a_min|a_max|alat_max| \| |w_vel|w_contour|w_theta|w_accel|w_delta| \| |sigma_x|sigma_y|sigma_v|sigma_theta| \| |T|N|max_itr| \| |time|comp_time|remark
-|-|-|-| - |-|-|-|-|-| - |-|-|-|-| - |-|-|-| - |-|-|-
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|0| \| |2|10|25| \| |**55.2**|60|*baseline*
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |**0.5**|**0.5**|0|0| \| |2|10|25| \| |**54.6**|51|more noisy trajectory
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |**1.5**|**1.5**|0|0| \| |2|10|25| \| |**53.8**|42|very noisy trajectory
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|**0.5**|0| \| |2|10|25| \| |**55.0**|55|?
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|**1.5**|0| \| |2|10|25| \| |**53.2**|59|more noisy trajectory
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|**0.5**| \| |2|10|25| \| |**54.4**|57|?
4|-3.5|3.5|5| \| |2|100|2|1|1| \| |0|0|0|**1.5**| \| |2|10|25| \| |**-**|-|very noisy trajectory + overflow

*Note: Noise is especially visible in lateral acceleration.*

### Notes
- overflow in exponential function (of soft constraint?) often a problem
- planning more steps with larger horizon often with artefacts (maybe weights on immediate next states too small? exponential decay into the future?)
- planning with higher velocity more coarse (maybe adapt time step size to current velocity?)

### Questions
- lap time getting better with more noise?

In [None]:
#@markdown # Run Simulation

dt = params['T'] / (params['N']-1)

def simulate_kin(state, control, step = 10):

    accel = np.clip(control[0], params['a_min'], params['a_max'])
    delta = np.clip(control[1], params['delta_min'], params['delta_max'])
    
    next_state = state
    dt_step = dt/step
    for _ in range(step):
        # State: [x, y, psi, v]
        d_x = (next_state[2]*dt_step+0.5*accel*dt_step**2)*np.cos(next_state[3])
        d_y = (next_state[2]*dt_step+0.5*accel*dt_step**2)*np.sin(next_state[3])
        d_v = accel*dt_step
        d_psi = (next_state[2]*dt_step+0.5*accel*dt_step**2)*np.tan(delta)/params['L']
        next_state = next_state + np.array([d_x, d_y, d_v, d_psi])+sigma*np.random.normal(size=(4))*dt_step
        next_state[2] = max(0, next_state[2])
    return next_state

def load_track(track_name):
  x = []
  y = []
  width_r = []
  width_l = []
  filename = 'racetrack/'+track_name+'.csv'
  with open(filename, newline='') as f:
      spamreader = csv.reader(f, delimiter=',')
      for i, row in enumerate(spamreader):
          if i>0:
              x.append(float(row[0]))
              y.append(float(row[1]))
              width_r.append(float(row[2]))
              width_l.append(float(row[3]))

  x = np.array(x)/25.0
  y = np.array(y)/25.0

  x_range = max(x) - min(x)
  y_range = max(y) - min(y)
  ratio = x_range/y_range
  if ratio < 1:
    center_line = np.array([y,x])
    ratio = 1/ratio
  else:
    center_line = np.array([x,y])
  track = Track(center_line = center_line, width_left = 0.4, width_right = 0.4)

  
  return track, ratio

# initialize the ilqr
track, ratio = load_track(track_name)
solver = iLQR(track, params)

itr_max = 500
state_hist = np.zeros((4,itr_max+1))
control_hist = np.zeros((2,itr_max))
plan_hist = np.zeros((4, params['N'], itr_max))

pos0, psi0 = track.interp([0])
x_cur =np.array([pos0[0], pos0[1], 0, psi0[0]])
init_control = np.zeros((2,params['N']))
 
t_total = 0

# solve
for i in range(itr_max):
    
    states, controls, t_process, status, theta \
            = solver.solve(x_cur, controls = init_control) 
    
    plan_hist[:,:, i] = states
    
    state_hist[:,i] = states[:,0]
    control_hist[:,i] = controls[:,0]
    init_control[:,:-1] = controls[:,1:]
    
    x_cur = simulate_kin(x_cur, controls[:,0])
    t_total += t_process
   
    
    if theta[1]<theta[0]:
        break

state_hist = state_hist[:, :i+2]
control_hist = control_hist[:,:i+1]
plan_hist = plan_hist[:,:, :i+1]
print("Complete a lap on ", track_name, " in ", (i+1)*dt, 'secs')

plot_zoom = 1 

# plot parameter
fig_x_max = 20*plot_zoom
fig_y_max = 10*plot_zoom
cut_off_ratio = fig_x_max/fig_y_max
if ratio>=cut_off_ratio: #
  fig_x = fig_x_max
  fig_y = fig_x_max/ratio
else:
  fig_y = fig_y_max
  fig_x = fig_y_max*ratio

In [None]:
#@title { run: "auto" }

#@markdown ## Scale the plot to fit your screen
plot_zoom = 0.6 #@param {type:"slider", min:0.1, max:2, step:0.1}

# plot parameter
fig_x_max = 30*plot_zoom
fig_y_max = 15*plot_zoom
cut_off_ratio = fig_x_max/fig_y_max
if ratio>=cut_off_ratio: #
  fig_x = fig_x_max
  fig_y = fig_x_max/ratio
else:
  fig_y = fig_y_max
  fig_x = fig_y_max*ratio

In [None]:
#@markdown # Visualize simulation (This takes a while to generate)
sim = plt.figure(figsize=(fig_x, fig_y))
N = plan_hist.shape[-1]
track.plot_track()
plan_plot = plt.plot([], [], linewidth= 4)[0]
traj_plot = plt.scatter([], [], s = 80, c=[], cmap=cm.jet, vmin=0, vmax=params['v_max'], edgecolor='none', marker='o')
cbar = plt.colorbar(traj_plot)
cbar.set_label(r"velocity [$m/s$]", size=20)
plt.axis('equal')
plt.close()

def drawframe(n):
    traj_plot.set_offsets(state_hist[:2,:n+1].T)
    traj_plot.set_array(state_hist[2,:n+1])
    plan_plot.set_data(plan_hist[0,:, n], plan_hist[1,:, n])
    
    return (traj_plot,plan_plot)


# blit=True re-draws only the parts that have changed.
anim = animation.FuncAnimation(sim, drawframe, frames=N, interval=dt*1000, blit=True)
HTML(anim.to_html5_video())

In [None]:
#@markdown  ## Plot raceline
plt.figure(figsize=(fig_x, fig_y))
plt.plot(state_hist[0,:], state_hist[1,:], 'b-', linewidth= 5, alpha =0.6, label = 'Trajectory')
track.plot_track(centerline = True)
plt.legend(prop={'size': 20})
#plt.title("Trajectory", size = 20)
plt.axis('equal')
plt.axis('off')
plt.show()





In [None]:
#@markdown  ## Visualize velocity along the track
plt.figure(figsize=(fig_x, fig_y))
track.plot_track()
sc = plt.scatter(state_hist[0, :-1], state_hist[1,:-1], s = 80, 
                c=state_hist[2,:-1], cmap=cm.jet, 
                vmin=params['v_min'], vmax=params['v_max'], edgecolor='none', marker='o')
cbar = plt.colorbar(sc)
cbar.set_label(r"Velocity [$m/s$]", size=20)
plt.axis('equal')
plt.show()

In [None]:
#@markdown  ## Visualize longitudinal acceleration along the track
plt.figure(figsize=(fig_x, fig_y))
track.plot_track()
sc = plt.scatter(state_hist[0, :-1], state_hist[1,:-1], s = 80, 
                c=control_hist[0,:], cmap=cm.jet, 
                vmin=params['a_min'], vmax=params['a_max'], edgecolor='none', marker='o')
cbar = plt.colorbar(sc)
cbar.set_label(r"Longitudinal Accel [$m/s^2$]", size=20)
plt.axis('equal')
plt.show()

In [None]:
#@markdown  ## Visualize lateral acceleration along the track
plt.figure(figsize=(fig_x, fig_y))
track.plot_track()
alat = state_hist[2,:-1]**2*np.tan(control_hist[1,:])/0.257
sc = plt.scatter(state_hist[0, :-1], state_hist[1,:-1], s = 80, 
                c=alat, cmap=cm.jet, 
                vmin = 0, vmax=params['alat_max'], edgecolor='none', marker='o')
cbar = plt.colorbar(sc)
cbar.set_label(r"Lateral Accel [$m/s^2$]", size=20)
plt.axis('equal')
plt.show()