<a href="https://colab.research.google.com/github/RaulShahi/Lunar-Lander/blob/main/Deep_Q_Learning_for_Lunar_Landing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):
  def __init__(self, state_size, action_size, seed=42) -> None:
    super(Network, self).__init__()
    self.seed = torch.manual_seed(seed)
    self.state_size = state_size
    self.action_size = action_size
    self.fc1 = nn.Linear(state_size, 64)
    self.fc2 = nn.Linear(64, 64)
    self.fc3 = nn.Linear(64, action_size)

  def forward(self, state):
    x = self.fc1(state)
    x = F.relu(x)
    x = self.fc2(x)
    x = F.relu(x)
    return self.fc3(x)


## Part 2 - Training the AI

### Setting up the environment

In [None]:
import gymnasium as gym
env = gym.make('LunarLander-v3') # The Lunar Lander environment was upgraded to v3
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

### Initializing the hyperparameters

In [None]:
learning_rate = 5e-4
minibatch_size = 100
discount_factor = 0.99
replay_buffer_size = int(1e5) #100000
interpolation_parameter = 0.001 #for soft updates

### Implementing Experience Replay

In [None]:
class ReplayMemory(object):
  def __init__(self, capacity) -> None:
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.capacity = capacity
    self.memory = []

  def push(self, event):
    self.memory.append(event)
    if len(self.memory) > self.capacity:
      del self.memory[0]

  def sample(self, batch_size):
    experiences = random.sample(self.memory, k=batch_size)
    #states = np.vstack([e[0] for e in experiences if e is not None]) #getting all the states for all the sampled experiences
    #we need to convert this stack of states into pytorch tensors for training and backpropagation

    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device) #np.uint8 => boolean
    return states, next_states, actions, rewards, dones


### Implementing the DQN class

In [None]:
class Agent():

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size
    self.action_size = action_size
    self.local_qnetwork = Network(state_size, action_size).to(self.device)
    self.target_qnetwork = Network(state_size, action_size).to(self.device)
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate)
    self.memory = ReplayMemory(replay_buffer_size)
    self.t_step = 0

  def step(self, state, action, reward, next_state, done):
    self.memory.push((state, action, reward, next_state, done))
    self.t_step = (self.t_step + 1) % 4
    if self.t_step == 0:
      if len(self.memory.memory) > minibatch_size:
        experiences = self.memory.sample(100)
        self.learn(experiences, discount_factor)

  def act(self, state, epsilon = 0.):
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
    self.local_qnetwork.eval()
    with torch.no_grad():
      action_values = self.local_qnetwork(state)
    self.local_qnetwork.train()
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())
    else:
      return random.choice(np.arange(self.action_size))

  def learn(self, experiences, discount_factor):
    states, next_states, actions, rewards, dones = experiences
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
    q_targets = rewards + discount_factor * next_q_targets * (1 - dones)
    q_expected = self.local_qnetwork(states).gather(1, actions)
    loss = F.mse_loss(q_expected, q_targets)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

  def soft_update(self, local_model, target_model, interpolation_parameter):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)

### Initializing the DQN agent

In [None]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [None]:
number_of_episodes = 2000
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen= 100)

for episode in range(1, number_of_episodes + 1):
  state, _ = env.reset()
  score = 0

  for t in range(0, maximum_number_timesteps_per_episode):
    action = agent.act(state, epsilon)
    #after taking the action, we end up in a new state and get a reward
    next_state, reward, done, _, _ = env.step(action)
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward
    if done:
      break
  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon_decay_value*epsilon)
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "")
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))
  if np.mean(scores_on_100_episodes) >= 200.0:
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break


## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v3')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

