# Please Install these Dependencies

In [1]:
!pip install tensorflow==2.5.0
!pip install gym
!pip install keras
!pip install keras-rl2



#  Build Environment

In [13]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

In [14]:
random.seed(22)

### The Vehicle Class contains all the information for the DQN Agent (step, reset, rewards, etc.)

In [15]:
class VehEnv(Env):
    def __init__(self,visualize,traffic_lights,mass,frontal_area,air_density,drag_cof,Crr):
        self.traffic_lights = traffic_lights
        self.mass = mass
        self.frontal_area = frontal_area
        self.air_density = air_density
        self.drag_cof = drag_cof
        self.Crr = Crr
   
        
        # number of actions (accelerations)- Discrete is discrete set of values
        self.action_space = Discrete(11)                         
        # upper and lower bounds for speed- Box is continuous set of values- normalize?
        self.observation_space = Box(low=np.array([0.0, 0.0, 0.0,0]), high = np.array([40.0/40.0, 1000/1000,40/40,1])) 
        
        # parameters for normilization
        self.speed_max = 40
        self.dist_max = 1000
        self.time_max = 40
        
        
        # start parameters
        self.power = 0
        self.energy = 0
        self.traffic_lights.reset()
        next_pos = self.traffic_lights.get_info()[0][0]
        self.prev_light_pos = self.traffic_lights.get_info()[0][0]
        next_phase = self.traffic_lights.get_info()[0][3]
        if next_phase == 0:
             next_time = self.traffic_lights.get_info()[0][2]
        else:
            next_time = self.traffic_lights.get_info()[0][1]
        self.x = 0 
        self.state = [self.normalize(self.speed_max,random.randint(0,35)), self.normalize(self.dist_max,next_pos),self.normalize(self.time_max,next_time), next_phase]

        # set simulation length
        self.sim_length = 1000
        
        self.vis = visualize
        if self.vis:
            global speeds 
            global accel
            global light_vis_info
            global total_energy
            speeds = []
            accels = []
            position = []
            light_vis_info = []
            total_energy = []
            
        else:
            pass

        
        
        
    def step(self,action):
        reward = 0
        
        # map discrete values to discrete acceleration values  between -3 and 2 (y=0.5x-2)
        accel = 0.5*action-3
        # get previous state
        prev_speed = (self.state[0]*self.speed_max/2)+self.speed_max/2
        prev_x = self.x
        # get new speed and position and SPaT data
        speed = prev_speed+ accel
        self.x = prev_x + prev_speed+(speed-prev_speed)/2
        
        # collect data for ran red function
        prev_phase = self.state[3] # previous phase 

        # change traffic lights
        self.traffic_lights.step()
        
        # calculate energy
        [energy,power] = self.power_calcs(accel,speed)
        self.energy += energy
        self.power = power
        
        # drop simulation time every time step
        self.sim_length -= 1
        
        # check next light info
        y = self.check_next_light(self.x)
        next_light_pos = self.traffic_lights.get_info()[y][0]
        next_pos = next_light_pos-self.x
        next_phase = self.traffic_lights.get_info()[y][3]
        if next_phase == 0:
             next_time = self.traffic_lights.get_info()[y][2]
        else:
            next_time = self.traffic_lights.get_info()[y][1]
        
        # check if light was ran
        if self.prev_light_pos !=  next_light_pos:
            reward += self.ran_red(prev_phase)
    
        # only do this if visualization is necessary 
        if self.vis:
            speeds.append(speed)
            accels.append(accel)
            positions.append(self.x)
            total_energy.append(self.energy)
            info = self.traffic_lights.get_info()
            lights_at_time_step = []
            for i in range(0,len(info)):   
                lights_at_time_step.append([info[i][0],info[i][3]])
            light_vis_info.append(lights_at_time_step)
        else:
            pass     
        
        # reward function
