# DoubleDunk - REINFORCE


Resumen:

- Algoritmo: REINFORCE con baseline (Actor-Crítico), entropía y gradient clipping.
- Ambiente: `ALE/DoubleDunk-v5`
  - Espacio de Acciones Discreto: `18` acciones
  - Espacio de Observaciones: `Box(0, 255, (210, 160, 3), uint8)`
- Objetivo: Anotar en el juego de baloncesto, defender y superar al oponente.
- Puntaje baseline: `-14.0` (resultado típico)
- Hardware: Compatible con Google Colab GPU, detecta automáticamente CUDA/MPS/CPU
- Tiempo de entrenamiento estimado: ~6-8 horas (8000 episodios)
- Características: Sistema de checkpoints, evaluación periódica, generación de videos


## Instalación e importación de librerías


In [1]:
# Instalación de dependencias para Google Colab
!pip install gymnasium[atari] ale-py torch torchvision imageio -q
!pip install "gymnasium[atari,accept-rom-license]" -q
!AutoROM --accept-license

# Librerías básicas
import sys, platform
print({'python': sys.version.split()[0], 'platform': platform.platform()})

#Limpia los registros generados
from IPython.display import clear_output
clear_output()
print("Todas las librerías han sido instaladas correctamente.")


Todas las librerías han sido instaladas correctamente.


## Detección de Hardware


In [2]:
# ========================================
# IMPORTACIONES Y DETECCIÓN DE HARDWARE
# ========================================

import gymnasium as gym
import gymnasium
import ale_py
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
from torch.optim import Adam
from dataclasses import dataclass
from typing import List
from collections import deque
import random
import os
import cv2
import imageio
import warnings
import platform

warnings.filterwarnings('ignore')

# Registrar entornos ALE
gymnasium.register_envs(ale_py)

SEED = 123
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

def detect_hardware():
    """Detecta y configura el hardware disponible (Google Colab compatible)"""
    print("=" * 60)
    print("🖥️  DETECCIÓN DE HARDWARE")
    print("=" * 60)
    
    # Información del sistema
    print(f"Sistema: {platform.system()} {platform.release()}")
    print(f"PyTorch: {torch.__version__}")
    
    # Verificar CUDA (GPU)
    if torch.cuda.is_available():
        device = torch.device('cuda')
        gpu_name = torch.cuda.get_device_name(0)
        gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
        
        print(f"✅ GPU DETECTADA: {gpu_name}")
        print(f"   💾 Memoria GPU: {gpu_memory:.1f} GB")
        print(f"   🔧 CUDA Version: {torch.version.cuda}")
        
        # Configuraciones de PyTorch para GPU
        torch.backends.cudnn.benchmark = True
        torch.backends.cudnn.deterministic = False
        torch.cuda.empty_cache()
        
    # Verificar MPS (Apple Silicon)
    elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
        device = torch.device('mps')
        print(f"✅ MPS DETECTADO (Apple Silicon)")
        print(f"   🍎 Aceleración optimizada para Apple Silicon")
        
    else:
        # Fallback a CPU
        device = torch.device('cpu')
        print(f"⚠️  SOLO CPU DISPONIBLE")
        print(f"   ❌ No se detectó GPU - El entrenamiento será más lento")
        print(f"   💡 En Colab: Runtime > Change runtime type > GPU")
        torch.set_num_threads(4)  # Limitar threads en CPU
    
    print("-" * 60)
    print(f"DISPOSITIVO FINAL: {device}")
    print("=" * 60)
    
    return device

# Detectar hardware
device = detect_hardware()

# Configuraciones determinísticas
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

print(f"\n🎯 Sistema configurado para: {device}")


🖥️  DETECCIÓN DE HARDWARE
Sistema: Darwin 24.5.0
PyTorch: 2.8.0
✅ MPS DETECTADO (Apple Silicon)
   🍎 Aceleración optimizada para Apple Silicon
------------------------------------------------------------
DISPOSITIVO FINAL: mps

🎯 Sistema configurado para: mps


Utilizaremos el algoritmo REINFORCE con baseline para el entrenamiento

## Definición del Algoritmo REINFORCE

REINFORCE (REward Increment = Nonnegative Factor × Offset Reinforcement × Characteristic Eligibility) es un algoritmo de gradiente de política que optimiza directamente la política para maximizar la recompensa esperada.

