<a href="https://colab.research.google.com/github/LimaCondas/eco-driving-speed-rl/blob/main/My_Gym01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**0.Install Dependencies**

---



In [None]:
!pip install gym
!pip install numpy==1.20
!pip install tensorflow==2.5.0
!pip install keras
!pip install keras-rl2
!pip install torch


# **1. Test a Simple Eco-Driving Scenario**


---




> **import packages**

In [108]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

> **This block is EcoDrivingEnv**

In [217]:
import gym
from gym import spaces
import numpy as np
import random

class EcoDrivingEnv(gym.Env):
    def __init__(self):
        self.safe_speed_limit = 20 # m/s, equal 72kph
        self.time_step = 0.1 # seconds
        self.mass = 1000 # kg
        self.gravity = 9.81 # m/s^2
        self.MAX_SPEED = 40 # m/s, equal 144kph
        self.friction_coefficient = 0.8
        self.position = 0
        self.speed = 0
        self.work = 0
        self.steps = 0
        self.total_reward = 0
        self.distance = 1000 # m
        self.max_steps = 5000 # 5000 steps, 500s
        self.max_acceleration = 1 # m/s^2

        # Action Space deaccelerate, no action, accelerate
        self.action_space = spaces.Discrete(3) 

        # Observation space [position, speed]
        self.observation_space = spaces.Box(low=np.array([0, 0]), high=np.array([np.inf, np.inf]), dtype=np.float32)

        print("Successfully Initialize EcoDrivingEnv.......")

    def reset(self):
        self.position = 0
        self.speed = 0
        self.work = 0
        self.steps = 0
        self.total_reward = 0
        return np.array([self.position, self.speed], dtype=np.float32)

    def step(self, action):
        # Apply action
        if action == 0:
            acceleration = -self.max_acceleration # -1
        elif action == 1:
            acceleration = 0
        elif action == 2:
            acceleration = self.max_acceleration # +1
        else:
            acceleration = 0


        # Calculate new speed and position
        new_speed = self.speed + acceleration * self.time_step # vt = v0 + a * dt
        new_speed = np.clip(new_speed, 0, self.MAX_SPEED) # speed no bigger than max speed
        # xt = x0 + vt + 0.5 * a * t^2
        new_position = self.position + self.speed * self.time_step + 0.5 * acceleration * self.time_step ** 2

        # Calculate work of friction force
        friction_force = self.friction_coefficient * self.mass * acceleration # F = 0.8 * m * a
        self.work += friction_force * (new_position - self.position) # W = F * dx

        # Calculate deviation from safe speed limit
        speed_deviation = abs(new_speed - self.safe_speed_limit)

        # Calculate reward with normalization
        reward = - 0.0015 * self.work / (self.work + speed_deviation) - 0.9985 * speed_deviation / (self.work + speed_deviation)

        # Update state
        self.position = new_position
        self.speed = new_speed
        self.steps += 1

        # Check if episode is done, distance or steps satisfied
        done = False
        if self.position >= self.distance or self.steps >= self.max_steps:
          done = True

        # Update info dictionary with additional information
        self.state = np.array([self.position, self.speed], dtype=np.float32)
        info = {'friction_work': self.work, 'speed_deviation': speed_deviation}

        # Return step information
        return self.state, reward, done, info


    def render(self):
        print(f"Position: {self.position:.2f}m, Speed: {self.speed:.2f}m/s, Work: {self.work:.2f}J, Steps: {self.steps}")
        pass

> **Examplify an Eco-Drving Env**

In [112]:
env = EcoDrivingEnv()

Successfully Initialize EcoDrivingEnv.......


> **Test Environment with Random Action**

In [113]:
# env.action_space.sample()
# env.observation_space.sample()

In [114]:
episode = 10

flag = 0
position = []
speed = []

for episode in range(1, episode+1):
  state = env.reset()
  done = False
  score = 0
  
  while not done:
    action = random.choice([0, 1, 2])
    n_state, reward, done, info = env.step(action)
    score += reward

    if not flag:
      position.append(n_state[0])
      speed.append(n_state[1])
  flag = 1

  print('===== Episode:{} Score:{} ====='.format(episode, int(score)))


===== Episode:1 Score:-206 =====
===== Episode:2 Score:-234 =====
===== Episode:3 Score:-203 =====
===== Episode:4 Score:-216 =====
===== Episode:5 Score:-214 =====
===== Episode:6 Score:-175 =====
===== Episode:7 Score:-183 =====
===== Episode:8 Score:-229 =====
===== Episode:9 Score:-173 =====
===== Episode:10 Score:-195 =====


