# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [28]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


### Importing the libraries

In [29]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [30]:
#Network is the brain of the AI
class Network(nn.Module):
  def __init__(self, state_size, action_size, seed = 42): #state size is the number of inputs in a state
    super(Network, self).__init__() #this is done in order to activate the inheritance
    self.seed = torch.manual_seed(seed) #this is to generate some random vectors
    #now we start with the architecture
    #fc1 represents the first full connection layer between the input layer and the first fully connected layer
    #first argument is the number of neurons in the input layer which is going to be the state size
    self.fc1 = nn.Linear(state_size, 64) #the optimal number of neurons 64 takes trial and error to find out
    #for the second fc, the first argumnet will be the number of neurons in the fully connected layer which is 64
    self.fc2 = nn.Linear(64, 64) #through trial and error it was found that the best wayy is having another/ a second fully connected layer
    #again fc3, will have the first argument as the number of the neurons in the fully connected layer which is basically the second argument of the
    #previous step
    self.fc3 = nn.Linear(64,action_size ) #the best architecture in this case contains two intermediate fully connected layers between the input layer and the output layer

    #Now we built the forward method inside this network class which will forward propagate the signal from the input layer to the output layer through our two fully connected layers

  def forward(self, state):
      #First we will propagate teh signal from the input layer to the first fully connected layer
      # x = self.fc1(state) #x contains what is returned by self.fc1(state)
      # #Now we need to activate teh signal thanks to the rectifier activation function
      # x = F.relu(x) #this is the rectifier activation function
      # #The above two lines of code propagate the signal from the input layer to the first fully connected layer with a rectifier activation function
      # x = self.fc2(x)
      # x = F.relu(x)
      # #The above two lines of code propagate the signal from the first fully connected layer to the second fully connected layer with a rectifier activation function

      # #fc3 is fully connected with our rectifier activation function
      # return self.fc3(x)
    x = self.fc1(state)
    x = F.relu(x)
    x = self.fc2(x)
    x = F.relu(x)
    return self.fc3(x)


## Part 2 - Training the AI

### Setting up the environment

In [31]:
import gymnasium as gym
env = gym.make('LunarLander-v3') # The Lunar Lander environment was upgraded to v3
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape) #vector of 8 elements so it is of the form (8, 0)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


### Initializing the hyperparameters

In [32]:
learning_rate = 5e-4 #the best value attained after experimentation
minibatch_size = 100 #this refers to the number of observations used in one step of training to update the model parameters
discount_factor = 0.99 #discount factor represents the present value of the future rewards so if we have a low discount factor close to zero that will make the agent shorsighted by
#considering only the current rewards. However if we have a discount factor close to 1, that will make the agent consider the future rewards in its accumulation of the total reward
#and we will get the best performance with a good consideration of future rewards
replay_buffer_size = int(1e5) #the size of the memory of the AI meaning how many experiences including state, action, reward, next date and whether done or not in the memory of the agent
#the training to sample and break correlations in the observation sequences. Purpose is to stabilize and improve the training process
#we choose to store 100000 experiences in the memory

#final parameter which is the interpolation parameter for the training so that is the parameter which will be used in the sub update of the target networks
interpolation_parameter = 1e-3

### Implementing Experience Replay

In [33]:
class ReplayMemory(object):

  def __init__(self, capacity): #self to refer to the object and capacity for the capacity of the memory
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #if we want to make the training faster and more performance using a gpu
    self.capacity = capacity
    self.memory = []


  def push(self, event): #this will add experiences to the ReplayMemory buffer while also checking we dont exceed the capacity
    self.memory.append(event)
    # The following line is changed to check the length of self.memory
    if len(self.memory) > self.capacity:
      del self.memory[0]

  #the next method is going to be a sample method that will randomly select a batch of experiences from the memory buffer
  def sample(self, batch_size):
    experiences = random.sample(self.memory, k=batch_size) #an experience contains the state, the action, the reward, the next state
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device) #we need to convert these stack of states into pytorch tensors and we do this using the pyTorch library
    #we need to make sure that the data type of the tensor is only float
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)#np.uint8 datatype is used to represent boolean values before we convert them to float
    return states, next_states, actions, rewards, dones

### Implementing the DQN class

In [34]:
class Agent():

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size
    self.action_size = action_size
    self.local_qnetwork = Network(state_size, action_size).to(self.device)
    self.target_qnetwork = Network(state_size, action_size).to(self.device)
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate)
    self.memory = ReplayMemory(replay_buffer_size)
    self.t_step = 0

  def step(self, state, action, reward, next_state, done):
    self.memory.push((state, action, reward, next_state, done))
    self.t_step = (self.t_step + 1) % 4
    if self.t_step == 0:
      if len(self.memory.memory) > minibatch_size:
        experiences = self.memory.sample(100)
        self.learn(experiences, discount_factor)

  def act(self, state, epsilon = 0.):
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
    self.local_qnetwork.eval()
    with torch.no_grad():
      action_values = self.local_qnetwork(state)
    self.local_qnetwork.train()
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())
    else:
      return random.choice(np.arange(self.action_size))

  def learn(self, experiences, discount_factor):
    states, next_states, actions, rewards, dones = experiences
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
    q_targets = rewards + discount_factor * next_q_targets * (1 - dones)
    q_expected = self.local_qnetwork(states).gather(1, actions)
    loss = F.mse_loss(q_expected, q_targets)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

  def soft_update(self, local_model, target_model, interpolation_parameter):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)

### Initializing the DQN agent

In [35]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [36]:
number_episodes = 2000
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value  = 1.0
epsilon_ending_value  = 0.01
epsilon_decay_value  = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen = 100)

for episode in range(1, number_episodes + 1):
  state, _ = env.reset()
  score = 0
  for t in range(maximum_number_timesteps_per_episode):
    action = agent.act(state=state, epsilon=epsilon)
    next_state, reward, done, _, _ = env.step(action)
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward
    if done:
      break
  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "")
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))
  if np.mean(scores_on_100_episodes) >= 200.0:
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break

Episode 100	Average Score: -179.67
Episode 200	Average Score: -83.06
Episode 300	Average Score: 26.65
Episode 400	Average Score: 69.34
Episode 500	Average Score: 92.63
Episode 600	Average Score: 171.48
Episode 668	Average Score: 200.48
Environment solved in 568 episodes!	Average Score: 200.48


## Part 3 - Visualizing the results

In [37]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v3')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

