In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
from pathlib import Path
import os
import sys
import numpy as np
from geneticalgorithm import geneticalgorithm as ga
import matplotlib.pyplot as plt
%matplotlib inline
sys.path.append(str(Path(os.getcwd()).parent.parent.absolute()))
from CF_Env import CF_Env
env = CF_Env()

# Load data

# Extract states

In [None]:
training = np.load(str(Path(os.getcwd()).parent.absolute()) + "/train_cf_2.npy", allow_pickle=True)
training = training.item()['states']
training = np.array([np.array(x) for x in training])
print('Number of training samples:', len(training))

In [None]:
testing = np.load(str(Path(os.getcwd()).parent.absolute()) + '/test_cf_2.npy', allow_pickle=True)
testing = testing.item()['states']
testing = np.array([np.array(x) for x in testing])
print('Number of testing samples:', len(testing))

# IDM Hyper-paramters include
# aMax: maximum acceleration/deceleration of the following vehicle
# aCom: comfortable deceleration
# V0: the desired speed
# S0: minimum spacing at standstill
# T0: desired time headway

# Define the objective function

In [None]:
def cal_rmspe(y_true, y_pred):
    # Compute Root Mean Square Percentage Error between two arrays
    loss = np.sqrt(np.sum(np.square(y_true - y_pred))/np.sum(np.square(y_true)))
    return loss

In [None]:
def fitness_func(solution): # solution is 6D vector: [aMax, aCom, V0, S0, T0]
    # store simulated and real data
    # spacing
    SimSpaceData = []
    RealSpaceData = []
    # following speed
    SimSpeedData = []
    RealSpeedData = []

    
    for i in range(len(training)):
        # Initialize the environment.
        init_data = training[i]
        state = env.reset(init_data)
        
        # store simulated spacing and following speed
        SimSpaceData.append(state[0])
        SimSpeedData.append(state[1])
        # store real spacing and following speed
        RealSpaceData.append(env.RealSpaceData[0])
        RealSpeedData.append(env.RealSpeedData[0])
    
        while True:
            # update the desired spacing
            S = solution[3] + env.CurrentState[1]*solution[4]+(env.CurrentState[1]*env.CurrentState[2])/(2*np.sqrt(solution[0]*solution[1]))
            # update the action
            action = solution[0]*(1-(env.CurrentState[1]/solution[2])**4-(S/env.CurrentState[0])**2)
            # update the state
            next_state, reward, done, _ = env.step(action)

            # next_states
            a = []
            for item in next_state:
                if isinstance(item, np.ndarray):
                    a.append(item[0])
                else:
                    a.append(item)
            next_state = np.array(a)
            state = next_state

            if done:
                break

            # store simulated spacing and following speed
            SimSpaceData.append(state[0])
            SimSpeedData.append(state[1])
            # store real spacing and following speed
            RealSpaceData.append(env.RealSpaceData[env.timeStep - 1])
            RealSpeedData.append(env.RealSpeedData[env.timeStep - 1])

    # spacing
    SimSpaceData = np.array(SimSpaceData)
    RealSpaceData = np.array(RealSpaceData)
    spacing_rmspe = cal_rmspe(y_true=RealSpaceData, y_pred=SimSpaceData)
    # following speed
    SimSpeedData = np.array(SimSpeedData)
    RealSpeedData = np.array(RealSpeedData)
    speed_rmspe = cal_rmspe(y_true=RealSpeedData, y_pred=SimSpeedData)
    
    return spacing_rmspe

# Training part

In [None]:
# # set the bounds pf hyper-parameters
# # [aMax, aCom, V0, S0, T0]
# varbound=np.array([[0.1, 3], 
#                    [0.1, 5], 
#                    [1, 30], 
#                    [0.1, 5], 
#                    [0.1, 3]])

# algorithm_param = {'max_num_iteration': 300,\
#                    'population_size': 300,\
#                    'mutation_probability': 0.1,\
#                    'elit_ratio': 0.01,\
#                    'crossover_probability': 0.5,\
#                    'parents_portion': 0.3,\
#                    'crossover_type': 'uniform',\
#                    'max_iteration_without_improv': 100}

# model=ga(function=fitness_func,
#          dimension=5,
#          variable_type='real',
#          variable_boundaries=varbound,
#          algorithm_parameters=algorithm_param)

# model.run()

# Testing part

In [None]:
# solution = model.best_variable
solution = [1.08200092, 0.37833643, 29.72641382, 4.98069566, 1.4551363]

In [None]:
# store simulated trajectories
idm_test = []

# store simulated and real data
# spacing
SimSpaceData = []
RealSpaceData = []
# following speed
SimSpeedData = []
RealSpeedData = []

for i in range(len(testing)):
    init_data = testing[i]
    state = env.reset(init_data)
    traj = [state]

    # store simulated spacing and following speed
    SimSpaceData.append(state[0])
    SimSpeedData.append(state[1])
    # store real spacing and following speed
    RealSpaceData.append(env.RealSpaceData[0])
    RealSpeedData.append(env.RealSpeedData[0])

    while True:
        # update the desired spacing
        S = solution[3] + env.CurrentState[1]*solution[4]+(env.CurrentState[1]*env.CurrentState[2])/(2*np.sqrt(solution[0]*solution[1]))
        # update the action
        action = solution[0]*(1-(env.CurrentState[1]/solution[2])**4-(S/env.CurrentState[0])**2)
        # update the state
        next_state, reward, done, _ = env.step(action)

        # next_states
        a = []
        for item in next_state:
            if isinstance(item, np.ndarray):
                a.append(item[0])
            else:
                a.append(item)
        next_state = np.array(a)
        state = next_state
        
        traj.append(state)

        if done:
            break

        # store simulated spacing and following speed
        SimSpaceData.append(state[0])
        SimSpeedData.append(state[1])
        # store real spacing and following speed
        RealSpaceData.append(env.RealSpaceData[env.timeStep - 1])
        RealSpeedData.append(env.RealSpeedData[env.timeStep - 1])

    traj = np.array(traj)
    idm_test.append(traj)

# spacing
SimSpaceData = np.array(SimSpaceData)
RealSpaceData = np.array(RealSpaceData)
spacing_rmspe = cal_rmspe(y_true=RealSpaceData, y_pred=SimSpaceData)
# following speed
SimSpeedData = np.array(SimSpeedData)
RealSpeedData = np.array(RealSpeedData)
speed_rmspe = cal_rmspe(y_true=RealSpeedData, y_pred=SimSpeedData)

print(
    'TEST\tSpacing RMSPE: {:.6f}\tSpeed RMSPE: {:.6f}'.format(
        spacing_rmspe, speed_rmspe))

np.save('test_IDM_traj.npy', idm_test)

# Visualize the simulated results

In [None]:
test_traj = np.load('test_IDM_traj.npy', allow_pickle=True) # spacing, relative speed and following vehicle speed

In [None]:
# spacing
plt.plot(test_traj[4][:, 0], label ='Sim Spacing')
plt.plot(testing[4][:, 0], label ='Real Spacing')
plt.legend()

In [None]:
# following speed
plt.plot(test_traj[4][:, 1], label ='Sim FV Speed')
plt.plot(testing[4][:, 1], label ='Real FV Speed')
plt.legend()