<a href="https://colab.research.google.com/github/gomezc08/AI_Udemy/blob/main/Deep_Q_Learning_Lunar_Landing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 29 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 2s (516 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 124947 files and directories currently installed.)
Preparing to unpack .../swig4.0_4.0.2-1ubuntu1_amd64.deb ...
Unpacking swig4.0 (4.0.2-1ubuntu1) ...
Selecting previously unselected package swig.
Preparing to unpack .../swig_4.0.2-1ubunt

### Importing the libraries

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):
  def __init__(self, state_size, action_size, seed=42):   # 8 observation states in a vector: agent's current x,y, angular velocity, ..., 4 actions
     super(Network, self).__init__()
     self.seed = torch.manual_seed(seed)
     self.fc1 = nn.Linear(state_size, 64)
     self.fc2 = nn.Linear(64, 64)
     self.fc3 = nn.Linear(64, action_size)

  def forward(self, initial_state):
    x = self.fc1(initial_state)
    x = F.relu(x)
    x = self.fc2(x)
    x = F.relu(x)
    return self.fc3(x)

## Part 2 - Training the AI

### Setting up the environment

$TD(s, a) = R(s, a) + \gamma \cdot \max_{a'} \left(Q(s', a') - Q_{t-1}(s, a)\right)$

$Q_t = Q_{t-1}(s,a) + \alpha \cdot TD_t(s,a)$

In [None]:
import gymnasium as gym

env = gym.make('LunarLander-v3') # making the lunar landar enviornment

# grab parameters for later.
#state_shape = env.observation_space.shape   # the state is the 8 data vector of observation space
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
#print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State size:  8
Number of actions:  4


### Initializing the hyperparameters

In [None]:
alpha = 5e-4                # learning rate: how much model updates parameter (gradient descent step size)
batch_size = 100            # updating parameters in batches of 100
gamma = 0.99                # discount factor: how to reward agent; [0,1] -> [immediate reward, future rewards]
replay_buffer_size = 100000
tau = 1e-3                  # interpolation parameter: balances making safe vs aggressive future moves; [0,1] -> [conservative, aggressive]. value of 0.001 = conservative.

### Implementing Experience Replay

In [None]:
import random

# gathers experiences into a data structure.
# push - adds an experience.
# sample - grabs random batches of memories to learn from (not just one); IMPT.

class ReplayMemory(object):
  # capacity - max size of the reply memory buffer.
  # memory_buffer - list of experiences: state, action, reward, next states, done or not.
  def __init__(self, capacity):
     self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")       # using gpu.
     self.capacity = capacity
     self.memory_buffer = []

  # adding experience to memory buffer.
  # event: [state, action, reward, next_states, done]
  def push(self, event):
    self.memory_buffer.append(event)
    if len(self.memory_buffer) > replay_buffer_size:
      del self.memory_buffer[0]

  # grab random batch of experiences from memory_buffer.
  def sample(self, batch_size):
    experiences = random.sample(self.memory_buffer, k=batch_size)
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None])).to(torch.uint8).float().to(self.device)
    return states, next_states, actions, rewards, dones

### Implementing the DQN class

In [None]:
import random

# Agent - defines behaviour of our agent as it interacts with the enviornment.
# act - selects an action based on q-value from network while taking into account epsilon (go through network and get q-value).
# learn - calculates target and expected q-value (using equation) and learns because we wanna minimize our loss function.
# step -  store experiences and decide when to learn from them (every 4 steps).

class Agent():
  # local_qnetwork - the network we are actually doing training on (where we are grabbing our q-values from and teaching agent to move).
  # target_qnetwork - used for computing the loss (actual q-value).

  def __init__(self, state_size, action_size):
     self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")       # using gpu.
     self.state_size = state_size
     self.action_size = action_size
     self.local_qnetwork = Network(state_size, action_size).to(self.device)
     self.target_qnetwork = Network(state_size, action_size).to(self.device)
     self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = alpha)
     self.memory = ReplayMemory(replay_buffer_size)
     self.timestep = 0

  # selecting an action (highest q value) based on the current state + epsilon.
  def act(self, state, epsilon = 0.):
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)      # adding a batch to the state to show which batch the state belongs to.
    self.local_qnetwork.eval()    # going from training mode -> evaluation mode (kinda like an isoloated evaluation environment).

    # ensure we are using evaluation mode before grabbing our predicted q values.
    with torch.no_grad():
      action_values = self.local_qnetwork(state)

    self.local_qnetwork.train()  # going from evaluation mode -> training mode.
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())

    return random.choice(np.arange(self.action_size))

  # updating q-values from our local network to better match those of the target network.
  def learn(self, experiences, discount_factor):
    # unpack our experiences.
    states, next_states, actions, rewards, dones = experiences
    Q_target_next = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
    Q_target = rewards + (discount_factor * Q_target_next * (1 - dones))    # Q_t equation.
    Q_expected = self.local_qnetwork(states).gather(1, actions)
    loss = F.mse_loss(Q_expected, Q_target)
    self.optimizer.zero_grad()
    loss.backward()   # back propogates.
    self.optimizer.step()                                             # updates local model's parameters.
    self.soft_update(self.local_qnetwork, self.target_qnetwork, tau)  # updates target model's parameters.

  # store experiences and decide when to learn from them (every 4 steps).
  def step(self, state, action, reward, next_state, done_status):
    self.memory.push((state, action, reward, next_state, done_status))
    self.timestep = (self.timestep + 1) % 4
    if self.timestep == 0:
      # ensure we have enough to make batches of memory.
      if len(self.memory.memory_buffer) > batch_size:
        experience_batch = self.memory.sample(100)
        self.learn(experience_batch, gamma)

  # updating target model's parameters.
  def soft_update(self, local_model, target_model, interpolation_parameter):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)

### Initializing the DQN agent

In [None]:
agent = Agent(state_size=state_size, action_size=number_actions)

### Initialize hyperparameters for the DQN agent

In [None]:
epochs = 2000
maximum_number_timesteps_per_episode = 1000    # max number of steps for the rocket to reach surface (dont want AI to train too hard).
# epsilon = exploitation vs exploration; 1 = exploration, 0 = exploitation.
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_starting_value
scores_100_episodes = deque(maxlen=100)   # window that keeps track of the scores of last 100 episodes.

### Training the DQN agent

In [None]:
for epoch in range(1, epochs + 1):
  state, _ = env.reset()    # since we are starting a new episode, we gotta reset the env everytime (reset rocket ship). _ ensures we dont care abt any return value.
  score = 0   # cummulative reward for entire episode (wanna maximize this).

  for t in range(1, maximum_number_timesteps_per_episode + 1):
    action = agent.act(state, epsilon=epsilon)
    next_state, reward, done, _, _ = env.step(action)   # built in.
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward

    # check if episode is done or not.
    if done:
      break

  scores_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon * epsilon_decay_value)
  print('\rEpoch {}\tAverage Score: {:.2f}'.format(epoch, np.mean(scores_100_episodes)), end = "")
  if epoch % 100 == 0:
    print('\rEpoch {}\tAverage Score: {:.2f}'.format(epoch, np.mean(scores_100_episodes)), end = "")
    if np.mean(scores_100_episodes) >= 200.0:
      print('\nEnviornment solved in {:d}\tAverage Score: {:.2f}'.format(epoch, np.mean(scores_100_episodes)), end = "")
      torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
      break

Epoch 1000	Average Score: 206.58
Enviornment solved in 1000	Average Score: 206.58

## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v3')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