**Características principales:**
- **Algoritmo de política**: Optimiza directamente la función de política π(a|s)
- **Baseline con Critic**: Reduce la varianza usando un estimador de valor V(s)
- **Monte Carlo**: Usa retornos completos de episodios para actualizar
- **Gradient ascent**: Maximiza la recompensa esperada usando gradientes


In [3]:
# Wrappers: preprocesamiento y frame stacking
class SimpleFrameStack(gym.Wrapper):
    def __init__(self, env, k: int = 4):
        super().__init__(env)
        self.k = k
        self.frames = deque(maxlen=k)
        obs_space = env.observation_space
        h, w = obs_space.shape[0], obs_space.shape[1]
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(h, w, k), dtype=np.uint8)

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self.frames.clear()
        for _ in range(self.k):
            self.frames.append(obs)
        return self._get_ob(), info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.frames.append(obs)
        return self._get_ob(), reward, terminated, truncated, info

    def _get_ob(self):
        return np.stack(list(self.frames), axis=-1)


def make_env(seed: int = SEED, render_mode=None):
    env = gym.make('ALE/DoubleDunk-v5', render_mode=render_mode)
    class GrayResizeWrapper(gym.ObservationWrapper):
        def __init__(self, env):
            super().__init__(env)
            h, w = 84, 84
            self.observation_space = gym.spaces.Box(low=0, high=255, shape=(h, w), dtype=np.uint8)
        def observation(self, obs):
            gray = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
            resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
            return resized
    env = GrayResizeWrapper(env)
    env = SimpleFrameStack(env, 4)
    env.reset(seed=seed)
    env.action_space.seed(seed)
    env.observation_space.seed(seed)
    return env

# to tensor CHW [0,1]
def obs_to_tensor(obs) -> torch.Tensor:
    arr = obs if isinstance(obs, np.ndarray) else np.array(obs)
    if arr.ndim == 3 and arr.shape[-1] == 4:
        arr = np.transpose(arr, (2, 0, 1))
    elif arr.ndim == 2:
        arr = np.stack([arr]*4, axis=0)
    tensor = torch.from_numpy(arr).float() / 255.0
    return tensor.unsqueeze(0).to(device)


## Entrenamiento

Configuración de hiperparámetros y ejecución del entrenamiento REINFORCE


In [4]:
# Modelo CNN actor-crítico
class AtariActorCritic(nn.Module):
    def __init__(self, in_channels: int, n_actions: int):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=8, stride=4), nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU(),
        )
        self.flatten = nn.Flatten()
        self.fc = nn.Sequential(
            nn.Linear(64 * 7 * 7, 512), nn.ReLU(),
        )
        self.policy_head = nn.Linear(512, n_actions)
        self.value_head = nn.Linear(512, 1)

        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
                nn.init.zeros_(m.bias)

    def forward(self, x: torch.Tensor):
        z = self.features(x)
        z = self.flatten(z)
        z = self.fc(z)
        logits = self.policy_head(z)
        value = self.value_head(z).squeeze(-1)
        return logits, value

    def act(self, x: torch.Tensor):
        logits, value = self.forward(x)
        dist = Categorical(logits=logits)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        entropy = dist.entropy()
        return action.item(), log_prob, entropy, value


In [5]:
@dataclass
class Config:
    total_episodes: int = 8000
    max_steps_per_episode: int = 8000
    gamma: float = 0.99
    learning_rate: float = 3e-4
    entropy_coef: float = 0.01
    value_coef: float = 0.5
    grad_clip_norm: float = 0.5
    checkpoint_dir: str = 'checkpoints_doubledunk'
    checkpoint_every_episodes: int = 50
    eval_every_episodes: int = 100
    eval_episodes: int = 10

cfg = Config()
os.makedirs(cfg.checkpoint_dir, exist_ok=True)
print(cfg)

# Utils

def compute_returns(rewards: List[float], gamma: float) -> torch.Tensor:
    G = 0.0
    returns = []
    for r in reversed(rewards):
        G = r + gamma * G
        returns.append(G)
    returns.reverse()
    ret = torch.tensor(returns, dtype=torch.float32, device=device)
    return ret.view(-1)


def save_checkpoint(model: nn.Module, optimizer: torch.optim.Optimizer, episode: int, path: str):
    torch.save({'episode': episode,
                'model': model.state_dict(),
                'optimizer': optimizer.state_dict()}, path)


