# DQN para Cart Pole

**Tarea**

El agente debe decidir entre dos acciones: mover el carro a la izquierda o a la derecha, de modo que el palo unido a él se mantenga en equilibrio. Para entender bien el funcionamiento del ambiente, es importante que **lean la siguiente página**: [sitio web de Gymnasium](https://gymnasium.farama.org/environments/classic_control/cart_pole/).

![CartPole](https://pytorch.org/tutorials/_static/img/cartpole.gif)

A medida que el agente observa el estado actual del ambiente y elige una acción, éste transiciona a un nuevo estado y también devuelve una recompensa que indica las consecuencias de la acción. En esta tarea, las recompensas son +1 por cada paso de tiempo incremental y el ambiente termina si el palo se inclina demasiado o si el carro se mueve más de 2.4 unidades del centro. Esto significa que los escenarios con mejor rendimiento durarán más tiempo, acumulando un retorno mayor.

La tarea CartPole está diseñada para que las entradas al agente sean 4 valores de tipo **float** que representan el estado del ambiente (posición, velocidad, etc.). Tomamos estas 4 entradas sin ningún escalado y las pasamos por una pequeña red con 2 salidas, una para cada acción. La red se entrena para predecir el valor esperado para cada acción, dado el estado de entrada. Luego se elige la acción con el mayor valor esperado.


In [None]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F


En el ambiente `CartPole-v1`, el **estado** está compuesto por 4 valores continuos:

1. **Cart Position (posición del carro)**
2. **Cart Velocity (velocidad del carro)**  
3. **Pole Angle (ángulo del palo)**
4. **Pole Angular Velocity (velocidad angular del palo):** rapidez con la que gira el palo.

Es decir, el estado se representa como:

state = [posición, velocidad, ángulo, velocidad\_angular]


In [None]:
env = gym.make("CartPole-v1")


In [None]:
state = env.reset()

print("Estado inicial (vector):", state)
print(f"Cart Position        : {state[0][0]}")
print(f"Cart Velocity        : {state[0][1]}")
print(f"Pole Angle           : {state[0][2]}")
print(f"Pole Angular Velocity: {state[0][3]}")

In [None]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

seed = 42
random.seed(seed)
torch.manual_seed(seed)
env.reset(seed=seed)
env.action_space.seed(seed)
env.observation_space.seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)


# Componentes

- Ambiente
- Memoria
- Red de política (Policy network)
- Red objetivo (Target network)


Memoria
=====================

Vamos a usar una memoria de experiencias para entrenar nuestra DQN. En ésta vamos a almacenar las transiciones que el agente observa, permitiéndonos reutilizar estos datos más adelante. Al muestrear aleatoriamente, las transiciones que conforman un batch no van a estar correlacionadas entre sí. Se ha demostrado que esto estabiliza y mejora en gran medida el procedimiento de entrenamiento de DQN.

Para ello, vamos a necesitar dos clases:

- `Transition` - una tupla que representa una única transición en nuestro ambiente. Esencialmente mapea pares (state, action) a su resultado (next_state, reward).
- `ReplayMemory` - un búfer cíclico de tamaño acotado que contiene las transiciones observadas recientemente. También implementa un método `.sample()` para seleccionar un batch aleatorio de transiciones para el entrenamiento.


In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity): # TODO
        """ Inicia la memoria como una lista vacía de capacidad maxima = capacity """
        pass

    def push(self, *args):  # TODO
        """ Guardar una transición """
        pass

    def sample(self, batch_size): # TODO
        """ Tomar una muestra aleatoria de la memoria de tamaño batch_size """
        pass

    def __len__(self): # TODO
        pass


Ahora vamos a definir nuestro modelo. Pero antes, hagamos un repaso rápido de DQN.

Algoritmo DQN
=============

Nuestro objetivo va a ser entrenar una política que trate de maximizar la recompensa acumulada y descontada
$R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t$.