#         error = (speed-25)**2
#         reward = reward + -2/(1+2.7**(-0.05*error))+2
        reward = reward + 1/(2.7**(0.5*accel**2))
        
    
        # check if simulation is done
        last_light = self.traffic_lights.get_info()[len(self.traffic_lights.get_info())-1][0]
        if self.sim_length <= 0 or speed < 0 or self.x >= last_light or reward < 0:
            done = True
        else:
            done = False
            
    
        # return step information
        self.prev_light_pos = next_light_pos
        self.state = [self.normalize(self.speed_max,speed), self.normalize(self.dist_max,next_pos),self.normalize(self.time_max,next_time), next_phase]
        info = {}
        return self.state, reward, done, info
     
        
        
        
    def ran_red(self,prev_phase):
        # the light could turn green or red at an interval we don't check... this will be fixed later
        # the light will only check at the last phase before vehicle passes through
        if prev_phase == 1:
            # did not ran red
            return 10
        elif prev_phase == 0:
            # ran red
            return -10
            
            
            
            
    def power_calcs(self,accel,speed):
        drag = 0.5*self.drag_cof*self.frontal_area*self.air_density*speed**2
        rolling_res = mass*9.81*Crr ###Fix
        force = self.mass*accel + drag + rolling_res
        power = force*speed/1000 # convert to kW
        if power < 0:
            power = 0
        
        energy = self.energy+self.power+(power-self.power)/2
        energy = energy*0.000277778 # convert from kJ to kWh
        return energy, power
        
        
        
    
    def check_next_light(self,x):
        c = self.traffic_lights.get_info()
        if x < c[0][0]:
            y = 0
        if x > c[len(c)-1][0]:
            y = len(c)-1
        else:
            for i in range(0,len(c)-1):
                if x >= c[i][0] and x <= c[i+1][0]:
                    y = i + 1
        return y
        
        
        
        
    def normalize(self,max,x):
        normed = (x-max/2)/(max/2)
        return normed
        
        
        
        
    def render(self):
        pass
    
    
    
    
    def reset(self):
        
#         reset initial conditions
        self.power = 0
        self.energy = 0

        self.traffic_lights.reset()
        next_pos = self.traffic_lights.get_info()[0][0]
        next_phase = self.traffic_lights.get_info()[0][3]
        if next_phase == 0:
             next_time = self.traffic_lights.get_info()[0][2]
        else:
            next_time = self.traffic_lights.get_info()[0][1]
        self.x = 0
        self.state = [self.normalize(self.speed_max,random.randint(0,35)), self.normalize(self.dist_max,next_pos),self.normalize(self.time_max,next_time), next_phase]
        
        # reset time
        self.sim_length = 1000
        if self.vis:
            speeds = []
            accels = []
            positions = []
            light_vis_info = []
            total_energy = []
        else:
            pass
        
        return self.state
        

### This Traffic Light Class creates a single Traffic Light with Signal Phasing and Timing Data 

In [16]:
class TrafficLight():
    def __init__(self,position,green_time,red_time):
        # green is 1, red is 0
        self.initial_green_time = green_time
        self.initial_red_time = red_time
        self.initial_position = position
        self.position = position
        self.green_time = green_time
        self.red_time = red_time
        self.phase =random.randint(0,1)
    
    def reset(self):
        self.position = self.initial_position
        self.green_time = self.initial_green_time
        self.red_time = self.initial_red_time
        self.phase = random.randint(0,1)

    def step(self):
        if self.phase == 1:
            self.green_time -= 1
        elif self.phase == 0:
            self.red_time -= 1

        if self.green_time <= 0:
            self.phase = 0
            self.green_time = self.initial_green_time
        elif self.red_time <= 0:
            self.phase = 1
            self.red_time = self.initial_red_time
        return None
    
    def get_info(self): 
        return [self.position, self.green_time, self.red_time,self.phase]

### This Corridor Class builds an entire Street of Lights using the previous Traffic Light Class

In [17]:
class Corridor():
    def __init__(self,*args):
        self.y = [x for x in args]
        
    def step(self):
        for x in self.y:
            x.step()
            
    def reset(self):
        for x in self.y:
            x.reset()
        
    def get_info(self):
        return [x.get_info() for x in self.y]

# Vehicle Parameters

In [18]:
## user input
# mass = input('Vehicle Mass in kg: ')
# frontal_area = input('Frontal area of vehicle in m^2: ')
# air_density =  input('Density of air in kg/m^3: ')
# drag_cof = input('Drag Coefficient: ')

# pre-defined
mass = 2050 
frontal_area = 2
air_density = 1.2
drag_cof = .38
Crr = .01


# Test that Traffic Light and Corridor Classes are functioning properly

In [19]:
### # Test light class
# TL = TrafficLight(20,30,25)    # position, green_time, red_time
# for i in range(1,920):
#     TL.step()
# print(TL.get_info())
# TL1 = TrafficLight(20,3,25) 
# TL2 = TrafficLight(20,30,25) 
# TL3 = TrafficLight(20,40,25) 

