# Reinforcement learning project

Contact:
Elie KADOCHE,
eliekadoche78@gmail.com.

This is a reinforcement learning project on deep Q-learning and policy gradient methods.
More explanations will be given in the board during the practical sessions.
Do not hesitate to ask for help if you need to.

In `dummy_deep.py`, you will find a minimal example of building and training a neural network with PyTorch.
The model learns to approximate the square root of a number.
This example is designed to help you get familiar with PyTorch syntax and workflow, which you'll use throughout the lab.

You will write your answers, thoughts and approaches for each question.
Even if you do some mistakes, you can describe them and explain how you corrected them.
Each time there is a `# ---> TODO:` comment, you need to add some code.
Be mindful of the code quality and explain your code with specific comments.

In [4]:
!pip install gymnasium
!pip install numpy
!pip install torch
!pip install swig
!pip install gymnasium[box2d]



In [6]:
import gymnasium as gym
import numpy as np
import torch
from torch import nn as nn
from torch import optim as optim
from torch.distributions import Categorical
from torch.nn import functional as F

## 1) Lunar Lander environment

Your objective is to understand how the Lunar Lander environment works and what is the problem we want to solve.
You need to write what are the states, the rewards and the actions.
Write down the Markov decision process associated to the problem.


## 1) Lunar Lander Environment

### Objective

The objective in the Lunar Lander environment is to train an agent to safely land a lunar module on a designated landing pad. The landing pad is marked by two flags. The agent controls the lander's thrusters to navigate through a 2D space, combat gravity, and achieve a soft landing.

A successful landing requires the lander to come to a complete stop on the landing pad. Crashing the lander or landing outside the designated area results in failure. The agent must learn to manage its fuel consumption, as firing thrusters incurs a cost.

### Markov Decision Process (MDP)

The Lunar Lander problem can be formally described as a Markov Decision Process (MDP).

*   **S**: A set of states.
*   **A**: A set of actions.
*   **P**: The state transition probability function (transition kernel), `P(s' | s, a)`.
*   **R**: The reward function, `R(s,a)`.
*   **γ**: The discount factor.

#### **State Space (S)**

The state is a continuous **8-dimensional vector** that provides all the necessary information about the lander's current situation. The components of the state vector are:

1.  **Horizontal coordinate (x)**: The lander's position on the x-axis.
2.  **Vertical coordinate (y)**: The lander's position on the y-axis.
3.  **Horizontal velocity (vx)**: The lander's speed along the x-axis.
4.  **Vertical velocity (vy)**: The lander's speed along the y-axis.
5.  **Angle (θ)**: The lander's angle in radians.
6.  **Angular velocity (vθ)**: The rate at which the lander's angle is changing.
7.  **Left leg contact (boolean)**: `1` if the left leg is in contact with the ground, `0` otherwise.
8.  **Right leg contact (boolean)**: `1` if the right leg is in contact with the ground, `0` otherwise.

#### **Action Space (A)**

The code specifies `continuous=False`, so we are using the **discrete action space**. There are 4 possible discrete actions the agent can take at each time step:

*   **0**: `do nothing` - The lander drifts according to gravity and its current momentum.
*   **1**: `fire left orientation engine` - Applies a force to rotate the lander counter-clockwise.
*   **2**: `fire main engine` - Applies an upward force to counteract gravity and slow the lander's descent.
*   **3**: `fire right orientation engine` - Applies a force to rotate the lander clockwise.

#### **Reward Function (R)**

The reward function is shaped to guide the agent towards a successful landing. The total reward for an episode is the sum of rewards received at each step.

