# TP4, INF8225 2025, Projet


## Imports

In [3]:
from IPython.display import clear_output

%pip install torch torchvision --index-url https://download.pytorch.org/whl/cu121
%pip install numpy
%pip install swig
%pip install box2d
%pip install pygame
%pip install gymnasium
%pip install "gymnasium[box2d]"
%pip install matplotlib

clear_output()

In [19]:
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from IPython.display import HTML
import matplotlib.animation as animation
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
import torchvision.transforms as T
import random
import os.path
import pygame

import math
from collections import namedtuple, deque

### Initialisation

In [20]:
print(torch.cuda.is_available())
print(torch.version.cuda)
print(torch.cuda.get_device_name(0))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

True
12.1
Tesla T4


## Data Declaration

In [21]:
# Inspired by : https://github.com/pangyyen/carRacing-DeepRL/blob/main/ppo/ppo.ipynb

env = gym.make("CarRacing-v3", render_mode="rgb_array", domain_randomize=False)
print("Observation space: ", env.observation_space) # (low, high, shape, dtype)
print("Action space: ", env.action_space)

SEED = 42

go_up = np.array([0.0, 1.0, 0.0])
no_action = np.array([0.0, 0.0, 0.0])

observation, info = env.reset(seed=SEED)

def show_animation():
	show_animation_frames(env.render())

def show_animation_frames(frames):
	fig = plt.figure(figsize=(7, 5))
	plt.axis('off')
	im = plt.imshow(frames[0])

	def animate(i):
		im.set_data(frames[i])
		return im,

	anim = animation.FuncAnimation(fig, animate, frames=len(frames), repeat=False)
	plt.close(fig)
	display(HTML(anim.to_jshtml()))

def show_current_frame(env, epoch, timestep):
	frame = env.render()
	plt.imshow(frame)
	plt.axis('off')
	plt.text(170, 0, f'Epoch: {epoch}', fontsize=12, color='black', backgroundcolor='white')
	plt.text(320, 0, f'Timestep: {timestep}', fontsize=12, color='black', backgroundcolor='white')
	clear_output(wait=True)
	plt.show()

def skip_zooming(env):
	for i in range(50):
		observation, _, terminated, truncated, info = env.step(no_action)

		if terminated or truncated:
			observation, info = env.reset()
			break
	return observation, info

Observation space:  Box(0, 255, (96, 96, 3), uint8)
Action space:  Box([-1.  0.  0.], 1.0, (3,), float32)


## Helper functions

In [22]:
transform = T.Compose([
	T.ToPILImage(),
	T.Grayscale(num_output_channels=1),
	T.Resize((84, 84)),
	T.ToTensor(),
	T.Normalize((0.5,), (0.5,))
])


## DQN


### DQN Algorithms

DQN is at its heart Q-Learning using Deep Neural Networks to predict the behavior of its environment and to predict which action is the best.

Our goal, when implementing DQN is to maximize the rewards of our policy $\pi^{*}$ described as followed, where $Q^{*}$ is defined as the optimal action-value function.

$$
\pi^{*}(s) = \underset{a}{\arg\max} \; Q^{*}(s,a)
$$



The definition of $Q^{*}$ follows the Bellman Optimality Equation:

