# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 38 not upgraded.


### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):

  def __init__(self, state_size, action_size, seed=42):
    super(Network, self).__init__()
    self.seed = torch.manual_seed(seed)
    self.fc1 = nn.Linear(state_size, 64) #input layer to 1st fully connected layer
    self.fc2 = nn.Linear(64, 64) #connection between 1st and 2nd connected layers
    self.fc3 = nn.Linear(64, action_size) #connection between 2nd connected layer and output layer

  def forward(self, state):
    x = self.fc1(state)
    x = F.relu(x)  #nn.Functionnal module, relu rectifier activation function
    x = self.fc2(x)
    x = F.relu(x) # a second activation
    return self.fc3(x) # output throught the output layer

## Part 2 - Training the AI

Observation Space:

The state is an 8-dimensional vector: the coordinates of the lander in x & y, its linear velocities in x & y, its angle, its angular velocity, and two booleans that represent whether each leg is in contact with the ground or not.

Action Space:

0: do nothing

1: fire left orientation engine

2: fire main engine

3: fire right orientation engine

### Setting up the environment

In [None]:
import gymnasium as gym
env = gym.make('LunarLander-v3')
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


### Initializing the hyperparameters

Q(s,a) ← r + γ * max a'[​Q(s′,a′)]

replay_buffer_size = 100,000

The replay buffer stores past experiences (s, a, r, s').

keeps the last 100k experiences in memory.

Breaks correlations in the training data (random sampling).

Reuses past experiences for efficiency.

If too small → you lose diversity. If too big → higher memory usage, but more variety.

θtarget​ ← τ * θlocal​ + (1−τ) * θtarget​

In [None]:
learning_rate = 5e-4
minibatch_size = 100
discount_factor = 0.99 # γ
replay_buffer_size = int(1e5)
interpolation_parameter = 1e-3 #τ

### Implementing Experience Replay

In [None]:
class ReplayMemory(object):

  def __init__(self, capacity):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.capacity = capacity
    self.memory = []

  def push(self, event):
    self.memory.append(event)
    if len(self.memory) > self.capacity:
      del self.memory[0]

  def sample(self, batch_size): #randomly select a batch of exp from memory buffer
     experiences = random.sample(self.memory, k = batch_size)
     # Extracting state, reward, next state, boolean done or not

          # Stacked all states
     states = np.vstack([e[0] for e in experiences if e is not None])
          # Transform to pytorch tensors, transform data type to float
          #and send to the designated device
     states = torch.from_numpy(states).float().to(self.device)

          # Actions, not float it's 0, 1, 2 or 3 so long integer is better
     actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)

          # Rewards
     rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)

          # Next states
     next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)

           # Dones
     dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)

     return states, next_states, actions, rewards, dones

### Implementing the DQN class