> **SAMPLE to plot the profile of position and speed in 1 episode**

In [115]:
# import matplotlib.pyplot as plt
# import numpy as np

# position_data = position
# speed_data = speed

# # Create a figure with 2 subplots for position and speed
# fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 3))
# fig.subplots_adjust(hspace=0.4)

# # Plot position data
# ax1.plot(position_data, '-')
# ax1.set_title('Profile of Position')
# ax1.set_xlabel('Time (step)')
# ax1.set_ylabel('Position')

# # Plot speed data
# ax2.plot(speed_data, '-')
# ax2.set_title('Profile of Speed')
# ax2.set_xlabel('Time (step)')
# ax2.set_ylabel('Speed')

# # Display the plot
# plt.show()


# **2. Create a Deep Learning Model with Keras**


---



In [116]:
import numpy as np
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [117]:
states = env.observation_space.shape
actions = env.action_space.n

In [118]:
states

(2,)

In [119]:
def build_model(states, actions):
  model = Sequential()
  model.add(Flatten(input_shape=(1,) + states))  # Add a Flatten layer
  model.add(Dense(24, activation='relu', input_shape=states))
  model.add(Dense(24, activation='relu'))
  model.add(Dense(actions, activation='linear'))
  return model

In [120]:
del model

In [121]:
model = build_model(states, actions)
model.summary()

Model: "sequential_7"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten_5 (Flatten)          (None, 2)                 0         
_________________________________________________________________
dense_21 (Dense)             (None, 24)                72        
_________________________________________________________________
dense_22 (Dense)             (None, 24)                600       
_________________________________________________________________
dense_23 (Dense)             (None, 3)                 75        
Total params: 747
Trainable params: 747
Non-trainable params: 0
_________________________________________________________________


# **3. Build Agent with keras-RL**


---





In [95]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [96]:
def build_agent(model, actions):
  policy = BoltzmannQPolicy()
  memory = SequentialMemory(limit=50000, window_length=1)
  dqn = DQNAgent(model=model, memory=memory, policy=policy, 
                 nb_actions=actions, nb_steps_warmup=10, target_model_update=1e-2)
  return dqn

In [None]:
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=50000, visualize=False, verbose=1)

# **4. Detail Code of DQN Model**

---





> **This code block is DQN model https://github.com/indigoLovee/DQN/blob/main/DQN.py**



In [123]:
import torch as T
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np



---
> **Buffer.py**


In [127]:
class ReplayBuffer:
    def __init__(self, state_dim, action_dim, max_size, batch_size):
        self.mem_size = max_size
        self.batch_size = batch_size
        self.mem_cnt = 0

        self.state_memory = np.zeros((self.mem_size, state_dim))
        self.action_memory = np.zeros((self.mem_size, ))
        self.reward_memory = np.zeros((self.mem_size, ))
        self.next_state_memory = np.zeros((self.mem_size, state_dim))
        self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)

    def store_transition(self, state, action, reward, state_, done):
        mem_idx = self.mem_cnt % self.mem_size

        self.state_memory[mem_idx] = state
        self.action_memory[mem_idx] = action
        self.reward_memory[mem_idx] = reward
        self.next_state_memory[mem_idx] = state_
        self.terminal_memory[mem_idx] = done

        self.mem_cnt += 1

    def sample_buffer(self):
        mem_len = min(self.mem_size, self.mem_cnt)

        batch = np.random.choice(mem_len, self.batch_size, replace=True)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.next_state_memory[batch]
        terminals = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminals

    def ready(self):
        return self.mem_cnt > self.batch_size



---
> **DQN.py**


In [128]:
# from buffer import ReplayBuffer

device = T.device("cuda:0" if T.cuda.is_available() else "cpu")


class DeepQNetwork(nn.Module):
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
        super(DeepQNetwork, self).__init__()

        self.fc1 = nn.Linear(state_dim, fc1_dim)
        self.fc2 = nn.Linear(fc1_dim, fc2_dim)
        self.q = nn.Linear(fc2_dim, action_dim)

        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        self.to(device)

    def forward(self, state):
        x = T.relu(self.fc1(state))
        x = T.relu(self.fc2(x))

        q = self.q(x)

        return q

    def save_checkpoint(self, checkpoint_file):
        T.save(self.state_dict(), checkpoint_file, _use_new_zipfile_serialization=False)

    def load_checkpoint(self, checkpoint_file):
        self.load_state_dict(T.load(checkpoint_file))


