# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [1]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting autorom~=0.4.2 (from autorom[accept-rom-license]~=0.4.2; extra == "accept-rom-license"->gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl.metadata (2.8 kB)
Collecting shimmy<1.0,>=0.1.0 (from shimmy[atari]<1.0,>=0.1.0; extra == "atari"->gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl.metadata (2.3 kB)
Collecting AutoROM.accep

### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
# Inherits from the neural network class
class Network(nn.Module):

  def __init__(self, state_size, action_size, seed=42):
      """
      Initialize the neural network.

      Parameters:
      - state_size (int): The dimension of the input state space.
      - action_size (int): The dimension of the output action space.
      - seed (int): A random seed for reproducibility.
      """

      # Pass in Network to activate inheritance from nn.Module
      super(Network, self).__init__()

      # Set the random seed for reproducibility
      self.seed = torch.manual_seed(seed)

      # Define the fully connected layers
      self.fc_1 = nn.Linear(state_size, 64)
      self.fc_2 = nn.Linear(64, 64)
      self.fc_3 = nn.Linear(64, action_size)

  # Propagate the input signal from the input layer to the output layer
  def forward(self, state):
    """
    Forward pass through the network.

    Parameters:
    - state (torch.Tensor): The input state vector.

    Returns:
    - x (torch.Tensor): The output action values (Q-values).
    """

    layers = [self.fc_1, self.fc_2]

    x = state
    for layer in layers:
      x = layer(x)
      x = F.relu(x)

    x = self.fc_3(x)

    return x


## Part 2 - Training the AI

### Setting up the environment

In [None]:
'''
  Download the challenge that the AI will train and solve [landing a lunar vehicle]
'''

import gymnasium as gym
env = gym.make('LunarLander-v2')
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n

print(f"State shape: {state_shape}")
print(f"State size: {state_size}")
print(f"Number of actions: {number_actions}")

State shape: (8,)
State size: 8
Number of actions: 4


### Initializing the hyperparameters

In [None]:
learning_rate = 5e-4
mini_batch_size = 100
discount_factor = 0.99 # gamma
replay_buffer_size = int(1e5) # size of the AI memory
interpolation_param = 1e-3 # tau

  and should_run_async(code)


### Implementing Experience Replay

# Lunar Lander V2 Details
## Actions:
  There are four discrete actions available:
  - 0: do nothing
  - 1: fire left orientation engine
  - 2: fire main engine
  - 3: fire right orientation engine

## States:
  The state is an 8-dimensional vector:
  - the coordinates of the lander in x & y
  - its linear velocities in x & y
  - its angle
  - its angular velocity
  - two booleans that represent whether each leg is in contact with the ground or not.

## Rewards:
After every step a reward is granted. The total reward of an episode is the sum of the rewards for all the steps within that episode.

  For each step, the reward:
  - is increased/decreased the closer/further the lander is to the landing pad.
  - is increased/decreased the slower/faster the lander is moving.
  - is decreased the more the lander is tilted (angle not horizontal).
  - is increased by 10 points for each leg that is in contact with the ground.
  - is decreased by 0.03 points each frame a side engine is firing.
  - is decreased by 0.3 points each frame the main engine is firing.

The episode receive an additional reward of -100 or +100 points for crashing or landing safely respectively.

An episode is considered a solution if it scores at least 200 points.

In [None]:
# Object is passed to signify no inheritance
class ReplayMemory(object):

  def __init__(self, capacity):
    # Use device GPU when available, otherwise uses CPU
    self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    self.capacity = capacity # max size of memory buffer
    self.memory = list() # stores experiences of the agent

  def push(self, event):
    self.memory.append(event)

    # Remove the oldest event if the memory buffer is full
    if len(self.memory) > self.capacity:
      self.memory.pop(0)

  def sample(self, batch_size):
    experiences = random.sample(self.memory, k=batch_size)
    # Create pytorch tensor containing the states
    # .float(): ensures data-type is float
    # .to(): moves states tensor into the dedicated storage device (cpu or gpu)
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device) # dones - a boolean saying at each timestep whether we are done or not playing in the environment

    return states, next_states,actions, rewards, dones


  and should_run_async(code)


### Implementing the DQN class

In [None]:
class Agent(object):

  def __init__(self, state_size, action_size):
    self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    self.state_size = state_size
    self.action_size = action_size
    self.local_qnetwork = Network(state_size, action_size).to(self.device)
    self.target_qnetwork = Network(state_size, action_size).to(self.device)
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr= learning_rate)
    self.replay_memory = ReplayMemory(replay_buffer_size)
    self.t_step = 0

  def step(self, state, action, reward, next_state, done):
    # Store the memory in the buffer
    self.replay_memory.push((state, action, reward, next_state, done))
    # Increase time step
    self.t_step += 1

    # Bring to zero if we have reached four steps
    self.t_step %= 4

    # Learn every four steps
    if self.t_step == 0 and (len(self.replay_memory.memory) > mini_batch_size):
      # take a sample of experiences from the replay memory buffer
      experiences = self.replay_memory.sample(mini_batch_size)

      # learn from experiences
      self.learn(experiences, discount_factor)

  def act(self, state, epsilon=0.0):
    # ensure state is a tensor
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)

    self.local_qnetwork.eval()

    # disable gradient calculations (memory efficiency)
    with torch.no_grad():
      action_values = self.local_qnetwork(state)

    self.local_qnetwork.train()

    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())

    return random.choice(np.arange(self.action_size))

  def learn(self, experiences, discount_factor):
    states, next_states, actions, rewards, dones = experiences

    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)

    q_targets = rewards + (discount_factor * next_q_targets * (1 - dones))

    q_expected = self.local_qnetwork(states).gather(1, actions)

    loss = F.mse_loss(q_expected, q_targets)

    self.optimizer.zero_grad()

    loss.backward()

    self.optimizer.step()

    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_param)

  def soft_update(self, local_model, target_model, interpolation_param):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(interpolation_param * local_param.data + (1.0 - interpolation_param) * target_param.data)


### Initializing the DQN agent

In [None]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [None]:
number_episodes = 2000
max_time_steps_per_episode = 1000
epsilon_start_value = 1.0
epsilon_end_value = 0.01
epsilon_decay_rate = 0.995
epsilon = epsilon_start_value
scores_on_100_episodes = deque(maxlen = 100)

for episode in range (1, number_episodes + 1):
  # Return rover to initial position
  state, _ = env.reset()

  # Initialize the score [cumulative reward]
  score = 0

  for t in range(max_time_steps_per_episode):
    action = agent.act(state, epsilon)
    next_state,reward, done, _, _ = env.step(action)
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward

    if done:
      break

  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_end_value, epsilon_decay_rate * epsilon)

  average_score = np.mean(scores_on_100_episodes)
  print(f'\rEpisode: {episode}\tAverage Score:{average_score:.2f}', end = "")

  if episode % 100 == 0:
      print(f'\rEpisode: {episode}\tAverage Score:{average_score:.2f}')

  if np.mean(scores_on_100_episodes) >= 200.0:
    print(f'\n Environment solved in {episode - 100} episodes!\tAverage Score:{average_score:.2f}')

    # [OPTIONAL] save the winning model
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')

    # exit training since we found a winning model
    break


Episode: 100	Average Score:-109.26
Episode: 200	Average Score:-7.47
Episode: 300	Average Score:93.17
Episode: 400	Average Score:181.07
Episode: 445	Average Score:200.55
 Environment solved in 345 episodes!	Average Score:200.55


## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

