# Interaction-aware IMM-KF-MPC for trajectory uncertainty and maneuver uncertainty
## 2021.05.25, Jian Zhou, Start
## 2021.06.30，Jian Zhou, The Code Framework is bascially done
## 2021.07.28, Jian Zhou, The Interaction-Aware IMM-KF MPC is done
## 2021.10.10, Jian Zhou, The Maneuver uncertainty version with HDdata starts 

### 1. Import

In [1]:
import numpy as np
import math
import time
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import casadi
import scipy.linalg as sl
from numpy.linalg import matrix_power
from scipy.io import loadmat
from scipy.io import savemat
from Initialization_SV import Initialization_SV
from Initialization_EV import Initialization_EV
from IAIMM_KF import IAIMM_KF
from IAIMM_KF_MPC import IAIMM_KF_MPC
from CAM import CAM

import pdb

%matplotlib inline

### 2. Priority list calculation, and compute vertex of car 

In [2]:
class Priority_Sort( ):
    
    def __init__(self, infinity, L_Bound, N_Car, N, Ts):
        self.N_Car = N_Car
        self.N = N
        self.Ts = Ts
        self.L_Bound = L_Bound
        self.infinity = infinity
    
    def Define_lane_location(self, Lane_position):
        L_Bound = self.L_Bound
        
        if (L_Bound[0] <= Lane_position< L_Bound[1]):
            Lane_position = 1
        elif (L_Bound[1] <= Lane_position < L_Bound[2]):
            Lane_position = 2
        elif  (L_Bound[2] <= Lane_position):
            Lane_position = 3
        
        return Lane_position
    
    def Sort(self, Y_k):
        N_Car = self.N_Car
        N = self.N
        Ts = self.Ts
        infinity = self.infinity
        
        list_priority = np.zeros((2, N_Car))
        list_priority[0, :] = range(0, N_Car)
        x_initial_1 = list([infinity]*N_Car)     # for lane 1
        x_terminal_1 = list([infinity]*N_Car) # for lane 1
        x_initial_2 = list([infinity]*N_Car)     # for lane 2
        x_terminal_2 = list([infinity]*N_Car) # for lane 2
        x_initial_3 = list([infinity]*N_Car)     # for lane 3
        x_terminal_3 = list([infinity]*N_Car) # for lane 3
        
        for i in range(N_Car):
            Lane_location = self.Define_lane_location(Y_k[i][2])
            if Lane_location == 1: # in lane 1
                x_initial_1[i] = Y_k[i][0]
                x_terminal_1[i] = Y_k[i][0] + Y_k[i][1]*N*Ts
            elif Lane_location == 2: # in lane  2
                x_initial_2[i] = Y_k[i][0]
                x_terminal_2[i] = Y_k[i][0] + Y_k[i][1]*N*Ts
            elif Lane_location == 3: # in lane 3
                x_initial_3[i] = Y_k[i][0]
                x_terminal_3[i] = Y_k[i][0] + Y_k[i][1]*N*Ts

        for i in range(N_Car):
            x_initial_index_min_1 = np.argmin(x_initial_1) # index of leading vehicle in lane 1
            x_terminal_min_1 = x_terminal_1[x_initial_index_min_1] # value of terminal position of this car

            x_initial_index_min_2 = np.argmin(x_initial_2) # index of ;eading vehicle in lane 2
            x_terminal_min_2 = x_terminal_2[x_initial_index_min_2] # value of terminal position of this car
            
            x_initial_index_min_3 = np.argmin(x_initial_3) # index of leading vehicle in lane 3
            x_terminal_min_3 = x_terminal_3[x_initial_index_min_3] # value of terminal position of this car
            
            index_terminal = np.argmin([x_terminal_min_1, x_terminal_min_2, x_terminal_min_3])
            
            if index_terminal == 0:
                index = x_initial_index_min_1
                x_initial_1[index] = infinity
                x_terminal_1[index] = infinity
            elif index_terminal == 1:
                index = x_initial_index_min_2
                x_initial_2[index] = infinity
                x_terminal_2[index] = infinity
            elif index_terminal == 2:
                index = x_initial_index_min_3
                x_initial_3[index] = infinity
                x_terminal_3[index] = infinity
            
            list_priority[1, index] = i
        
        list_use = list_priority[1, :] 
        return list_use

