# Graded lab: Implement DQN for LunarLander

This lab is a modified verstion of a notebookfrom the Deep RL Course on HuggingFace.

In this notebook, you'll train your **Deep Q-Network (DQN) agent** to play an Atari game. Your agent controls a spaceship, the Lunar Lander, to learn how to **land correctly on the Moon**.

*All your answers should be written in this notebook. You shouldn’t need to write or modify any other files. The parts of code that need to be changed as labelled as TODOs in the comments. You should execute every block of code to not miss any dependency.*

### The environment

We will use the [LunarLander-v2](https://gymnasium.farama.org/environments/box2d/lunar_lander/) environment from Gymnasium. This environment is a classic rocket trajectory optimization problem. According to Pontryagin’s maximum principle, it is optimal to fire the engine at full throttle or turn it off. This is the reason why this environment has discrete actions: engine on or off.

In [2]:
%%html
<video controls autoplay><source src="https://huggingface.co/sb3/ppo-LunarLander-v2/resolve/main/replay.mp4" type="video/mp4"></video>

### Note on HuggingFace

You can easily find the HuggingFace original notebook which uses the [Stable-Baselines3](https://stable-baselines3.readthedocs.io/en/master/). This library provides a set of reliable implementations of reinforcement learning algorithms in PyTorch.

The Hugging Face Hub 🤗 works as a central place where anyone can share and explore models and datasets. It has versioning, metrics, visualizations and other features that will allow you to easily collaborate with others.

You can see here all the Deep reinforcement Learning models available here https://huggingface.co/models?pipeline_tag=reinforcement-learning&sort=downloads

## Install dependencies and create a virtual screen 🔽

The first step is to install the dependencies, we’ll install multiple ones.

- `gymnasium[box2d]`: Contains the LunarLander-v2 environment
- `stable-baselines3[extra]`: The deep reinforcement learning library.


In [3]:
!apt install swig cmake

zsh:1: command not found: apt


In [4]:
!pip install gymnasium[box2d]

zsh:1: no matches found: gymnasium[box2d]


In [5]:
!pip install stable-baselines3==2.0.0a5



During the notebook, we'll need to generate a replay video. To do so, with colab, **we need to have a virtual screen to be able to render the environment** (and thus record the frames).

Hence the following cell will install virtual screen libraries and create and run a virtual screen

In [None]:
!sudo apt-get update
!sudo apt-get install -y python3-opengl
!apt install ffmpeg
!apt install xvfb
!pip3 install pyvirtualdisplay

Password:
sudo: a password is required
Password:

To make sure the new installed libraries are used, **sometimes it's required to restart the notebook runtime**. The next cell will force the **runtime to crash, so you'll need to connect again and run the code starting from here**. Thanks to this trick, **we will be able to run our virtual screen.**

In [None]:
import os
os.kill(os.getpid(), 9)

In [None]:
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

## Import the packages

In [None]:
import gymnasium as gym

from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.monitor import Monitor

## Create the LunarLander environment and understand how it works

### [The environment](https://gymnasium.farama.org/environments/box2d/lunar_lander/)

The goal is to train our agent, a [Lunar Lander](https://gymnasium.farama.org/environments/box2d/lunar_lander/), **to land correctly on the moon**. To do that, the agent needs to learn **to adapt its speed and position (horizontal, vertical, and angular) to land correctly.**

In [None]:
# We create our environment with gym.make("<name_of_the_environment>")
env = gym.make("LunarLander-v2")
env.reset()
print("_____OBSERVATION SPACE_____ \n")
print("Observation Space Shape", env.observation_space.shape)
print("Sample observation", env.observation_space.sample()) # Get a random observation

We see with `Observation Space Shape (8,)` that the observation is a vector of size 8, where each value contains different information about the lander:
- Horizontal pad coordinate (x)
- Vertical pad coordinate (y)
- Horizontal speed (x)
- Vertical speed (y)
- Angle
- Angular speed
- If the left leg contact point has touched the land (boolean)
- If the right leg contact point has touched the land (boolean)


In [None]:
print("\n _____ACTION SPACE_____ \n")
print("Action Space Shape", env.action_space.n)
print("Action Space Sample", env.action_space.sample()) # Take a random action

The action space (the set of possible actions the agent can take) is discrete with 4 actions available:

- Action 0: Do nothing,
- Action 1: Fire left orientation engine,
- Action 2: Fire the main engine,
- Action 3: Fire right orientation engine.

Reward function (the function that will gives a reward at each timestep):

After every step a reward is granted. The total reward of an episode is the **sum of the rewards for all the steps within that episode**.

For each step, the reward:

- Is increased/decreased the closer/further the lander is to the landing pad.
-  Is increased/decreased the slower/faster the lander is moving.
- Is decreased the more the lander is tilted (angle not horizontal).
- Is increased by 10 points for each leg that is in contact with the ground.
- Is decreased by 0.03 points each frame a side engine is firing.
- Is decreased by 0.3 points each frame the main engine is firing.

The episode receive an **additional reward of -100 or +100 points for crashing or landing safely respectively.**

An episode is **considered a solution if it scores at least 200 points.**

#### Vectorized Environment

- We create a vectorized environment (a method for stacking multiple independent environments into a single environment) of 16 environments, this way, **we'll have more diverse experiences during the training.**

In [None]:
# Create the environment
env = make_vec_env('LunarLander-v2', n_envs=16)

## Create the Model

Remember the goal: **being able to land the Lunar Lander to the Landing Pad correctly by controlling left, right and main orientation engine**. Based on this, s build the algorithm we're going to use to solve this Problem.

To solve this problem, you're going to implement DQN from scratch.

In [None]:
#### TODO: Define your DQN agent from scratch!

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

#####################################
# Q-Network Definition
#####################################
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

#####################################
# Replay Buffer
#####################################
class ReplayBuffer:
    def __init__(self, buffer_size=100000):
        self.buffer = deque(maxlen=buffer_size)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size=64):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        return (np.array(states, dtype=np.float32),
                np.array(actions, dtype=np.int64),
                np.array(rewards, dtype=np.float32),
                np.array(next_states, dtype=np.float32),
                np.array(dones, dtype=np.float32))

    def __len__(self):
        return len(self.buffer)

#####################################
# DQN Agent
#####################################
class DQNAgent:
    def __init__(
        self,
        state_dim,
        action_dim,
        gamma=0.99,
        lr=1e-3,
        batch_size=64,
        buffer_size=100000,
        min_buffer_size=1000,
        epsilon_start=1.0,
        epsilon_end=0.01,
        epsilon_decay=50000,  # Steps over which epsilon decays
        tau=1e-3,  # For soft update of target network
        device='cpu'
    ):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.lr = lr
        self.batch_size = batch_size
        self.min_buffer_size = min_buffer_size
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.tau = tau
        self.device = device

        self.q_network = QNetwork(state_dim, action_dim).to(device)
        self.target_network = QNetwork(state_dim, action_dim).to(device)
        self.target_network.load_state_dict(self.q_network.state_dict())

        self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.lr)
        self.replay_buffer = ReplayBuffer(buffer_size)

        self.epsilon = epsilon_start
        self.global_step = 0

    def select_action(self, state):
        # Epsilon-greedy policy
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)
        else:
            state_t = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            with torch.no_grad():
                q_values = self.q_network(state_t)
            return q_values.argmax(dim=1).item()

    def store_transition(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)

    def update_epsilon(self):
        # Linear decay of epsilon
        self.epsilon = max(self.epsilon_end, self.epsilon - (self.epsilon_start - self.epsilon_end) / self.epsilon_decay)

    def soft_update(self):
        # Soft update target network parameters
        for target_param, local_param in zip(self.target_network.parameters(), self.q_network.parameters()):
            target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)

    def train_step(self):
        if len(self.replay_buffer) < self.min_buffer_size:
            return

        # Sample from replay buffer
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

        states_t = torch.FloatTensor(states).to(self.device)
        actions_t = torch.LongTensor(actions).to(self.device).unsqueeze(1)
        rewards_t = torch.FloatTensor(rewards).to(self.device).unsqueeze(1)
        next_states_t = torch.FloatTensor(next_states).to(self.device)
        dones_t = torch.FloatTensor(dones).to(self.device).unsqueeze(1)

        # Compute current Q values
        q_values = self.q_network(states_t).gather(1, actions_t)

        # Compute next Q values from target network
        with torch.no_grad():
            next_q_values = self.target_network(next_states_t).max(dim=1, keepdim=True)[0]

        # Compute the target Q values
        target_q_values = rewards_t + (self.gamma * next_q_values * (1 - dones_t))

        # Compute loss
        loss = nn.MSELoss()(q_values, target_q_values)

        # Gradient descent
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Update target network
        self.soft_update()

        # Update epsilon
        self.update_epsilon()

        self.global_step += 1

