# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting autorom[accept-rom-license]~=0.4.2 (from gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Collecting shimmy[atari]<1.0,>=0.1.0 (from gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl (25 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.4.2->gymnasium[accept-rom-license,atari])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m11.7

### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module): #Taking Module form nn

  def __init__(self, state_size , action_size, seed = 42): #state size 8 , action size 4
    super(Network, self).__init__() #Activating inheretance
    self.seed = torch.manual_seed(seed)
    self.fc1 = nn.Linear(state_size, 64) # fullyconnected layer 1 linear(statesize,neuron_total) 64 here was optimal for lunar landing
    self.fc2 = nn.Linear(64, 64) #(number of neurons in previous, nuerons in fc2)
    self.fc3 = nn.Linear(64, action_size) #(number of neurons in previous, action size)
    #concludes the ann.

  def forward(self, state): #propigating function
    x = self.fc1(state) # takes input from input layer
    x = F.relu(x) # activation function
    x = self.fc2(x)
    x = F.relu(x)
    return self.fc3(x)

## Part 2 - Training the AI

### Setting up the environment

In [None]:
import gymnasium as gym #importing environments
env = gym.make('LunarLander-v2') # https://gymnasium.farama.org/environments/box2d/lunar_lander/
#Importing lunar lander environment.
state_shape = env.observation_space.shape # 8 vectors here
state_size = env.observation_space.shape[0] # no. of elements coorddinates,speed etc.
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


### Initializing the hyperparameters

In [None]:
learning_rate = 5e-4 #0.00005
minibatch_size = 100 #batch size to update paramaters
discount_factor = 0.99 #gamma for less goes shor term and for near 1 considers future rewards.
replay_buffer_size = int(1e5) #memory of AI to stabalize and improve. 100k experiences.
interpolation_parameter = 1e-3 #subupdates of target network. (tao)
#all parameters are taken through experimentation for optimal result.

### Implementing Experience Replay

In [None]:
class ReplayMemory(object):

  def __init__(self, capacity): #capacity of mamory (constructor creating self)
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #if cuda available use cuda else cpu
    self.capacity = capacity
    self.memory = [] #list containing state action reward next state exit etc.

  def push(self, event): #adds experience to memory buffer (event contains  state action reward next state exit etc.)
    self.memory.append(event)
    if len(self.memory) > self.capacity: #if memory is full then remove oldest event.
      del self.memory[0]

  def sample(self, batch_size): #selects random batch of experiences form memory buffer.
    experiences = random.sample(self.memory, k = batch_size) #taking experiences from memory
    # extracting each elements in experiences and stack them together.
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device) #for loop within a list. extracting each element using np.vstack and then converting them into pytorch tensors(the numpy array becomes a torch tensor.) .float to convert to float and .to(self,device) to send it to cpu or gpu.
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device) # actions are 0,1,2,3 etc so long
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device) #dones as unsigned int 8 - (booleandatatype)
    return states, next_states, actions, rewards, dones #order of return matters.

### Implementing the DQN class