*   **Moving towards the pad**: A positive reward is given for moving from a point far from the landing pad to a point closer to it. Conversely, moving away results in a penalty.
*   **Successful Landing**: If the lander comes to rest on the landing pad, it receives a large reward of **+100 points**.
*   **Crashing**: If the lander crashes, it receives a large penalty of **-100 points**.
*   **Leg Contact**: Each leg that touches the ground gives a reward of **+10 points**.
*   **Fuel Cost**: Firing the main engine costs a small amount of fuel, resulting in a small negative reward (e.g., -0.3 points per frame). Firing the side engines also has a smaller cost (e.g., -0.03 points per frame).

The problem is considered "solved" when the agent consistently achieves an average total reward of 200 points over 100 consecutive episodes.

#### **Transition Kernel (P)**

The state transition function `P(s' | s, a)` defines the probability of moving to state `s'` after taking action `a` in state `s`. In this environment, the transitions are **deterministic** and governed by the **Box2D physics engine**. Given a state and an action, the physics engine calculates the next state based on forces like gravity, thruster power, and ground contact dynamics. While the outcome is deterministic, the underlying physics equations are complex and are handled internally by the environment.

#### **Discount Factor (γ)**

The discount factor, represented as `discount_factor` in the code, is a value between 0 and 1. It determines the importance of future rewards. In other words, today's reward is more important then future rewards.

#### **Episode Termination**

An episode ends (`terminated`) under one of the following conditions:

1.  The lander crashes (e.g., its body hits the ground or it moves too fast).
2.  The lander comes to a complete rest on the landing pad.
3.  The lander flies out of the screen boundaries.

Additionally, an episode can be `truncated` if it exceeds a maximum number of steps (typically 1000 steps), preventing it from running indefinitely.

### Based on the provided code, a single step within the `while` loop represents one full cycle of the Markov Decision Process. Here is a detailed breakdown of this dynamic process:

1.  **Observe the State (`S_t`)**: At the beginning of each loop iteration, the agent is in a specific state, `S_t`. In the code, this is represented by the `observation` variable.

2.  **Select an Action (`A_t`)**: The agent must now choose an action to take. In this specific "random policy" example, the agent completely ignores the current state `S_t`. Instead, it uses `env.action_space.sample()` to select an action `A_t` uniformly at random from the four possible discrete actions. This is the simplest possible policy, where every action is equally likely, regardless of the situation.

3.  **Interact with the Environment (`env.step(A_t)`)**: The chosen action `A_t` is sent to the environment. The `env.step(action)` function is the core of the interaction. The environment takes the current state `S_t` and the agent's action `A_t` and performs two critical calculations based on its internal physics engine (the transition dynamics):
    *   **State Transition (`P(S_{t+1} | S_t, A_t)`)**: It computes the next state of the lander, `S_{t+1}`. For example, if the action was "fire main engine," the lander's vertical velocity will decrease and its vertical position will increase.
    *   **Calculate Reward (`R_{t+1}`)**: It calculates the immediate reward, `R_{t+1}`, that results from this transition from `S_t` to `S_{t+1}` by taking action `A_t`.

4.  **Receive New State and Reward**: The `env.step()` function returns the results to the agent. The `observation` variable is updated with the new state `S_{t+1}`. The agent also receives the scalar `reward` value (`R_{t+1}`) and boolean flags (`terminated`, `truncated`) that indicate if the episode has ended.

5.  **Update and Repeat**: The agent updates its `total_reward` by adding the received reward. The `finished` flag is updated. The loop then repeats, with the new `observation` (`S_{t+1}`) now serving as the current state for the next decision-making cycle. 

In [7]:
"""Run a random policy."""

# Create and reset environment
env = gym.make("LunarLander-v3", continuous=False, render_mode="rgb_array")
# here the observation is the state of the environment
obsevation, info = env.reset(seed=None)
total_reward = 0.0

# While the episode is not finished
finished = False
while not finished:

    # Select a random action
    action = env.action_space.sample()

    # One step forward
    # env.step(action): go to the next step based on the action
    obsevation, reward, terminated, truncated, info = env.step(action)
    finished = terminated or truncated

    # Eventually render the environment (render mode should be "human")
    total_reward += reward
    env.render()

