# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing [Gymnasium](https://gymnasium.farama.org/index.html)

In [1]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting autorom~=0.4.2 (from autorom[accept-rom-license]~=0.4.2; extra == "accept-rom-license"->gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl.metadata (2.8 kB)
Collecting shimmy<1.0,>=0.1.0 (from shimmy[atari]<1.0,>=0.1.0; extra == "atari"->gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl.metadata (2.3 kB)
Collecting AutoROM.accep

### Importing the libraries

In [2]:
import os
import random
import numpy as np
import torch # To build and train the AI
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [3]:
class Network(nn.Module):
  def __init__(self, state_size, action_size, seed = 42):
    super(Network, self).__init__() # makes sure the initialization of the parent nn.Module is called
    self.seed = torch.manual_seed(seed)
    self.fc1 = nn.Linear(state_size, 64) # the number 64 (neurons) is the result of many experiments
    self.fc2 = nn.Linear(64, 64)
    self.fc3 = nn.Linear(64, action_size)

  def forward(self, state):
    x = self.fc1(state) # taking the current state, fc1 returns the first fully connected layer (x)
    x = F.relu(x) # rectifier activation function, activate the signal from input to fc1 layer
    x = self.fc2(x)
    x = F.relu(x) ## x Fully activated using relu function
    return self.fc3(x)


## Part 2 - Training the AI

### Setting up the environment

In [4]:
import gymnasium as gym
env = gym.make('LunarLander-v2')
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


### Initializing the hyperparameters

In [5]:
learning_rate = 5e-4 # After many experimentation, no rule of thumb
minibatch_size = 100 # number of observations in one step of the training to update model params. based on practive
gamma = 0.99 # discount factor, more future concerned with closer values to 1
replay_buffer_size: int = int(1e5) # size of the replay buffer (the memory of the AI)
interpolation_parameter = 1e-3 # interpolation param used for the training

### Implementing Experience Replay

In [6]:
# Memory replay class
class ReplayMemory(object):
  def __init__(self, capacity):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.capacity = capacity # passed later, max mem size of the buffer
    self.memory = [] # stores experiences [state, action, reward, next_state, isDone]

  # Add experiences (event) to the replay memory buffer
  def push(self, event):
    self.memory.append(event)
    if len(self.memory) > self.capacity:
      # remove oldest event
      del self.memory[0]
  # A method that will randomly select a batch of experiences(events) from the memory
  def sample(self, batch_size):
    experiences = random.sample(self.memory, batch_size)
    states = np.vstack([e[0] for e in experiences if e is not None])
    # Convert the states to PyTorch tensors
    states = torch.from_numpy(states).float().to(self.device) # float() to make sure only floats enter the device (cpu or gpu). basically stacking states together as torch tensor and moved to the device
    # Move actions to the device
    actions = np.vstack([e[1] for e in experiences if e is not None])
    actions = torch.from_numpy(actions).long().to(self.device)

    # Move rewards to the device
    rewards = np.vstack([e[2] for e in experiences if e is not None])
    rewards = torch.from_numpy(rewards).float().to(self.device)

    # Move next states to the device
    next_states = np.vstack([e[3] for e in experiences if e is not None])
    next_states = torch.from_numpy(next_states).float().to(self.device)

    # Move isDone to the device
    dones = np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8) # uint8 to make sure only integer (bolean 0 and 1) enter the device (cpu or gpu)
    dones = torch.from_numpy(dones).float().to(self.device)

    return states, next_states, actions, rewards, dones

  and should_run_async(code)


### Implementing the DQN class

The agent defines the behavious of an agent that interacts with our space environment using a deep q-network. While it is interacting it maintains two network local and target network. The local q network selects the action and the target network will calculate the target q value that will be used for the training of the local q n/w.
The double q n/w setup will stabilize the learning process. The soft_update method will update the target the q n/w parameters by blending them with those of the local q n/w to prevent the abrupt changes with might destabilize the training. The act method will help the agent pickup up an action based on the current understanding of the optimal policy, those actions will be returned from the local q n/w, that will forward propagate the state to return the aciton value. Following episolon greedy policy, it will return the final action (exploration mechanism - leading to a improved learing).
The learn method uses experiences to update the local q n/w q-values towards the target q values.

In [7]:
# AI or Agent with state_size (8) and action_size (4). The agent defines the behavious of an agent that interacts with our space environment using a deep q-network
class Agent():
  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size
    self.action_size = action_size
    self.local_qnetwork = Network(state_size, action_size).to(self.device) # Init and move to the device
    self.target_qnetwork = Network(state_size, action_size).to(self.device)
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr=learning_rate) # take weights of local q network for better and better actions to take
    # Memory of the AI initialized here
    self.memory = ReplayMemory(replay_buffer_size)
    # initialize the time step in which moment we learn and update the parameters
    self.t_step = 0

  # Step method will store experiences and decide when to learn from them. It takes decomposed experience
  def step(self, state, action, reward, next_state, done):
    # Save experience in replay memory
    self.memory.push((state, action, reward, next_state, done))
    # decides when to learn from experience - increment timestamp counter and reset it every four steps to every four steps
    self.t_step = (self.t_step + 1) % 4
    # Learn every at point of every four steps - when step is reset (0)
    if self.t_step == 0:
      # If enough samples are available in memory, get random subset and learn
      # We learn on minibatch of observations instead of one
      if len(self.memory.memory) > minibatch_size:
        # sample 100 experiences from the memory
        experiences = self.memory.sample(minibatch_size)
        self.learn(experiences, gamma)

  # That selects an action based on a given state (epsilon-greedy action policy)
  def act(self, state, epsilon = 0.):
    # Convert the state to a PyTorch tensor
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device) # unsqueeze defines in which batch it happened
    #
    self.local_qnetwork.eval() # eval() coming from nn.Module. Sets our q network in evaluation mode

    # We forward path this state through our local q network to get the action value
    with torch.no_grad(): # make sure no gradient computation is enabled
      action_values = self.local_qnetwork(state)
    self.local_qnetwork.train() # train() coming from nn.Module. Sets our q network in training
    # Epsilon action selection policy: if random number is larger the epsilon, we select the action with highest Q value
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy()) # .cpu() send that to CPU as it is a simple operation. data().numpy() used as argmax expects a numpy format of data
    else:
      return random.choice(np.arange(self.action_size)) # pick up a random action

  # Learning method - update the agent's q values based on our simple experiences
  def learn(self, experiences, gamma):
    states, next_states, actions, rewards, dones = experiences
    # Get the max predicted q values for the next states from the target network
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
    q_targets = rewards + (gamma * next_q_targets * (1 - dones)) # Gives up the q-target for the current state
    # Get the expected q values from the local network
    q_expected = self.local_qnetwork(states).gather(1, actions)
    # Compute the loss
    loss = F.mse_loss(q_expected, q_targets) # mean square error loss
    # Minimize the loss by back propagating the loss to update the model parameters, leading to a better action selection poslicy
    self.optimizer.zero_grad() # reset by zeroing the gradient
    loss.backward()
    # Update the parameters by taking a single step optimization
    self.optimizer.step()

    # Update the target n/w parameters with those of the local n/w
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

  def soft_update(self, local_model, target_model, interpolation_parameter):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()): # zip() method provides parameters
      # Soft update
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)