class DQN:
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim, ckpt_dir,
                 gamma=0.99, tau=0.005, epsilon=1.0, eps_end=0.01, eps_dec=5e-4,
                 max_size=1000000, batch_size=256):
        self.tau = tau
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_min = eps_end
        self.eps_dec = eps_dec
        self.batch_size = batch_size
        self.action_space = [i for i in range(action_dim)]
        self.checkpoint_dir = ckpt_dir

        self.q_eval = DeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                   fc1_dim=fc1_dim, fc2_dim=fc2_dim)
        self.q_target = DeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                     fc1_dim=fc1_dim, fc2_dim=fc2_dim)

        self.memory = ReplayBuffer(state_dim=state_dim, action_dim=action_dim,
                                   max_size=max_size, batch_size=batch_size)

        self.update_network_parameters(tau=1.0)

    def update_network_parameters(self, tau=None):
        if tau is None:
            tau = self.tau

        for q_target_params, q_eval_params in zip(self.q_target.parameters(), self.q_eval.parameters()):
            q_target_params.data.copy_(tau * q_eval_params + (1 - tau) * q_target_params)

    def remember(self, state, action, reward, state_, done):
        self.memory.store_transition(state, action, reward, state_, done)

    def choose_action(self, observation, isTrain=True):
        state = T.tensor([observation], dtype=T.float).to(device)
        actions = self.q_eval.forward(state)
        action = T.argmax(actions).item()

        if (np.random.random() < self.epsilon) and isTrain:
            action = np.random.choice(self.action_space)

        return action

    def learn(self):
        if not self.memory.ready():
            return

        states, actions, rewards, next_states, terminals = self.memory.sample_buffer()
        batch_idx = np.arange(self.batch_size)

        states_tensor = T.tensor(states, dtype=T.float).to(device)
        rewards_tensor = T.tensor(rewards, dtype=T.float).to(device)
        next_states_tensor = T.tensor(next_states, dtype=T.float).to(device)
        terminals_tensor = T.tensor(terminals).to(device)

        with T.no_grad():
            q_ = self.q_target.forward(next_states_tensor)
            q_[terminals_tensor] = 0.0
            target = rewards_tensor + self.gamma * T.max(q_, dim=-1)[0]
        q = self.q_eval.forward(states_tensor)[batch_idx, actions]

        loss = F.mse_loss(q, target.detach())
        self.q_eval.optimizer.zero_grad()
        loss.backward()
        self.q_eval.optimizer.step()

        self.update_network_parameters()
        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min

    def save_models(self, episode):
        self.q_eval.save_checkpoint(self.checkpoint_dir + 'Q_eval/DQN_q_eval_{}.pth'.format(episode))
        print('Saving Q_eval network successfully!')
        self.q_target.save_checkpoint(self.checkpoint_dir + 'Q_target/DQN_Q_target_{}.pth'.format(episode))
        print('Saving Q_target network successfully!')

    def load_models(self, episode):
        self.q_eval.load_checkpoint(self.checkpoint_dir + 'Q_eval/DQN_q_eval_{}.pth'.format(episode))
        print('Loading Q_eval network successfully!')
        self.q_target.load_checkpoint(self.checkpoint_dir + 'Q_target/DQN_Q_target_{}.pth'.format(episode))
        print('Loading Q_target network successfully!')



---
> **utils.py**


In [202]:
import os
import matplotlib.pyplot as plt


def plot_learning_curve(episodes, records, title, ylabel, figure_file):
    plt.figure()
    plt.plot(episodes, records, linestyle='-', color='r')
    plt.title(title)
    plt.xlabel('episode')
    plt.ylabel(ylabel)

    plt.show()
    plt.savefig(figure_file)


def create_directory(path: str, sub_dirs: list):
    for sub_dir in sub_dirs:
        if os.path.exists(path + sub_dir):
            print(path + sub_dir + ' is already exist!')
        else:
            os.makedirs(path + sub_dir, exist_ok=True)
            print(path + sub_dir + ' create successfully!')


def save_best_profile(actions, states, episode):
    best_action = np.argmax(actions)
    best_state = np.argmax(states)
    np.savez(f'best_profile_episode_{episode}.npz', best_action=best_action, best_state=best_state)