$$
Q^{*}(s,a) = \mathbb{E} \left[ r + \gamma \underset{a'}{\max} Q^{*}(s', a') \; | \; s, a \right]
$$

The equation means that the value of an action is dictated by the current reward + the best reward we can get from the best next action. The $\gamma$ symbol is used only so that we can diminishes the importance of futur action on the long run.

Our goal is to maximize the rewards we will have on the long term, which can be defined as:

$$
G_t = r_t + \gamma r_{t+1} + \gamma^{2} r_{t+2} + \gamma^{3} r_{t+3} + ...  
$$

Based on Bellman's Optimality Equation, we are able to use the following update equation:

$$
Q(s,a) \leftarrow Q(s,a) + \alpha \left[r + \gamma \underset{a'}{\max}Q(s', a') - Q(s,a) \right]
$$


### DQN Implementation

In [40]:
# Implementation based on : https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward', 'done'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def append(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

class DQN(nn.Module):
  def __init__(self, n_observations, n_actions):
    """
    Q-Network made of a Deep neural network
    """
    super(DQN, self).__init__()
    # TODO: Ajust the depth of the model so that we don't need to use 128 each time,
    # and evaluate the impact of changing those values
    self.layers = nn.Sequential(
        nn.Linear(n_observations, 128),
        nn.ReLU(),
        nn.Linear(128, 128),
        nn.ReLU(),
        nn.Linear(128, n_actions)
    )

  def forward(self, x):
    return self.layers(x)

class DQNAgent():
  def __init__(self, env):
    """
    Agent made of DQNs used for learning how to use the sim racer.
    """
    # TODO : make it so that it's possible to verify which hyperparameter was the best
    # TODO : we will handle the images as greyscale because we don't need
    # to handle the colors, it doesn't add that much information more than greyscale
    # TODO : inclure les formules mathématiques

    # Hyperparameters
    self.GAMMA = 0.99
    self.LR = 1e-3
    self.BATCH_SIZE = 64
    self.MEMORY_SIZE = 10000
    self.EPSILON_START = 1.0
    self.EPSILON_END = 0.01
    self.EPSILON_DECAY = 500
    self.TARGET_UPDATE_FREQ = 10

    # TODO : Fix this
    n_actions = env.action_space.shape[0]
    frame, info = env.reset(seed=SEED)
    n_observations = 96 * 96

    # Neural Network Declarations Here
    self.policy_net = DQN(n_observations, n_actions).to(device)
    self.target_net = DQN(n_observations, n_actions).to(device)
    self.target_net.load_state_dict(self.policy_net.state_dict())

    self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=self.LR, amsgrad=True)
    self.memory = ReplayMemory(10000)

    self.steps_done = 0
    self.episode_durations = []

  def select_action(self, state):
    """
    Epsilon-greedy strategy
    """
    sample = random.random()
    eps_threshold = self.EPSILON_END + (self.EPSILON_START - self.EPSILON_END) * \
      math.exp(-1. * self.steps_done / self.EPSILON_DECAY)

    self.steps_done += 1
    if sample > eps_threshold:
      with torch.no_grad():
        self.policy_net(state).argmax().item()
    else:
      return env.action_space.sample()


  def optimize_model(self):
    """
    Apply the backward propagation to the policy_net and the target_net.
    """
    if len(self.memory) < self.BATCH_SIZE:
      return

    batch = random.sample(memory, self.BATCH_SIZE)
    states, actions, rewards, next_states, done = zip(*batch)

    # TODO Adapt this so that it works with the actions of this specific challenge
    states = torch.FloatTensor(states)
    actions = torch.LongTensor(actions).unsqueeze(1)
    rewards = torch.FloatTensor(rewards).unsqueeze(1)
    next_states = torch.FloatTensor(next_states)
    dones = torch.BoolTensor(dones).unsqueeze(1)

    current_q = self.policy_net(states).gather(1, actions)
    next_q = self.target_net(next_states).max(1)[0].detach().unsqueeze(1)
    expected_q = rewards + self.GAMMA * next_q * (~dones)

    loss = nn.MSELoss()(current_q, expected_q)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()


### Training DQN Agent

In [39]:
agent = DQNAgent(env)

n_episode = 500
for episode in range(n_episode):
  state, _ = env.reset()
  total_reward = 0
  done = False
  while not done:
    print(state)
    action = agent.select_action(state)
    next_state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    agent.memory.append(state, action, reward, next_state, done)
    state = next_state
    total_reward += reward

    agent.optimize_model()

  if episode % agent.TARGET_UPDATE_FREQ == 0:
    agent.target_net.load_state_dict(agent.policy_net.state_dict())

  print(f"Episode {episode}, Total Reward: {total_reward}")


[[[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 ...

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]]
[[[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 ...

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]]
[[[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...


TypeError: linear(): argument 'input' (position 1) must be Tensor, not numpy.ndarray

## PPO

### Testing


In [19]:
# TODO - Remove this cell when finish implementation

observation, info = env.reset(seed=SEED)
for i in range(100):
	#action = env.action_space.sample()
	observation, reward, terminated, truncated, info = env.step(go_up)
	#print(f"Step: {i}, Reward: {reward}, Terminated: {terminated}, Truncated: {truncated}, Info: {info}")

	if terminated or truncated:
		observation, info = env.reset()
env.close()

#show_animation()

### PPO Implementation

In [21]:
class Actor(nn.Module):
	def __init__(self, action_dim, actor_std, hidden_sizes=[1024, 512], is_log_std = False):
		"""
    The Actor decides which action to take given a state.
    It outputs a mean and standard deviation to define a probability distribution
    over actions, enabling exploration.
    """
		super().__init__()

		self.is_log_std = is_log_std
		self.actor_std = actor_std

	 	# CNN encoder to extract features from image input (1 channel grayscale)
		self.cnn = nn.Sequential(
			nn.Conv2d(1, 32, 8, stride=4), nn.ReLU(),
			nn.Conv2d(32, 64, 4, stride=2), nn.ReLU(),
			nn.Conv2d(64, 64, 3, stride=1), nn.ReLU(),
			nn.Flatten()
		)

		# Compute flattened CNN output size dynamically
		with torch.no_grad():
			dummy = torch.zeros(1, 1, 84, 84)
			n_flatten = self.cnn(dummy).shape[1]

		# Fully connected layers to produce the mean of the action distribution
		layers = []
		in_features = n_flatten
		for hidden in hidden_sizes:
			layers.append(nn.Linear(in_features, hidden))
			layers.append(nn.ReLU())
			in_features = hidden
		layers.append(nn.Linear(in_features, action_dim))
		self.fc = nn.Sequential(*layers)

		# Either use a fixed std or learnable log std
		if is_log_std:
			self.std = nn.Parameter(torch.full((action_dim,), torch.log(torch.tensor(self.actor_std))))
		else:
			self.std = torch.full((action_dim,), self.actor_std).to(device)

	def forward(self, x):
		"""
		Takes input state `x` and returns (mean, std) of action distribution.
		"""
		x = self.cnn(x)
		mean = self.fc(x)
		if self.is_log_std:
			std = self.std.exp()
			return mean, std
		else:
			return mean, self.std


class Critic(nn.Module):
	def __init__(self, hidden_sizes=[1024, 512]):
		"""
    The Critic estimates the value function V(s),
    which is the expected return from a given state.
    This helps the PPO algorithm evaluate how good an action is.
    """
		super().__init__()

		# CNN encoder to extract features from the input image (1 grayscale channel)
		self.cnn = nn.Sequential(
			nn.Conv2d(1, 32, 8, stride=4), nn.ReLU(),
			nn.Conv2d(32, 64, 4, stride=2), nn.ReLU(),
			nn.Conv2d(64, 64, 3, stride=1), nn.ReLU(),
			nn.Flatten()
		)

		# Compute the flattened feature size dynamically using a dummy input
		with torch.no_grad():
			dummy = torch.zeros(1, 1, 84, 84)
			n_flatten = self.cnn(dummy).shape[1]

		# Fully connected layers to map CNN features to a scalar value (V(s))
		layers = []
		in_features = n_flatten
		for hidden in hidden_sizes:
			layers.append(nn.Linear(in_features, hidden))
			layers.append(nn.ReLU())
			in_features = hidden
		layers.append(nn.Linear(in_features, 1))
		self.fc = nn.Sequential(*layers)

	def forward(self, x):
		"""
		Takes an input state `x`, passes it through the CNN and FC layers,
		and returns a scalar value estimating V(s).
		"""
		x = self.cnn(x)
		return self.fc(x)

class PPO:
	def __init__(self, env, save_path="ppo.pth", lr_actor=3e-4, lr_critic=3e-4, actor_std=0.4, gamma=0.99, eps_clip=0.2, K_epochs=4, is_log_std = False):
		"""
    Proximal Policy Optimization (PPO) agent implementation.
    This class coordinates the actor and critic networks, manages training, and handles interaction with the environment.
    """
		self.env = env
		self.save_path = save_path
		self.action_dim = env.action_space.shape[0]

		# Create the actor and critic networks
		self.actor = Actor(self.action_dim, actor_std, [1024, 512], is_log_std).to(device)
		self.critic = Critic([1024, 512]).to(device)

		# Optimizer for both actor and critic
		self.optimizer = torch.optim.Adam([
                        {'params': self.actor.parameters(), 'lr': lr_actor},
                        {'params': self.critic.parameters(), 'lr': lr_critic}
                    ])

		# Constants used for PPO's equation
		self.gamma = gamma
		self.eps_clip = eps_clip
		self.K_epochs = K_epochs

	def save(self):
		"""Saves actor and critic model parameters to disk."""
		torch.save({ 'actor_state_dict': self.actor.state_dict(), 'critic_state_dict': self.critic.state_dict() }, self.save_path)

	def load(self):
		"""Loads actor and critic model parameters from disk if available."""
		if os.path.isfile(self.save_path):
			checkpoint = torch.load(self.save_path, map_location=device)
			self.actor.load_state_dict(checkpoint['actor_state_dict'])
			self.critic.load_state_dict(checkpoint['critic_state_dict'])

	def select_action(self, state):
		"""
		Given a state, selects an action using the actor network by sampling from a Normal distribution.
		Returns the action and its log probability.
		"""
		state = transform(state).unsqueeze(0).to(device)
		mean, std = self.actor(state)
		dist = Normal(mean, std)
		action = dist.sample()
		log_prob = dist.log_prob(action).sum(-1)
		return action.clamp(-1, 1).cpu().numpy()[0], log_prob.item()

	def evaluate(self, states, actions):
		"""
		Used during training to evaluate the log probabilities, value estimates,
		and entropy of the current policy given a batch of states and actions.
		"""
		mean, std = self.actor(states)
		dist = Normal(mean, std)
		log_probs = dist.log_prob(actions).sum(-1)
		entropy = dist.entropy().sum(-1)
		values = self.critic(states).squeeze(-1)
		return log_probs, values, entropy

	def compute_returns(self, rewards, dones, next_value):
		"""
		Computes discounted returns (targets for the critic) using the Bellman equation.
		next_value is the estimated value of the last state in the trajectory.
		"""
		returns = []
		R = next_value
		for step in reversed(range(len(rewards))):
			R = rewards[step] + self.gamma * R * (1 - dones[step])
			returns.insert(0, R)
		return returns

	def update(self, memory):
		"""
		Main PPO update logic:
		- Computes advantages
		- Evaluates current policy
		- Applies clipped PPO loss
		- Optimizes the networks
		"""
		states = torch.stack(memory['states']).to(device)
		actions = torch.stack(memory['actions']).to(device)
		old_log_probs = torch.tensor(memory['log_probs']).to(device)
		returns = torch.tensor(memory['returns']).to(device)

		for _ in range(self.K_epochs):
			log_probs, values, entropy = self.evaluate(states, actions)
			advantages = returns - values.detach()

			ratio = (log_probs - old_log_probs).exp()
			surr1 = ratio * advantages
			surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
			actor_loss = -torch.min(surr1, surr2).mean()
			critic_loss = nn.MSELoss()(values, returns)
			loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy.mean()
			print(f"Loss: {loss}")

			self.optimizer.zero_grad()
			loss.backward()
			self.optimizer.step()

		self.save()

### PPO Training

In [22]:
ppo = PPO(env, is_log_std=False, actor_std=0.4)

In [25]:
max_episodes = 300000
max_timesteps = 10000

ppo.actor.train()
ppo.critic.train()
for ep in range(max_episodes):
	state, _ = env.reset(seed=SEED)
	state, _ = skip_zooming(env)

	memory = {'states': [], 'actions': [], 'log_probs': [], 'rewards': [], 'dones': []}
	total_reward = 0
	t = 0

	while t < max_timesteps:
		state_t = transform(state).to(device)
		memory['states'].append(state_t)

		action, log_prob = ppo.select_action(state)
		next_state, reward, done, trunc, _ = env.step(action)

		memory['actions'].append(torch.FloatTensor(action).to(device))
		memory['log_probs'].append(log_prob)
		memory['rewards'].append(reward)
		memory['dones'].append(done)

		state = next_state
		total_reward += reward

		if t % 5 == 0:
			show_current_frame(env, ep, t)
			print("action: ", action)
			if ppo.actor.is_log_std:
				print("std: ", ppo.actor.std.data.cpu().numpy())

		if done or trunc:
			break

		t += 1

	last_score = total_reward

	with torch.no_grad():
		next_state_t = transform(state).unsqueeze(0).to(device)
		next_value = ppo.critic(next_state_t).item()
	memory['returns'] = ppo.compute_returns(memory['rewards'], memory['dones'], next_value)

	ppo.update(memory)
	print(f"Episode {ep+1} - Reward: {total_reward:.1f}")

env.close()

KeyboardInterrupt: 

In [24]:
state, _ = env.reset(seed=SEED)
total_reward = 0

ppo.actor.eval()
ppo.critic.eval()
#while True:
for i in range(200):
	state_tensor = transform(state).unsqueeze(0).to(device)

	with torch.no_grad():
		mean, _ = ppo.actor(state_tensor)
		action = mean.squeeze(0).cpu().numpy()

	action = np.clip(action, -1, 1)
	state, reward, done, trunc, _ = env.step(action)
	total_reward += reward

	if done or trunc:
		break

env.close()
show_animation()
print(f"Test reward: {total_reward:.2f}")

Test reward: -12.93


## Human Testing

In [12]:
env = gym.make("CarRacing-v3", render_mode="human")
obs = env.reset()[0]

pygame.init()
win = pygame.display.set_mode((1000, 1000))
pygame.display.set_caption("Keyboard Controller")

clock = pygame.time.Clock()

running = True
action = np.array([0.0, 0.0, 0.0])  # [steer, gas, brake]

def process_keys():
    keys = pygame.key.get_pressed()
    steer = 0.0
    gas = 0.0
    brake = 0.0

    if keys[pygame.K_LEFT]:
        steer = -1.0
    elif keys[pygame.K_RIGHT]:
        steer = 1.0

    if keys[pygame.K_UP]:
        gas = 1.0
    if keys[pygame.K_DOWN]:
        brake = 1.0

    return np.array([steer, gas, brake])

while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    action = process_keys()
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated

    if done:
        obs = env.reset()[0]

    clock.tick(60)  # Limit to 60 FPS

env.close()
pygame.quit()

KeyboardInterrupt: 