## Optimization of DQN Agent


In [1]:
# Reward shaping, Double DQN, and Dueling DQN Optimizations

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

#####################################
# Reward Shaping for Smooth Landings
#####################################
def reward_shaping(reward, state, next_state):
    """
    Add custom penalties or rewards to encourage smooth landings:
    - Penalize sharp angle changes (angular velocity).
    - Penalize horizontal drift (horizontal speed).
    """
    angle_penalty = -abs(next_state[4]) * 0.5  # Penalize large angles
    horizontal_speed_penalty = -abs(next_state[2]) * 0.3  # Penalize horizontal drift
    reward += angle_penalty + horizontal_speed_penalty
    return reward

#####################################
# Dueling Q-Network
#####################################
class DuelingQNetwork(nn.Module):
    """
    Dueling architecture to separate state value and action advantage.
    """
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(DuelingQNetwork, self).__init__()
        self.feature_layer = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU()
        )
        
        # State value stream
        self.value_stream = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
        # Advantage stream
        self.advantage_stream = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        features = self.feature_layer(x)
        values = self.value_stream(features)
        advantages = self.advantage_stream(features)
        
        # Combine value and advantage streams
        q_values = values + (advantages - advantages.mean())
        return q_values

#####################################
# Update DQN Agent for Double DQN
#####################################
class DQNAgentImproved(DQNAgent):
    """
    Updated DQN agent to use Double DQN and Dueling Q-Network.
    """
    def __init__(self, *args, dueling=True, **kwargs):
        super(DQNAgentImproved, self).__init__(*args, **kwargs)
        # Replace Q-Network with Dueling Q-Network if specified
        if dueling:
            self.q_network = DuelingQNetwork(self.state_dim, self.action_dim).to(self.device)
            self.target_network = DuelingQNetwork(self.state_dim, self.action_dim).to(self.device)
            self.target_network.load_state_dict(self.q_network.state_dict())
    
    def train_step(self):
        """
        Update using Double DQN to reduce overestimation bias.
        """
        if len(self.replay_buffer) < self.min_buffer_size:
            return

        # Sample from replay buffer
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

        states_t = torch.FloatTensor(states).to(self.device)
        actions_t = torch.LongTensor(actions).to(self.device).unsqueeze(1)
        rewards_t = torch.FloatTensor(rewards).to(self.device).unsqueeze(1)
        next_states_t = torch.FloatTensor(next_states).to(self.device)
        dones_t = torch.FloatTensor(dones).to(self.device).unsqueeze(1)

        # Current Q-values
        q_values = self.q_network(states_t).gather(1, actions_t)

        # Double DQN Target Q-values
        with torch.no_grad():
            next_actions = self.q_network(next_states_t).argmax(dim=1, keepdim=True)
            next_q_values = self.target_network(next_states_t).gather(1, next_actions)

        target_q_values = rewards_t + self.gamma * next_q_values * (1 - dones_t)

        # Loss calculation
        loss = nn.MSELoss()(q_values, target_q_values)

        # Gradient descent
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Update target network
        self.soft_update()

        # Update epsilon
        self.update_epsilon()