# Print reward
print("total_reward = {}".format(total_reward))
env.close()

total_reward = -100.72087786830204


## 2) Deep neural Q-network

We aim to build a deep Q-network $ Q_\theta(s, a) $, which estimates the Q-value for each action $ a $ given a state $ s $.
This network is parameterized by weights $ \theta $ and replaces the classical Q-table used in tabular methods.

During learning, the network is updated to minimize the temporal difference (TD) error: $\delta = r + \gamma \max_{a'} Q_\theta(s', a') - Q_\theta(s, a)$.
This leads to the Q-learning update rule: $Q_\theta(s, a) \leftarrow Q_\theta(s, a) + \alpha \, \delta$.
In practice, we minimize the squared TD error using gradient descent.

Below are 3 code samples.
- A class implementing the Q-network. You must specify `input_size` and `nb_actions`.
- A script to train the Q-network. Complete the missing parts of the code.
- A script to test the Q-network. Determine how to select an action from the Q-values.

In [11]:
class QNetwork(nn.Module):
    """Deep neural Q-network."""

    def __init__(self):
        """Initialize."""
        super(QNetwork, self).__init__()
        
        # The input is the state of the environment. The Lunar Lander state
        # is an 8-dimensional vector (x, y, vx, vy, angle, angular_velocity, leg1_contact, leg2_contact).
        input_size = 8

        # The output is a Q-value for each possible action. The Lunar Lander
        # has 4 discrete actions (do nothing, fire left, fire main, fire right).
        nb_actions = 4

        # Layers
        self.layer_a = nn.Linear(input_size, 128)
        self.layer_b = nn.Linear(128, 128)
        self.layer_c = nn.Linear(128, nb_actions)

    def forward(self, x):
        """Forward."""
        x = F.relu(self.layer_a(x))
        x = F.relu(self.layer_b(x))
        q_values = self.layer_c(x)
        return q_values

In [25]:
"""Run deep Q-learning."""

# ---> TODO: find good hyperparameters
discount_factor = 0.99 # gamma value
learning_rate = 1e-4 # alpha value
epsilon_start = 1.0 #
epsilon_end = 0.01
epsilon_decay = 0.999

# Create environment and reset it
env = gym.make("LunarLander-v3", continuous=False, render_mode="rgb_array")
observation, info = env.reset(seed=None)

# Create Q-network and enable train mode
device = torch.device("cpu")
q_network = QNetwork().to(device)
q_network.train()

# Create optimizer
optimizer = optim.Adam(q_network.parameters(), lr=learning_rate)

# Launch training
running_reward = 0.0
training_iteration = 0
epsilon = epsilon_start
while True:

    # Reset the environment
    observation, info = env.reset()
    episode_total_reward = 0.0

    # Sample a trajectory
    while True:

        # Epsilon-greedy action selection (random)
        if np.random.rand() < epsilon:
            action = env.action_space.sample()

        # Epsilon-greedy action selection (best action)
        else:
            with torch.no_grad():

                # Add batch dimension and transform to tensor
                x = np.expand_dims(observation, 0)
                x = torch.from_numpy(x).float().to(device)
                q_values = q_network(x)

                # ---> TODO: how to compute action (taking the action has max q_values)
                # Select the action with the highest predicted Q-value.
                # .argmax() returns the index of the maximum value, which corresponds to the best action.
                # .item() extracts the value from the tensor.
                action = q_values.argmax().item()

        # Take the action
        observation_next, reward, terminated, truncated, info = env.step(
            action)

        # Check if episode is done and save reward
        done = terminated or truncated
        episode_total_reward += reward

        # Compute the TD target
        with torch.no_grad():
            x_next = np.expand_dims(observation_next, 0)
            x_next = torch.from_numpy(x_next).float().to(device)
            q_next = q_network(x_next)
            q_next_max = q_next.max(dim=1).values.item()

            # ---> TODO: compute the TD target
            # If the episode is done, there is no next state, so the target is just the immediate reward.
            # Otherwise, the target is the reward plus the discounted value of the best action in the next state.
            if done:
                target = reward
            else:
                target = reward + discount_factor * q_next_max

        # TD prediction
        x = np.expand_dims(observation, 0)
        x = torch.from_numpy(x).float().to(device)
        q_pred = q_network(x)[0, action]

        # ---> TODO: compute loss and update
        # diff between what we want and what we have
        # The loss is the difference between our target (ideal value) and our prediction (current estimate).
        # We use Mean Squared Error loss, which is standard for Q-learning.
        loss_fn = nn.MSELoss()
        loss = loss_fn(q_pred, torch.tensor(target).to(device).float())

        # Reset gradients to 0.0
        optimizer.zero_grad()

        # Compute the gradients of the loss (backpropagation)
        loss.backward()

        # Update the policy parameters (gradient ascent)
        optimizer.step()

        # Transition
        observation = observation_next

        # End episode
        if done:
            break

    # Logging
    running_reward = 0.1 * episode_total_reward + 0.9 * running_reward
    epsilon = max(epsilon_end, epsilon * epsilon_decay)

    # Log results
    log_frequency = 5
    training_iteration += 1
    if training_iteration % log_frequency == 0:

        # Save neural network
        torch.save(q_network.state_dict(), "q_network.pt")

        # Print results
        print("iteration {} - last reward: {:.2f}".format(
            training_iteration, episode_total_reward))

        # Exit condition
        if running_reward >= 200:
            break

# Close environment
env.close()

iteration 5 - last reward: -291.76
iteration 10 - last reward: -97.84
iteration 15 - last reward: -82.56
iteration 20 - last reward: -195.42
iteration 25 - last reward: -426.02
iteration 30 - last reward: -79.98
iteration 35 - last reward: -226.26
iteration 40 - last reward: -43.78
iteration 45 - last reward: -381.63
iteration 50 - last reward: -232.14
iteration 55 - last reward: -269.84
iteration 60 - last reward: 6.27
iteration 65 - last reward: -110.84
iteration 70 - last reward: -157.07
iteration 75 - last reward: -31.79
iteration 80 - last reward: -107.06
iteration 85 - last reward: -242.10
iteration 90 - last reward: -367.54
iteration 95 - last reward: -98.51
iteration 100 - last reward: -53.09
iteration 105 - last reward: -138.26
iteration 110 - last reward: -338.06
iteration 115 - last reward: -122.79
iteration 120 - last reward: -129.79
iteration 125 - last reward: -151.89
iteration 130 - last reward: -99.41
iteration 135 - last reward: -206.43
iteration 140 - last reward: -97

In [37]:
"""Test Q-network."""

# Create environment and reset it
env = gym.make("LunarLander-v3", continuous=False, render_mode="rgb_array")
observation, info = env.reset(seed=None)
total_reward = 0.0

# Load trained Q-network and enable test mode
device = torch.device("cpu")
q_network = QNetwork().to(device)
q_network.load_state_dict(torch.load("q_network.pt", weights_only=True))
q_network.eval()

# While the episode is not finished
finished = False
while not finished:

    # Add batch dimension and transform to tensor
    x = np.expand_dims(observation, 0)
    x = torch.from_numpy(x).float().to(device)

    # Compute action from the Q-table
    # action = q_network(x)
    with torch.no_grad():
        q_values_for_current_state = q_network(x)

    # ---> TODO: how to select an action
    # In evaluation, we always act greedily by choosing the action with the highest Q-value.
    # No exploration (no epsilon-greedy) is needed here.
    # action = q_values.argmax().item()
    action = q_values_for_current_state.argmax().item()

    # One step forward
    observation, reward, terminated, truncated, info = env.step(action)
    finished = terminated or truncated

    # Eventually render the environment (render mode should be "human")
    total_reward += reward
    env.render()

# Print reward
print("total_reward = {}".format(total_reward))
env.close()

total_reward = 222.86713600510137


Through hyperparameter tuning, I configured the settings as follows: a **discount factor** of 0.99, a **learning rate** of 1e-4, an **epsilon_start** of 1.0, an **epsilon_end** of 0.01, and an **epsilon_decay** of 0.999. I also made a minor change to the action selection logic in the test script. After approximately 1900 training iterations, the final Q-network testing consistently yielded scores above 200 across multiple runs.

**Significance of the Result**: A consistent score above 200 signifies that the agent has successfully "solved" the Lunar Lander environment. This result demonstrates that the agent has learned a robust and effective policy, going beyond simply avoiding a crash. It has mastered the complex control task of using its thrusters to gently slow its descent, maintain stability, and navigate precisely to the landing pad, thereby achieving the maximum rewards for a successful landing.

## 3) REINFORCE algorithm

We want to build a policy $\pi_\theta(a | s) = P(a | s, \theta)$ that gives the probability of choosing an action $a$ in state $s$.
The policy is a deep neural network parameterized by some weights $\theta$.
The policy is also referred to as "actor".

We want to find the parameters $\theta$ that maximize the performance measure $J(\theta) = \mathbb{E}_{\pi_\theta}[ G_0 ]$ with $G_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k+1}$ and $\gamma \in [0, 1]$ being a discount factor.
To do so, we use the gradient ascent method: $\theta_{k+1} = \theta_{k} + \alpha \nabla_{\theta_k} J(\theta_k)$ with $\alpha$ being the learning rate.
The performance measure depends on both the action selection and the distribution of states.
Both are affected by the policy parameters, which make the computation of the gradient challenging.