def visualize_profile(rewards, states):
  # create x-axis (episode) values
  x = np.arange(1, len(rewards) + 1)

  # create subplots with shared x-axis
  fig, (ax1, ax2, ax3) = plt.subplots(1, 3, sharex=False, figsize=(15, 4))

  # plot rewards
  ax1.plot(x, rewards, color='r', label='Rewards')
  ax1.set_ylabel('Rewards')

  # plot Position
  ax2.plot(x, states[:,0],  color='g', label='Position')
  ax2.set_ylabel('Position')
  ax2.legend()

  # plot Speed
  ax3.plot(x, states[:,1],  color='b', label='Speed')
  ax3.set_ylabel('Speed')
  ax3.legend()

  # add x-axis label
  plt.xlabel('Episode')

  # show the chart
  plt.show()

In [None]:
import argparse
# from DQN import DQN
# from utils import plot_learning_curve, create_directory

parser = argparse.ArgumentParser()
parser.add_argument('--max_episodes', type=int, default=500)
parser.add_argument('--ckpt_dir', type=str, default='./checkpoints/DQN/')
parser.add_argument('--reward_path', type=str, default='./output_images/avg_reward.png')
parser.add_argument('--epsilon_path', type=str, default='./output_images/epsilon.png')

args, unknown = parser.parse_known_args()


def main():
    env = EcoDrivingEnv()

    agent = DQN(alpha=0.0003, state_dim=env.observation_space.shape[0], action_dim=env.action_space.n,
                fc1_dim=256, fc2_dim=256, ckpt_dir=args.ckpt_dir, gamma=0.99, tau=0.005, epsilon=1.0,
                eps_end=0.05, eps_dec=5e-4, max_size=1000000, batch_size=256)
    
    create_directory(args.ckpt_dir, sub_dirs=['Q_eval', 'Q_target'])
    total_rewards, avg_rewards, eps_history = [], [], []

    for episode in range(args.max_episodes):
        total_reward = 0
        done = False
        observation = env.reset()

        # Record
        states = []
        rewards = []

        while not done:
            action = agent.choose_action(observation, isTrain=True)
            observation_, reward, done, info = env.step(action)
            agent.remember(observation, action, reward, observation_, done)
            agent.learn()
            total_reward += reward
            observation = observation_

            states.append(observation)
            rewards.append(reward)
            # print(info)
            

        total_rewards.append(total_reward)
        avg_reward = np.mean(total_rewards[-100:])
        avg_rewards.append(avg_reward)
        eps_history.append(agent.epsilon)
        
        print('EP: {}\t reward: {:.4}\t avg_reward: {:.4}\t epsilon: {:.4}'.
              format(episode + 1, total_reward, avg_reward, agent.epsilon))
        
        states = np.array(states)
        visualize_profile(rewards, states)
        


        if (episode + 1) % 100 == 0:
            agent.save_models(episode + 1)

    episodes = [i for i in range(args.max_episodes)]
    plot_learning_curve(episodes, avg_rewards, 'Reward', 'reward', args.reward_path)
    plot_learning_curve(episodes, eps_history, 'Epsilon', 'epsilon', args.epsilon_path)


if __name__ == '__main__':
    main()

> **Code using pygame to display a straight road with single vehicle on it.**

> **WARNING: Cannot run in Colab**

In [None]:
import pygame

# Initialize Pygame
pygame.init()

# Set up the display
screen_width = 800
screen_height = 600
screen = pygame.display.set_mode((screen_width, screen_height))
pygame.display.set_caption("Simple Car Game")

# Colors
black = (0, 0, 0)
white = (255, 255, 255)
gray = (128, 128, 128)
green = (0, 200, 0)

# Set up the road
road_width = 600
road_height = 100
road_x = (screen_width - road_width) / 2
road_y = screen_height - road_height - 200
road_rect = pygame.Rect(road_x, road_y, road_width, road_height)

# Set up the car
car_width = 50
car_height = 30
car_x = (screen_width - car_width) / 2 - 250
car_y = screen_height - car_height - 210
car_rect = pygame.Rect(car_x, car_y, car_width, car_height)


# Game loop
running = True
while running:
    # Handle events
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    # Clear the screen
    screen.fill(white)

    # Draw the road
    pygame.draw.rect(screen, gray, road_rect)
    pygame.draw.line(screen, white, (road_x, road_y + road_height/2), (road_x + road_width, road_y + road_height/2), 5)

    # Draw the car shade
    car_shade_surf = pygame.Surface((car_width, car_height))
    car_shade_surf.set_alpha(100)  # Set the alpha value for transparency
    car_shade_surf.fill((0, 0, 0))  # Fill the surface with black
    screen.blit(car_shade_surf, (car_x-3, car_y-3))  # Blit the surface onto the screen

    # Draw the car
    pygame.draw.rect(screen, green, car_rect)

    # Update the display
    pygame.display.update()

# Quit Pygame
pygame.quit()