class Agent():
  #1. Initialization
  def __init__(self,state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size
    self.action_size = action_size
    self.local_qnetwork = Network(state_size, action_size).to(self.device) #move to designated device
    self.target_qnetwork = Network(state_size, action_size).to(self.device) #move to designated device
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr= learning_rate)
    self.replay_memory = ReplayMemory(capacity=replay_buffer_size)
    self.t_step = 0

  #2. Now store the experiences and learn from them
  def step(self, state, action, rewards, next_state, done):
    #a. store the experience
    self.replay_memory.push((self, action, rewards, next_state, done))

    #bwhen to learn from them? increment the t_step counter and reset every certain step
    self.t_step = (self.t_step + 1) % 4
    if self.t_step == 0:
      #we dont learn from one experience only, but on the minibatch of experiences
      #so check the number of experiences in the replay memory is greater than initialized batchsize

      if len(self.replay_memory.memory) > minibatch_size:
        experiences = self.replay_memory.sample(100)
        self.learn(experiences, discount_factor)

  #3.select an action based on given state and a certain epsilon value
  def act(self, state, epsilon=0.):
    """a. at this point, the state is a numpy array and we need to make sure it is a torch tensor. so convert
    very important to add an extra dimension to the state tensor to represent batch size.
    This batch dimension is added at the beginning(index 0) using the unsqueeze(0) method.
    This is important because the network expects batched inputs, even if the batch size is one."""
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)

    """
    b. Setting the local q network to evaluation mode
    Before forwarding the state through the local Q network to obtain action values,
    we set the network to evaluation mode by calling its eval() method.
    This disables certain layers like dropout and batch normalization, ensuring consistent inference behavior.
    """
    self.local_qnetwork.eval()

    """
    c. Forward Pass with No Gradient Computation
    To perform inference without tracking gradients, we use PyTorch's torch.no_grad() context manager.
    Inside this block, we forward the state through the local Q network to obtain the action values (Q-values) for each possible action.
    """

    with torch.no_grad():
      action_values = self.local_qnetwork(state)#forward pass

    """
    d. Returning to Training Mode
    After inference, we set the local Q network back to training mode by calling its train() method.
    This re-enables layers like dropout and batch normalization for training.
    """
    self.local_qnetwork.train()

    """
    e. Epsilon-Greedy Action Selection Policy
    The epsilon-greedy policy works as follows:

    Generate a random number between 0 and 1.
    If this random number is greater than epsilon, select the action with the highest Q-value.
    Otherwise, select a random action.
    This balances exploitation (choosing the best known action) and exploration (trying random actions).
    """
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy()) #.cpu() means sending to cpu, argmax expects numpy format, so data.numpy()
    else:
      return random.choice(np.arange(self.action_size))

#4. Update the agent's Q values based on sample experiences
def learn(self, experiences, discount_factor):
  """
  a. The first step is to unpack the sampled experiences into their respective components: states, next states, actions, rewards, and dones. These are extracted from the experience tuples.
  """
  states, next_states, actions, rewards, dones = experiences
  """
  b. Computing Maximum Predicted Q Values for Next States
  We obtain the maximum predicted Q values for the next states from the target network. This is necessary to compute the Q targets for the current states.

  We forward propagate the next states through the target Q network to get the action values.
  Then, we detach the resulting tensor from the computation graph to avoid tracking gradients during backpropagation.

  Next, we use the max function along dimension one (the action dimension) to get the maximum values.
  Since max returns both the maximum values and their indices, we select only the maximum values tensor by indexing with zero.

  Finally, we add a dimension back at position one using .unsqueeze(1) to maintain the batch dimension.
  """

  next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)

  """
  c. Calculating Q Targets for Current States
  The Q targets for the current states are computed using the formula:
  Q targets=rewards+γ×next Q targets×(1−dones)
  Here,
  γ
  γ is the discount factor, and
  dones indicates whether the episode has ended.
  """
  q_targets = rewards + (discount_factor * next_q_targets * (1-dones))

  """d. Getting Expected Q Values from Local Q Network
  We forward propagate the current states through the local Q network to obtain the expected Q values.
  Then, we gather the Q values corresponding to the taken actions using the gather function along dimension one.
  """
  q_expected = self.local_qnetwork(states).gather(1, actions)

  """e. Computing the Loss
  We compute the loss between the expected Q values and the target Q values using the mean squared error (MSE) loss function from the functional module F.
  """
  loss = F.mse_loss(q_expected, q_targets)

  """
  f.Backpropagation and Optimization
  Before backpropagation, we reset the gradients of the optimizer by calling its zero_grad() method.
  Then, we backpropagate the loss to compute the gradients with respect to the model parameters.
  """
  self.optimizer.zero_grad()
  loss.backward()

  """Performing an Optimization Step
  We perform a single optimization step to update the model parameters by calling the step() method of the optimizer.
  """
  self.optimizer.step()
  """
  Updating Target Network Parameters
  Finally, we update the target network's parameters with those of the local network using a soft update method.
  """
  self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

#5. Update the target network's parameters
def soft_update(self, local_model, target_model, interpolation_parameter):
  for target_param, local_param in zip(target_model.parameters(), local_model.paramters()):
    target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter)* target_param.data)