### 3. Driving Model Parameters

In [3]:
Model_Parameters = loadmat(r'Model_Parameters.mat')
Model_Parameters = Model_Parameters['Model_Parameters']
Model_Parameters = Model_Parameters[0, 0]

m0 =   [Model_Parameters['m0']['Lon'][0][0][0], Model_Parameters['m0']['Lat'][0][0][0]] 
m1 =   [Model_Parameters['m1']['Lon'][0][0][0], Model_Parameters['m1']['Lat'][0][0][0]] 
m2 =   [Model_Parameters['m2']['Lon'][0][0][0], Model_Parameters['m2']['Lat'][0][0][0]] 
m3 =   [Model_Parameters['m3']['Lon'][0][0][0], Model_Parameters['m3']['Lat'][0][0][0]] 
m4 =   [Model_Parameters['m4']['Lon'][0][0][0], Model_Parameters['m4']['Lat'][0][0][0]] 
m5 =   [Model_Parameters['m5']['Lon'][0][0][0], Model_Parameters['m5']['Lat'][0][0][0]] 
m6 =   [Model_Parameters['m6']['Lon'][0][0][0], Model_Parameters['m6']['Lat'][0][0][0]] 
std_m0 = [Model_Parameters['m0']['K_set_lon'][0][0][0], Model_Parameters['m0']['std_y'][0][0][0]]
std_m1 = [Model_Parameters['m1']['K_set_lon'][0][0][0], Model_Parameters['m1']['K_set_lat'][0][0][0]]
std_m2 = [Model_Parameters['m2']['K_set_lon'][0][0][0], Model_Parameters['m2']['K_set_lat'][0][0][0]]
std_m3 = [Model_Parameters['m3']['K_set_lon'][0][0][0], Model_Parameters['m3']['std_y'][0][0][0]]
std_m4 = [Model_Parameters['m4']['K_set_lon'][0][0][0], Model_Parameters['m4']['K_set_lat'][0][0][0]]
std_m5 = [Model_Parameters['m5']['K_set_lon'][0][0][0], Model_Parameters['m5']['K_set_lat'][0][0][0]]
std_m6 = [Model_Parameters['m6']['K_set_lon'][0][0][0], Model_Parameters['m6']['std_y'][0][0][0]]

Models = [m0, m1, m2, m3, m4, m5, m6] # submodels
std_parameters = [std_m0, std_m1, std_m2, std_m3, std_m4, std_m5, std_m6] # parameters of standard deviation

### 4. Define Basic parameters

In [4]:
# basic parameters
Ts = 0.32         # time interval
N = 25            # prediction horizon
N_Lane = 3     # number of lanes
N_M = 7         # number of sub-models of SVs
N_M_EV = 3   # number of sub-models of EV
N_Car = 6      # number of cars involving EV and SVs
index_EV  = 5   # index of EV
L_Width = [3.75, 3.75, 3.75]             # width of each lane
L_Bound = [0, 3.75, 3.75*2, 3.75*3] # land boundaries
L_Center = [3.75/2, 3.75 + 3.75/2, 3.75*2 + 3.75/2] # lane center positions
l_veh = 4.3    # vehicle length 
w_veh = 1.8  # vehicle width
Th_long = 2      # long time headway
Th_short = 1.5  # short time headway
DSV = 6            # dimension of SVs
DEV = 8            # dimension of EV
Dev =  np.array([0.015, 1.8])                 #  threshold of phi and y
K_Lon_EV = np.array([0.1029, 0.3423]) # long. controller gains of EV
K_Lat_EV = np.array([0.0984, 0.4656, 0.5417])   # lateral controller gains of EV
SpeedLim = np.array([65/3.6, 90/3.6, 90/3.6])  # speed limit of each lane
Weight =  np.array([0.1, 0.3, 0.1, 0.5])               # wight for ax ay vx y
H = np.array([[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0]]) # selection matrix for observations from the state
l_f = 1.446   # vehicle parameter
l_r =1.477    # vehicle parameter
acc = -1.2    # acceleration of SV4
infinity = 100000 # represents infinity
epsilon = 0.2       # the safety parameter of MPC
Q = np.diag([1, 0.5, 0.25, 0.1, 0.1, 0]) # Q matrix in Kalman filter -- Tunable
R = np.diag([1, 1, 1])*1e-5                  # R matrix in Kalman filter -- Tunable
K_sampling = 30 # the sampling size for estimating the variance of  x and y