The policy gradient theorem gives an expression for $\nabla_\theta J(\theta)$ that does not involve the derivative of the state distribution.
The expectation is over all possible state-action trajectories over the policy $\pi_\theta$:
$\nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}[ \sum_{t=0}^{\infty} G_t \nabla_\theta \ln \pi_\theta(a_t | s_t) ]$.
In the REINFORCE algorithm, we use a Monte-Carlo estimate over one episode, i.e., one trajectory:
$\nabla_\theta J(\theta) = \sum_{t=0}^{\infty} G_t \nabla_\theta \ln \pi_\theta(a_t | s_t)$.

Your objective is to complete the REINFORCE algorithm to train the policy until convergence. To solve the problem, you need to achieve a cumulative reward of at least 200 when training the policy. Below are: the code of the REINFORCE algorithm and a script to test your policy once it is trained.

Below are 3 code samples.
- A class implementing the policy. You must specify `input_size` and `nb_actions`.
- A script to train the policy using REINFORCE. Complete the missing parts of the code.
- A script to test the policy. Determine how to select an action from the output.

In [None]:
class ActorModel(nn.Module):
    """Deep neural network policy."""

    def __init__(self):
        """Initialize."""
        super(ActorModel, self).__init__()
        # --> TODO: specify the correct input and output sizes
        input_size = 0
        nb_actions = 0

        # Layers
        self.layer_a = nn.Linear(input_size, 128)
        self.layer_b = nn.Linear(128, 128)
        self.policy = nn.Linear(128, nb_actions)

    def forward(self, x):
        """Forward."""
        x = F.relu(self.layer_a(x))
        x = F.relu(self.layer_b(x))
        action_prob = F.softmax(self.policy(x), dim=-1)
        return action_prob