NameError: name 'DQNAgent' is not defined

## Train the DQN agent
- Let's train our agent for 1,000,000 timesteps, don't forget to use GPU (on your local installation, Google Colab or similar). You will notice that experiments will take considerably longer than previous labs.

#### Solution

In [None]:
import torch
import numpy as np
from tqdm import trange

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Initialize the agent with parameters (these are examples, feel free to adjust)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQNAgentImproved(
    state_dim=state_dim,
    action_dim=action_dim,
    gamma=0.99,
    lr=1e-3,
    batch_size=64,
    buffer_size=100000,
    min_buffer_size=100, # 10000 Replay buffer warm-up
    epsilon_start=1.0,
    epsilon_end=0.01,
    epsilon_decay=500000,      # Decay epsilon over a large number of steps
    tau=1e-3,
    device=device,
    dueling=True # activate dueling dqn
)

num_timesteps = 10000 #1_000_000
obs = env.reset()

# Variables to track rewards and performance
episode_rewards = []
current_rewards = np.zeros(env.num_envs, dtype=np.float32)

# We'll use a tqdm progress bar for convenience
for timestep in trange(num_timesteps, desc="Training steps"):
    # Select actions for each environment in the vectorized set
    actions = []
    for i in range(env.num_envs):
        actions.append(agent.select_action(obs[i]))
    actions = np.array(actions)

    # Step through the vectorized environment
    next_obs, rewards, dones, infos = env.step(actions)

    shaped_rewards = []
    for i in range(env.num_envs):
        shaped_reward = reward_shaping(rewards[i], obs[i], next_obs[i])
        shaped_rewards.append(shaped_reward)
        agent.store_transition(obs[i], actions[i], shaped_reward, next_obs[i], dones[i])

    # Perform a training step of the agent
    agent.train_step()

    # Update observation
    obs = next_obs

    # Accumulate rewards for each environment
    current_rewards += shaped_rewards

    # When an episode finishes in any environment, reset that environment and record rewards
    for i, done in enumerate(dones):
        if done:
            episode_rewards.append(current_rewards[i])
            current_rewards[i] = 0.0

