Implement DQN for LunarLander

This lab is a modified verstion of a notebook from the Deep RL Course on HuggingFace.

In this notebook, we'll train a **Deep Q-Network (DQN) agent** to play an Atari game. The agent controls a spaceship, the Lunar Lander, to learn how to **land correctly on the Moon**.


### The environment

We will use the [LunarLander-v2](https://gymnasium.farama.org/environments/box2d/lunar_lander/) environment from Gymnasium. This environment is a classic rocket trajectory optimization problem. According to Pontryagin’s maximum principle, it is optimal to fire the engine at full throttle or turn it off. This is the reason why this environment has discrete actions: engine on or off.

In [None]:
%%html
<video controls autoplay><source src="https://huggingface.co/sb3/ppo-LunarLander-v2/resolve/main/replay.mp4" type="video/mp4"></video>

## Install dependencies and create a virtual screen 🔽

The first step is to install the dependencies, we’ll install multiple ones.

- `gymnasium[box2d]`: Contains the LunarLander-v2 environment
- `stable-baselines3[extra]`: The deep reinforcement learning library.


In [None]:
!apt install swig cmake

In [None]:
!pip install gymnasium[box2d]

In [None]:
!pip install stable-baselines3==2.0.0a5

During the notebook, we'll need to generate a replay video. To do so, with colab, **we need to have a virtual screen to be able to render the environment** (and thus record the frames).

Hence the following cell will install virtual screen libraries and create and run a virtual screen

In [None]:
!sudo apt-get update
!sudo apt-get install -y python3-opengl
!apt install ffmpeg
!apt install xvfb
!pip3 install pyvirtualdisplay

To make sure the new installed libraries are used, **sometimes it's required to restart the notebook runtime**. The next cell will force the **runtime to crash, so you'll need to connect again and run the code starting from here**. Thanks to this trick, **we will be able to run our virtual screen.**

In [None]:
import os
os.kill(os.getpid(), 9)

In [None]:
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

## Import the packages

In [None]:
import gymnasium as gym

from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.monitor import Monitor

## Create the LunarLander environment and understand how it works

### [The environment](https://gymnasium.farama.org/environments/box2d/lunar_lander/)

The goal is to train our agent, a [Lunar Lander](https://gymnasium.farama.org/environments/box2d/lunar_lander/), **to land correctly on the moon**. To do that, the agent needs to learn **to adapt its speed and position (horizontal, vertical, and angular) to land correctly.**

In [None]:
# We create our environment with gym.make("<name_of_the_environment>")
env = gym.make("LunarLander-v2")
env.reset()
print("_____OBSERVATION SPACE_____ \n")
print("Observation Space Shape", env.observation_space.shape)
print("Sample observation", env.observation_space.sample()) # Get a random observation

We see with `Observation Space Shape (8,)` that the observation is a vector of size 8, where each value contains different information about the lander:
- Horizontal pad coordinate (x)
- Vertical pad coordinate (y)
- Horizontal speed (x)
- Vertical speed (y)
- Angle
- Angular speed
- If the left leg contact point has touched the land (boolean)
- If the right leg contact point has touched the land (boolean)


In [None]:
print("\n _____ACTION SPACE_____ \n")
print("Action Space Shape", env.action_space.n)
print("Action Space Sample", env.action_space.sample()) # Take a random action

The action space (the set of possible actions the agent can take) is discrete with 4 actions available:

- Action 0: Do nothing,
- Action 1: Fire left orientation engine,
- Action 2: Fire the main engine,
- Action 3: Fire right orientation engine.

Reward function (the function that will gives a reward at each timestep):

After every step a reward is granted. The total reward of an episode is the **sum of the rewards for all the steps within that episode**.

For each step, the reward:

- Is increased/decreased the closer/further the lander is to the landing pad.
-  Is increased/decreased the slower/faster the lander is moving.
- Is decreased the more the lander is tilted (angle not horizontal).
- Is increased by 10 points for each leg that is in contact with the ground.
- Is decreased by 0.03 points each frame a side engine is firing.
- Is decreased by 0.3 points each frame the main engine is firing.

The episode receive an **additional reward of -100 or +100 points for crashing or landing safely respectively.**

An episode is **considered a solution if it scores at least 200 points.**

#### Vectorized Environment

- We create a vectorized environment (a method for stacking multiple independent environments into a single environment) of 16 environments, this way, **we'll have more diverse experiences during the training.**

In [None]:
# Create the environment
env = make_vec_env('LunarLander-v2', n_envs=16)

## Create the Model

Remember the goal: **being able to land the Lunar Lander to the Landing Pad correctly by controlling left, right and main orientation engine**. Based on this, let's build the algorithm we're going to use to solve this Problem.

To solve this problem, we're going to implement DQN from scratch.

In [None]:
import random
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, namedtuple

# Neural network model for Q-Learning
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Replay Buffer to store experience tuples
class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

    def add(self, state, action, reward, next_state, done):
        experience = self.experience(state, action, reward, next_state, done)
        self.memory.append(experience)

    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.FloatTensor([e.state for e in experiences]).to(device)
        actions = torch.LongTensor([e.action for e in experiences]).to(device)
        rewards = torch.FloatTensor([e.reward for e in experiences]).to(device)
        next_states = torch.FloatTensor([e.next_state for e in experiences]).to(device)
        dones = torch.FloatTensor([e.done for e in experiences]).to(device)

        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.memory)

# DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size, seed):
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        self.qnetwork_local = QNetwork(state_size, action_size).to(device)
        self.qnetwork_target = QNetwork(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=1e-4)

        # Replay memory
        self.memory = ReplayBuffer(buffer_size=200000, batch_size=128)

        # Hyperparameters
        self.gamma = 0.99  # discount factor
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.05  # minimum exploration rate
        self.epsilon_decay = 0.999  # decay rate for epsilon
        self.target_update_frequency = 1000  # how often to update the target network
        self.update_count = 0

    def step(self, state, action, reward, next_state, done):
        """Store experience in replay buffer and learn every few steps"""
        self.memory.add(state, action, reward, next_state, done)

        # Learn every time step if we have enough samples in the memory
        if len(self.memory) > self.memory.batch_size:
            self.learn()

        # Update target network periodically
        self.update_count += 1
        if self.update_count % self.target_update_frequency == 0:
            self.update_target_network()

    def select_action(self, state):
        """Select action using epsilon-greedy policy"""
        if random.random() < self.epsilon:
            return random.choice(np.arange(self.action_size))
        else:
            state = torch.FloatTensor(state).unsqueeze(0).to(device)
            with torch.no_grad():
                action_values = self.qnetwork_local(state)
            return np.argmax(action_values.cpu().data.numpy())

    def learn(self):
        """Sample a batch from the replay buffer and update the Q-Network"""
        states, actions, rewards, next_states, dones = self.memory.sample()

        # Get max predicted Q values (for next states) from target model
        next_q_values = self.qnetwork_target(next_states).detach().max(1)[0]
        target_q_values = rewards + (self.gamma * next_q_values * (1 - dones))

        # Get the Q values for the actions that were actually taken
        current_q_values = self.qnetwork_local(states).gather(1, actions.unsqueeze(1)).squeeze(1)

        # Compute the loss
        loss = nn.SmoothL1Loss()(current_q_values, target_q_values)

        # Minimize the loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        """Update the target network to match the local Q-network"""
        self.qnetwork_target.load_state_dict(self.qnetwork_local.state_dict())

    def decay_epsilon(self):
        """Decay the exploration rate"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)


## Train the DQN agent
Let's train our agent for 1,000,000 timesteps, don't forget to use GPU (on your local installation, Google Colab or similar). You will notice that experiments will take considerably longer than previous labs.

In [None]:
#check for available GPU
import torch
print("GPU Available:", torch.cuda.is_available())
print("Device Name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU")


In [None]:
# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Training the DQN Agent
env = gym.make("LunarLander-v2")
agent = DQNAgent(state_size=env.observation_space.shape[0], action_size=env.action_space.n, seed=0)

num_episodes = 2000
max_timesteps = 500

rewards_list = []


for episode in range(1, num_episodes + 1):
    state = env.reset()
    total_reward = 0
    for t in range(max_timesteps):
        action = agent.select_action(state)
        next_state, reward, done, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if done:
            break
    rewards_list.append(total_reward)


    # Decay epsilon after each episode
    agent.decay_epsilon()

    print(f"Episode {episode}/{num_episodes}, Total Reward: {total_reward:.2f}, Epsilon: {agent.epsilon:.2f}")

    # Save the model every 100 episodes
    if episode % 100 == 0:
        torch.save(agent.qnetwork_local.state_dict(), f"dqn_lunarlander_{episode}.pth")

# Close the environment
env.close()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import linregress

#rewards_list = np.random.normal(loc=0, scale=1, size=500).cumsum()
#episodes = np.arange(len(rewards_list))
#slope, intercept, r_value, p_value, std_err = linregress(episodes, rewards_list)


plt.figure(figsize=(12, 6))
plt.plot(rewards_list, label='Total Reward per Episode', color='blue')
# Anpassen der Plot-Grenzen und Erstellen von Platz für den Text unterhalb des Plots
plt.subplots_adjust(bottom=0.3)  # Vergrößert den unteren Rand des Plotbereichs

# Platzierung des Textes unterhalb des Plots
plt.figtext(0.5, 0.01,
            f'Gamma = {agent.gamma}\n'
            f'Epsilon = {agent.epsilon}\n'
            f'Epsilon_min = {agent.epsilon_min}\n'
            f'Epsilon_decay = {agent.epsilon_decay}\n'
            f'Target Update Frequency = {agent.target_update_frequency}\n'
            f'Update Count = {agent.update_count}',
            horizontalalignment='center', verticalalignment='bottom', fontsize=10)

print(agent.gamma)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Learning Curve of the DQN-Agent')
plt.legend()
plt.grid(True)
plt.show()


## Evaluate the agent
- Now that our Lunar Lander agent is trained, we need to **check its performance**.

**Note**: When you evaluate your agent, you should not use your training environment but create an evaluation environment.

In [None]:
# Evaluate the agent
eval_env = gym.make("LunarLander-v2")
eval_env.reset()

num_episodes = 10
rewards = []
agent.qnetwork_local.load_state_dict(torch.load('dqn_lunarlander_2000.pth'))

#Evaluate the model with 10 evaluation episodes
for _ in range(num_episodes):
    state = eval_env.reset()
    episode_reward = 0
    while True:
        # Greedy action selection for evaluation
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
        with torch.no_grad():
            action_values = agent.qnetwork_local(state_tensor)
        action = np.argmax(action_values.cpu().data.numpy())

        # Step the environment
        state, reward, done, _ = eval_env.step(action)
        episode_reward += reward
        if done:
            break
    rewards.append(episode_reward)

# Calculate mean and standard deviation of rewards
mean_reward = np.mean(rewards)
std_reward = np.std(rewards)

# Print the results
print(f"{mean_reward:.2f} +/- {std_reward:.2f}")

# Close the evaluation environment
eval_env.close()


In [None]:
# Evaluation with video creating and download

# Import necessary libraries
import gym
from gym.wrappers import RecordVideo
import numpy as np
import torch

# Create a new evaluation environment with video recording
video_path = "./videos"  # Directory to store the video
eval_env = gym.make("LunarLander-v2")
eval_env = RecordVideo(eval_env, video_path, episode_trigger=lambda episode_id: episode_id == 0)  # Record only the first episode

num_episodes = 10
rewards = []
agent.qnetwork_local.load_state_dict(torch.load('dqn_lunarlander_2000.pth'))

# Evaluate the model with 10 evaluation episodes
for episode in range(num_episodes):
    state = eval_env.reset()
    episode_reward = 0
    while True:
        # Greedy action selection for evaluation
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
        with torch.no_grad():
            action_values = agent.qnetwork_local(state_tensor)
        action = np.argmax(action_values.cpu().data.numpy())

        # Step the environment
        state, reward, done, _ = eval_env.step(action)
        episode_reward += reward
        if done:
            break
    rewards.append(episode_reward)
    print(f"Episode {episode + 1} - Reward: {episode_reward:.2f}")

# Calculate mean and standard deviation of rewards
mean_reward = np.mean(rewards)
std_reward = np.std(rewards)

# Print the results
print(f"Mean Reward: {mean_reward:.2f} +/- {std_reward:.2f}")

# Close the evaluation environment
eval_env.close()

# Notify where the video is saved
print(f"Video saved at: {video_path}")

# Download the video file (assuming Colab)
from google.colab import files
files.download(f"{video_path}/rl-video-episode-0.mp4")

