# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

Gymnasium is a project that provides an API for all single agent reinforcement learning environments, and includes implementations of common environments: cartpole, pendulum, mountain-car, mujoco, atari, and more.


In [7]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
# SWIG is a software development tool that connects programs written in C and C++ with a variety of high-level programming languages.
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting autorom[accept-rom-license]~=0.4.2 (from gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Collecting shimmy[atari]<1.0,>=0.1.0 (from gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl (25 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.4.2->gymnasium[accept-rom-license,atari])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m11.0 

### Importing the libraries

In [8]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

First we are gonna create a class which will act as the brain for our AI. By experimental analysis (Hit and trial ) , We have found that we need 2 hidden layers for this Lunar Landing Mission.

In [9]:
class Network(nn.Module):
    def __init__(self, state_size,action_size,seed=42):
        super(Network,self).__init__()
        self.seed=torch.manual_seed(seed)
        # Now we built the architecture of our model (the input layer and the first fully connected layer)
        self.fc1=nn.Linear(state_size,64) #This 64 is an experimental number obtained by hit and trial method for Lunar landing problem
        self.fc2=nn.Linear(64,64)
        self.fc3=nn.Linear(64,action_size)

    def forward(self,state):
        x = self.fc1(state) #taking state as input for first layer
        x = F.relu(x) # Relu activation function

        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)

## Part 2 - Training the AI

### Setting up the environment

Now as we have made the architecture for our model. We now have to setup the environment to train our AI. We import the Lunar Lander env from the Gymnasium playground.

In [10]:
import gymnasium as gym
env = gym.make('LunarLander-v2')
state_space = env.observation_space.shape
state_size = env.observation_space.shape[0]
action_size= env.action_space.n
print('State_space:',state_space)
print('State_size:',state_size)
print('action_space:',action_size)

State_space: (8,)
State_size: 8
action_space: 4


### Initializing the hyperparameters

In [26]:
learning_rate = 5e-4 # experimental
minibatch_size = 100
discount_factor = 0.99
replay_buffer_size=int(1e5)
interpolation_parameter=1e-3

The env here is partially Observable.We don't have all the info to make a decision. Therefore we need to have a memory. Here the replay buffer is the memory needed to store the agents action-observation so as to make an optimal decision.

### Implementing Experience Replay

In [27]:
class ReplayMemory(object):
    def __init__(self,capacity):
        self.device=torch.device("cpu")
        self.capacity=capacity
        self.memory = [] #this will store the experiences (state,action,next state,)

    def push(self,event): #event=experience
        self.memory.append(event)
        if(len(self.memory)> self.capacity):
            del self.memory[0] #delete the oldest experience

    #We will now have to sample the experiences randomly to select the batch for training
    def sample(self,batch_size):
        experiences = random.sample(self.memory,k=batch_size)
        #now extract the different values from experiences by making a stack of all experiences
        states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device) #actions can be 0,1,2,3 so long is used
        rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
        next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device) #for boolean we define it as uint8
        return states, next_states, actions,rewards,dones


### Implementing the DQN class