def load_checkpoint(model: nn.Module, optimizer: torch.optim.Optimizer, path: str):
    ckpt = torch.load(path, map_location=device)
    model.load_state_dict(ckpt['model'])
    optimizer.load_state_dict(ckpt['optimizer'])
    return ckpt.get('episode', 0)


def evaluate(agent: nn.Module, episodes: int = 10, render: bool = False) -> float:
    env = make_env(seed=SEED + 999, render_mode='human' if render else None)
    agent.eval()
    rewards = []
    with torch.no_grad():
        for ep in range(episodes):
            obs, info = env.reset(seed=SEED + 999 + ep)
            total_r = 0.0
            for t in range(cfg.max_steps_per_episode):
                x = obs_to_tensor(obs)
                logits, _ = agent(x)
                action = torch.argmax(F.softmax(logits, dim=-1), dim=-1).item()
                obs, r, terminated, truncated, info = env.step(action)
                total_r += float(r)
                if terminated or truncated:
                    break
            rewards.append(total_r)
    env.close()
    agent.train()
    return float(np.mean(rewards))


Config(total_episodes=8000, max_steps_per_episode=8000, gamma=0.99, learning_rate=0.0003, entropy_coef=0.01, value_coef=0.5, grad_clip_norm=0.5, checkpoint_dir='checkpoints_doubledunk', checkpoint_every_episodes=50, eval_every_episodes=100, eval_episodes=10)


In [6]:
# ========================================
# GRÁFICOS DE PROGRESO DEL ENTRENAMIENTO
# ========================================

import matplotlib.pyplot as plt
import pandas as pd

