## PPO for Car Racing from Scratch

This notebook implements the Proximal Policy Optimization (PPO) algorithm to train an agent for the `CarRacing-v2` environment in OpenAI Gym. The code is broken down into logical sections, mirroring the structure of a typical machine learning project in a Colab environment.

### 1. Installation

First, we need to install the required libraries. `box2d-py` requires some system-level dependencies to be built correctly. We'll install those first using `apt-get` and then proceed with the `pip` installations.

In [None]:
# Make sure to run this cell in your environment (e.g., Google Colab)
# First, install system dependencies for building box2d-py, which can cause errors.
!apt-get update > /dev/null 2>&1
!apt-get install -y swig > /dev/null 2>&1

# Now, install the Python packages
!pip install gymnasium[box2d]
!pip install imageio
!pip install opencv-python
!pip install torch
!pip install matplotlib



### 2. Imports and Environment Wrapper

Here, we import all the necessary libraries and define the `Env` class, which acts as a wrapper around the standard Gym environment. This class handles crucial preprocessing steps like converting images to grayscale, resizing, and stacking frames to give the agent a sense of motion.

In [None]:
import torch
import torch.nn as nn
from torch.distributions import Beta, Categorical # Import Categorical for discrete actions
import torch.optim as optim
from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
import gymnasium as gym
import numpy as np
import cv2
from collections import deque, namedtuple
import os
import time
import imageio
import matplotlib.pyplot as plt


class Env:
    """
    Environment wrapper for CarRacing-v3.
    Includes preprocessing, frame stacking, and action space discretization.
    """
    def __init__(self, frame_stack=4, action_repeat=4):
        self.env = gym.make('CarRacing-v3', continuous=True)
        self.action_repeat = action_repeat
        self.frame_stack = frame_stack

        # --- NEW: Define the discrete action space as requested ---
        self.action_space = [
            [-1.0, 0.0, 0.0],   # 0: Turn Left
            [1.0, 0.0, 0.0],    # 1: Turn Right
            [0.0, 0.0, 0.8],    # 2: Brake
            [0.0, 1.0, 0.0],    # 3: Accelerate
            [0.0, 0.0, 0.0],    # 4: Do-Nothing
        ]
        self.num_actions = len(self.action_space)

    def reset(self):
        """ Resets the environment and returns the initial stacked state. """
        self.counter = 0
        self.die = False
        img_rgb, _ = self.env.reset()
        img_gray = self.rgb2gray(img_rgb)
        self.stack = deque([img_gray] * self.frame_stack, maxlen=self.frame_stack)
        return np.array(self.stack)

    def step(self, action_index):
        """
        Takes an action index, maps it to a continuous action, and steps the environment.
        """
        # Get the continuous action from our discrete list and convert to numpy array
        continuous_action = np.array(self.action_space[action_index])

        total_reward = 0
        for i in range(self.action_repeat):
            img_rgb, reward, terminated, truncated, info = self.env.step(continuous_action)

            # Use the game's default reward (positive for visiting new tiles)
            total_reward += reward

            # Penalty for being on the grass
            is_on_grass = np.mean(img_rgb[84:, 12:84, 1]) > 180
            if is_on_grass:
                total_reward -= 0.2

            if terminated:
                self.die = True
                # Large penalty for crashing
                total_reward -= 100
                break

        img_gray = self.rgb2gray(img_rgb)
        self.stack.append(img_gray)

        done = self.die or self.counter > 1000 or truncated
        self.counter += 1
        return np.array(self.stack), total_reward, done, {}

    def render(self, *args, **kwargs):
        """Renders the environment."""
        return self.env.render(*args, **kwargs)

    @staticmethod
    def rgb2gray(rgb, norm=True):
        """ Converts an RGB image to grayscale (84x84) and normalizes it. """
        gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY)
        gray = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
        if norm:
            gray = gray.astype(np.float32) / 255.
        return gray


### 3. PPO Agent Implementation

This section contains the core of the PPO algorithm. We define the `Actor` and `Critic` neural network architectures and the `PPOAgent` class that brings them together. The agent class handles action selection, storing experiences, and updating the networks based on the PPO loss function.

In [None]:


Transition = namedtuple('Transition', ['state', 'action', 'a_log_p', 'reward', 'next_state'])