In [37]:
class Agent():
    def __init__(self,state_size,action_size):
        self.device=torch.device("cpu")
        self.state_size=state_size
        self.action_size=action_size
        self.local_qnetwork = Network(state_size,action_size).to(self.device)
        self.target_qnetwork = Network(state_size,action_size).to(self.device)
        self.optim = optim.Adam(self.local_qnetwork.parameters(),lr=learning_rate) #update the parameters
        self.memory=ReplayMemory(replay_buffer_size)
        self.t_step=0 #time step counter means the moments in which we will update our network parameters

    #this method will store experiences and decide when to learn from them
    def step(self,states,actions,rewards,next_states,done):
        self.memory.push((states,actions,rewards,next_states,done))
        self.t_step = (self.t_step+1)%4 #we want to learn every 4 steps
        if self.t_step == 0:
            if len(self.memory.memory) > 100:
                experiences = self.memory.sample(100)
                self.learn(experiences,discount_factor)

    #this method will select an action for the given state using e-greedy policy
    def act(self,state,epsilon=0.):
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device) #adding an extra dimention to store which batch the state belongs to and in the new tensor first index will be that
        #now we have to forward the states through local_qnetwork so as to find best actions for the state
        # we need to turn on the evaluation mode for local_qnetwork
        self.local_qnetwork.eval()
        #this puts the network out of training mode and into the infererence mode i.e. we are ready to make predictions now
        with torch.no_grad(): #checks whethers gradient computation is disabled or not
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train() #back to training mode

        #now we will randomly generate a number and if it is greater than epsilon then we'll select action with highest Q value else randomly select an action
        choice = random.random()
        if choice>epsilon:
            action = np.argmax(action_values.cpu().data.numpy())
            return action
        else:
            action = random.randint(0,3)
            return action

    #implement the learn method which will update q values from sampled experiences
    def learn(self,experiences,discount_factor):
        states,next_states,actions,rewards,dones = experiences
        #next step is to get the maximum predicted Q values for the next states from the target network
        next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
        #we calculated the q values for next states from the target network but then detach it (remove it from computation graph so gradient won't be computed during backprop)
        #then we take max(1) i.e. max along 1st dim(action) and it contains two values (Q, action corresponding to it) so we only need Q values
        #then we add the batch at dim 1
        q_targets = rewards + (discount_factor*next_q_targets*(1-dones))
        q_expected = self.local_qnetwork(states).gather(1,actions) # gather all the q values for all actions
        loss = F.mse_loss(q_expected,q_targets)
        self.optim.zero_grad() #resetting the gradients
        loss.backward()
        #single optimisation step to update the parameters of the model
        self.optim.step()
        #update target network param with that of local param
        self.soft_update(self.local_qnetwork,self.target_qnetwork,interpolation_parameter)

    def soft_update(self,local_model,target_model,interpolation_parameter):
        for local_params, target_params in zip(local_model.parameters(), target_model.parameters()):
            #here we will use the copy func to update it
            target_params.data.copy_(interpolation_parameter*local_params.data + (1-interpolation_parameter)*target_params.data)


### Initializing the DQN agent

In [38]:
agent = Agent(state_size,action_size)

### Training the DQN agent

In [39]:
#First we initialize certain parameters needed for training our AI model
number_episodes = 2000
max_timestep_per_episode = 1000
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 0.995
epsilon = epsilon_start
scores_on_100_values = deque(maxlen = 100)

for episodes in range(1,number_episodes+1):
    state, _ = env.reset() #reset the env returns initial state and initial observations
    score = 0
    for t in range(max_timestep_per_episode):
        action = agent.act(state,epsilon)
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state,action,reward, next_state,done) #perform the training
        state = next_state
        score+=reward
        if done:
            break

    scores_on_100_values.append(score)
    epsilon = max(epsilon_end,epsilon_decay*epsilon)
    print("\rEpisode:{}\tAverage Score:{:.2f}".format(episodes,np.mean(scores_on_100_values)),end="")   #\r = for dynamic printing
    if episodes % 100 ==0:
        print("\rEpisode:{}\tAverage Score:{:.2f}".format(episodes,np.mean(scores_on_100_values)))
    if np.mean(scores_on_100_values)>=200.0:
        print("\nEnvironment solved in {} episodes!\tAverage Score:{:.2f}".format(episodes-100,np.mean(scores_on_100_values)))
        torch.save(agent.local_qnetwork.state_dict(),"checkpoint.pth")
        break

Episode:100	Average Score:-167.16
Episode:200	Average Score:-127.29
Episode:300	Average Score:-41.14
Episode:400	Average Score:6.12
Episode:500	Average Score:102.94
Episode:600	Average Score:141.63
Episode:700	Average Score:149.69
Episode:800	Average Score:197.38
Episode:806	Average Score:201.95
Environment solved in 706 episodes!	Average Score:201.95


## Part 3 - Visualizing the results

In [48]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