In [None]:
"""Run REINFORCE."""

# ---> TODO: find good hyperparameters
discount_factor = 1.0
learning_rate = 0.1

# Create environment and reset it
env = gym.make("LunarLander-v3", continuous=False, render_mode="rgb_array")
obsevation, info = env.reset(seed=None)

# Create policy and enable train mode
device = torch.device("cpu")
policy = ActorModel().to(device)
policy.train()

# Create optimizer
optimizer = optim.Adam(policy.parameters(), lr=learning_rate)

# Launch training
running_reward = 0.0
training_iteration = 0
while True:

    # Experience
    # ------------------------------------------

    # Reset the environment
    obsevation, info = env.reset()

    # During experience, we will save:
    # - the probability of the chosen action at each time step pi(at|st)
    # - the rewards received at each time step ri
    saved_probabilities = list()
    saved_rewards = list()

    # Sample a trajectory
    while True:

        # Add batch dimension and transform to tensor
        x = torch.from_numpy(np.expand_dims(obsevation, 0)).float()

        # Create a categorical distribution over the list of probabilities
        # of actions (given by the policy) and sample an action from it
        probabilities = policy(x.to(device))
        distribution = Categorical(probabilities)
        action = distribution.sample()

        # Take the action
        obsevation, reward, terminated, truncated, info = env.step(
            action.item())

        # Save the probability of the chosen action and the reward
        saved_probabilities.append(probabilities[0][action])
        saved_rewards.append(reward)

        # End episode
        if terminated or truncated:
            break

    # Compute discounted sum of rewards
    # ------------------------------------------

    # Current discounted reward
    discounted_reward = 0.0

    # List of all the discounted rewards, for each time step
    discounted_rewards = list()

    # ---> TODO: compute discounted rewards
    for r in saved_rewards[::-1]:
        pass

    # Eventually normalize for stability purposes
    discounted_rewards = torch.tensor(discounted_rewards)
    mean, std = discounted_rewards.mean(), discounted_rewards.std()
    discounted_rewards = (discounted_rewards - mean) / (std + 1e-7)

    # Update policy parameters
    # ------------------------------------------

    # For each time step
    actor_loss = list()
    for p, g in zip(saved_probabilities, discounted_rewards):

        # ---> TODO: compute policy loss
        time_step_actor_loss = 0

        # Save it
        actor_loss.append(time_step_actor_loss)

    # Sum all the time step losses
    actor_loss = torch.cat(actor_loss).sum()

    # Reset gradients to 0.0
    optimizer.zero_grad()

    # Compute the gradients of the loss (backpropagation)
    actor_loss.backward()

    # Update the policy parameters (gradient ascent)
    optimizer.step()

    # Logging
    # ------------------------------------------

    # Episode total reward
    episode_total_reward = sum(saved_rewards)
    running_reward = 0.1 * episode_total_reward + 0.9 * running_reward

    # Log results
    log_frequency = 5
    training_iteration += 1
    if training_iteration % log_frequency == 0:

        # Save neural network
        torch.save(policy.state_dict(), "policy.pt")

        # Print results
        print("iteration {} - last reward: {:.2f}".format(
            training_iteration, episode_total_reward))

        # Exit condition
        if running_reward >= 200:
            break

# Close environment
env.close()

In [None]:
"""Test policy."""
# Create environment and reset it
env = gym.make("LunarLander-v3", continuous=False, render_mode="rgb_array")
obsevation, info = env.reset(seed=None)
total_reward = 0.0

# Load trained policy and enable test mode
device = torch.device("cpu")
policy = ActorModel().to(device)
policy.load_state_dict(torch.load("policy.pt", weights_only=True))
policy.eval()

# While the episode is not finished
finished = False
while not finished:

    # Add batch dimension and transform to tensor
    x = torch.from_numpy(np.expand_dims(obsevation, 0)).float()

    # Compute action from the policy
    action = policy(x.to(device))

    # ---> TODO: how to select an action
    action = 0

    # One step forward
    obsevation, reward, terminated, truncated, info = env.step(action)
    finished = terminated or truncated

    # Eventually render the environment (render mode should be "human")
    total_reward += reward
    env.render()

# Print reward
print("total_reward = {}".format(total_reward))
env.close()