class Actor(nn.Module):
    """ Actor Network - Modified for Discrete Actions """
    def __init__(self, num_actions):
        super(Actor, self).__init__()
        self.cnn_base = nn.Sequential(
            nn.Conv2d(4, 8, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Flatten()
        )
        self.v = nn.Sequential(
            nn.Linear(1024, 100),
            nn.ReLU(),
            # --- NEW: Output layer for discrete action probabilities ---
            nn.Linear(100, num_actions)
        )

    def forward(self, state):
        x = self.cnn_base(state)
        # --- NEW: Returns logits for the categorical distribution ---
        action_logits = self.v(x)
        return action_logits

class Critic(nn.Module):
    """ Critic Network (Value Function) - Unchanged """
    def __init__(self):
        super(Critic, self).__init__()
        self.cnn_base = nn.Sequential(
            nn.Conv2d(4, 8, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Flatten()
        )
        self.v = nn.Sequential(nn.Linear(1024, 100), nn.ReLU(), nn.Linear(100, 1))

    def forward(self, state):
        x = self.cnn_base(state)
        value = self.v(x)
        return value

class PPOAgent:
    """ PPO Agent - Modified for Discrete Actions """
    def __init__(self, device, num_actions):
        self.device = device
        self.actor_net = Actor(num_actions).float().to(self.device)
        self.critic_net = Critic().float().to(self.device)
        self.buffer = []
        self.batch_size = 64
        self.ppo_update_time = 10
        self.max_grad_norm = 0.5
        self.clip_param = 0.2
        self.gamma = 0.99
        self.lr = 0.0001

        self.optimizer_actor = optim.Adam(self.actor_net.parameters(), lr=self.lr)
        self.optimizer_critic = optim.Adam(self.critic_net.parameters(), lr=self.lr)

    def select_action(self, state):
        """ Selects a discrete action index using the actor network. """
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        with torch.no_grad():
            action_logits = self.actor_net(state)

        # --- NEW: Use Categorical distribution for discrete actions ---
        dist = Categorical(logits=action_logits)
        action = dist.sample()
        action_log_prob = dist.log_prob(action)

        return action.item(), action_log_prob.item()

    def get_value(self, state):
        """ Gets the value of a state from the critic network. """
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        with torch.no_grad():
            value = self.critic_net(state)
        return value.item()

    def store_transition(self, transition):
        """ Stores a transition in the buffer. """
        self.buffer.append(transition)

    def update(self):
        """ Updates the actor and critic networks using the PPO algorithm. """
        state = torch.tensor(np.array([t.state for t in self.buffer]), dtype=torch.float).to(self.device)
        action = torch.tensor([t.action for t in self.buffer], dtype=torch.int64).view(-1, 1).to(self.device)
        reward = [t.reward for t in self.buffer]
        old_action_log_prob = torch.tensor([t.a_log_p for t in self.buffer], dtype=torch.float).view(-1, 1).to(self.device)

        R = 0
        Gt = []
        for r in reward[::-1]:
            R = r + self.gamma * R
            Gt.insert(0, R)
        Gt = torch.tensor(Gt, dtype=torch.float).to(self.device)

        for i in range(self.ppo_update_time):
            for index in BatchSampler(SubsetRandomSampler(range(len(self.buffer))), self.batch_size, False):
                Gt_index = Gt[index].view(-1, 1)
                V = self.critic_net(state[index])
                delta = Gt_index - V
                advantage = delta.detach()

                # --- NEW: Get log probabilities from the discrete distribution ---
                action_logits = self.actor_net(state[index])
                dist = Categorical(logits=action_logits)
                action_log_prob = dist.log_prob(action[index].squeeze()).view(-1, 1)

                ratio = torch.exp(action_log_prob - old_action_log_prob[index])

                surr1 = ratio * advantage
                surr2 = torch.clamp(ratio, 1 - self.clip_param, 1 + self.clip_param) * advantage

                loss_actor = -torch.min(surr1, surr2).mean()
                loss_critic = nn.functional.mse_loss(Gt_index, V)
                entropy = dist.entropy().mean()
                loss_total = loss_actor + 0.5 * loss_critic - 0.05 * entropy

                self.optimizer_actor.zero_grad()
                self.optimizer_critic.zero_grad()
                loss_total.backward()
                nn.utils.clip_grad_norm_(self.actor_net.parameters(), self.max_grad_norm)
                nn.utils.clip_grad_norm_(self.critic_net.parameters(), self.max_grad_norm)
                self.optimizer_actor.step()
                self.optimizer_critic.step()

        del self.buffer[:]

    def save_param(self, directory='./ppo_params'):
        """Saves the model parameters."""
        if not os.path.exists(directory):
            os.makedirs(directory)
        torch.save(self.actor_net.state_dict(), os.path.join(directory, 'actor.pth'))
        torch.save(self.critic_net.state_dict(), os.path.join(directory, 'critic.pth'))

    def load_param(self, directory='./ppo_params'):
        """Loads the model parameters."""
        actor_path = os.path.join(directory, 'actor.pth')
        critic_path = os.path.join(directory, 'critic.pth')
        if os.path.exists(actor_path) and os.path.exists(critic_path):
            self.actor_net.load_state_dict(torch.load(actor_path, map_location=self.device))
            self.critic_net.load_state_dict(torch.load(critic_path, map_location=self.device))
            print("--- Parameters Loaded ---")
        else:
            print("--- No Parameters Found, Starting from Scratch ---")


### 4. Training the Agent

This is the main execution block. It initializes the environment and the agent, then enters the training loop. The agent collects experience, and once enough data is gathered in the buffer, it calls the `update` method to improve its policy. Progress is logged periodically.

In [None]:
# --- Main Training Loop ---

# Global list to store rewards for plotting
reward_history = []

def main():
    # Configuration
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    env = Env()
    agent = PPOAgent(device, num_actions=env.num_actions)

    # Load pre-trained models if they exist
    agent.load_param()

    # Training parameters
    max_episodes = 2000
    update_timestep = 2048 # Update policy every N timesteps
    log_interval = 10 # Print avg reward in the interval

    running_reward = 0
    avg_length = 0
    timestep = 0

    # Training loop
    for i_episode in range(1, max_episodes + 1):
        state = env.reset()
        ep_reward = 0

        for t in range(1001): # Max timesteps per episode
            timestep += 1

            # Select action and step
            action_index, action_log_prob = agent.select_action(state)
            next_state, reward, done, _ = env.step(action_index)

            # Store transition
            trans = Transition(state, action_index, action_log_prob, reward, next_state)
            agent.store_transition(trans)

            state = next_state
            ep_reward += reward

            # Update if buffer is full
            if len(agent.buffer) >= update_timestep:
                agent.update()

            if done:
                break

        running_reward += ep_reward
        avg_length += t

        # Logging
        if i_episode % log_interval == 0:
            avg_length = int(avg_length / log_interval)
            running_reward = int((running_reward / log_interval))

            # Store reward for plotting
            reward_history.append(running_reward)

            print(f'Episode {i_episode}\tAvg length: {avg_length}\tAvg reward: {running_reward}')

            # Save a checkpoint of the model
            print("--- Saving checkpoint --- ")
            agent.save_param()

            running_reward = 0
            avg_length = 0

# Start training
main()

Using device: cuda
--- Parameters Loaded ---
Episode 10	Avg length: 124	Avg reward: 67
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 20	Avg length: 124	Avg reward: 68
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 30	Avg length: 124	Avg reward: 77
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 40	Avg length: 124	Avg reward: 71
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 50	Avg length: 124	Avg reward: 89
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 60	Avg length: 124	Avg reward: 75
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 70	Avg length: 124	Avg reward: 68
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 80	Avg length: 124	Avg reward: 76
--- Saving checkpoint --- 
--- Parameters Saved ---
Episode 90	Avg length: 124	Avg reward: 73
--- Saving checkpoint --- 
--- Parameters Saved ---


### 5. Evaluate and Record a Demo

After training, this final block loads the saved model parameters and runs the agent for 10 episodes to evaluate its performance on unseen tracks. It also records a video of the first evaluation episode, which can be used as the demo deliverable.

In [None]:
# --- Plotting, Evaluation and Recording ---

def plot_rewards():
    """Plots the training reward history."""
    plt.figure(figsize=(10, 5))
    plt.plot(np.arange(len(reward_history)) * 10, reward_history)
    plt.xlabel('Episode')
    plt.ylabel('Average Reward')
    plt.title('Training Reward Curve')
    plt.grid(True)
    plt.show()

def evaluate_and_record():
    """
    Evaluates the trained agent and records a video of its performance.
    """
    # To render in Colab, we need a virtual display
    !pip install pyvirtualdisplay > /dev/null 2>&1
    !apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
    from pyvirtualdisplay import Display
    display = Display(visible=0, size=(1400, 900))
    display.start()

    device = torch.device("cpu") # Use CPU for evaluation
    env = Env()
    # IMPORTANT: Make sure the environment for evaluation has rendering enabled
    env.env = gym.make('CarRacing-v3', continuous=True, render_mode='rgb_array')
    agent = PPOAgent(device, num_actions=env.num_actions)
    agent.load_param()

    video_filename = "ppo_carracing_evaluation.mp4"
    frames = []

    print("--- Plotting Reward History ---")
    plot_rewards()

    print("\n--- Starting Evaluation ---")
    total_reward = 0
    for i in range(10): # Evaluate over 10 episodes
        state = env.reset()
        episode_reward = 0
        for t in range(1001):
            action_index, _ = agent.select_action(state)
            state, reward, done, _ = env.step(action_index)

            # For recording, render the environment
            if i == 0: # Record the first episode
                try:
                    frame = env.render()
                    frames.append(frame)
                except Exception as e:
                    print(f"Rendering failed: {e}. Skipping frame.")

            episode_reward += reward
            if done:
                break
        print(f"Episode {i+1}: Reward = {episode_reward:.2f}")
        total_reward += episode_reward

    avg_reward = total_reward / 10
    print(f"\n--- Average Reward over 10 episodes: {avg_reward:.2f} ---")

    # Save the video
    if frames:
        print(f"--- Saving video to {video_filename} ---")
        imageio.mimsave(video_filename, frames, fps=30)

# Run evaluation after training
# evaluate_and_record()
