# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m23.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting autorom~=0.4.2 (from autorom[accept-rom-license]~=0.4.2; extra == "accept-rom-license"->gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl.metadata (2.8 kB)
Collecting shimmy<1.0,>=0.1.0 (from shimmy[atari]<1.0,>=0.1.0; extra == "atari"->gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl.metadata (2.3 kB)
Collecting AutoROM.acce

### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:

class Network(nn.Module):
    """
    A neural network model used for Deep Q-Learning in environments like Lunar Lander.

    Attributes:
    ----------
    seed : torch.Generator
        A seed to ensure reproducibility.
    fc1 : nn.Linear
        The first fully connected layer that maps from the state space to a 64-dimensional space.
    fc2 : nn.Linear
        The second fully connected layer that maps from the 64-dimensional space to another 64-dimensional space.
    fc3 : nn.Linear
        The third fully connected layer that maps from the 64-dimensional space to the action space.
    """

    def __init__(self, state_size, action_size, seed=42):
        """
        Initializes the neural network with the given state size and action size.

        Parameters:
        ----------
        state_size : int
            The dimension of the input state space.
        action_size : int
            The dimension of the output action space (number of possible actions).
        seed : int, optional
            A seed for random number generation (default is 42).
        """
        super(Network, self).__init__()
        self.seed = torch.manual_seed(seed)  # Set the seed for reproducibility
        self.fc1 = nn.Linear(state_size, 64)  # First fully connected layer
        self.fc2 = nn.Linear(64, 64)          # Second fully connected layer
        self.fc3 = nn.Linear(64, action_size) # Third fully connected layer

    def forward(self, state):
        """
        Defines the forward pass of the network, mapping the input state to Q-values for each action.

        Parameters:
        ----------
        state : torch.Tensor
            The input state tensor representing the current state of the environment.

        Returns:
        -------
        torch.Tensor
            A tensor containing Q-values for each possible action.
        """
        x = self.fc1(state)  # Pass input state through the first layer
        x = F.relu(x)         # Apply ReLU activation
        x = self.fc2(x)       # Pass through the second layer
        x = F.relu(x)         # Apply ReLU activation again
        return self.fc3(x)    # Output the final Q-values from the third layer


## Part 2 - Training the AI

### Setting up the environment

In [None]:
import gymnasium as gym

# Create the Lunar Lander environment
env = gym.make('LunarLander-v2')

# Get the shape of the state space (e.g., (8,))
state_shape = env.observation_space.shape

# Get the size of the state space (e.g., 8)
state_size = env.observation_space.shape[0]

# Get the number of possible actions (e.g., 4)
number_actions = env.action_space.n

# Print the state shape, size, and number of actions
print('State shape:', state_shape)
print('State size:', state_size)
print('Number of actions:', number_actions)

State shape: (8,)
State size: 8
Number of actions: 4


### Initializing the hyperparameters

In [None]:
learning_rate =5e-4
minibatch_size =100
discount_factor = 0.99
replay_buffer_size = int(1e5)
interpolation_parameter =1e-3

  and should_run_async(code)


### Implementing Experience Replay

In [None]:

class ReplayMemory(object):
    """
    A class to implement experience replay memory for deep reinforcement learning.

    Attributes:
    ----------
    device : torch.device
        The device (GPU or CPU) where the computations will be performed.
    capacity : int
        The maximum number of experiences the replay memory can store.
    memory : list
        A list to store experiences as tuples of (state, action, reward, next_state, done).
    """

    def __init__(self, capacity):
        """
        Initializes the replay memory with a given capacity.

        Parameters:
        ----------
        capacity : int
            The maximum number of experiences the replay memory can hold.
        """
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = capacity
        self.memory = []

    def push(self, event):
        """
        Adds a new experience to the replay memory. If the memory exceeds its capacity,
        the oldest experience is removed.

        Parameters:
        ----------
        event : tuple
            A tuple representing an experience (state, action, reward, next_state, done).
        """
        self.memory.append(event)
        if len(self.memory) > self.capacity:
            del self.memory[0]

    def sample(self, batch_size):
        """
        Randomly samples a batch of experiences from the replay memory.

        Parameters:
        ----------
        batch_size : int
            The number of experiences to sample.

        Returns:
        -------
        tuple
            A tuple of tensors: (states, next_states, actions, rewards, dones).
        """
        # Randomly sample a batch of experiences
        experiences = random.sample(self.memory, k=batch_size)

        # Extract each component from the experiences and convert them to tensors
        states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
        rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
        next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)

        return states, next_states, actions, rewards, dones


### Implementing the DQN class

