# Laboratorio 7 CartPole
## Deep Learning

- Fabiola Contreras, 22787
- Diego Duarte, 22075
- José Marchena, 22398
- Sofía Velásquez, 22049
- María José Villafuerte, 22129

### Preparar ambiente

In [14]:
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import pygame

In [2]:
env = gym.make("CartPole-v1", render_mode="rgb_array")
env

<TimeLimit<OrderEnforcing<PassiveEnvChecker<CartPoleEnv<CartPole-v1>>>>>

### Definición de Redes

In [3]:
class QNetwork(nn.Module):
    def __init__(self, obs_size, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )
        
    def forward(self, x):
        return self.net(x)

In [4]:
obs_size = env.observation_space.shape[0]   # 4 observaciones
n_actions = env.action_space.n              # 2 acciones

# Red en línea
online_net = QNetwork(obs_size, n_actions)

# Red de destino (clon inicial)
target_net = QNetwork(obs_size, n_actions)
target_net.load_state_dict(online_net.state_dict())  # mismos pesos al inicio
target_net.eval()  # no necesita gradientes


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.3.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "C:\Users\diego\AppData\Roaming\Python\Python311\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "C:\Users\diego\AppData\Roaming\Python\Python311\site-packages\traitlets\config\application.py", line 1075, in launch_instance
    app.start()
  File "C:\Users\diego\AppData\Roaming\Python\Python311\site-packages\ipykernel\kernelapp.py", line 739, in start
    self.io_lo

QNetwork(
  (net): Sequential(
    (0): Linear(in_features=4, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=2, bias=True)
  )
)

### Establecer hiperparámetros

In [5]:
import random
from collections import deque
import torch.optim as optim

# Dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
online_net.to(device)
target_net.to(device)

# Hiperparámetros principales
GAMMA = 0.99                 # factor de descuento
LR = 1e-3                    # tasa de aprendizaje
BATCH_SIZE = 64              # tamaño de minibatch
REPLAY_CAPACITY = 50_000     # capacidad del buffer
TARGET_UPDATE_FREQ = 1000    # cada cuántos pasos copiar (hard update) a la red objetivo
TRAIN_START_SIZE = 1000      # mínimo de transiciones antes de entrenar
MAX_EPISODES = 500           # por si lo usas luego en el bucle de entrenamiento
MAX_STEPS_PER_EP = 500

# Exploración ε-greedy
EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY_STEPS = 50_000     # pasos para decaer desde 1.0 -> 0.05 (lineal)

# Optimizador y pérdida
optimizer = optim.Adam(online_net.parameters(), lr=LR)
criterion = nn.MSELoss()

# Programador lineal de epsilon (por pasos globales)
global_step = 0
def get_epsilon(step: int) -> float:
    # decaimiento lineal
    frac = min(1.0, step / EPS_DECAY_STEPS)
    return EPS_START + (EPS_END - EPS_START) * frac

### Selección de acciones épsilon-greedy

In [6]:
def select_action(state_np, epsilon: float):
    """
    state_np: np.ndarray con forma (obs_size,)
    epsilon : valor actual de exploración
    Retorna: acción (int)
    """
    if random.random() < epsilon:
        # Explora
        return env.action_space.sample()
    else:
        # Explotación con la red online
        state_t = torch.as_tensor(state_np, dtype=torch.float32, device=device).unsqueeze(0)  # [1, obs_size]
        with torch.no_grad():
            q_values = online_net(state_t)  # [1, n_actions]
            action = int(torch.argmax(q_values, dim=1).item())
        return action


### Experience replay

In [10]:
# Replay Buffer + paso de optimización (DQL con red objetivo) muy de girly pop
class ReplayBuffer:
    def __init__(self, capacity: int):
        self.buf = deque(maxlen=capacity)
    def push(self, s, a, r, s_next, done):
        # Guardamos como tuplas
        self.buf.append((s, a, r, s_next, done))
    def __len__(self):
        return len(self.buf)
    def sample(self, batch_size: int):
        batch = random.sample(self.buf, batch_size)
        s, a, r, s_next, d = zip(*batch)

        s      = torch.as_tensor(np.vstack(s), dtype=torch.float32, device=device)      # [B, obs]
        a      = torch.as_tensor(a, dtype=torch.int64, device=device).unsqueeze(1)      # [B, 1]
        r      = torch.as_tensor(r, dtype=torch.float32, device=device).unsqueeze(1)    # [B, 1]
        s_next = torch.as_tensor(np.vstack(s_next), dtype=torch.float32, device=device) # [B, obs]
        d      = torch.as_tensor(d, dtype=torch.float32, device=device).unsqueeze(1)    # [B, 1]

        return s, a, r, s_next, d


