In [2]:
import numpy as np
import gym
import scipy.io as sio
from gym import spaces
from numpy import float32
import random
import pandas as pd

from math import sqrt
from sklearn.preprocessing import MinMaxScaler, StandardScaler

In [3]:
class CarFollowingEnv_test(gym.Env):

    #Environment that suits the gym package.
    #Reproduces a basic Car Following microscopic environment. 
    
    metadata = {'render.modes': ['human']}
    
    
    def __init__(self, TTC_threshold=4.001):
        
        super(CarFollowingEnv_test, self).__init__()
        
        self.timeWindow = 1 
        
        self.n_features = 3*self.timeWindow
        self.penalty = 100 # penalty for collisions
        self.TTC_threshold = TTC_threshold

        # Define action and observation space
        # They must be gym.spaces objects
        # The action space refers to the continuous acceleration taken.
        self.action_space = spaces.Box(low = -3.0, high = 3.0, shape = (1,), dtype = np.float32)
    
        
        # The observation will be the space, speed, and relative_speed (or leader speed):
        # [Space, Speed, Relative_Speed]

        self.observation_space = spaces.Box(
            low = np.array([0.0, 5.0, -2.5]), 
            high = np.array([90.0, 25.0, 2.5]),
            shape = (3,), dtype = np.float32
        )

    def reset(self):
        # load testing data
        train = sio.loadmat('trainSet.mat')['calibrationData']
        test = sio.loadmat('testSet.mat')['validationData']
        trainNum = train.shape[0]
        testNum = test.shape[0]

        car_fol_id = random.randint(0, testNum - 1)
        self.car_fol_id = car_fol_id
        data = test[car_fol_id, 0] #Use testing data

        time_span = data.shape[0]
        ctime = [i for i in range(time_span)]

        df = pd.DataFrame(data)
        df["time"] = ctime
        df.set_index("time", inplace = True)
        df.rename(columns = {0 : "Space", 1 : "Speed", 2 : "Relative Speed", 3 : "Leader Speed"}, inplace = True)
        
        data = df.values
        """
        Important: the observation must be a numpy array
        :return: (np.array) 
        """
        self.time_step = self.timeWindow # starting form 1 to n
        self.LVSpdData = data[:, 3]

        self.SimSpaceData = np.zeros(data[:, 0].shape)
        self.SimSpeedData = np.zeros(data[:,1].shape)
        self.SimPosData = np.zeros(data[:, 0].shape)
        self.LVPosData = np.zeros(data[:, 0].shape)
        
        self.SimTTCData = np.zeros(data[:,1].shape)
        
        self.SimSpaceData[0] = data[0,0] # initialize with initial spacing
        self.SimSpeedData[0] = data[0,1] # initialize with initial speed
        self.SimPosData[0] = np.float32(0)
        self.LVPosData[0] = data[0,0]
        
        self.TrueSpaceData = data[:,0]

        temp = data[:self.timeWindow, :3].reshape(1, self.timeWindow * 3)
        temp = temp[0, :] #Desembedding: [[]] --> [] 
        self.state = temp
        self.currentState = self.state[-3:] 
        
        self.TimeLen = data.shape[0]
        self.isCollision = 0
        self.isStall = 0
        self.lastAction = 0
        
        relSpd = self.currentState[2]
        space = self.currentState[0]
        self.TTC = - space / relSpd
        
        self.SimTTCData[0] = self.TTC
        
        self.state = np.array(self.state, dtype = np.float32)
        
        return self.state

    
    def step(self, action):
        
        if action.shape == (1,):
            action = action[0]
            
        # update state
        self.time_step += 1
        LVSpd = self.LVSpdData[self.time_step-1]
        svSpd = self.currentState[1] + action*0.1
        
        if svSpd <= 0:
            svSpd = 0.00001
            self.isStall = 1
        else:
            self.isStall = 0
    
        relSpd = LVSpd - svSpd
        space = self.currentState[0] + relSpd*0.1
        self.currentState = [space, svSpd, relSpd]
        
        
        #Is there a collision? 
        if space < 0:
            self.isCollision = 1
        
        #Store the space and speed history.
        self.SimSpaceData[self.time_step-1] = space
        self.SimSpeedData[self.time_step-1] = svSpd
        
        if self.time_step >= 2:
            self.SimPosData[self.time_step-1] = self.SimPosData[self.time_step-2] + self.currentState[1]*0.1
            
            self.LVPosData[self.time_step-1] = self.SimPosData[self.time_step-1] + self.currentState[0]
            
            
        # calculate the features for the reward.
        jerk = (action - self.lastAction) / 0.1
        hdw = space / svSpd
        self.TTC = -space / relSpd  # negative sign because of relative speed sign
        
        self.SimTTCData[self.time_step-1] = self.TTC
        

        fJerk = -(jerk ** 2)/3600   # the maximum range is change from -3 to 3 in 0.1 s, then the jerk = 60

        fAcc = - action**2/60

        self.lastAction = action

        if self.TTC >= 0 and self.TTC <= self.TTC_threshold:
            fTTC = np.log(self.TTC/self.TTC_threshold) 
        else:
            fTTC = 0

        mu = 0.422618  
        sigma = 0.43659
        if hdw <= 0:
            fHdw = -1
        else:
            fHdw = (np.exp(-(np.log(hdw) - mu) ** 2 / (2 * sigma ** 2)) / (hdw * sigma * np.sqrt(2 * np.pi)))
        
        #NRMSE(Spacing) reward feature. 
        def NRMSE(TrueSpace, SimSpace):
            
            if TrueSpace <= 0:
                TrueSpace = 0.00001
                
            if SimSpace <= 0:
                SimSpace = 0.00001
                
            MSE = (TrueSpace - SimSpace)**2
            RMSE = sqrt(MSE)
            NRMSE = RMSE/sqrt(TrueSpace**2)
    
            return NRMSE
    
        #RMSE(Spacing) reward feature. 
        def RMSE(TrueSpace, SimSpace):
            
            if TrueSpace <= 0:
                TrueSpace = 0.00001
                
            if SimSpace <= 0:
                SimSpace = 0.00001
                
            MSE = (TrueSpace - SimSpace)**2
            RMSE = sqrt(MSE)
    
            return RMSE
        
        #Am I sure it is the right time step ????
        fNRMSE = - NRMSE(self.TrueSpaceData[self.time_step], space)
        
        #RMSE(spacing)
        self.RMSE = RMSE(self.TrueSpaceData[self.time_step], space)

        # calculate the total reward
        reward = fJerk + fTTC + fHdw + fNRMSE - self.penalty * self.isCollision

        # record reward info
        rewardInfo = {"fNRMSE":fNRMSE, "fJerk":fJerk, "fTTC":fTTC, 
                      "fHdw":fHdw, "Jerk":jerk, "TTC": self.TTC, "Accel":action, "RMSE": self.RMSE}
        
        # judge the termination of the episode. 
        if self.time_step == (self.TimeLen-1) or self.isCollision == 1:
            done = True
        else:
            done = False
        
        self.state = np.array(self.currentState, dtype = np.float32)
        
        return self.state, float(reward), done, rewardInfo

    def close(self):
        pass