opts_SV = { # parameters of SV
    'Ts': Ts,      
    'N': N,   
    'N_Lane': N_Lane, 
    'N_M': N_M,         
    'N_Car': N_Car,     
    'L_Width': L_Width,     
    'w_veh': w_veh, 
    'l_veh': l_veh,  
    'L_Bound': L_Bound, 
    'L_Center': L_Center, 
    'DSV': DSV,  
    'infinity': infinity,
    'SpeedLim': SpeedLim, 
    'Q': Q, 
    'R': R, 
    'Weight': Weight,
    'H': H, 
    'Models': Models,
    'std_parameters': std_parameters,
    'K_sampling': K_sampling,
}
opts_EV = { # parameters of EV
    'index_EV': index_EV,
    'Ts': Ts,      
    'N': N,     
    'N_Lane': N_Lane, 
    'N_M': N_M, 
    'N_M_EV': N_M_EV, 
    'N_Car': N_Car,   
    'L_Center': L_Center, 
    'L_Bound': L_Bound, 
    'w_veh': w_veh, 
    'l_veh': l_veh,  
    'zeta_l': 0.5,
    'zeta_w': 0.5,
    'zeta_EV': 0.5,
    'Th_MPC': Th_short, 
    'Th_QP': Th_long, 
    'DSV': DSV,       
    'DEV': DEV,       
    'Dev': Dev, 
    'K_Lon_EV': K_Lon_EV,
    'K_Lat_EV': K_Lat_EV,
    'l_f': l_f,  
    'l_r': l_r,  
    'SpeedLim': SpeedLim, 
    'Weight': Weight,
    'H': H, 
    'infinity': infinity,
    'Models': Models,
    'Q_Initial': [0.5, 0.1, 0.5, 0.1, 0.05, 1, 0.055], # weight for MPC of initialization
    'epsilon': epsilon,
    'Q1': 0.5,     # longitudianl snap
    'Q2': 0.1,     # front tire angle acceleration
    'Q3': 0.5,     # longitudinal acceleration
    'Q4': 0.1,     # front tire wheel angle 
    'Q5': 0.05,   #  terminal lateral error
    'Q6': 1,        #  terminal longitudinal position error
    'Q7': 0.055, # slack variable
}
opts_CA = { # parameters of the constant acceleration vehicle -- SV4
    'Ts': Ts,     
    'DSV': DSV,      
    'N': N,     
    'N_M': N_M,     
    'L_Center': L_Center,     
    'Models': Models,
    'acc': acc, 
    'H': H, 
}

# Lateral position of lanes
L_No1 = L_Center[0]
L_No2 = L_Center[1]
L_No3 = L_Center[2]

### 4. Initialization