In [None]:
class Agent():

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size
    self.action_size = action_size
    # Q-networks (The 2 networks stabilizes the learning)
    self.local_qnetwork = Network(state_size, action_size).to(self.device) # Select actions
    self.target_qnetwork = Network(state_size, action_size).to(self.device) # Calculate target q_values used in training for local network
    # Optimizer
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate)
    # Replay buffer
    self.memory = ReplayMemory(replay_buffer_size)
    # Learning step
    self.t_step = 0


  def step(self, state, action, reward, next_state, done):
    #store exp in the replay memory
    self.memory.push((state, action, reward, next_state, done))
    #when to learn from the exp? every 4 steps
    self.t_step += 1
    # reset every 4 steps (modulo 4)
    self.t_step = self.t_step % 4
    if self.t_step == 0: #4 steps passed, so we need to learn
       #we learn on mini batches, is the memory size larger?
       # Note : self.memory = self.ReplayMemory(replay_buffer_size)
       # first memory is the instance of the ReplayMempry class
       # second memory is the list attribure
       if len(self.memory.memory) > minibatch_size:
          experiences = self.memory.sample(minibatch_size) #sample method defined in ReplayMemory
          # we selected the learning samples, now time to learn
          self.learn(experiences, discount_factor)

  def act(self, state, epsilon = 0.):
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
    #extra dim to state vector -batch- at position 0 (beginnning), to know to which batch the state belongs
    self.local_qnetwork.eval() # .eval() is a method inherited from nn.Module
    # we're now in evaluation mode, we forward path the state through our local network
    with torch.no_grad(): #check we're in inference mode (predections)
      action_values = self.local_qnetwork(state) # action Q-values
    # torch.no_grad() is a PyTorch context manager.
    # It disables gradient tracking during the operations inside this block.
    # without it Without it, PyTorch would assume we’re doing training and keep track of all operations for backpropagation.

    # training mode now:
    self.local_qnetwork.train()
    if  random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())

      # .cpu() moves the tensor from GPU to CPU memory.
      # NumPy (.numpy()) only works with CPU tensors.
      # If you try .numpy() directly on a CUDA tensor, you’ll get an error:
      #.data (or .detach()) → drop the computational graph, so it’s just raw values.
      #.numpy() → convert to a NumPy array.
    else:
      return random.choice(np.arange(self.action_size)) #random number from {0,1,2,3}

  def learn(self, experiences, discount_factor):
    # Uses experiences that are sampled form the replay memory to update the local q_network
    # q values toward the target q_values.
    states, next_states, actions, rewards, dones = experiences
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1) #detach the resulting tensor form computational graphs
    q_targets = rewards + (discount_factor * next_q_targets * (1-dones))
    q_expected = self.local_qnetwork(states).gather(1, actions)
    loss = F.mse_loss(q_expected, q_targets)
    # Initialize the optimizer by zeroing the gradient
    self.optimizer.zero_grad()
    #back propagate the loss
    loss.backward()
    #single optimization step to update the model parameters
    self.optimizer.step()
    #update target network with those of the local network
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

  def soft_update(self, local_model, target_model, interpolation_parameter):
    #loop throught the parameters
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      # softly updating the target params using the weighted avg of local and target params
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)



#### Explanations

**Classic Q-Learning**

In traditional Q-learning, we keep a Q-table: a big table with all possible (state, action) pairs.

We update it with the Bellman equation:

Q(s,a) ← Q(s,a) + α * [ r − Q(s,a) + γ * maxQ(s′,a′)]

Problem: if state is continuous or very large, we can’t store every state in a table.

**Enter Deep Q-Networks (DQN)**

Instead of a table, we use a neural network to approximate Q(s, a).

Input = state (vector of features).

Output = estimated Q-values for each possible action.

This is where your two Q-networks come in.

```
self.local_qnetwork = Network(state_size, action_size).to(self.device)

```
This is the main network.

It learns by minimizing the difference between its predictions and the target Q-values.

Used to choose actions during training (e.g., ε-greedy policy).

Think of it as the "student" that is constantly learning.

```
self.target_qnetwork = Network(state_size, action_size).to(self.device)
```

This is a frozen copy of the local network.

It provides the “stable” target in the Bellman update.

Example target for Q-learning with function approximation:

y = r + γ * max a′ ​Q_target​(s′,a′)

Notice: the target Q-value comes from target_qnetwork, not the local one.

Helps avoid instability (if you updated and evaluated with the same network, it would “chase itself” and diverge).

Think of it as the "teacher" that provides correct answers occasionally, while the student (local network) learns.

### Initializing the DQN agent

In [None]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [None]:
number_episodes = 2000
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value  = 1.0
epsilon_ending_value  = 0.01
epsilon_decay_value  = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen = 100)

for episode in range(1, number_episodes + 1):
  # reset environment to its initial state
  state, _ = env.reset() #returns state but also some other information  like initial observation
                        #discarded with _
  score = 0 # Over the episode this will be the accumulated reward
  for t in range(maximum_number_timesteps_per_episode):
    action = agent.act(state, epsilon) #First thing agent does? Select an action.
    next_state, reward, done, _, _ = env.step(action) # Now we're in a new state
    agent.step(state, action, reward, next_state, done) # Training the agent
    state = next_state # Update state variable
    score += reward # Update score (accumulated reward)
    if done: # If the episode is done
      break
  scores_on_100_episodes.append(score) # Score of the last episode that was finished
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)

  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "")
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))
  if np.mean(scores_on_100_episodes) >= 200.0:
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))
    # Saving the final model
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break

Episode 100	Average Score: -161.08
Episode 200	Average Score: -113.11
Episode 300	Average Score: -44.81
Episode 400	Average Score: -20.83
Episode 500	Average Score: 45.19
Episode 600	Average Score: 162.13
Episode 700	Average Score: 193.99
Episode 714	Average Score: 201.12
Environment solved in 614 episodes!	Average Score: 201.12


## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v3')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