print("\nTraining finished!")

# After training, you could save the model if desired
torch.save(agent.q_network.state_dict(), "dqn_lunarlander.pth")


## Evaluate the agent
- Now that our Lunar Lander agent is trained, we need to **check its performance**.

**Note**: When you evaluate your agent, you should not use your training environment but create an evaluation environment.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import gymnasium

# Create a new environment for evaluation
eval_env = gymnasium.make("LunarLander-v2")
num_eval_episodes = 10
all_episode_rewards = []

for _ in range(num_eval_episodes):
    obs, info = eval_env.reset()
    done = False
    truncated = False
    episode_reward = 0.0

    while not (done or truncated):
        action = agent.select_action(obs)
        obs, reward, done, truncated, info = eval_env.step(action)
        episode_reward += reward

    all_episode_rewards.append(episode_reward)

mean_reward = np.mean(all_episode_rewards)
std_reward = np.std(all_episode_rewards)

print(f"Mean Reward over {num_eval_episodes} episodes: {mean_reward:.2f} +/- {std_reward:.2f}")

# ---------------------------
# Plotting
# ---------------------------

# Plot the training episode rewards
plt.figure(figsize=(12, 6))
plt.plot(episode_rewards, label='Episode Rewards')
plt.title('Learning Curve (Episode Rewards Over Time)')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True)
plt.legend()
plt.show()

# Plot a smoothed version of the training curve using a moving average
window_size = 50  # Adjust this window as needed
if len(episode_rewards) > window_size:
    smooth_rewards = np.convolve(episode_rewards, np.ones(window_size)/window_size, mode='valid')

    plt.figure(figsize=(12, 6))
    plt.plot(smooth_rewards, label=f'Moving Average (window={window_size})', color='orange')
    plt.title('Smoothed Learning Curve')
    plt.xlabel('Episode')
    plt.ylabel('Smoothed Total Reward')
    plt.grid(True)
    plt.legend()
    plt.show()

# Plot the distribution of evaluation episode rewards
plt.figure(figsize=(12,6))
plt.hist(all_episode_rewards, bins=10, alpha=0.7, label='Evaluation Rewards')
plt.axvline(x=mean_reward, color='r', linestyle='dashed', linewidth=2, label=f'Mean Reward = {mean_reward:.2f}')
plt.title('Distribution of Evaluation Episode Rewards')
plt.xlabel('Total Reward')
plt.ylabel('Count')
plt.legend()
plt.grid(True)
plt.show()


# Create Video of the result



In [None]:
import os
from gymnasium.wrappers import RecordVideo

# Create a directory for the videos if it doesn't exist
if not os.path.exists('./videos'):
    os.makedirs('./videos')

# Create a new evaluation environment with video recording
eval_env = gym.make("LunarLander-v2", render_mode="rgb_array")
eval_env = RecordVideo(eval_env, video_folder='./videos', episode_trigger=lambda episode_id: True)

obs, info = eval_env.reset()
done = False
truncated = False

while not (done or truncated):
    # Select an action using the trained agent (no epsilon-greedy during evaluation)
    action = agent.select_action(obs)
    obs, reward, done, truncated, info = eval_env.step(action)

eval_env.close()
print("Video recorded! Check the './videos' folder for the output.")