El descuento, $\gamma$, debe ser una constante entre $0$ y $1$ que asegure la convergencia de la suma. Un $\gamma$ menor hace que las recompensas del futuro lejano incierto sean menos importantes para nuestro agente que las del futuro cercano, sobre las que puede estar más seguro. También incentiva a los agentes a recolectar recompensas más cercanas en el tiempo que recompensas equivalentes pero más lejanas.

La idea principal detrás de Q-learning es que si tuviéramos una función
$Q^*: State \times Action \rightarrow \mathbb{R}$, que pudiera decirnos cuál sería nuestro retorno si tomáramos una acción en un estado dado, entonces podríamos construir fácilmente una política que maximice nuestras recompensas:

$$\pi^*(s) = \arg\!\max_a \ Q^*(s, a)$$

Sin embargo, no sabemos todo sobre el ambiente, así que no tenemos acceso a $Q^*$. Pero, dado que las redes neuronales son aproximadores universales de funciones, podemos simplemente crear una y entrenarla para que se parezca a $Q^*$.

Para nuestra regla de actualización de entrenamiento, usaremos el hecho de que toda función $Q$ para alguna política obedece la ecuación de Bellman:

$$Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s'))$$

La diferencia entre ambos lados de la igualdad se conoce como el error de diferencia temporal, $\delta$:

$$\delta = Q(s, a) - (r + \gamma \max_a' Q(s', a))$$

Para minimizar este error, vamos a usar la [pérdida de Huber](https://en.wikipedia.org/wiki/Huber_loss). La pérdida de Huber actúa como el error cuadrático medio cuando el error es pequeño, pero como el error absoluto medio cuando el error es grande; esto la hace más robusta a valores atípicos cuando las estimaciones de $Q$ son muy ruidosas. Calculamos esto sobre un batch de transiciones, $B$, muestreado de la memoria de repetición:

$$\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)$$

$$\begin{aligned}
\text{donde} \quad \mathcal{L}(\delta) = \begin{cases}
  \frac{1}{2}{\delta^2}  & \text{si } |\delta| \le 1, \\
  |\delta| - \frac{1}{2} & \text{en otro caso.}
\end{cases}
\end{aligned}$$


Q-network
-----------------

Nuestro modelo será una red neuronal feed-forward que recibe un vector que representa a un estado. Tiene dos salidas, que representan $Q(s, \mathrm{left})$ y $Q(s, \mathrm{right})$ (donde $s$ es la entrada de la red). En efecto, la red intenta predecir el retorno esperado de tomar cada acción dada la entrada actual.


In [None]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions): # TODO
        """ Definir la arquitectura de la red FF """
        super(DQN, self).__init__()
        pass

    def forward(self, x): # TODO
        """ Definir las funciones de activación de la red """
        pass


Entrenamiento
=============

Hiperparámetros y utilidades
----------------------------

Vamos a instanciar nuestro modelo y su optimizador, y definir algunas utilidades:

- `select_action` - seleccionará una acción de acuerdo con una política e-greedy. En pocas palabras, a veces usaremos nuestro modelo para elegir la acción, y otras veces simplemente muestrearemos una uniformemente. La probabilidad de elegir una acción aleatoria comenzará en `EPS_START` y decaerá exponencialmente hacia `EPS_END`. `EPS_DECAY` controla la tasa de decaimiento.
- `plot_durations` - una función auxiliar para graficar la duración de los episodios, junto con un promedio sobre los últimos 100 episodios (la medida usada en las evaluaciones oficiales). La gráfica estará debajo de la celda que contiene el bucle principal de entrenamiento y se actualizará después de cada episodio.


In [None]:
# BATCH_SIZE es el número de transiciones muestreadas de la memoria
# GAMMA es el factor de descuento mencionado en la sección anterior
# EPS_START es el valor inicial de epsilon
# EPS_END es el valor final de epsilon
# EPS_DECAY controla la tasa de decaimiento exponencial de epsilon; valores altos implican un decaimiento más lento
# TAU es la tasa de actualización de la red objetivo
# LR es la tasa de aprendizaje del optimizador AdamW

BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.01
EPS_DECAY = 2500
TAU = 0.005
LR = 3e-4

# Obtenemos el número de acciones del espacio de acciones
n_actions = env.action_space.n
# Obtenemos el número de observaciones de estado
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
# Copiamos los pesos de las redes 
target_net.load_state_dict(policy_net.state_dict()) 

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

steps_done = 0


In [None]:
def select_action(state): 
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


In [None]:
episode_durations = []

def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Resultado')
    else:
        plt.clf()
        plt.title('Entrenando...')
    plt.xlabel('Episodio')
    plt.ylabel('Duración')
    plt.plot(durations_t.numpy())
    # Tomar promedios de 100 episodios y graficarlos también
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

Bucle de entrenamiento
======================

Finalmente, el código para entrenar nuestro modelo.

La función `optimize_model` realiza un único paso de optimización. Primero muestrea un batch, concatena todos los tensores en uno solo, calcula $Q(s_t, a_t)$ y $V(s_{t+1}) = \max_a Q(s_{t+1}, a)$, y los combina en nuestra pérdida. Por definición fijamos $V(s) = 0$ si $s$ es un estado terminal. También usamos una red objetivo para calcular $V(s_{t+1})$ para mayor estabilidad. La red objetivo se actualiza en cada paso con una [actualización suave](https://arxiv.org/pdf/1509.02971.pdf) controlada por el hiperparámetro `TAU`, que definimos previamente.


In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transponer el lote (ver https://stackoverflow.com/a/19343/3343043 para
    # una explicación detallada). Esto convierte un arreglo de Transitions por lote
    # a un Transition de arreglos por lote.
    batch = Transition(*zip(*transitions))

    # Calcular una máscara de estados no finales y concatenar los elementos del lote
    # (un estado final sería aquel después del cual la simulación terminó)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Calcular Q(s_t, a) - el modelo calcula Q(s_t), luego seleccionamos las
    # columnas de las acciones tomadas. Estas son las acciones que se habrían tomado
    # para cada estado del lote según policy_net
    state_action_values = # TODO

    # Calcular V(s_{t+1}) para todos los estados siguientes.
    # Los valores esperados de las acciones para non_final_next_states se calculan
    # con la target_net "más antigua"; seleccionando su mejor recompensa
    # Esto se combina según la máscara, de modo que tengamos el valor de estado esperado
    # o 0 en caso de que el estado sea final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = # TODO
    # Calcular los valores Q esperados
    expected_state_action_values = # TODO

    # Calcular pérdida de Huber
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimizar el modelo
    optimizer.zero_grad()
    loss.backward()
    # Recorte de gradientes in-place
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()


A continuación, vemos como quedaría bucle principal de entrenamiento. Al principio reiniciamos el ambiente y obtenemos el `state` inicial como un Tensor. Luego, muestreamos una acción, la ejecutamos, observamos el siguiente estado y la recompensa (siempre 1), y optimizamos nuestro modelo una vez. Cuando termina el episodio (nuestro modelo falla), reiniciamos el bucle.


In [None]:
if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Inicializar el ambiente y obtener su estado
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = # TODO
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # TODO: Guardar la transición en la memoria

        # TODO: Moverse al siguiente estado
    
        # Realizar un paso de optimización (en la red de política)
        optimize_model()

        # Actualización suave de los pesos de la red objetivo
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Completado')
plot_durations(show_result=True)
plt.ioff()
plt.show()


Diagrama explicativo del flujo de datos

![](https://pytorch.org/tutorials/_static/img/reinforcement_learning_diagram.jpg)

Las acciones se eligen aleatoriamente o en base a una política, obteniendo la siguiente muestra del ambiente de gym. Registramos los resultados en la memoria de repetición y también ejecutamos un paso de optimización en cada iteración. La optimización elige un batch aleatorio de la memoria de repetición para entrenar la nueva política. La `target_net` "más antigua" también se usa en la optimización para calcular los valores Q esperados. Se realiza una actualización de los pesos en cada paso del entrenamiento.
