# Deep Q-Learning Implementation for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [2]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 21 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 1s (1,689 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 124926 files and directories currently installed.)
Preparing to unpack .../swig4.0_4.0.2-1ubuntu1_amd64.deb ...
Unpacking swig4.0 (4.0.2-1ubuntu1) ...
Selecting previously unselected package swig.
Preparing to unpack .../swig_4.0.2-1ubu

### Importing the libraries

In [3]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [4]:
class Network(nn.Module):

  def __init__(self, state_size, action_size, seed = 42):
      super(Network, self).__init__()
      self.seed = torch.manual_seed(seed) #to have a constant set of random numbers
      self.fc1 = nn.Linear(state_size, 64) #linear relation between two neurons
      self.fc2 = nn.Linear(64, 64)
      self.fc3 = nn.Linear(64, action_size)

  def forward(self, state): #forward prop
      x = self.fc1(state) #get the value of state from fully connected layer1
      x = F.relu(x) #apply relu function and store it in x

      x = self.fc2(x) #use the previous x value and get the value of x from layer2
      x = F.relu(x) #applying relu again and store it in x

      return self.fc3(x) #use the previous x value and returning the output of layer3
      #NOTE: Layer 3 is the output layer with four neurons in this case of lunar lander

## Part 2 - Training the AI

### Setting up the environment

In [5]:
import gymnasium as gym
env = gym.make('LunarLander-v3') # The Lunar Lander environment was upgraded to v3
state_shape = env.observation_space.shape #8 in this case as per given in gym website--> x, y, xdot, ydot, theta, thetadot, left_leg_on_ground_yes/no, right_leg_on_ground_yes/no
# https://gymnasium.farama.org/environments/box2d/lunar_lander/
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
#4 in this case as per given in gym website-->
'''
    0: do nothing

    1: fire left orientation engine

    2: fire main engine

    3: fire right orientation engine
'''
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


### Initializing the hyperparameters

In [6]:
learning_rate_alpha = 0.0005 #can also be written as 5e-4
minibatch_size = 100
discount_factor_gamma = 0.99
experience_replay_buffer_size = 100000 #can also be written as int(1e5)
interpolation_parameter_tau = 0.001 #can also be written as 1e-3

### Implementing Experience Replay

In [7]:
class ReplayMemory(object):

  def __init__(self, capacity_of_memory):
      self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #using gpu if cuda is available else using cpu-->hence self.device instance will contain whether it is a gpu/cpu
      self.capacity_of_memory = capacity_of_memory
      self.memory = []

  def push(self, event):
      self.memory.append(event) #event is something that contains--> a list of [current_state, action_taken, next_state, reward obtained, whether_process_done_or_not]
      if len(self.memory) > self.capacity_of_memory: #capacity of memory is the buffer in the memory which is temporarily held and only then learnt separately instead of continuously learning
        del self.memory[0] #at once number of events(list containing data mentioned above) reaches buffer memory capacity(which we define eg. 100) for next addition of event, the 0th element is removed

  def sample(self, batch_size): #sampling a bunch of events from the memory buffer == self.memory and using them later for training
      experiences = random.sample(self.memory, k = batch_size) #sample() function from random module is used

#NOTE: self.memory, event, experiences--> all are interdependent and has following structure: [current_state, action_taken, next_state, reward obtained, whether_process_done_or_not]

      stack_states = np.vstack([e[0] for e in experiences if e is not None]) #zeroth element of experiences contain current_state info
      states = torch.from_numpy(stack_states).float().to(self.device) #converting numpy array to pytorch tensor for further computations

      stack_actions = np.vstack([e[1] for e in experiences if e is not None]) #first element of experiences contain action_taken info
      actions = torch.from_numpy(stack_actions).long().to(self.device) #converting numpy array to pytorch tensor for further computations

      stack_rewards = np.vstack([e[2] for e in experiences if e is not None]) #second element of experiences contain next_state info
      rewards = torch.from_numpy(stack_rewards).float().to(self.device) #converting numpy array to pytorch tensor for further computations

      next_stack_states = np.vstack([e[3] for e in experiences if e is not None]) #third element of experiences contain reward obtained info
      next_states = torch.from_numpy(next_stack_states).float().to(self.device) #converting numpy array to pytorch tensor for further computations

      stack_dones = np.vstack([e[4] for e in experiences if e is not None]) #fourth element of experiences contain whether_process_done_or_not info
      dones = torch.from_numpy(stack_dones.astype(np.uint8)).float().to(self.device) #converting numpy array to pytorch tensor for further computations and np.uint8 converts into bool

      return states, next_states, actions, rewards, dones

### Implementing the DQN class

