# 📘 Algoritmo Deep Q-Network (DQN)

## 🧠 Introducción

El algoritmo **Deep Q-Network (DQN)** fue propuesto por DeepMind en 2013-2015 y marcó un hito al lograr jugar videojuegos de Atari a nivel humano. Combina el **Q-Learning** tradicional con redes neuronales profundas para aproximar la función de valor, lo que permite manejar espacios de estados continuos y de alta dimensión.

**Publicación clave:**  
- *Playing Atari with Deep Reinforcement Learning* (Mnih et al., 2013)  
  [ArXiv paper](https://arxiv.org/abs/1312.5602)

![Red](https://miro.medium.com/v2/resize:fit:1153/1*emv9eFMbGODD4gnITjfwcQ.png)

![Red2](https://www.researchgate.net/publication/358148766/figure/fig1/AS:1116926097014784@1643307341035/DQN-training-schematic-showing-policy-network-target-network-and-experience-replay.png)
---

## ⚙️ Características principales de DQN

- Utiliza una **red neuronal** para aproximar la función Q:  
  $
  Q(s, a; \theta) \approx Q^*(s, a)
  $
- Emplea un **replay buffer** para romper la correlación temporal entre experiencias.
- Usa una **red objetivo (target network)** para estabilizar el entrenamiento.
- Algoritmo **off-policy**, basado en Q-Learning.
- Estrategia de exploración: **ε-greedy**.

---

## 🧮 Ecuaciones clave

### 1. **Actualización de Q-Learning**

La función de valor Q se actualiza usando la ecuación de Bellman:

$
Q_{\text{target}} = r + \gamma \max_{a'} Q(s', a'; \theta^-)
$

$
\mathcal{L}(\theta) = \mathbb{E}_{(s,a,r,s') \sim \mathcal{D}} \left[ \left(Q(s,a;\theta) - Q_{\text{target}}\right)^2 \right]
$

Donde:
- $ \theta $: parámetros de la red principal.
- $ \theta^- $: parámetros de la red objetivo (actualizados cada cierto tiempo).
- $ \gamma $: factor de descuento.
- $ \mathcal{D} $: buffer de experiencia.

---

## 🖼️ Arquitectura (representación visual)

Una arquitectura típica de DQN para Atari incluye:

- Preprocesamiento de imágenes.
- CNN para extraer características del estado.
- Capas densas para generar valores Q.

**Imagen representativa de la arquitectura:**

![DQN Architecture](https://www.researchgate.net/publication/318184943/figure/fig1/AS:812127505879042@1570637694827/DQN-architecture-for-end-to-end-learning-of-Atari-2600-game-plays.png)  
Crédito: [Denny Britz - RL GitHub](https://github.com/dennybritz/reinforcement-learning)

---

## 📦 Experience Replay Buffer

**¿Qué es?**  
Una estructura de datos (FIFO o circular) que almacena transiciones \( (s, a, r, s') \) observadas durante las interacciones del agente con el entorno.

### Ventajas:
- Rompe la correlación entre experiencias consecutivas.
- Permite reutilizar datos para mejorar la eficiencia.
- Mejora la estabilidad del entrenamiento.

### Implementación básica:
- Tamaño fijo.
- Muestreo aleatorio mini-batch.
- Reemplazo cuando se llena.

---

## 📜 Pseudocódigo: Algoritmo de entrenamiento de $Q$-Learning

Usaremos el siguiente algoritmo para entrenar la red.

* Inicializar la memoria $D$
* Inicializar la red de valores de acción $Q$ con pesos aleatorios
* **Para** episodio $\leftarrow 1$ **hasta** $M$ **hacer**
  * Observar $s_0$
  * **Para** $t \leftarrow 0$ **hasta** $T-1$ **hacer**
     * MUESTREO
     * Con probabilidad $\epsilon$ seleccionar una acción aleatoria $a_t$, de lo contrario seleccionar $a_t = \mathrm{argmax}_a Q(s_t,a)$
     * Ejecutar acción $a_t$ en el simulador y observar la recompensa $r_{t+1}$ y el nuevo estado $s_{t+1}$
     * Almacenar la transición $<s_t, a_t, r_{t+1}, s_{t+1}>$ en la memoria $D$
     * ENTRENAMIENTO
     * Muestrear un mini-lote aleatorio de $D$: $<s_j, a_j, r_j, s'_j>$
     * Establecer $Q^t_j = r_j$ si el episodio termina en $j+1$, de lo contrario establecer $Q^t_j = r_j + \gamma \max_{a'}{Q(s'_j, a')}$
     * Realizar un paso de descenso por gradiente con la pérdida $(\hat{Q}_j - Q(s_j, a_j))^2$
  * **fin para**
* **fin para**

¡Se recomienda (y se fomenta!) tomar el tiempo para extender este código e implementar algunas de las mejoras que discutimos en la lección, incluyendo objetivos fijos de $Q$, Double DQNs, reproducción priorizada y/o redes dueling.


In [17]:
try:
    from google.colab import drive
    drive.mount('/content/drive/')
    COLAB = True
    print("Nota: Usando Google CoLab")
except:
    print("Nota: Usando JupyterNotebook")
    COLAB = False

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).
Nota: Usando Google CoLab


In [18]:
if COLAB:
  !apt update && apt install xvfb
  !pip install gym-notebook-wrapper
  !pip install flappy-bird-gymnasium
  !pip install renderlab
  !pip install opencv-python
  !pip install torch

from IPython.display import clear_output
import warnings

warnings.filterwarnings('ignore')
clear_output()

## Entorno sin aprendizaje

In [19]:
import matplotlib
import matplotlib.pyplot as plt

import random
import torch
from torch import nn
import yaml

from datetime import datetime, timedelta
import argparse
import itertools

import flappy_bird_gymnasium
import os
import gymnasium
import renderlab as rl

# Documentacion https://github.com/markub3327/flappy-bird-gymnasium

env = gymnasium.make("FlappyBird-v0", render_mode="rgb_array", use_lidar=False)
env = rl.RenderFrame(env, "./output") # Inicializa la carpeta para render

obs, _ = env.reset()
while True:
    # Next action:
    # (feed the observation to your agent here)
    action = env.action_space.sample()

    # Processing:
    obs, reward, terminated, _, info = env.step(action)

    # Checking if the player is still alive
    if terminated:
        break
env.play()
env.close()

Moviepy - Building video temp-{start}.mp4.
Moviepy - Writing video temp-{start}.mp4





Moviepy - Done !
Moviepy - video ready temp-{start}.mp4


## Memoria

In [20]:
###
from collections import deque #a formal way to "bilateral arrays"
import random

# I mean, there is no mistery here, it is just to be formal
class ReplayMemory():
    def __init__(self, maxlen):
        self.memory = deque([], maxlen=maxlen)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)

    def __len__(self):
        return len(self.memory)

In [27]:
####
import torch.nn.functional as F
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
class DQN(nn.Module):

    def __init__(self, state_dim, action_dim, hidden_dim=256, enable_dueling_dqn=True):
        super(DQN, self).__init__()

        self.enable_dueling_dqn=enable_dueling_dqn
        self.fc1 = nn.Linear(state_dim, hidden_dim)

        if self.enable_dueling_dqn:

            #value stream
            self.fc_value = nn.Linear(hidden_dim, 256)
            self.value = nn.Linear(256, 1)

            #advantages stream
            self.fc_advantages = nn.Linear(hidden_dim, 256)
            self.advantages = nn.Linear(256, action_dim)

        else:
            # vanilla dqn
            self.output = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        latent_features = F.relu(self.fc1(x))

        if self.enable_dueling_dqn:
            #value stream calculation
            v = F.relu(self.fc_value(latent_features))
            V = self.value(v)

            #advantages stream calculation
            a = F.relu(self.fc_advantages(latent_features))
            A = self.advantages(a)

            #Q-value calculation:
            """
              -> V: [batch_size, 1]
              -> A: [batch_size, action_dim]
              -> V + A = [batch_size, action_dim] + [batch_size, action_dim * 1]
              this means that the action tensor is automatically expanded
            """
            Q = V + A - torch.mean(A, dim=1, keepdim=True)

        else:
            Q = self.output(latent_features)

        return Q

if __name__ == '__main__':
    state_dim = 12
    action_dim = 2
    net = DQN(state_dim, action_dim)
    state = torch.randn(10, state_dim) #batch dimension:10 with 12 state dimensions ("our bird sensors")
    output = net(state)
    print("As we can see, we have 10 different Q-values for each state:")
    print(output.size())
    print(output)

cpu
As we can see, we have 10 different Q-values for each state:
torch.Size([10, 2])
tensor([[ 0.1399,  0.0127],
        [ 0.1460,  0.2408],
        [-0.0134,  0.1870],
        [ 0.0216,  0.2159],
        [ 0.0203,  0.1664],
        [ 0.0727,  0.2818],
        [-0.0918,  0.1337],
        [-0.0525,  0.2067],
        [ 0.0684,  0.1558],
        [ 0.0398,  0.2800]], grad_fn=<SubBackward0>)


In [28]:
###
def optimize(mini_batch, policy_dqn, target_dqn):

        #get elements from the env in the mini_batch
        states, actions, new_states, rewards, terminations = zip(*mini_batch)

        states = torch.stack(states)
        actions = torch.stack(actions)
        new_states = torch.stack(new_states)
        rewards = torch.stack(rewards)
        terminations = torch.tensor(terminations).float().to(device)

        discount_factor_g = 0.99
        with torch.no_grad():
            enable_double_dqn = True #just to implement also without double_dqn
            if enable_double_dqn == True:
                best_actions_from_policy = policy_dqn(new_states).argmax(dim=1)

                target_q = rewards + (1-terminations) * discount_factor_g * \
                                target_dqn(new_states).gather(dim=1, index=best_actions_from_policy.unsqueeze(dim=1)).squeeze()
            else:
                #calculate target Q values (expected returns)
                target_q = rewards + (1-terminations) * discount_factor_g * target_dqn(new_states).max(dim=1)[0]
                '''
                    target_dqn(new_states)  ==> tensor([[1,2,3],[4,5,6]])
                        .max(dim=1)         ==> torch.return_types.max(values=tensor([3,6]), indices=tensor([3, 0, 0, 1]))
                            [0]             ==> tensor([3,6])
                '''

        # calcuate Q values from current policy
        current_q = policy_dqn(states).gather(dim=1, index=actions.unsqueeze(dim=1)).squeeze()
        '''
            policy_dqn(states)  ==> tensor([[1,2,3],[4,5,6]])
                actions.unsqueeze(dim=1)
                .gather(1, actions.unsqueeze(dim=1))  ==>
                    .squeeze()                    ==>
        '''

        loss = loss_fn(current_q, target_q)

        # backprop step
        optimizer.zero_grad()  # Clear gradients
        loss.backward()             # compute gradients
        optimizer.step() #propagate it through the model

### Principal

In [29]:
###
env = gymnasium.make("FlappyBird-v0", render_mode='human', use_lidar=False) ## to get the 12 state observations
num_actions = env.action_space.n
print(f"number of possible actions (flap or nothing): {num_actions}")

observation_space = env.observation_space
num_states = observation_space.shape[0] # Box(low, high, (shape0,), float64)
print(f"observation space: {observation_space}")
print(f"observations: {num_states}")

rewards_per_episode = []
epsilon = 1
epsilon_decay = 0.99995
epsilon_min = 0.00001
mini_batch_size = 32
network_sync_rate = 10
epsilon_history = []
memory = ReplayMemory(maxlen = 100000)

#instantiating networks
policy_dqn = DQN(num_states, num_actions, hidden_dim=256, enable_dueling_dqn=True).to(device)
target_dqn = DQN(num_states, num_actions, hidden_dim=256, enable_dueling_dqn=True).to(device)
target_dqn.load_state_dict(policy_dqn.state_dict()) #same as the target network

optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=0.0001)
loss_fn = nn.MSELoss()

# track number of steps taken. used for syncing policy => target network.
step_count=0

best_reward = -float("inf") ## see if it is right

#for episode in itertools.count(): #if you want to run indefinitely -> stop whenever you want
for episode in range(1000):

    state, _ = env.reset()  # initialize environment. reset returns (state,info).
    state = torch.tensor(state, dtype=torch.float, device=device)

    #loop train in the episode
    stop_on_reward = 999
    terminated = False      #if the env has some goal (not the case of flappybird)
    episode_reward = 0.0    #used to accumulate rewards per episode

number of possible actions (flap or nothing): 2
observation space: Box(-1.0, 1.0, (12,), float64)
observations: 12


## Entrenamiento

In [None]:
###
state, _ = env.reset()
print(state)
print(_)

#for episode in itertools.count(): #if you want to run indefinitely -> stop whenever you want
for episode in range(1000):

    state, _ = env.reset()  # initialize environment. reset returns (state,info).
    state = torch.tensor(state, dtype=torch.float, device=device)

    #loop train in the episode
    stop_on_reward = 999
    terminated = False      #if the env has some goal (not the case of flappybird)
    episode_reward = 0.0    #used to accumulate rewards per episode

    while(not terminated and episode_reward < stop_on_reward): #stop if bird is dead or achieved 999 scores on the game

        ## epsilon-greedy algorithm !!!!!!!!!!
        if random.random() < 0.1: ##floating number [0,1] < epsilon
            # select random action
            action = env.action_space.sample() #env.action_space = [0,1]
            action = torch.tensor(action, dtype=torch.int64, device=device)
        else:
            # select best action
            with torch.no_grad():
                state_pytorch = state.unsqueeze(dim=0) # state.shape -> [12], state.unsqueeze(dim=0).shape -> [1,12] because pytorch expects a batch dimension
                action = policy_dqn(state_pytorch).squeeze().argmax() # action.shape -> [1, 2], action.squeeze(dim=0).shape -> [2], argmax outputs the position of the max(a,b)

        #execute action letsgoo
        new_state,reward,terminated,truncated,info = env.step(action.item()) #this env.step executes the action on the env
        episode_reward += reward

        new_state = torch.tensor(new_state, dtype=torch.float, device=device)
        reward = torch.tensor(reward, dtype=torch.float, device=device)

        #adding into the replay buffer
        memory.append((state, action, new_state, reward, terminated))
        step_count+=1

        state = new_state
    rewards_per_episode.append(episode_reward)

    if episode_reward > best_reward:
        print(f"{datetime.now().strftime('%m-%d %H:%M:%S')}: New best reward {episode_reward:0.1f} ({(episode_reward-best_reward)/best_reward*100:+.1f}%) at episode {episode}, saving model...")
        torch.save(policy_dqn.state_dict(), "best_model.pt")
        best_reward = episode_reward

    # also an importat part: the model optimization using the memory buffer, epsilon decay and double dqn !!!!!!!!!!
    if len(memory) > mini_batch_size:
        mini_batch = memory.sample(mini_batch_size)
        optimize(mini_batch, policy_dqn, target_dqn) #optimization step

        # espilon decay... going into the exploitation and avoiding exploration
        epsilon = max(epsilon * epsilon_decay, epsilon_min)
        epsilon_history.append(epsilon)

        # the double dqn idea: copy policy network to target network after a certain number of steps
        if step_count > network_sync_rate:
            target_dqn.load_state_dict(policy_dqn.state_dict())
            step_count=0

In [None]:
###
# switch model to evaluation mode
policy_dqn.eval()

rewards_per_episode = []

#for episode in itertools.count(): #if you want to run indefinitely -> stop whenever you want
for episode in range(5):

    state, _ = env.reset()  # initialize environment. reset returns (state,info).
    state = torch.tensor(state, dtype=torch.float, device=device)

    #loop train in the episode
    terminated = False      #if the env has some goal (not the case of flappybird)
    episode_reward = 0.0    #used to accumulate rewards per episode

    while not terminated: #stop if bird is dead

        # select best action
        with torch.no_grad():
            state_pytorch = state.unsqueeze(dim=0) # state.shape -> [12], state.unsqueeze(dim=0).shape -> [1,12] because pytorch expects a batch dimension
            action = policy_dqn(state_pytorch).squeeze().argmax() # action.shape -> [1, 2], action.squeeze(dim=0).shape -> [2], argmax outputs the position of the max(a,b)

        #execute action letsgoo
        new_state,reward,terminated,truncated,info = env.step(action.item()) #this env.step executes the action on the env
        episode_reward += reward

        new_state = torch.tensor(new_state, dtype=torch.float, device=device)
        reward = torch.tensor(reward, dtype=torch.float, device=device)

        state = new_state
    rewards_per_episode.append(episode_reward)

print(rewards_per_episode)

In [None]:
###
# switch model to evaluation mode
policy_dqn.eval()

rewards_per_episode = []

#for episode in itertools.count(): #if you want to run indefinitely -> stop whenever you want
for episode in range(5):

    state, _ = env.reset()  # initialize environment. reset returns (state,info).
    state = torch.tensor(state, dtype=torch.float, device=device)

    #loop train in the episode
    terminated = False      #if the env has some goal (not the case of flappybird)
    episode_reward = 0.0    #used to accumulate rewards per episode

    while not terminated: #stop if bird is dead

        # select best action
        with torch.no_grad():
            state_pytorch = state.unsqueeze(dim=0) # state.shape -> [12], state.unsqueeze(dim=0).shape -> [1,12] because pytorch expects a batch dimension
            action = policy_dqn(state_pytorch).squeeze().argmax() # action.shape -> [1, 2], action.squeeze(dim=0).shape -> [2], argmax outputs the position of the max(a,b)

        #execute action letsgoo
        new_state,reward,terminated,truncated,info = env.step(action.item()) #this env.step executes the action on the env
        episode_reward += reward

        new_state = torch.tensor(new_state, dtype=torch.float, device=device)
        reward = torch.tensor(reward, dtype=torch.float, device=device)

        state = new_state
    rewards_per_episode.append(episode_reward)

print(rewards_per_episode)

In [None]:
env = gymnasium.make("FlappyBird-v0", render_mode="rgb_array", use_lidar=True)
env = rl.RenderFrame(env, "./output") # Inicializa la carpeta para render

obs, _ = env.reset()
while True:
    # Next action:
    # (feed the observation to your agent here)
    action = env.action_space.sample()

    # Processing:
    obs, reward, terminated, _, info = env.step(action)

    # Checking if the player is still alive
    if terminated:
        break
env.play()
env.close()