In [None]:
class Agent():
    """
    A Deep Q-Learning agent that interacts with an environment and learns from experiences.

    Attributes:
    ----------
    device : torch.device
        The device (GPU or CPU) for computations.
    state_size : int
        The dimension of the state space.
    action_size : int
        The number of possible actions.
    local_qnetwork : Network
        The Q-network used for action selection.
    target_qnetwork : Network
        The Q-network used to compute target Q-values.
    optimizer : torch.optim.Optimizer
        The optimizer for updating the local Q-network.
    memory : ReplayMemory
        The replay memory buffer for storing experiences.
    t_step : int
        A counter to control how often to update the network.
    """

    def __init__(self, state_size, action_size):
        """
        Initializes the agent with the given state size and action size.

        Parameters:
        ----------
        state_size : int
            The dimension of the state space.
        action_size : int
            The number of possible actions.
        """
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.local_qnetwork = Network(state_size, action_size).to(self.device)
        self.target_qnetwork = Network(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr=learning_rate)
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0

    def step(self, state, action, reward, next_state, done):
        """
        Saves experience in replay memory and triggers learning step every few steps.

        Parameters:
        ----------
        state : np.array
            The current state of the environment.
        action : int
            The action taken by the agent.
        reward : float
            The reward received after taking the action.
        next_state : np.array
            The next state of the environment.
        done : bool
            Whether the episode is finished.
        """
        # Store experience in replay memory
        self.memory.push((state, action, reward, next_state, done))

        # Update time step and check if it's time to learn
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            # Learn every 4 steps if there are enough samples in memory
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(minibatch_size)
                self.learn(experiences, discount_factor)

    def act(self, state, epsilon=0.):
        """
        Selects an action for the given state using an epsilon-greedy policy.

        Parameters:
        ----------
        state : np.array
            The current state of the environment.
        epsilon : float, optional
            The probability of selecting a random action (exploration). Default is 0.

        Returns:
        -------
        int
            The action selected by the agent.
        """
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()

        # Epsilon-greedy action selection
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def learn(self, experiences, discount_factor):
        """
        Updates the Q-network based on a batch of experiences.

        Parameters:
        ----------
        experiences : tuple
            A tuple of (states, next_states, actions, rewards, dones) sampled from memory.
        discount_factor : float
            The discount factor for future rewards (gamma).
        """
        states, next_states, actions, rewards, dones = experiences

        # Compute Q-targets for the next state
        next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
        q_targets = rewards + (discount_factor * next_q_targets * (1 - dones))

        # Compute expected Q-values from local model
        q_expected = self.local_qnetwork(states).gather(1, actions)

        # Compute loss
        loss = F.mse_loss(q_expected, q_targets)

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Soft update the target network
        self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

    def soft_update(self, local_model, target_model, interpolation_parameter):
        """
        Soft update model parameters from the local model to the target model.

        θ_target = τ*θ_local + (1 - τ)*θ_target

        Parameters:
        ----------
        local_model : torch.nn.Module
            The local model whose parameters are being copied from.
        target_model : torch.nn.Module
            The target model whose parameters are being updated.
        interpolation_parameter : float
            The interpolation parameter τ (tau), controlling the update rate.
        """
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)


### Initializing the DQN agent

In [None]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [None]:
# Hyperparameters
number_episodes = 2000
max_steps_per_episode = 1000
epsilon_st_values = 1.0
epsilon_end_values = 0.01
epsilon_decay_values = 0.995
epsilon = epsilon_st_values
scores_on_100_episodes = deque(maxlen=100)

for episode in range(1, number_episodes + 1):
    """
    Loop over the specified number of episodes to train the DQN agent.

    Parameters:
    ----------
    episode : int
        The current episode number.
    """
    state, _ = env.reset()  # Reset the environment to start a new episode
    score = 0  # Initialize the score for this episode

    for t in range(0, max_steps_per_episode):
        """
        Loop over the steps within an episode. The agent interacts with the environment, learns, and accumulates rewards.

        Parameters:
        ----------
        t : int
            The current time step within the episode.
        """
        action = agent.act(state, epsilon)  # Select an action using the epsilon-greedy policy
        next_state, reward, done, _, _ = env.step(action)  # Perform the action and observe the outcome
        agent.step(state, action, reward, next_state, done)  # Store the experience and update the agent's knowledge
        state = next_state  # Move to the next state
        score += reward  # Update the score with the received reward

        if done:
            break  # Exit the loop if the episode is done

    scores_on_100_episodes.append(score)  # Record the score for this episode
    epsilon = max(epsilon_end_values, epsilon_decay_values * epsilon)  # Decay epsilon for future episodes

    # Print progress
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end="")
    if episode % 100 == 0:
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))

    # Check if the environment is solved
    if np.mean(scores_on_100_episodes) >= 200.0:
        print('\rEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))
        torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')  # Save the trained model
        break


  and should_run_async(code)


Episode 100	Average Score: -158.99
Episode 200	Average Score: -89.75
Episode 300	Average Score: -35.42
Episode 400	Average Score: 18.46
Episode 500	Average Score: 86.03
Episode 600	Average Score: 145.69
Episode 700	Average Score: 197.30
Environment solved in 603 episodes!	Average Score: 200.69


## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

