<a href="https://colab.research.google.com/github/ThierryBeaulieu/BasicCMakeArchitecture/blob/master/TP4/TP4/INF8225_TP4_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TP4, INF8225 2025, Projet


## Imports

In [1]:
from IPython.display import clear_output

%pip install torch torchvision --index-url https://download.pytorch.org/whl/cu121
%pip install numpy
%pip install swig
%pip install box2d
%pip install pygame
%pip install gymnasium
%pip install "gymnasium[box2d]"
%pip install matplotlib
%pip install wandb

clear_output()

In [2]:
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from IPython.display import HTML
import matplotlib.animation as animation
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
import random
import os
import wandb
from IPython.display import clear_output
import math
from collections import namedtuple, deque

### Initialisation

In [3]:
print(torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(torch.version.cuda)

if torch.cuda.is_available():
	print(torch.cuda.get_device_name(0))

True
12.1
NVIDIA L4


## Data Declaration

In [19]:
# Inspired by : https://github.com/pangyyen/carRacing-DeepRL/blob/main/ppo/ppo.ipynb

env = gym.make("CarRacing-v3", render_mode="rgb_array", domain_randomize=False, continuous=False)
print("Observation space: ", env.observation_space) # (low, high, shape, dtype)
print("Action space: ", env.action_space)

SEED = 42

observation, info = env.reset(seed=SEED)
def show_animation():
	show_animation_frames(env.render())

def show_animation_frames(frames):
	fig = plt.figure(figsize=(7, 5))
	plt.axis('off')
	im = plt.imshow(frames[0])

	def animate(i):
		im.set_data(frames[i])
		return im,

	anim = animation.FuncAnimation(fig, animate, frames=len(frames), repeat=False)
	plt.close(fig)
	display(HTML(anim.to_jshtml()))

def show_current_frame(env, data):
	frame = env.render()
	fig, _ = plt.subplots()
	r = fig.canvas.get_renderer()
	plt.imshow(frame)
	plt.axis('off')
	texts = []
	size_used = 0
	for i, key in enumerate(data):
		text = plt.text(0, 0, f'{key}: {data[key]}', fontsize=12, color='black', backgroundcolor='white', ha="center")
		size_used += text.get_window_extent(renderer=r).width
		texts.append(text)
	split = (700 - size_used) / (len(data) + 1)
	next_position = split
	for t in texts:
		t.set_position((next_position, 0))
		next_position = next_position + t.get_window_extent(renderer=r).width + split
	clear_output(wait=True)
	plt.show()

def skip_zooming(env):
	no_action = 0
	if type(env.action_space) != gym.spaces.Discrete:
		no_action = np.zeros((env.action_space.shape[0]))

	for i in range(50):
		observation, _, terminated, truncated, info = env.step(no_action)

		if terminated or truncated:
			observation, info = env.reset()
			break
	return observation, info

Observation space:  Box(0, 255, (96, 96, 3), uint8)
Action space:  Discrete(5)


### Helper Function

In [20]:
transform = T.Compose([
	T.ToPILImage(),
	T.Grayscale(num_output_channels=1),
	T.Resize((84, 84)),
	T.ToTensor(),
	T.Normalize((0.5,), (0.5,))
])


## Implementation

### DQN

#### Algorithms

DQN is at its heart Q-Learning using Deep Neural Networks to predict the behavior of its environment and to predict which action is the best.

Our goal, when implementing DQN is to maximize the rewards of our policy $\pi^{*}$ described as followed, where $Q^{*}$ is defined as the optimal action-value function.

$$
\pi^{*}(s) = \underset{a}{\arg\max} \; Q^{*}(s,a)
$$



The definition of $Q^{*}$ follows the Bellman Optimality Equation:

$$
Q^{*}(s,a) = \mathbb{E} \left[ r + \gamma \underset{a'}{\max} Q^{*}(s', a') \; | \; s, a \right]
$$

The equation means that the value of an action is dictated by the current reward + the best reward we can get from the best next action. The $\gamma$ symbol is used only so that we can diminishes the importance of futur action on the long run.

Our goal is to maximize the rewards we will have on the long term, which can be defined as:

$$
G_t = r_t + \gamma r_{t+1} + \gamma^{2} r_{t+2} + \gamma^{3} r_{t+3} + ...  
$$

Based on Bellman's Optimality Equation, we are able to use the following update equation:

$$
Q(s,a) \leftarrow Q(s,a) + \alpha \left[r + \gamma \underset{a'}{\max}Q(s', a') - Q(s,a) \right]
$$


#### Implementation

In [26]:
# Implementation based on : https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward', 'done'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def append(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

class DQN(nn.Module):
  def __init__(self, n_actions):
    """
    Q-Network made of a Deep neural network
    """
    super(DQN, self).__init__()
    # TODO: Ajust the depth of the model so that we don't need to use 128 each time,
    # and evaluate the impact of changing those values
    self.net = nn.Sequential(
      # Adjusted for RGB input (96x96x3) without resizing
      nn.Conv2d(1, 32, kernel_size=8, stride=4),    # Output: 32x23x23
      nn.ReLU(),
      nn.Conv2d(32, 64, kernel_size=4, stride=2),   # Output: 64x10x10
      nn.ReLU(),
      nn.Conv2d(64, 64, kernel_size=3, stride=1),    # Output: 64x8x8
      nn.ReLU(),
      nn.Flatten(),
      nn.Linear(64 * 7 * 7, 512),
      nn.ReLU(),
      nn.Linear(512, n_actions)
    )

  def forward(self, x):
    return self.net(x)

class DQNAgent():
  def __init__(self, env):
    """
    Agent made of DQNs used for learning how to use the sim racer.
    """
    # TODO : make it so that it's possible to verify which hyperparameter was the best
    # TODO : we will handle the images as greyscale because we don't need
    # to handle the colors, it doesn't add that much information more than greyscale
    # TODO : inclure les formules mathématiques

    # Hyperparameters
    self.GAMMA = 0.99
    self.LR = 3e-4
    self.BATCH_SIZE = 64
    self.MEMORY_SIZE = 10000
    self.EPSILON_START = 1.0
    self.EPSILON_END = 0.01
    self.EPSILON_DECAY = 1000
    self.TARGET_UPDATE_FREQ = 10

    # Possible actions
    self.discrete_actions = [
      0,	# Do nothing
      1,	# Steer left
      2,	# Steer right
      3,	# Accelerate
      4,	# Brake
    ]
    n_actions = len(self.discrete_actions)

    # Neural Network Declarations Here
    self.policy_net = DQN(n_actions).to(device)
    self.target_net = DQN(n_actions).to(device)
    self.target_net.load_state_dict(self.policy_net.state_dict())

    self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=self.LR, amsgrad=True)
    self.memory = ReplayMemory(10000)

    self.steps_done = 0
    self.episode_durations = []

  def select_action(self, state):
    """
    Epsilon-greedy strategy

    state: contains the rgb image of the car and the racing track (96, 96, 3)
    """
    sample = random.random()
    eps_threshold = self.EPSILON_END + (self.EPSILON_START - self.EPSILON_END) * \
      math.exp(-1. * self.steps_done / self.EPSILON_DECAY)
    self.steps_done += 1


    # print(f"{eps_threshold}")
    if sample > eps_threshold:
      with torch.no_grad():
        # Add batch dimension and convert to float
        state_tensor = state.unsqueeze(0).float().to(device)
        q_values = self.policy_net(state_tensor)
        action_idx = q_values.argmax().item()
    else:
      action_idx = random.randint(0, len(self.discrete_actions)-1)

    return action_idx

  def preprocess_state(self, state):
    # Convert numpy array to tensor and normalize to [0,1]
    # Input shape: (96, 96, 3) -> Output shape: (3, 96, 96)
    return torch.from_numpy(state).permute(2, 0, 1).float() / 255.0

  def optimize_model(self):
    """
    Apply the backward propagation to the policy_net and the target_net.
    """
    if len(self.memory) < self.BATCH_SIZE:
      return

    transitions = self.memory.sample(self.BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    states = torch.stack([s for s in batch.state]).to(device)
    actions = torch.tensor(batch.action, dtype=torch.long, device=device)
    rewards = torch.tensor(batch.reward, dtype=torch.float32, device=device).unsqueeze(1)
    dones = torch.tensor(batch.done, dtype=torch.bool, device=device).unsqueeze(1)
    next_states = torch.stack([s for s in batch.next_state]).to(device)

    # Remove flattening steps to keep spatial structure
    current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
    next_q = self.target_net(next_states).max(1)[0].unsqueeze(1)
    expected_q = rewards + (self.GAMMA * next_q * ~dones)

    # loss = nn.MSELoss()(current_q, expected_q)
    loss = nn.SmoothL1Loss()(current_q, expected_q)

    self.optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 10)
    self.optimizer.step()


#### Training Agent

In [None]:
# Training loop
agent = DQNAgent(env)

# Real progress starts at 100
n_episode = 700
for episode in range(n_episode):
	state, _ = env.reset(seed=SEED)
	state, _ = skip_zooming(env)
	state = transform(state)
	total_reward = 0
	rewards = []
	done = False
	t = 0

	while not done:
		action_idx = agent.select_action(state)
		next_state, reward, done, truncated, _ = env.step(action_idx)
		done = done or truncated

		reward = np.clip(reward, -1, 1)
		next_state = transform(next_state)

		agent.memory.append(state, action_idx, next_state, reward, done)
		rewards.append(reward)
		state = next_state
		total_reward += reward

		# if t % 20 == 0:
		# 	show_current_frame(env, {"Episode": episode, "Timestep": t})

		agent.optimize_model()

		t += 1

	if episode % agent.TARGET_UPDATE_FREQ == 0:
		agent.target_net.load_state_dict(agent.policy_net.state_dict())

	print(f"Episode {episode+1}, Total Reward: {total_reward}")


Episode 1, Total Reward: -88.39999999999918
Episode 2, Total Reward: -90.59999999999911
Episode 3, Total Reward: -27.099999999999874
Episode 4, Total Reward: -77.40000000000015
Episode 5, Total Reward: -92.79999999999902


#### Car Racing Animation

In [None]:
def run_agent_and_collect_frames(agent, env, seed=42):
  state, _ = env.reset(seed=seed)
  done = False
  frames = []

  while not done:
    frame = env.render()
    frames.append(frame)

    preprocessed_state = agent.preprocess_state(state)
    action = agent.select_action(preprocessed_state)
    next_state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    state = next_state

    return frames

# Function to display frames as an animation using matplotlib
def show_animation_frames(frames):
  fig = plt.figure(figsize=(7, 5))
  plt.axis('off')
  im = plt.imshow(frames[0])

  def animate(i):
    im.set_data(frames[i])
    return im,

  anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50, repeat=False)
  plt.close(fig)
  display(HTML(anim.to_jshtml()))

# Run the episode with the trained agent
frames = run_agent_and_collect_frames(agent, env)

# Show the animation
show_animation_frames(frames)