In [8]:
class Agent(): #the one which does trial and error and learns from mistakes using deep Q learning

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #using gpu if cuda is available else using cpu-->hence self.device instance will contain whether it is a gpu/cpu
    self.state_size = state_size #state_size is the number of components of the vector with 8 values init--> len([x, y, xdot, ydot, theta, thetadot, left_leg_on_ground_yes/no, right_leg_on_ground_yes/no])
    self.action_size = action_size #it is the number_actions variable--> number of different possible actions available left thrust, right thrust, main engine thrust, do nothing

    self.local_qnetwork = Network(state_size, action_size).to(self.device) #.to() moves the tensor/vector to the device(Cpu/Gpu)
    self.target_qnetwork = Network(state_size, action_size).to(self.device) #check below to understand between local and target Q networks
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate_alpha) #using adam optimizer and .parameters() comes from pytorch.nn.Module

    self.memory = ReplayMemory(experience_replay_buffer_size)
    self.t_step = 0 #A counter for number of steps such that, for every 4 steps--> a tuple of events is pushed into memory using .push()
                    #a sample of 100 is chosen from memory for experiences using self.sample(100)
                    #and backprop happens so that the network learns using self.learn

  def step(self, state, action, reward, next_state, done):
    self.memory.push((state, action, reward, next_state, done))
    self.t_step = (self.t_step+1) % 4

    if self.t_step == 0:
      if len(self.memory.memory) > minibatch_size:
        experiences = self.memory.sample(100)
        self.learn(experiences, discount_factor_gamma) #learn function defined below

  def act(self, state, epsilon = 0.): #This act function is responsible for choosing an action based on the current state of the environment
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device) #converts the state information to tensors and uploads that to device(cpu/gpu)
                                                                         #note that unsqueeze(0) adds a new dimension at the start
                                                                         #for example: if initial dimension of state is (8,) then after given process it becomes, (1,8)
                                                                         #and the tensor with dim = (1,8) goes inside device(cpu/gpu)

    self.local_qnetwork.eval() #eval() function available at pytorch.nn.module used for evaluating/testing with available weights
    with torch.no_grad(): #while no gradient is applied--> helps in speeding up the evaluation process and used usually when testing/evaluating
      action_values = self.local_qnetwork(state) #output of the nn computed using the weights of local_qnetwork is saved into action_values variable

    self.local_qnetwork.train() #train() function available at pytorch.nn.module used for training with new weights
    if random.random() > epsilon: #applying concept of epsilon greedy
      return np.argmax(action_values.cpu().data.numpy())
    else:
      return random.choice(np.arange(self.action_size))

  def learn(self, experiences, discount_factor_gamma):
    states, next_states, actions, rewards, dones = experiences #extracting contents from a tuple
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
    q_targets = rewards + (discount_factor_gamma*next_q_targets*(1-dones)) #applying concept of bellman equation with mdp
    q_expected = self.local_qnetwork(states).gather(1, actions)

    loss = F.mse_loss(q_expected, q_targets) #note mse_loss()

    self.optimizer.zero_grad()


 #   Resets gradients: It sets the gradients of all parameters in the model to zero.

 #   Prevents accumulation: It ensures that gradients from previous backward passes don't accumulate.

 #   Prepares for new backward pass: It gets the optimizer ready for a fresh round of gradient computation.


    #time for back propagation
    loss.backward() #note backward()
    self.optimizer.step() #step function called
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter_tau)

  def soft_update(self, local_model, target_model, interpolation_parameter_tau):
    #updating target Q network with already learnt local Q network and the episode is done then new episode starts which helps in improvement of weights
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(interpolation_parameter_tau*local_param.data + (1.0 - interpolation_parameter_tau)*target_param.data)


'''
    ## Local Q-network

    Think of the local Q-network as an eager student who's always learning and updating their knowledge.

    It's constantly changing and improving

    It makes quick guesses about what actions are best

    It learns from every experience immediately

    ### Example: Imagine you're learning to play chess.
    After each move, you immediately think about whether it was good or bad and try to adjust your strategy for the next game.
    This is like the local Q-network, always updating and changing its understanding.

    ## Target Q-network

    The target Q-network is like a tutor who updates their knowledge less often, providing a stable reference point.

    It changes more slowly

    It gives a steady, consistent opinion about what actions are best

    It only updates occasionally, not after every experience

    ### Example: Now imagine you have a chess coach.
    The coach doesn't change their advice after every move you make.
    Instead, they watch you play for a while, then give you updated strategies every few games.
    This steady approach helps you learn more consistently.

    ## Why use both?

    Using both networks is like having a balance between quick learning and stable guidance:

    The local network (eager student) learns quickly but might make erratic changes.

    The target network (steady tutor) provides a stable goal to aim for.

    Together, they help the AI learn more effectively and avoid wild swings in performance.
    It's like learning from your immediate experiences while also having a steady guide to keep you on track.
'''

### Initializing the DQN agent

In [9]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [10]:
number_episodes = 2000 #usually takes less number of episodes/epochs to reach min(loss)
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen = 100) #This creates a "sliding window" effect, always keeping the 100 most recent scores.

#NOTE: that episodes are entire one loop of agent interacting with environment
#while epochs are entire one loop of training phase

for episode in range(1, number_episodes+1):
  state, _ = env.reset() #env = gym.make(env_name)
  score = 0 #each episode is independent and thats why we initialize score as zero after every episode
  for t in range(maximum_number_timesteps_per_episode):
    action = agent.act(state, epsilon)
    next_state, reward, done, _, _ = env.step(action)

    agent.step(state, action, reward, next_state, done) #step function calls learn function and helps update weights
    state = next_state
    score += reward
    if done:
      break

  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon_decay_value*epsilon)

#fascinating dynamic printing

  print("\rEpisode: {}\t Average Score: {:.2f}".format(episode, np.mean(scores_on_100_episodes)), end="")
  if episode%100 == 0:
    print("\rEpisode: {}\t Average Score: {:.2f}".format(episode, np.mean(scores_on_100_episodes)))
  if np.mean(scores_on_100_episodes) >= 200.0:
    print("\Envoronment Solved in {:d} episodes!\t Average Score: {:.2f}".format(episode-100, np.mean(scores_on_100_episodes)))
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break

Episode: 100	 Average Score: -160.00
Episode: 200	 Average Score: -111.25
Episode: 300	 Average Score: -46.93
Episode: 400	 Average Score: -5.01
Episode: 500	 Average Score: 114.49
Episode: 592	 Average Score: 200.11\Envoronment Solved in 492 episodes!	 Average Score: 200.11


## Part 3 - Visualizing the results

In [11]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v3')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