In [5]:
# global initial state
index_Bro = 3   # index of the breaking SV -- SV3
ispeed_EV = 105 # initial speed of EV
ixposition_EV = 0 # initial position of EV
x_0_0 = np.array([200, 60/3.6, 0, L_No1, 0, 0]) # initial state of SV0
x_0_1 = np.array([150, 60/3.6, 0, L_No1, 0, 0]) # initial state of SV1
x_0_2 = np.array([100, 108/3.6, 0, L_No2, 0, 0]) # initial state of SV2
x_0_3 = np.array([85, 95/3.6, 0, L_No3, 0, 0])     # initial state of SV3
x_0_4 =  np.array([35, 95/3.6, 0, L_No3, 0, 0])     # initial state of SV4
x_0_5 = np.array([0, ispeed_EV/3.6, 0, L_No1, 0, 0]) # initial state of EV
x_0_EV_glo = np.array([ixposition_EV, ispeed_EV/3.6, 0, L_No1, 0, 0]) # initial state of EV in global frame
x_0_EV_loc = np.array([ixposition_EV, L_No1, 0, ispeed_EV/3.6, 0, 0, 0, 0]) # initial state of EV in vehicle frame

# saving room
X_Hat = list( ) # the estimation of state of each mode
MU = list( )    # the probability
P = list( )        # the covariance matrix in IMM-KF
Y = list( )        # the measurement
X_State = list( )                  # the state of each car at every step
X_State_EV_LOC = list( )    # the state of EV in the local frame at every step
X_State_EV_GLO = list( )    # the  state of EV in the global frame at every step

X_State_0 = [x_0_0, x_0_1, x_0_2, x_0_3, x_0_4, x_0_5] # collect the initial states of all vehicles
X_State.append(X_State_0) 

# initialization of EV and SV
Initial_SV =Initialization_SV(Params = opts_SV)
Initial_EV = Initialization_EV(Params = opts_EV, state_0_glo = x_0_EV_glo, state_0_loc = x_0_EV_loc)
MU_0, M_0,  Y_0, Y_1, X_Hat_0, P_0, X_Pre_0, X_Po_All_0, X_Var_0, Y_Var_0, REF_Speed_0, REF_Lane_0, REF_Speed_All_0 = Initial_SV.Initialize_MU_M_P(X_State, index_EV)
mu_0, m_0,  x_hat_0, x_pre_0, Traj_0, U_0, state_1_loc, state_1_glo, y_0, y_1, GMM_SV_0, RefSpeed_EV_0, REF_EV_0  = Initial_EV.Initialization_MPC(x_0_EV_glo, x_0_EV_loc, X_State_0, index_EV, X_Pre_0)

# initial storage
MU_0[index_EV] = mu_0
Y_0[index_EV]    = y_0
Y_1[index_EV]    = y_1
X_Hat_0[index_EV]  = x_hat_0

MU.append(MU_0)
Y.append(Y_0)
Y.append(Y_1)
X_Hat.append(X_Hat_0)
P.append(P_0)

X_State_EV_LOC.append(x_0_EV_loc)   # step 0
X_State_EV_GLO.append(x_0_EV_glo)  # step 0
X_State_EV_LOC.append(state_1_loc)  # step 1
X_State_EV_GLO.append(state_1_glo) # step 1

# initialize priority
Sorting = Priority_Sort(infinity = infinity, L_Bound = L_Bound, N_Car = N_Car, N = N, Ts = Ts)


******************************************************************************
This program contains Ipopt, a library for large-scale nonlinear optimization.
 Ipopt is released as open source code under the Eclipse Public License (EPL).
         For more information visit https://github.com/coin-or/Ipopt
******************************************************************************



### 5. Simulation

In [6]:
K_N = 100 # the number of all simulation steps
kb   = 3     # the time point when the SV3 is starting breaking
IMM_KF =  IAIMM_KF(Params = opts_SV)
MPC      =  IAIMM_KF_MPC(Params = opts_EV)  
CA         = CAM(Params = opts_CA)

Run_Time = 20
Time_Record_KF = list( )
Time_Record_MPC = list( )
Time_Record_OCP = list( )