### Initializing the DQN agent

In [8]:
# Initialize the Agent. Finally we create the AI agent
agent = Agent(state_size, number_actions)

### Training the DQN agent
The agent is create with empty brain and we have to train it

In [9]:
number_episodes = 2000 # the max number of episodes over which we train our agent
maximum_number_timesteps_per_episode= 1000 # the max of time steps per episode. In any atempt on landing, there is going to be max 1000 time steps
epsilon_starting_value = 1.0 # the starting epsilon which we let it to decay to test other values
epsilon_ending_value = 0.01
epsilon_decay_rate = 0.995 # 0.995, 0.995 * 0.995, ...
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen = 100) # stores the last 100 scores

for episode in range(1, number_episodes + 1):
  state, _ = env.reset() # Reset the env with intial state at the begining of every episode. discarded with _
  score = 0 # Initialize the score
  for timestep in range(maximum_number_timesteps_per_episode):
    action = agent.act(state, epsilon) # Select an action from the agent. Action taken and the agent lands in a new state
    next_state, reward, done, _, _ = env.step(action) # Send the action to the environment
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward
    if done:
      break
  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon_decay_rate * epsilon) # decay the epsilon for epsilon greedy policy without crossing the end value
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end="") # \r a carriage return
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))
  if np.mean(scores_on_100_episodes) >= 200.0:
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break

Episode 100	Average Score: -173.30
Episode 200	Average Score: -99.93
Episode 300	Average Score: -33.09
Episode 400	Average Score: 103.15
Episode 500	Average Score: 143.10
Episode 600	Average Score: 197.55
Episode 602	Average Score: 201.38
Environment solved in 502 episodes!	Average Score: 201.38


## Part 3 - Visualizing the results

In [10]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state) # only act is used
        state, reward, done, _, _ = env.step(action.item()) # get the next step. We don't see the step method from the agent, as the agent is trained
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()



## CHATGPT:
Can you please build a Deep Q-Learning model for Lunar Landing environment? Please implement it in PyTorch and please also include the code that generates the video of the trained agent at the end.