Deep Q-Learning for Lunar Landing

Part 0 - Installing the required packages and importing the Libraries

In [5]:
# Installing gymnasium
!pip install gymnasium
!pip install gymnasium[box2d]
!pip "gymnasium[atari,accept-rom-license]"
!apt-get install -y swig

Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Using cached box2d-py-2.3.5.tar.gz (374 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting swig==4.* (from gymnasium[box2d])
  Using cached swig-4.3.1.post0-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Using cached swig-4.3.1.post0-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.9 MB)
Building wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
  Created wheel for box2d-py: filename=box2d_py-2.3.5-cp312-cp312-linux_x86_64.whl size=2381983 sha256=ea455d05e90cc5650d4585628014f6366a1ae308562b45bf10d02e9e53ea7de6
  Stored in directory: /root/.cache/pip/wheels/2a/e9/60/774da0bcd07f7dc7761a8590fa2d065e4069568e78dcdc3318
Successfully built box2d-py
Installing collected packages: swig, box2d-py
Successfully installed box2d-py-2.3.5 swig-4.3.1.post0
ERROR: unknown command "gymnasium[atari,accept-rom-license]"
Reading package lists... Don

In [2]:
# Importing the libraries
import os
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd
from torch.autograd import variable
from collections import deque, namedtuple

**Part 1 - Building the AI**

- Creating the Architecture of the Neural Network

In [3]:
class Network(nn.Module):

  def __init__(self, state_size, action_size, seed = 42):
    super(Network, self).__init__()
    self.seed = torch.manual_seed(seed)
    self.fc1 = nn.Linear(state_size, 64)
    self.fc2 = nn.Linear(64, 64)
    self.fc3 = nn.Linear(64, action_size)

  def forward(self, state):
    x = self.fc1(state)
    x = F.relu(x)
    x = self.fc2(x)
    x = F.relu(x)
    x = self.fc3(x)
    return x

Part 2 - Training the AI

Setting up the environment

In [6]:
import gymnasium as gym
# Import the environment
env = gym.make("LunarLander-v3")
state_shape = env.observation_space.shape
state_size = state_shape[0]
action_size = env.action_space.n
# Print the state shape
print("State shape: ", state_shape)
# Print the action size
print("Action size: ", action_size)
# print the state size
print("State size: ", state_size)

State shape:  (8,)
Action size:  4
State size:  8


Initializing the hyperparameters

In [7]:
Learning_rate = 5e-4
minibatch_size = 100
discount_factor = 0.99
replay_buffer_size = int(10e5)
interpolation_parameter = 10e-3

Implementing Experience Replay

In [8]:
class ReplayMemory(object):

  def __init__(self, capacity):
    # Sets the device for PyTorch tensors.
    #  If a GPU is available, it uses CUDA; otherwise, it defaults to CPU
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # Stores the maximum number of experiences the buffer can hold
    self.capacity = capacity
    # Initializes an empty list to store experiences
    self.memory = []

  # A Method to store memore(state, action, reward e.t.c)
  def push(self, event):
    # Appends the experience to the memory list
    self.memory.append(event)
    # If memory exceeds its capacity, it removes the oldest experience
    if len(self.memory) > self.capacity:
      del self.memory[0]

  # A Method to randomly select a batch of experiences from the memory buffer
  def sample(self, batch_size):
    # Randomly selects batch_size experiences from memory.
    experiences = random.sample(self.memory, batch_size)
    # Stacks all states vertically into a NumPy array, converts to float tensor, and moves to the appropriate device
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_state = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    # Converts done flags (True/False) to uint8, then to float tensor
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)
    # Returns the sampled batch as separate tensors for use in training the neural network.
    return states, next_state, actions, rewards, dones

Implementing the DQN Class