In [None]:
class Agent():

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size #creating object variable
    self.action_size = action_size
    #Q learning --
    self.local_qnetwork = Network(state_size, action_size).to(self.device) #seed is already provided in previously.
    self.target_qnetwork = Network(state_size, action_size).to(self.device)

    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate) #optimizes for better reasults
    self.memory = ReplayMemory(replay_buffer_size) #memory of AI. no. of experiences
    self.t_step = 0 #time step.

  def step(self, state, action, reward, next_state, done): #stores experiences and decides when to learn from them.
    self.memory.push((state, action, reward, next_state, done)) #pushing experience into memory.
    self.t_step = (self.t_step + 1) % 4 # time step counter (learns every 4 steps)
    if self.t_step == 0:
      if len(self.memory.memory) > minibatch_size: #self.memory --> instance of replay memory calss and memory attribute inside __init__ gives self.memory.memory. if condition gives if we have more than 100 samples we can learn from it.
        experiences = self.memory.sample(100) #takes samples from replay memory class i.e 100
        self.learn(experiences, discount_factor) #learns finally.

  def act(self, state, epsilon = 0.): #select action based on a given state and epsilon value (epsilon greedy action selection policy) 0. --> float
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device) # we have eight dimension but we need to add one more to represent which batch it represents. .unsqueeze(0) adds dimension (dimension of the batch) and 0 represents the index of the dimension of the batch which will be at the beginning.
    self.local_qnetwork.eval() # The state will be passed through the local_qnetwork so it will be set evaluation mode (local_qnetwork comes Agent calss which inherits from nn.Modlue which has eval() function )
    with torch.no_grad(): # to disable any graddient computation (checking if we are in inference mode not training mode)
      action_values = self.local_qnetwork(state) #foreward pass state to outpul layers (these are the action values which will be selected by epsilon greedy.) (here we dont get final values we get q values correspinding to the state.)
    self.local_qnetwork.train() #after getting actions we go back to the training mode (local_qnetwork comes Agent calss which inherits from nn.Modlue which has train() function )
    # Eplison greedy action selection policy -- (used for exploration vs exploitation)
    if random.random() > epsilon: #if random number is greater than epsilon select max else random (random,radnom() random library then function)
      return np.argmax(action_values.cpu().data.numpy()) #argmax gives highest action form numpy library (action values sent to cpu as it is simple) (fromat should be numpy format of data, hence in data.numpy())
    else:
      return random.choice(np.arange(self.action_size)) #selection random actions from 4 (0,1,2,3)

  def learn(self, experiences, discount_factor): #updates agents q values based on sample experiences.
    states, next_states, actions, rewards, dones = experiences #unpacking experiences
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1) #take max q values form targeted networks(next states) self.target_qnetwork(next_states) gives action values .detach() detaches the values from tensor and we wont be using these values during backword propogation. .max(1) max along dimension 1.After getting the maximum values we get 2 tensors(max values,indices) hence we add .max[1][0] to only get max values. unsqueeze(1) dimension of batch but at position 1 this time.
    q_targets = rewards + discount_factor * next_q_targets * (1 - dones) # targets for current states formula
    q_expected = self.local_qnetwork(states).gather(1, actions) #getting expected q values .gather(indice,element)
    loss = F.mse_loss(q_expected, q_targets) #calculating loss mean squared error.
    self.optimizer.zero_grad() # resetting optimizer. zero_grad --> from Adam.
    loss.backward() # Backpropogation.
    self.optimizer.step() #Optimization to update paramaters. step() singly optimizes.
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter) #updating target parameters with local parameters.

  def soft_update(self, local_model, target_model, interpolation_parameter):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()): #looping for parameters zip() takes parameters in the variable parameters comes from nn.Module
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data) #copying the local parameter using interpolation in target parameter

### Initializing the DQN agent

In [None]:
agent = Agent(state_size, number_actions) #Creating agent or AI

### Training the DQN agent

In [None]:
number_episodes = 2000 # times we want to train
maximum_number_timesteps_per_episode = 1000 # we dont want to get stuck in a episode so max time given (attempts on landing on moon will be max 1000 timesteps)
epsilon_starting_value  = 1.0 # epsilon value starting
epsilon_ending_value  = 0.01 # epsilon value ending
epsilon_decay_value  = 0.995 # decay rate follows as (1*0.995 , 1*0.995*0.995,...)
epsilon = epsilon_starting_value # epsilon variable
scores_on_100_episodes = deque(maxlen = 100) # scores on last 100 episodes. (list)

for episode in range(1, number_episodes + 1): #runs till 2000 (upper range fix)
  state, _ = env.reset() #resetting the environment (here returns state and observation)
  score = 0 # cumulative score over the episode
  for t in range(maximum_number_timesteps_per_episode): #loop over timesteps
    action = agent.act(state, epsilon) # taking a action via epsilon greedy
    next_state, reward, done, _, _ = env.step(action) # Rewards as per given in the gymnasium. _ is discarding values not needed
    agent.step(state, action, reward, next_state, done) # Learn method
    state = next_state # Changing state
    score += reward # adding reward
    if done: # break if episode done
      break
  scores_on_100_episodes.append(score) #score of last episode 100th
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon) # decaying he epsilon greedy policy making sure it does not go below epsilon ending value
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "") #\r overriding \t tab .3f --> 2 decimal points.
  if episode % 100 == 0: # every 100 episodes changes the score
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes))) #/r overrides the line i.e gives only for the next printing line previous ones skipped (dynamic printing)
  if np.mean(scores_on_100_episodes) >= 200.0: #winning condition.
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes))) #:d --> double integer , we started winning from episode - 100 , we can keep the episode as it is also
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth') # Saving model parameters in dictionary
    break

  and should_run_async(code)


Episode 100	Average Score: -171.44
Episode 200	Average Score: -107.84
Episode 300	Average Score: -23.57
Episode 400	Average Score: 70.52
Episode 500	Average Score: 167.70
Episode 600	Average Score: 196.73
Episode 618	Average Score: 201.83
Environment solved in 518 episodes!	Average Score: 201.83


## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