In [20]:
# # Test corridor class
# c = Corridor(TL1,TL2,TL3)
# print(c.get_info())
# for i in range(1,20):
#     c.step()
# print(c.get_info())

# Build Traffic Light Corridor

In [21]:
# build traffic light corridor
TL1 = TrafficLight(1000,25,40) 
TL2 = TrafficLight(2200,35,30) 
TL3 = TrafficLight(2600,40,30) 
TL4 = TrafficLight(3000,35,40)
TL5 = TrafficLight(5000,35,40)
TL6 = TrafficLight(6500,35,40)
TL7 = TrafficLight(7000,35,40)

TL8 = TrafficLight(7900,25,40) 
TL9 = TrafficLight(8200,35,30) 
TL10 = TrafficLight(9000,40,30) 
TL11= TrafficLight(9900,35,40)
TL12= TrafficLight(10200,35,40)
TL13= TrafficLight(11000,35,40)
TL14= TrafficLight(14100,35,40)
TL15= TrafficLight(16000,35,40)
TL16= TrafficLight(17200,35,40)

corridor = Corridor(TL1,TL2,TL3,TL4,TL5,TL6,TL7,TL8,TL9,TL10,TL11,TL12,TL13,TL14,TL15,TL16)
# build environment
env = VehEnv(False, corridor,mass,frontal_area,air_density,drag_cof,Crr)
corridor.get_info()



[[1000, 25, 40, 1],
 [2200, 35, 30, 1],
 [2600, 40, 30, 1],
 [3000, 35, 40, 0],
 [5000, 35, 40, 0],
 [6500, 35, 40, 0],
 [7000, 35, 40, 0],
 [7900, 25, 40, 1],
 [8200, 35, 30, 1],
 [9000, 40, 30, 1],
 [9900, 35, 40, 0],
 [10200, 35, 40, 1],
 [11000, 35, 40, 1],
 [14100, 35, 40, 0],
 [16000, 35, 40, 1],
 [17200, 35, 40, 1]]

## Verify environment is returning appropriate results

In [22]:
env.observation_space.sample()

array([0.5258327 , 0.5750535 , 0.486482  , 0.57344747], dtype=float32)

In [23]:
## Testing car matches with matlab and other Python model
# state = env.reset()
# done = False
# score = 0
# for i in range(1,21):
#     action = env.step(10)
    
    

In [24]:
episodes = 1
for episode in range(1,episodes+1):
    state = env.reset()
    done = False
    score = 0
    
    while not done:
        env.render()
        action = env.action_space.sample()  
        n_state, reward, done, info = env.step(action)
        score+= reward
        print(n_state)
    print('Episode:{} Score:{}'.format(episode,score))
    


[0.65, 0.934, 0.2, 1]
[0.75, 0.866, 0.15, 1]
[0.775, 0.7955, 0.1, 1]
[0.7, 0.726, 0.05, 1]
[0.675, 0.6585, 0.0, 1]
[0.7, 0.591, -0.05, 1]
[0.65, 0.524, -0.1, 1]
[0.625, 0.4585, -0.15, 1]
[0.575, 0.3945, -0.2, 1]
[0.525, 0.3325, -0.25, 1]
[0.525, 0.2715, -0.3, 1]
[0.55, 0.21, -0.35, 1]
[0.425, 0.1505, -0.4, 1]
[0.4, 0.094, -0.45, 1]
[0.3, 0.04, -0.5, 1]
[0.375, -0.0135, -0.55, 1]
[0.375, -0.0685, -0.6, 1]
[0.45, -0.125, -0.65, 1]
[0.45, -0.183, -0.7, 1]
[0.425, -0.2405, -0.75, 1]
[0.3, -0.295, -0.8, 1]
[0.325, -0.3475, -0.85, 1]
[0.375, -0.4015, -0.9, 1]
[0.325, -0.4555, -0.95, 1]
[0.325, -0.5085, 1.0, 0]
[0.4, -0.563, 0.95, 0]
[0.45, -0.62, 0.9, 0]
[0.4, -0.677, 0.85, 0]
[0.4, -0.733, 0.8, 0]
[0.3, -0.787, 0.75, 0]
[0.25, -0.838, 0.7, 0]
[0.275, -0.8885, 0.65, 0]
[0.35, -0.941, 0.6, 0]
[0.275, -0.9935, 0.55, 0]
[0.15, 1.358, 0.5, 1]
Episode:1 Score:11.326721383321917