In [10]:
class Agent():

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # Stores the dimensions of the state and action spaces
    self.state_size = state_size
    self.action_size = action_size
    # Create two neural Networks(1 for selecting actions, 2 for stable Q-value updates)
    self.local_qnetwork = Network(state_size, action_size).to(self.device)
    self.target_qnetwork = Network(state_size, action_size).to(self.device)
    # Initializes the Adam optimizer to update the weights of the local Q Network
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr=Learning_rate)
    # Creates a replay buffer to store past experiences for training
    self.memory = ReplayMemory(replay_buffer_size)
    # Initializes a time step counter to control learning frequency
    self.t_step = 0

  # A method to store experiences annd decide when to learn from them
  def step(self, state, action, reward, nest_state, done):
    # Stores the experience tuple in the replay buffer.
    self.memory.push((state, action, reward, next_state, done))
    # Reset the t_step counter after every 4 steps, so we can learn after every 4 steps
    self.t_step = (self.t_step + 1) % 4
    if self.t_step == 0:
      # Check if lenght of memory is greater than
      if len(self.memory.memory) > minibatch_size:
        experiences = self.memory.sample(minibatch_size)
        self.learn(experiences, discount_factor)

  # A method that selects an action based on action selection policy(E-greedy)
  def act(self, state, eps=0.):
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
    # set the local qnetwork to evaluation mode
    self.local_qnetwork.eval()
    # Computes Q-values without tracking gradients.
    with torch.no_grad():
      action_values = self.local_qnetwork(state)
    # Returns the network to training mode
    self.local_qnetwork.train()
    # Chooses the best action with probability
    if random.random() > eps:
      return np.argmax(action_values.cpu().data.numpy())
    else:
      return random.choice(np.arange(self.action_size))

  # A method that updates the q values based on sampled experiences
  def learn(self, experiences, discount_factor):
    # Unpack sample experiences
    states, next_state, actions, rewards, dones = experiences
    # Get the max predicted q values from the next state
    next_q_target = self.target_qnetwork(next_state).detach().max(1)[0].unsqueeze(1)
    # Calculates the target Q-value using the Bellman equation
    q_targets = rewards + (discount_factor * next_q_target * (1 - dones))
    # Gets the predicted Q-values for the actions taken
    q_expected = self.local_qnetwork(states).gather(1, actions)
    loss = F.mse_loss(q_expected, q_targets)
    # Performs backpropagation and updates the local Q-network weights
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()
    # Softly updates the target network to slowly track the local network
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

  # A Method that does a soft update the target network parameters
  def soft_update(self, local_model, target_model, interpolation_parameter):
    # Blends the weights of the local and target networks using the interpolation factor
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)

Initializing the DQN Agent

In [11]:
agent = Agent(state_size, action_size)

Training the DQN Agent

In [13]:
# Initialize the training prameters
number_episodes = 2000
# Maximum number of time steps per episodes
max_t = 1000
eps_start = 1.0
eps_decay = 0.995
eps_end = 0.01
eps = eps_start
# Scores of last 100 episodes
scores = deque(maxlen=100)

for episode in range(1, number_episodes + 1):
  # First step is to reset the environment to initial state
  state, _ = env.reset()
  # Initialize the score
  score = 0
  for t in range(max_t):
    action = agent.act(state, eps)
    next_state, reward, done, _, _ = env.step(action)
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward
    # check if the episode is done
    if done:
      break
  scores.append(score)
  eps = max(eps_end, eps_decay * eps)
  print("\rEpisode {}\tAverage Score: {:.2f}".format(episode, np.mean(scores)), end = "")
  if episode % 100 == 0:
    print("\rEpisode{}\tAverage Score: {:.2f}".format(episode, np.mean(scores)))
  if np.mean(scores) >= 200.0:
    print("\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}".format(episode, np.mean(scores)))
    # Save the parameters of our model
    torch.save(agent.local_qnetwork.state_dict(), "model.pth")
    break

Episode100	Average Score: -151.68
Episode200	Average Score: -63.36
Episode300	Average Score: 2.27
Episode400	Average Score: 87.57
Episode500	Average Score: 171.84
Episode 555	Average Score: 200.66
Environment solved in 555 episodes!	Average Score: 200.66


Visualizing the results

In [15]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
  env = gym.make(env_name, render_mode="rgb_array")
  state, _= env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = agent.act(state)
    state, reward, done, _, _ = env.step(action.item())
  env.close()
  imageio.mimsave("video.mp4", frames, fps=30)

show_video_of_model(agent, "LunarLander-v3")

def show_video():
  mp4list = glob.glob("*.mp4")
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, "r+b").read()
    encoded = base64.b64encode(video)
    display(HTML(
        data='''<video alt="test" autoplay
        loop controls style="height: 400px;">
        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
      </video>'''.format(encoded.decode("ascii"))
    ))
  else:
    print("could'nt find video")


show_video()