replay = ReplayBuffer(REPLAY_CAPACITY)

def hard_update_target():
    """Copia dura de pesos desde online_net hacia target_net."""
    target_net.load_state_dict(online_net.state_dict())

def optimize_model():
    """Un paso de optimización DQL usando muestras del replay + red objetivo."""
    if len(replay) < max(BATCH_SIZE, TRAIN_START_SIZE):
        return None  # aún no hay suficientes muestras

    # Muestra minibatch
    states, actions, rewards, next_states, dones = replay.sample(BATCH_SIZE)

    # Q_online(s, a): tomamos sólo el Q de la acción ejecutada
    q_pred_all = online_net(states)                       # [B, n_actions]
    q_pred = q_pred_all.gather(1, actions)                # [B, 1]

    # Objetivo con red objetivo: y = r + gamma * (1 - done) * max_a' Q_target(s', a')
    with torch.no_grad():
        q_next_all = target_net(next_states)              # [B, n_actions]
        q_next_max = q_next_all.max(dim=1, keepdim=True).values  # [B, 1]
        q_target = rewards + (1.0 - dones) * GAMMA * q_next_max  # [B, 1]

    # Pérdida MSE
    loss = criterion(q_pred, q_target)

    # Backprop
    optimizer.zero_grad()
    loss.backward()
    # por si nos sirve, este es un clipping de gradiente para estabilidad, si no lo podemos borrar, x no importa
    torch.nn.utils.clip_grad_norm_(online_net.parameters(), max_norm=10.0)
    optimizer.step()

    return loss.item()


### Training Cycle y Visualizar Entorno

In [16]:
# Crear entorno con renderizado en pantalla
env = gym.make("CartPole-v1", render_mode="human")  

reward_history = []
loss_history = []

for episode in range(MAX_EPISODES):
    state, _ = env.reset()
    total_reward = 0

    for t in range(MAX_STEPS_PER_EP):
        global_step += 1
        epsilon = get_epsilon(global_step)

        # Renderizar cada 50 episodios
        if episode % 50 == 0:
            env.render()

        # Selección de acción (ε-greedy)
        action = select_action(state, epsilon)

        # Ejecutar acción en el entorno
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward

        # Guardar transición en el buffer con dtype correcto
        replay.push(
            np.array(state, dtype=np.float32), 
            action, 
            reward, 
            np.array(next_state, dtype=np.float32), 
            done
        )

        # Optimizar modelo con un batch de replay
        loss = optimize_model()
        if loss is not None:
            loss_history.append(loss)

        # Actualizar red objetivo cada cierto número de pasos
        if global_step % TARGET_UPDATE_FREQ == 0:
            hard_update_target()

        state = next_state

        if done:
            break

    reward_history.append(total_reward)

    # Log de progreso cada 10 episodios
    if (episode + 1) % 10 == 0:
        avg_r = np.mean(reward_history[-10:])
        print(f"Ep {episode+1}/{MAX_EPISODES} | Recompensa media ult.10: {avg_r:.2f} | ε={epsilon:.3f}")

# Cerrar entorno al final
env.close()


Ep 10/500 | Recompensa media ult.10: 24.00 | ε=0.724
Ep 20/500 | Recompensa media ult.10: 23.60 | ε=0.719
Ep 30/500 | Recompensa media ult.10: 27.80 | ε=0.714
Ep 40/500 | Recompensa media ult.10: 34.30 | ε=0.708
Ep 50/500 | Recompensa media ult.10: 34.10 | ε=0.701
Ep 60/500 | Recompensa media ult.10: 30.70 | ε=0.695
Ep 70/500 | Recompensa media ult.10: 31.70 | ε=0.689
Ep 80/500 | Recompensa media ult.10: 31.00 | ε=0.683
Ep 90/500 | Recompensa media ult.10: 20.70 | ε=0.679
Ep 100/500 | Recompensa media ult.10: 29.90 | ε=0.674
Ep 110/500 | Recompensa media ult.10: 30.70 | ε=0.668
Ep 120/500 | Recompensa media ult.10: 22.10 | ε=0.664
Ep 130/500 | Recompensa media ult.10: 46.10 | ε=0.655
Ep 140/500 | Recompensa media ult.10: 23.40 | ε=0.651
Ep 150/500 | Recompensa media ult.10: 45.70 | ε=0.642
Ep 160/500 | Recompensa media ult.10: 45.80 | ε=0.633


KeyboardInterrupt: 