# Build DQN Model

In [2]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow import keras

In [3]:
states = env.observation_space.shape
actions = env.action_space.n

NameError: name 'env' is not defined

In [None]:
def build_model(states, actions):
    model = Sequential()
    model.add(Flatten(input_shape=(1,states[0])))
    model.add(Dense(128, activation='relu', input_shape = (1,states[0])))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(actions, activation ='linear'))
    return model

In [None]:
try:
    del model
except:
    pass

In [None]:
model = build_model(states, actions)

In [None]:
model.summary()

# Build Agent 

In [None]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

### Load a Saved Model (you can use 'model-acceleration_based' from Github or your own trained model)

In [4]:
model = keras.models.load_model('model-acceleration_based')

### Build and compile agent

In [21]:
def build_agent(model, acitons):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=100000, window_length =1)
    dqn = DQNAgent(model=model, memory=memory, policy= policy, nb_actions=actions, nb_steps_warmup=10, target_model_update = 1e-2)
    return dqn

In [57]:
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics = ['mae'])



### Create a New Model (do not run if simply using the loaded saved model)

In [58]:
history = dqn.fit(env, nb_steps =1000000, visualize=False, verbose =1)

Training for 1000000 steps ...
Interval 1 (0 steps performed)
    1/10000 [..............................] - ETA: 8:40 - reward: 10.8832



110 episodes - episode_reward: 51.293 [-9.989, 338.662] - loss: 9.549 - mae: 37.454 - mean_q: 43.568

Interval 2 (10000 steps performed)
59 episodes - episode_reward: 108.590 [-9.863, 509.140] - loss: 23.654 - mae: 82.264 - mean_q: 93.743

Interval 3 (20000 steps performed)
21 episodes - episode_reward: 337.529 [-9.863, 757.477] - loss: 27.500 - mae: 106.177 - mean_q: 120.325

Interval 4 (30000 steps performed)
19 episodes - episode_reward: 391.986 [-9.117, 754.296] - loss: 25.463 - mae: 125.024 - mean_q: 141.351

Interval 5 (40000 steps performed)
13 episodes - episode_reward: 548.149 [-9.391, 746.021] - loss: 27.575 - mae: 138.839 - mean_q: 156.525

Interval 6 (50000 steps performed)
15 episodes - episode_reward: 484.351 [-9.117, 793.770] - loss: 27.177 - mae: 143.673 - mean_q: 161.835

Interval 7 (60000 steps performed)
17 episodes - episode_reward: 430.823 [-9.863, 790.303] - loss: 24.757 - mae: 142.932 - mean_q: 160.664

Interval 8 (70000 steps performed)
14 episodes - episode_rew

18 episodes - episode_reward: 370.067 [-9.391, 747.156] - loss: 15.291 - mae: 121.223 - mean_q: 135.255

Interval 40 (390000 steps performed)
18 episodes - episode_reward: 391.295 [-9.391, 769.175] - loss: 14.766 - mae: 120.566 - mean_q: 134.554

Interval 41 (400000 steps performed)
18 episodes - episode_reward: 401.651 [-9.955, 745.074] - loss: 13.973 - mae: 119.985 - mean_q: 133.888

Interval 42 (410000 steps performed)
20 episodes - episode_reward: 377.622 [-9.955, 751.240] - loss: 14.124 - mae: 118.537 - mean_q: 132.284

Interval 43 (420000 steps performed)
21 episodes - episode_reward: 349.770 [-9.989, 734.457] - loss: 13.893 - mae: 118.685 - mean_q: 132.491

Interval 44 (430000 steps performed)
22 episodes - episode_reward: 307.313 [-9.391, 731.612] - loss: 13.965 - mae: 118.125 - mean_q: 131.849

Interval 45 (440000 steps performed)
21 episodes - episode_reward: 355.733 [-9.673, 732.417] - loss: 13.356 - mae: 118.926 - mean_q: 132.610

Interval 46 (450000 steps performed)
18 epi

17 episodes - episode_reward: 391.846 [-9.989, 726.269] - loss: 17.310 - mae: 125.718 - mean_q: 140.260

Interval 78 (770000 steps performed)
18 episodes - episode_reward: 416.836 [-9.117, 732.238] - loss: 17.780 - mae: 126.309 - mean_q: 140.992