for run_time in range(Run_Time):
    for k in range(1, K_N):
        list_k = Sorting.Sort(Y[k])
        MU_k = [None]*N_Car 
        X_Hat_k = [None]*N_Car 
        X_State_k = [None]*N_Car 
        P_k = [None]*N_Car     
        Y_k_plus_1 = [None]*N_Car 
        Obst_k = [None]*N_Car 
        X_Po_All_k = [None]*N_Car 
        X_Var_k = [None]*N_Car 
        Y_Var_k = [None]*N_Car 
       # print('The step is', k)
        for i in range(N_Car):
                car_index = np.argwhere(list_k == np.max(list_k)) # fetch the car with highest priority
                car_index = car_index[0][0]                                      # index of highest-prioritized car
                list_k[car_index] = -1                                                 # when the car is fecthed out, fill the position by -1
                if car_index != index_EV:
                    if (car_index == index_Bro ) and (k >= kb):          #  initiate SV3 which statrs to break
                        start_CAM = time.perf_counter( )
                        Ref_speed, Ref_lane, REF_Speed_All, mu_k, m_k, x_hat_k, p_k, x_state_k, x_pre_k, y_k_plus_1, x_po_all_k, x_var_k, y_var_k = CA.Final_Return(k, X_Hat, Y, car_index)
                        end_CAM = time.perf_counter( )      
                        Time_Record_KF.append(end_CAM - start_CAM)
                    else:
                        start_KF = time.perf_counter( )
                        Ref_speed, Ref_lane, mu_k, m_k, x_hat_k, p_k, x_state_k, x_pre_k, y_k_plus_1, REF_Speed_All, x_po_all_k, x_var_k, y_var_k = IMM_KF.Final_Return(k, MU, X_Hat, P, Y, Obst_k, car_index)
                        end_KF = time.perf_counter( )
                        Time_Record_KF.append(end_KF - start_KF)
                    X_State_k[car_index] = x_state_k
                    P_k[car_index] = p_k
                    X_Po_All_k[car_index] = x_po_all_k    
                    Obst_k[car_index] = x_pre_k
                    X_Var_k[car_index] = x_var_k
                    Y_Var_k[car_index] = y_var_k
                else:
                    start_MPC = time.perf_counter( )
                    Ref_speed, Ref_lane, mu_k, m_k, x_hat_k, x_pre_k, Traj_k, u_k, state_k_plus_1_loc, state_k_plus_1_glo, y_k_plus_1, GMM_SV_k, REF_Speed_All, ocp_time = MPC.Final_Return_Com_Time(k, X_State_EV_LOC[k], X_State_EV_GLO[k], Obst_k, Y[k][car_index], X_Po_All_k, MU_k, X_Var_k, Y_Var_k)
                    end_MPC = time.perf_counter( )
                    Time_Record_MPC.append(end_MPC - start_MPC)
                    Time_Record_OCP.append(ocp_time)
                    X_State_k[car_index] = X_State_EV_GLO[k]
                    X_State_EV_LOC.append(state_k_plus_1_loc)
                    X_State_EV_GLO.append(state_k_plus_1_glo)
                    Obst_k[car_index] = x_pre_k

                MU_k[car_index] = mu_k
                X_Hat_k[car_index] = x_hat_k
                Y_k_plus_1[car_index] = y_k_plus_1

        MU.append(MU_k)
        X_Hat.append(X_Hat_k)
        X_State.append(X_State_k)
        P.append(P_k)
        Y.append(Y_k_plus_1)

In [7]:
print(len(Time_Record_KF))
print(len(Time_Record_MPC))
print(len(Time_Record_OCP))
print((np.sum(Time_Record_KF) + np.sum(Time_Record_MPC))/((K_N - 1)*Run_Time))
print((np.sum(Time_Record_OCP))/((K_N - 1)*Run_Time))

9900
1980
1980
0.17418668381066682
0.031514375347467996


In [8]:
np.save('SIM1_Time_Record_KF_02.npy', np.array(Time_Record_KF))
np.save('SIM1_Time_Record_MPC_02.npy', np.array(Time_Record_MPC))
np.save('SIM1_Time_Record_OCP_02.npy', np.array(Time_Record_OCP))