def plot_training_progress():
    """Genera gráficos del progreso del entrenamiento REINFORCE"""
    
    # Verificar si hay datos de entrenamiento
    if not hasattr(agent, 'episode_rewards') or len(episode_rewards) == 0:
        print("⚠️  No hay datos de entrenamiento para graficar")
        print("💡 Ejecuta primero la celda de entrenamiento")
        return
    
    print("📈 GENERANDO GRÁFICOS DE PROGRESO DEL ENTRENAMIENTO")
    print("=" * 55)
    
    # Crear figura con subplots
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('REINFORCE DoubleDunk - Progreso del Entrenamiento', fontsize=16, fontweight='bold')
    
    # 1. Recompensas por episodio (raw + media móvil)
    ax1 = axes[0, 0]
    episodes_range = range(1, len(episode_rewards) + 1)
    ax1.plot(episodes_range, episode_rewards, alpha=0.6, color='blue', linewidth=0.8, label='Recompensa por episodio')
    
    # Media móvil
    if len(episode_rewards) > 50:
        window = 50
        moving_avg = pd.Series(episode_rewards).rolling(window=window, center=True).mean()
        ax1.plot(episodes_range, moving_avg, color='red', linewidth=2, label=f'Media móvil ({window})')
    
    # Línea de baseline (-14.0)
    ax1.axhline(y=-14.0, color='orange', linestyle='--', linewidth=2, label='Baseline REINFORCE (-14.0)')
    ax1.set_xlabel('Episodios')
    ax1.set_ylabel('Recompensa')
    ax1.set_title('Evolución de Recompensas por Episodio')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 2. Recompensas de evaluación
    ax2 = axes[0, 1]
    if len(eval_rewards) > 0:
        ax2.plot(eval_episodes_list, eval_rewards, 'o-', color='green', linewidth=2, markersize=6, label='Evaluación (10 eps)')
        ax2.axhline(y=-14.0, color='orange', linestyle='--', linewidth=2, label='Baseline (-14.0)')
        ax2.set_xlabel('Episodios')
        ax2.set_ylabel('Recompensa Media')
        ax2.set_title('Progreso de Evaluaciones Periódicas')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
    else:
        ax2.text(0.5, 0.5, 'No hay datos de evaluación\ndisponibles', 
                ha='center', va='center', transform=ax2.transAxes, fontsize=12)
        ax2.set_title('Evaluaciones Periódicas')
    
    # 3. Histograma de recompensas
    ax3 = axes[1, 0]
    ax3.hist(episode_rewards, bins=30, alpha=0.7, color='purple', edgecolor='black')
    ax3.axvline(x=-14.0, color='orange', linestyle='--', linewidth=2, label='Baseline (-14.0)')
    ax3.axvline(x=np.mean(episode_rewards), color='red', linestyle='-', linewidth=2, label=f'Media: {np.mean(episode_rewards):.1f}')
    ax3.set_xlabel('Recompensa')
    ax3.set_ylabel('Frecuencia')
    ax3.set_title('Distribución de Recompensas')
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    
    # 4. Estadísticas del entrenamiento
    ax4 = axes[1, 1]
    ax4.axis('off')
    
    # Calcular estadísticas
    total_episodes = len(episode_rewards)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    min_reward = np.min(episode_rewards)
    max_reward = np.max(episode_rewards)
    
    # Últimos 100 episodios
    last_100 = episode_rewards[-100:] if len(episode_rewards) >= 100 else episode_rewards
    mean_last_100 = np.mean(last_100)
    
    # Mejora vs baseline
    improvement = mean_reward - (-14.0)
    improvement_pct = (improvement / abs(-14.0)) * 100
    
    stats_text = f"""
📊 ESTADÍSTICAS DEL ENTRENAMIENTO

🎯 Episodios totales: {total_episodes:,}
📈 Recompensa media: {mean_reward:.2f} ± {std_reward:.2f}
🏆 Mejor episodio: {max_reward:.2f}
📉 Peor episodio: {min_reward:.2f}

📊 Últimos 100 episodios: {mean_last_100:.2f}
📶 Mejora vs baseline: {improvement:+.2f} ({improvement_pct:+.1f}%)

🎲 Baseline REINFORCE: -14.0
🔥 Estado: {"✅ Mejorando" if improvement > 0 else "⚠️ Por debajo del baseline"}
"""
    
    ax4.text(0.05, 0.95, stats_text, transform=ax4.transAxes, fontsize=11,
             verticalalignment='top', fontfamily='monospace',
             bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgray', alpha=0.8))
    
    plt.tight_layout()
    plt.show()
    
    # Imprimir resumen
    print(f"\n📋 RESUMEN DEL PROGRESO:")
    print(f"   🎯 Episodios entrenados: {total_episodes:,}")
    print(f"   📊 Recompensa promedio: {mean_reward:.2f}")
    print(f"   🏆 Mejor resultado: {max_reward:.2f}")
    print(f"   📈 Últimos 100 eps: {mean_last_100:.2f}")
    print(f"   {'✅' if improvement > 0 else '❌'} Mejora vs baseline: {improvement:+.2f}")

# Ejecutar visualización si hay datos
try:
    if 'episode_rewards' in locals() and len(episode_rewards) > 10:
        plot_training_progress()
    else:
        print("📊 Gráficos disponibles después del entrenamiento")
        print("💡 Los gráficos se generarán automáticamente al completar el entrenamiento")
except Exception as e:
    print(f"⚠️  Error generando gráficos: {e}")
    print("💡 Asegúrate de ejecutar primero la celda de entrenamiento")


📊 Gráficos disponibles después del entrenamiento
💡 Los gráficos se generarán automáticamente al completar el entrenamiento


In [7]:
# Entrenamiento REINFORCE con baseline

env = make_env(seed=SEED, render_mode=None)
n_actions = env.action_space.n
in_channels = 4

agent = AtariActorCritic(in_channels=in_channels, n_actions=n_actions).to(device)
optimizer = Adam(agent.parameters(), lr=cfg.learning_rate)

start_episode = 0
ckpt_path = os.path.join(cfg.checkpoint_dir, 'reinforce_doubledunk.pt')
if os.path.exists(ckpt_path):
    print('Cargando checkpoint desde', ckpt_path)
    start_episode = load_checkpoint(agent, optimizer, ckpt_path)

best_eval = -float('inf')

# Listas para almacenar progreso del entrenamiento
episode_rewards = []
eval_rewards = []
eval_episodes_list = []

for ep in range(start_episode, cfg.total_episodes):
    obs, info = env.reset(seed=SEED + ep)
    log_probs, entropies, values, rewards = [], [], [], []
    total_reward = 0.0

    for t in range(cfg.max_steps_per_episode):
        x = obs_to_tensor(obs)
        action, log_prob, entropy, value = agent.act(x)
        obs, reward, terminated, truncated, info = env.step(action)
        log_probs.append(log_prob)
        entropies.append(entropy)
        values.append(value)
        rewards.append(float(reward))
        total_reward += float(reward)
        if terminated or truncated:
            break

    # Almacenar reward del episodio para gráficos
    episode_rewards.append(total_reward)

    returns = compute_returns(rewards, cfg.gamma)
    values_t = torch.stack(values)
    log_probs_t = torch.stack(log_probs)
    entropies_t = torch.stack(entropies)

    # Asegurar que values_t y returns tengan la misma forma
    if values_t.dim() > 1:
        values_t = values_t.squeeze(-1)
    
    advantages = returns - values_t
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

    # Entropy decay: más exploración al inicio
    entropy_coef = cfg.entropy_coef * (0.5 + 0.5 * (1 - (ep / max(1, cfg.total_episodes))))

    policy_loss = -(log_probs_t * advantages.detach()).mean()
    value_loss = F.mse_loss(values_t, returns)
    entropy_bonus = entropies_t.mean()

    loss = policy_loss + cfg.value_coef * value_loss - entropy_coef * entropy_bonus

    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    nn.utils.clip_grad_norm_(agent.parameters(), cfg.grad_clip_norm)
    optimizer.step()

    if (ep + 1) % cfg.checkpoint_every_episodes == 0:
        save_checkpoint(agent, optimizer, ep + 1, ckpt_path)

    if (ep + 1) % cfg.eval_every_episodes == 0:
        avg_eval = evaluate(agent, episodes=cfg.eval_episodes, render=False)
        print(f'Ep {ep+1} | Reward entrenamiento: {total_reward:.1f} | Eval-10 media: {avg_eval:.1f}')
        
        # Almacenar progreso de evaluación para gráficos
        eval_rewards.append(avg_eval)
        eval_episodes_list.append(ep + 1)
        
        if avg_eval > best_eval:
            best_eval = avg_eval
            torch.save({'model': agent.state_dict(), 'avg_eval': best_eval}, os.path.join(cfg.checkpoint_dir, 'best.pt'))

env.close()
print('Entrenamiento finalizado.')

# Generar gráficos automáticamente al completar el entrenamiento
print("\n" + "="*60)
print("📈 GENERANDO GRÁFICOS DEL PROGRESO DE ENTRENAMIENTO")
print("="*60)

try:
    # Ejecutar función de gráficos directamente aquí
    if len(episode_rewards) > 10:
        import matplotlib.pyplot as plt
        import pandas as pd
        
        # Crear figura con subplots
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('REINFORCE DoubleDunk - Progreso del Entrenamiento', fontsize=16, fontweight='bold')
        
        # 1. Recompensas por episodio (raw + media móvil)
        ax1 = axes[0, 0]
        episodes_range = range(1, len(episode_rewards) + 1)
        ax1.plot(episodes_range, episode_rewards, alpha=0.6, color='blue', linewidth=0.8, label='Recompensa por episodio')
        
        # Media móvil
        if len(episode_rewards) > 50:
            window = 50
            moving_avg = pd.Series(episode_rewards).rolling(window=window, center=True).mean()
            ax1.plot(episodes_range, moving_avg, color='red', linewidth=2, label=f'Media móvil ({window})')
        
        # Línea de baseline (-14.0)
        ax1.axhline(y=-14.0, color='orange', linestyle='--', linewidth=2, label='Baseline REINFORCE (-14.0)')
        ax1.set_xlabel('Episodios')
        ax1.set_ylabel('Recompensa')
        ax1.set_title('Evolución de Recompensas por Episodio')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. Recompensas de evaluación
        ax2 = axes[0, 1]
        if len(eval_rewards) > 0:
            ax2.plot(eval_episodes_list, eval_rewards, 'o-', color='green', linewidth=2, markersize=6, label='Evaluación (10 eps)')
            ax2.axhline(y=-14.0, color='orange', linestyle='--', linewidth=2, label='Baseline (-14.0)')
            ax2.set_xlabel('Episodios')
            ax2.set_ylabel('Recompensa Media')
            ax2.set_title('Progreso de Evaluaciones Periódicas')
            ax2.legend()
            ax2.grid(True, alpha=0.3)
        else:
            ax2.text(0.5, 0.5, 'No hay datos de evaluación\ndisponibles', 
                    ha='center', va='center', transform=ax2.transAxes, fontsize=12)
            ax2.set_title('Evaluaciones Periódicas')
        
        # 3. Histograma de recompensas
        ax3 = axes[1, 0]
        ax3.hist(episode_rewards, bins=30, alpha=0.7, color='purple', edgecolor='black')
        ax3.axvline(x=-14.0, color='orange', linestyle='--', linewidth=2, label='Baseline (-14.0)')
        ax3.axvline(x=np.mean(episode_rewards), color='red', linestyle='-', linewidth=2, label=f'Media: {np.mean(episode_rewards):.1f}')
        ax3.set_xlabel('Recompensa')
        ax3.set_ylabel('Frecuencia')
        ax3.set_title('Distribución de Recompensas')
        ax3.legend()
        ax3.grid(True, alpha=0.3)
        
        # 4. Estadísticas del entrenamiento
        ax4 = axes[1, 1]
        ax4.axis('off')
        
        # Calcular estadísticas
        total_episodes = len(episode_rewards)
        mean_reward = np.mean(episode_rewards)
        std_reward = np.std(episode_rewards)
        min_reward = np.min(episode_rewards)
        max_reward = np.max(episode_rewards)
        
        # Últimos 100 episodios
        last_100 = episode_rewards[-100:] if len(episode_rewards) >= 100 else episode_rewards
        mean_last_100 = np.mean(last_100)
        
        # Mejora vs baseline
        improvement = mean_reward - (-14.0)
        improvement_pct = (improvement / abs(-14.0)) * 100
        
        stats_text = f"""
📊 ESTADÍSTICAS DEL ENTRENAMIENTO

🎯 Episodios totales: {total_episodes:,}
📈 Recompensa media: {mean_reward:.2f} ± {std_reward:.2f}
🏆 Mejor episodio: {max_reward:.2f}
📉 Peor episodio: {min_reward:.2f}

📊 Últimos 100 episodios: {mean_last_100:.2f}
📶 Mejora vs baseline: {improvement:+.2f} ({improvement_pct:+.1f}%)

🎲 Baseline REINFORCE: -14.0
🔥 Estado: {"✅ Mejorando" if improvement > 0 else "⚠️ Por debajo del baseline"}
"""
        
        ax4.text(0.05, 0.95, stats_text, transform=ax4.transAxes, fontsize=11,
                 verticalalignment='top', fontfamily='monospace',
                 bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgray', alpha=0.8))
        
        plt.tight_layout()
        plt.show()
        
        # Imprimir resumen
        print(f"\n📋 RESUMEN DEL PROGRESO:")
        print(f"   🎯 Episodios entrenados: {total_episodes:,}")
        print(f"   📊 Recompensa promedio: {mean_reward:.2f}")
        print(f"   🏆 Mejor resultado: {max_reward:.2f}")
        print(f"   📈 Últimos 100 eps: {mean_last_100:.2f}")
        print(f"   {'✅' if improvement > 0 else '❌'} Mejora vs baseline: {improvement:+.2f}")
        
    else:
        print("⚠️  Entrenamiento muy corto para generar gráficos útiles")
        
except Exception as e:
    print(f"⚠️  Error generando gráficos: {e}")
    print("💡 Los gráficos se pueden generar manualmente en la siguiente celda")


A.L.E: Arcade Learning Environment (version 0.11.2+ecc1138)
[Powered by Stella]


Cargando checkpoint desde checkpoints_doubledunk/reinforce_doubledunk.pt


KeyboardInterrupt: 

## Evaluación del modelo


In [9]:
# ========================================
# EVALUACIÓN COMPLETA 
# ========================================

def comprehensive_evaluation_reinforce():
    """Evaluación completa del modelo REINFORCE entrenado comparable con DDQN"""
    
    print("📊 INICIANDO EVALUACIÓN COMPLETA REINFORCE")
    print("=" * 55)
    eval_start = time.time()
    eval_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Crear entorno de evaluación
    eval_env = make_env(seed=SEED + 999, render_mode=None)
    
    # Cargar mejor modelo si existe
    try:
        best_model_path = os.path.join(cfg.checkpoint_dir, 'best.pt')
        if os.path.exists(best_model_path):
            print("🏆 Cargando mejor modelo entrenado...")
            best_agent = AtariActorCritic(in_channels=4, n_actions=eval_env.action_space.n).to(device)
            ckpt_best = torch.load(best_model_path, map_location=device)
            best_agent.load_state_dict(ckpt_best['model'])
            eval_agent = best_agent
            print("✅ Mejor modelo cargado exitosamente")
        else:
            print("⚠️  Usando modelo actual (no se encontró best.pt)")
            eval_agent = agent
    except Exception as e:
        print(f"⚠️  Error cargando mejor modelo: {e}")
        print("⚠️  Usando modelo actual")
        eval_agent = agent
    
    # EVALUACIÓN PRINCIPAL (10 episodios para comparar con DDQN)
    print("\n🎯 Evaluación oficial (10 episodios)...")
    eval_agent.eval()
    episode_rewards = []
    episode_lengths = []
    
    with torch.no_grad():
        for ep in range(10):
            obs, info = eval_env.reset(seed=SEED + 999 + ep)
            total_reward = 0.0
            episode_length = 0
            
            for t in range(cfg.max_steps_per_episode):
                x = obs_to_tensor(obs)
                logits, _ = eval_agent(x)
                action = torch.argmax(F.softmax(logits, dim=-1), dim=-1).item()
                obs, reward, terminated, truncated, info = eval_env.step(action)
                total_reward += float(reward)
                episode_length += 1
                
                if terminated or truncated:
                    break
            
            episode_rewards.append(total_reward)
            episode_lengths.append(episode_length)
            print(f"   Episodio {ep+1}: Reward={total_reward:.2f}, Steps={episode_length}")
    
    eval_agent.train()
    eval_env.close()
    
    eval_end = time.time()
    eval_duration = eval_end - eval_start
    
    # Calcular estadísticas
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    min_reward = np.min(episode_rewards)
    max_reward = np.max(episode_rewards)
    mean_length = np.mean(episode_lengths)
    std_length = np.std(episode_lengths)
    
    # Mejora vs baseline REINFORCE típico
    baseline_reinforce = -14.0
    improvement = mean_reward - baseline_reinforce
    improvement_pct = (improvement / abs(baseline_reinforce)) * 100 if baseline_reinforce != 0 else 0
    
    # REPORTE OFICIAL COMPARABLE CON DDQN
    print(f"\n{'='*70}")
    print(f"📋 REPORTE OFICIAL - REINFORCE DOUBLEDUNK")
    print(f"{'='*70}")
    print(f"📅 Fecha evaluación: {eval_timestamp}")
    print(f"⏱️  Tiempo evaluación: {eval_duration:.2f}s")
    print(f"🖥️  Dispositivo: {device}")
    print(f"🎯 Algoritmo: REINFORCE con baseline (Actor-Crítico)")
    
    print(f"\n🎯 RESULTADOS PRINCIPALES:")
    print(f"📊 REINFORCE (10 episodios):    {mean_reward:.2f} ± {std_reward:.2f}")
    print(f"📈 Rango de recompensas:        [{min_reward:.2f}, {max_reward:.2f}]")
    print(f"📏 Duración promedio:           {mean_length:.1f} ± {std_length:.1f} pasos")
    
    print(f"\n📊 ANÁLISIS DE RENDIMIENTO:")
    print(f"├─ 🎲 Baseline REINFORCE:       {baseline_reinforce:.1f}")
    print(f"├─ 📶 Mejora absoluta:          {improvement:+.2f} puntos")
    print(f"├─ 📈 Mejora porcentual:        {improvement_pct:+.1f}%")
    print(f"└─ 🔥 Estado: {'✅ Superando baseline' if improvement > 0 else '⚠️ Por debajo del baseline'}")
    
    # Guardar resultados para comparación
    results = {
        'experiment_info': {
            'timestamp': eval_timestamp,
            'algorithm': 'REINFORCE',
            'device': str(device),
            'evaluation_episodes': 10
        },
        'evaluation_results': {
            'reinforce_baseline': baseline_reinforce,
            'reinforce_10_episodes': {
                'mean': float(mean_reward),
                'std': float(std_reward),
                'min': float(min_reward),
                'max': float(max_reward)
            },
            'episode_lengths': {
                'mean': float(mean_length),
                'std': float(std_length)
            },
            'improvement_absolute': float(improvement),
            'improvement_percentage': float(improvement_pct)
        },
        'individual_episodes': [float(r) for r in episode_rewards]
    }
    
    # Exportar resultados
    with open('reinforce_doubledunk_results.json', 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"\n💾 ARCHIVOS GENERADOS:")
    print(f"└─ reinforce_doubledunk_results.json")
    print(f"{'='*70}")
    
    return results

# Ejecutar evaluación comparable
import time
from datetime import datetime
import json

evaluation_results = comprehensive_evaluation_reinforce()

# Función de grabación de video

def record_video(model: nn.Module, filename: str = 'videos/doubledunk_reinforce.mp4', fps: int = 30, seed: int = SEED+2024):
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    env = make_env(seed=seed, render_mode='rgb_array')
    frames = []
    obs, info = env.reset(seed=seed)
    with torch.no_grad():
        for t in range(cfg.max_steps_per_episode):
            x = obs_to_tensor(obs)
            logits, _ = model(x)
            action = torch.argmax(F.softmax(logits, dim=-1), dim=-1).item()
            frame = env.render()
            frames.append(frame)
            obs, r, terminated, truncated, info = env.step(action)
            if terminated or truncated:
                frame = env.render()
                frames.append(frame)
                break
    env.close()
    imageio.mimwrite(filename, frames, fps=fps, quality=8)
    print('Video guardado en:', filename)

# Grabar con agente actual
record_video(agent, filename='videos/doubledunk_reinforce.mp4', fps=30)

# Grabar con mejor modelo si existe
best_path = os.path.join(cfg.checkpoint_dir, 'best.pt')
if os.path.exists(best_path):
    print('Cargando mejor modelo desde', best_path)
    env_tmp = make_env(seed=SEED+3030)
    n_actions_best = env_tmp.action_space.n
    env_tmp.close()
    best_agent = AtariActorCritic(in_channels=4, n_actions=n_actions_best).to(device)
    ckpt_best = torch.load(best_path, map_location=device)
    best_agent.load_state_dict(ckpt_best['model'])
    record_video(best_agent, filename='videos/doubledunk_best.mp4', fps=30, seed=SEED+3030)
else:
    print('No se encontró best.pt; aún no hay mejor modelo guardado')


📊 INICIANDO EVALUACIÓN COMPLETA REINFORCE
🏆 Cargando mejor modelo entrenado...
✅ Mejor modelo cargado exitosamente

🎯 Evaluación oficial (10 episodios)...
   Episodio 1: Reward=-4.00, Steps=8000
   Episodio 2: Reward=0.00, Steps=8000
   Episodio 3: Reward=0.00, Steps=8000
   Episodio 4: Reward=2.00, Steps=8000
   Episodio 5: Reward=0.00, Steps=8000
   Episodio 6: Reward=-2.00, Steps=8000
   Episodio 7: Reward=0.00, Steps=8000
   Episodio 8: Reward=-2.00, Steps=8000
   Episodio 9: Reward=-6.00, Steps=8000
   Episodio 10: Reward=0.00, Steps=8000

📋 REPORTE OFICIAL - REINFORCE DOUBLEDUNK
📅 Fecha evaluación: 2025-09-09 00:54:57
⏱️  Tiempo evaluación: 106.06s
🖥️  Dispositivo: mps
🎯 Algoritmo: REINFORCE con baseline (Actor-Crítico)

🎯 RESULTADOS PRINCIPALES:
📊 REINFORCE (10 episodios):    -1.20 ± 2.23
📈 Rango de recompensas:        [-6.00, 2.00]
📏 Duración promedio:           8000.0 ± 0.0 pasos

📊 ANÁLISIS DE RENDIMIENTO:
├─ 🎲 Baseline REINFORCE:       -14.0
├─ 📶 Mejora absoluta:          +1



Video guardado en: videos/doubledunk_reinforce.mp4
Cargando mejor modelo desde checkpoints_doubledunk/best.pt




Video guardado en: videos/doubledunk_best.mp4


# Referencias

[1] Gym Docs DoubleDunk https://ale.farama.org/environments/double_dunk/

[2] Williams, R. J. (1992). Simple statistical gradient-following algorithms for connectionist reinforcement learning. Machine learning, 8(3-4), 229-256.

[3] Sutton, R. S., & Barto, A. G. (2018). Reinforcement learning: An introduction. MIT press.

[4] PyTorch Documentation https://pytorch.org/docs/stable/index.html