Interval 79 (780000 steps performed)
22 episodes - episode_reward: 322.952 [-9.117, 729.073] - loss: 17.005 - mae: 125.956 - mean_q: 140.544

Interval 80 (790000 steps performed)
15 episodes - episode_reward: 483.665 [-9.955, 741.438] - loss: 16.976 - mae: 124.438 - mean_q: 138.767

Interval 81 (800000 steps performed)
18 episodes - episode_reward: 406.287 [-9.117, 724.808] - loss: 16.165 - mae: 122.501 - mean_q: 136.799

Interval 82 (810000 steps performed)
17 episodes - episode_reward: 418.859 [-9.989, 747.349] - loss: 15.932 - mae: 120.639 - mean_q: 134.677

Interval 83 (820000 steps performed)
24 episodes - episode_reward: 296.818 [-9.117, 762.796] - loss: 14.716 - mae: 120.812 - mean_q: 135.007

Interval 84 (830000 steps performed)
18 epi

### Save Model to current directory

In [11]:
model.save('model-acceleration_based_fuel_efficient_keras.h5')

### Show training stats and neural network weights/biases

In [59]:
import matplotlib.pyplot as plt

In [60]:
# summarize history for accuracy
%matplotlib qt 
plt.plot(history.history['episode_reward'])
plt.ylabel('reward')
plt.xlabel('epoch')
plt.show()
# model.get_weights()

## Simulate 10 Episodes

In [61]:
scores = dqn.test(env, nb_episodes = 10, visualize=False)
print(np.mean(scores.history['episode_reward']))

Testing for 10 episodes ...
Episode 1: reward: -9.117, steps: 1
Episode 2: reward: 661.960, steps: 801
Episode 3: reward: 741.877, steps: 923
Episode 4: reward: -9.117, steps: 1
Episode 5: reward: 595.467, steps: 742
Episode 6: reward: 688.663, steps: 816
Episode 7: reward: 645.046, steps: 817
Episode 8: reward: 726.512, steps: 928
Episode 9: reward: -9.117, steps: 1
Episode 10: reward: 588.660, steps: 740
462.08354696411


# Visualize Data

### Build Env for Visualization

In [12]:
env_vis = VehEnv(True,corridor,mass,frontal_area,air_density,drag_cof,Crr)

NameError: name 'VehEnv' is not defined

### Simulate 1 Run with Visualizations

In [90]:
speeds = []
accels = []
positions = []
test1 = dqn.test(env_vis, nb_episodes = 1, visualize=False)

Testing for 1 episodes ...
Episode 1: reward: 715.814, steps: 853


In [91]:
y=np.array(list(range(len(speeds))))

In [92]:
plt.figure(1,figsize=(20, 30), dpi=80)

plt.subplot(4,1,1)
plt.plot(y,accels)
plt.xlabel("Time Step")
plt.ylabel("Acceleration")

plt.subplot(4,1,2)
plt.plot(y,speeds)
plt.xlabel("Time Step")
plt.ylabel("Velocity")

plt.subplot(4,1,3)
plt.plot(y,positions)
for i in range(0,len(corridor.get_info())):
    plt.axhline(corridor.get_info()[i][0],color='purple')
for i in range(0,len(positions),10):
    for j in range(0,len(light_vis_info[i])):
        if light_vis_info[i][j][1] == 1:
            plt.scatter(i,light_vis_info[i][j][0],color="green" )
        if light_vis_info[i][j][1] == 0:
            plt.scatter(i,light_vis_info[i][j][0],color="red" )   
plt.xlabel("Time Step")
plt.ylabel("Position")

plt.subplot(4,1,4)
plt.plot(y,total_energy)
plt.xlabel("Time Step")
plt.ylabel("Energy")

plt.show

<function matplotlib.pyplot.show(*, block=None)>

In [47]:
plt.plot(y,positions)
for i in range(0,len(corridor.get_info())):
    plt.axhline(corridor.get_info()[i][0],color='purple')
for i in range(0,len(positions),10):
    for j in range(0,len(light_vis_info[i])):
        if light_vis_info[i][j][1] == 1:
            plt.scatter(i,light_vis_info[i][j][0],color="green" )
        if light_vis_info[i][j][1] == 0:
            plt.scatter(i,light_vis_info[i][j][0],color="red" )   
plt.xlabel("Time Step")
plt.ylabel("Position")

Text(0, 0.5, 'Position')