In [7]:
!pip install gymnasium
!pip install pygame
!pip install torch torchvision

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\dario\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.12_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


Defaulting to user installation because normal site-packages is not writeable


[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\dario\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.12_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip



Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\dario\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.12_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


# Deep Q-Learning con MountainCar

En este notebook implementaremos Deep Q-Learning (DQN) para resolver el entorno MountainCar. DQN utiliza una red neuronal para aproximar la función Q y emplea técnicas como Experience Replay y Target Networks para estabilizar el entrenamiento.

In [8]:
import gymnasium as gym
import numpy as np
import time
from IPython.display import clear_output, display, HTML
import matplotlib.pyplot as plt
from tqdm import tqdm
import pandas as pd
import random
from itertools import product
from concurrent.futures import ProcessPoolExecutor, as_completed
import imageio
from IPython.display import HTML
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque, namedtuple
import copy

SEED = 42

# Configurar dispositivo (GPU si está disponible)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

# -------------------- VISUALIZACIÓN DE RESULTADOS --------------------

def graficar_recompensas(agente):
    """Grafica la recompensa media acumulada por episodio."""
    plt.figure(figsize=(6, 3))
    plt.plot(agente.stats)
    plt.title('Recompensa media acumulada')
    plt.xlabel('Episodio')
    plt.ylabel('Recompensa media')
    plt.grid(True)
    plt.show()

def graficar_longitud_episodios(agente):
    """Grafica la longitud de cada episodio."""
    plt.figure(figsize=(6, 3))
    plt.plot(agente.episode_lengths)
    plt.title("Longitud de episodios")
    plt.xlabel("Episodio")
    plt.ylabel("Pasos")
    plt.grid(True)
    plt.show()

def graficar_loss(agente):
    """Grafica la evolución de la pérdida durante el entrenamiento."""
    if hasattr(agente, 'losses') and agente.losses:
        plt.figure(figsize=(6, 3))
        plt.plot(agente.losses)
        plt.title("Evolución de la pérdida")
        plt.xlabel("Actualización")
        plt.ylabel("Loss")
        plt.grid(True)
        plt.show()

def mostrar_resultados_agente_continuo(agente):
    """Muestra gráficos de rendimiento en entornos continuos."""
    graficar_recompensas(agente)
    graficar_longitud_episodios(agente)
    graficar_loss(agente)

# -------------------- EJECUCIÓN DE UN EPISODIO --------------------

def ejecutar_episodio_y_mostrar(agente, render=False):
    """Ejecuta un episodio con la política aprendida y muestra la evolución de la posición."""
    env = agente.env
    state, _ = env.reset()
    done = False
    total_reward = 0
    posiciones = []

    while not done:
        if render:
            env.render()
        posiciones.append(state[0])  # Guardamos la posición del coche
        action = agente._seleccionar_accion_greedy(state)
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward

    env.close()
    
    # Mostrar gráfico de posiciones
    plt.figure(figsize=(6, 3))
    plt.plot(posiciones)
    plt.title("Evolución de la posición del coche")
    plt.xlabel("Paso del episodio")
    plt.ylabel("Posición")
    plt.grid(True)
    plt.show()
    
    print(f"Recompensa total obtenida: {total_reward:.2f}")

Usando dispositivo: cuda


# Red Neuronal para Deep Q-Learning

In [9]:
class DQNNetwork(nn.Module):
    """Red neuronal para aproximar la función Q."""
    
    def __init__(self, input_size, hidden_sizes, output_size, dropout_rate=0.1):
        super(DQNNetwork, self).__init__()
        
        layers = []
        prev_size = input_size
        
        # Capas ocultas
        for hidden_size in hidden_sizes:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            prev_size = hidden_size
        
        # Capa de salida
        layers.append(nn.Linear(prev_size, output_size))
        
        self.network = nn.Sequential(*layers)
        
        # Inicialización de pesos
        self._init_weights()
    
    def _init_weights(self):
        """Inicialización Xavier para mejor convergencia."""
        for layer in self.network:
            if isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)
                nn.init.zeros_(layer.bias)
    
    def forward(self, x):
        return self.network(x)

# Experience Replay Buffer
Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])

class ReplayBuffer:
    """Buffer de experiencias para Experience Replay."""
    
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        """Añade una experiencia al buffer."""
        experience = Experience(state, action, reward, next_state, done)
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        """Muestrea un batch aleatorio de experiencias."""
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

# Agente Deep Q-Learning

In [10]:
class AgenteDQN:
    def __init__(self, env, hidden_sizes=[128, 64], lr=1e-3, gamma=0.99, epsilon=1.0, 
                 epsilon_min=0.01, epsilon_decay=0.995, buffer_size=10000, batch_size=64,
                 target_update_freq=100, dropout_rate=0.1):
        
        self.env = env
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.n
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq
        
        # Optimización: entrenar menos frecuentemente
        self.train_freq = 4  # Entrenar cada 4 pasos
        self.step_count = 0
        
        # Redes neuronales
        self.q_network = DQNNetwork(self.state_size, hidden_sizes, self.action_size, dropout_rate).to(device)
        self.target_network = DQNNetwork(self.state_size, hidden_sizes, self.action_size, dropout_rate).to(device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        
        # Experience replay
        self.memory = ReplayBuffer(buffer_size)
        
        # Estadísticas
        self.stats = []
        self.episode_lengths = []
        self.losses = []
        self.update_count = 0
        
        # Inicializar target network
        self._update_target_network()
    
    def _update_target_network(self):
        """Copia los pesos de la red principal a la red objetivo."""
        self.target_network.load_state_dict(self.q_network.state_dict())
    
    def _seleccionar_accion(self, state):
        """Selecciona una acción usando epsilon-greedy."""
        if random.random() < self.epsilon:
            return random.randrange(self.action_size)
        else:
            return self._seleccionar_accion_greedy(state)
    
    def _seleccionar_accion_greedy(self, state):
        """Selecciona la mejor acción según la red neuronal."""
        with torch.no_grad():  # Optimización: no calcular gradientes
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
            q_values = self.q_network(state_tensor)
            return q_values.argmax().item()
    
    def _almacenar_experiencia(self, state, action, reward, next_state, done):
        """Almacena una experiencia en el buffer."""
        self.memory.push(state, action, reward, next_state, done)
    
    def _entrenar_red(self):
        """Entrena la red neuronal con un batch de experiencias."""
        if len(self.memory) < self.batch_size:
            return
        
        # Muestrear batch de experiencias
        experiences = self.memory.sample(self.batch_size)
        
        # Convertir a tensores de forma más eficiente
        states = np.array([e.state for e in experiences])
        actions = np.array([e.action for e in experiences])
        rewards = np.array([e.reward for e in experiences])
        next_states = np.array([e.next_state for e in experiences])
        dones = np.array([e.done for e in experiences])
        
        # Convertir numpy arrays a tensores PyTorch
        states = torch.FloatTensor(states).to(device)
        actions = torch.LongTensor(actions).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.BoolTensor(dones).to(device)
        
        # Valores Q actuales
        current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))
        
        # Valores Q objetivo
        with torch.no_grad():
            next_q_values = self.target_network(next_states).max(1)[0]
            target_q_values = rewards + (self.gamma * next_q_values * ~dones)
        
        # Calcular pérdida
        loss = F.mse_loss(current_q_values.squeeze(), target_q_values)
        
        # Optimización
        self.optimizer.zero_grad()
        loss.backward()
        # Gradient clipping para estabilidad
        torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), 1.0)
        self.optimizer.step()
        
        # Guardar pérdida (solo cada 10 actualizaciones para ahorrar memoria)
        if self.update_count % 10 == 0:
            self.losses.append(loss.item())
        
        # Actualizar epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        
        # Actualizar red objetivo
        self.update_count += 1
        if self.update_count % self.target_update_freq == 0:
            self._update_target_network()
    
    def entrenar(self, num_episodes=2000, mostrar_barra=True):
        """Entrena el agente DQN."""
        random.seed(SEED)
        np.random.seed(SEED)
        torch.manual_seed(SEED)
        
        acumulador_recompensas = 0.0
        
        for episode in tqdm(range(num_episodes), disable=not mostrar_barra):
            state, _ = self.env.reset()
            total_reward = 0
            pasos = 0
            done = False
            
            while not done:
                # Seleccionar acción
                action = self._seleccionar_accion(state)
                
                # Ejecutar acción
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated
                
                # Almacenar experiencia
                self._almacenar_experiencia(state, action, reward, next_state, done)
                
                # Entrenar la red menos frecuentemente
                self.step_count += 1
                if self.step_count % self.train_freq == 0:
                    self._entrenar_red()
                
                state = next_state
                total_reward += reward
                pasos += 1
            
            # Estadísticas
            self.episode_lengths.append(pasos)
            acumulador_recompensas += total_reward
            self.stats.append(acumulador_recompensas / (episode + 1))
            
            # Mostrar progreso cada 200 episodios para reducir prints
            if (episode + 1) % 200 == 0 and mostrar_barra:
                print(f"Episodio {episode + 1}, Recompensa media: {self.stats[-1]:.2f}, Epsilon: {self.epsilon:.3f}")
        
        return self.q_network

# Búsqueda de Hiperparámetros

In [11]:
def evaluar_configuracion_dqn(params, env_name="MountainCar-v0"):
    """Evalúa una configuración de hiperparámetros para DQN."""
    try:
        lr, gamma, hidden_size, buffer_size, target_update_freq, dropout_rate = params
        
        env = gym.make(env_name)
        env.reset(seed=SEED)
        
        # Reducir el tamaño de las redes para búsqueda más rápida
        hidden_sizes = [hidden_size, hidden_size // 2] if hidden_size > 64 else [hidden_size]
        
        agente = AgenteDQN(
            env, 
            hidden_sizes=hidden_sizes,
            lr=lr, 
            gamma=gamma,
            buffer_size=int(buffer_size),
            target_update_freq=int(target_update_freq),
            dropout_rate=dropout_rate
        )
        
        # Reducir aún más episodios para búsqueda ultra rápida
        agente.entrenar(num_episodes=500, mostrar_barra=False)
        recompensa_final = np.mean(agente.stats[-25:]) if len(agente.stats) >= 25 else np.mean(agente.stats)
        
        env.close()
        return (lr, gamma, hidden_size, buffer_size, target_update_freq, dropout_rate, recompensa_final)
        
    except Exception as e:
        print(f"Error en configuración {params}: {e}")
        return (lr, gamma, hidden_size, buffer_size, target_update_freq, dropout_rate, -float('inf'))

def random_search_dqn(env_name="MountainCar-v0", n_trials=20):
    """Búsqueda aleatoria de hiperparámetros para DQN."""
    
    # Espacio de búsqueda más reducido y enfocado
    lrs = [5e-4, 1e-3, 5e-3]  # Reducir opciones
    gammas = [0.99, 1.0]      # Solo los mejores valores
    hidden_sizes = [64, 128]  # Redes más pequeñas
    buffer_sizes = [5000, 10000]  # Buffers más pequeños
    target_update_freqs = [50, 100]  # Solo opciones rápidas
    dropout_rates = [0.0, 0.1]  # Menos opciones
    
    combinaciones = []
    random.seed(SEED)
    
    for _ in range(n_trials):
        lr = random.choice(lrs)
        gamma = random.choice(gammas)
        hidden_size = random.choice(hidden_sizes)
        buffer_size = random.choice(buffer_sizes)
        target_update_freq = random.choice(target_update_freqs)
        dropout_rate = random.choice(dropout_rates)
        
        combinaciones.append((lr, gamma, hidden_size, buffer_size, target_update_freq, dropout_rate))
    
    mejor_config = None
    mejor_recompensa = -float('inf')
    resultados = []
    
    print(f"🔍 Random Search DQN (modo rápido): ejecutando {n_trials} configuraciones...\n")
    
    for i, combo in enumerate(tqdm(combinaciones, desc="Progreso")):
        print(f"Config {i+1}/{n_trials}: lr={combo[0]}, γ={combo[1]}, h={combo[2]}")
        
        resultado = evaluar_configuracion_dqn(combo, env_name)
        lr, gamma, hidden_size, buffer_size, target_update_freq, dropout_rate, recompensa = resultado
        resultados.append(resultado)
        
        if recompensa > mejor_recompensa:
            mejor_recompensa = recompensa
            mejor_config = (lr, gamma, hidden_size, buffer_size, target_update_freq, dropout_rate)
            print(f"  ✅ Nueva mejor! Recompensa: {mejor_recompensa:.4f}")
    
    print("\n" + "="*60)
    print("✅ MEJOR CONFIGURACIÓN ENCONTRADA:")
    print(f" learning_rate = {mejor_config[0]}")
    print(f" γ = {mejor_config[1]}")
    print(f" hidden_size = {mejor_config[2]}")
    print(f" buffer_size = {mejor_config[3]}")
    print(f" target_update_freq = {mejor_config[4]}")
    print(f" dropout_rate = {mejor_config[5]}")
    print(f"  → Recompensa media final: {mejor_recompensa:.4f}")
    print("="*60)
    
    return mejor_config, mejor_recompensa, resultados

In [12]:
# Ejecutar búsqueda de hiperparámetros ultra optimizada
print("🚀 Iniciando búsqueda rápida de hiperparámetros para DQN...")
mejor_config, mejor_recompensa, resultados = random_search_dqn(n_trials=6)  # Solo 6 trials para máxima velocidad

print(f"\n📊 Mejores hiperparámetros encontrados:")
print(f"   learning_rate = {mejor_config[0]}")
print(f"   γ = {mejor_config[1]}")  
print(f"   hidden_size = {mejor_config[2]}")
print(f"   buffer_size = {mejor_config[3]}")
print(f"   target_update_freq = {mejor_config[4]}")
print(f"   dropout_rate = {mejor_config[5]}")
print(f"   Recompensa final = {mejor_recompensa:.4f}")

🚀 Iniciando búsqueda rápida de hiperparámetros para DQN...
🔍 Random Search DQN (modo rápido): ejecutando 6 configuraciones...



Progreso:   0%|          | 0/6 [00:00<?, ?it/s]

Config 1/6: lr=0.005, γ=0.99, h=64


Progreso:   0%|          | 0/6 [00:47<?, ?it/s]



KeyboardInterrupt: 

In [None]:
# Entrenamiento con los mejores hiperparámetros encontrados
env = gym.make("MountainCar-v0")
env.reset(seed=SEED)

# Usar los mejores hiperparámetros encontrados o valores por defecto optimizados
if 'mejor_config' in locals():
    lr, gamma, hidden_size, buffer_size, target_update_freq, dropout_rate = mejor_config
    hidden_sizes = [hidden_size, hidden_size // 2] if hidden_size > 64 else [hidden_size]
else:
    # Configuración rápida por defecto
    lr, gamma, hidden_sizes, buffer_size, target_update_freq, dropout_rate = 1e-3, 0.99, [128, 64], 10000, 100, 0.1

agente_dqn = AgenteDQN(
    env,
    hidden_sizes=hidden_sizes,
    lr=lr,
    gamma=gamma,
    epsilon=1.0,
    epsilon_min=0.01,
    epsilon_decay=0.996,  # Decay más rápido
    buffer_size=buffer_size,
    batch_size=64,
    target_update_freq=target_update_freq,
    dropout_rate=dropout_rate
)

print("🎯 Entrenando agente DQN con configuración optimizada...")
agente_dqn.entrenar(num_episodes=2000)  # Reducir episodios

🎯 Entrenando agente DQN con configuración optimizada...


  4%|▎         | 74/2000 [00:55<50:06,  1.56s/it]

# Resultados del Agente DQN

In [None]:
mostrar_resultados_agente_continuo(agente_dqn)

In [None]:
ejecutar_episodio_y_mostrar(agente_dqn)

In [None]:
def grabar_video_agente_dqn(agente, nombre_archivo="video_mountaincar_dqn.gif", fps=30):
    """
    Ejecuta un episodio con la política aprendida y guarda un video del entorno.
    """
    # Crear entorno con renderizado de imágenes
    env = gym.make("MountainCar-v0", render_mode="rgb_array")
    env.reset(seed=SEED)
    state, _ = env.reset()
    done = False
    total_reward = 0
    frames = []

    while not done:
        frame = env.render()
        frames.append(frame)

        # Acción greedy usando la red neuronal
        action = agente._seleccionar_accion_greedy(state)

        next_state, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        done = terminated or truncated
        state = next_state

        if done:
            break

    env.close()

    print(f"Número total de frames: {len(frames)}")
    # Guardar el video como GIF
    imageio.mimsave(nombre_archivo, frames, fps=fps, loop=0)
    print(f"🎥 Vídeo guardado en: {nombre_archivo}")
    print(f"🏁 Recompensa total obtenida: {total_reward:.2f}")

In [None]:
nombre_archivo = "video_mountaincar_dqn.gif"

grabar_video_agente_dqn(agente_dqn, nombre_archivo=nombre_archivo)

HTML(f"""
<img src="{nombre_archivo}" style="width: 600px;" loop>
""")

## Análisis de Resultados

### Ventajas de Deep Q-Learning:

1. **Aproximación universal**: Las redes neuronales pueden aproximar funciones complejas mejor que métodos lineales
2. **Experience Replay**: Mejora la eficiencia del aprendizaje al reutilizar experiencias pasadas
3. **Target Network**: Estabiliza el entrenamiento al reducir la correlación temporal
4. **Escalabilidad**: Puede manejar espacios de estados de alta dimensionalidad

### Comparación con otros métodos:

- **vs Q-Learning tabular**: DQN puede manejar espacios continuos sin discretización
- **vs SARSA con funciones base**: Mayor flexibilidad en la representación de características
- **vs métodos lineales**: Capacidad de aprender relaciones no lineales complejas

### Hiperparámetros importantes:

- **Learning rate**: Controla la velocidad de aprendizaje de la red
- **Buffer size**: Tamaño del buffer de experience replay
- **Target update frequency**: Frecuencia de actualización de la red objetivo
- **Epsilon decay**: Estrategia de exploración vs explotación

In [None]:
# Análisis adicional: Evolución de epsilon y comparación de redes
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

# Gráfico 1: Recompensas
axes[0,0].plot(agente_dqn.stats)
axes[0,0].set_title('Recompensa media acumulada')
axes[0,0].set_xlabel('Episodio')
axes[0,0].set_ylabel('Recompensa media')
axes[0,0].grid(True)

# Gráfico 2: Longitud de episodios
axes[0,1].plot(agente_dqn.episode_lengths)
axes[0,1].set_title('Longitud de episodios')
axes[0,1].set_xlabel('Episodio')
axes[0,1].set_ylabel('Pasos')
axes[0,1].grid(True)

# Gráfico 3: Pérdida
if agente_dqn.losses:
    axes[1,0].plot(agente_dqn.losses)
    axes[1,0].set_title('Evolución de la pérdida')
    axes[1,0].set_xlabel('Actualización')
    axes[1,0].set_ylabel('MSE Loss')
    axes[1,0].grid(True)

# Gráfico 4: Comparación Q-values en diferentes estados
estados_test = [
    [-0.5, 0.0],    # Estado inicial típico
    [0.0, 0.0],     # Centro
    [0.5, 0.01],    # Cerca del objetivo
    [-0.3, -0.01]   # Retrocediendo
]

q_values_por_estado = []
for estado in estados_test:
    state_tensor = torch.FloatTensor(estado).unsqueeze(0).to(device)
    with torch.no_grad():
        q_vals = agente_dqn.q_network(state_tensor).cpu().numpy()[0]
    q_values_por_estado.append(q_vals)

q_values_array = np.array(q_values_por_estado)
x = np.arange(len(estados_test))
width = 0.25

for i in range(agente_dqn.action_size):
    axes[1,1].bar(x + i*width, q_values_array[:, i], width, 
                  label=f'Acción {i}', alpha=0.8)

axes[1,1].set_title('Q-values por estado y acción')
axes[1,1].set_xlabel('Estados de prueba')
axes[1,1].set_ylabel('Q-value')
axes[1,1].set_xticks(x + width)
axes[1,1].set_xticklabels([f'Estado {i+1}' for i in range(len(estados_test))])
axes[1,1].legend()
axes[1,1].grid(True)

plt.tight_layout()
plt.show()

print("Estados de prueba:")
for i, estado in enumerate(estados_test):
    print(f"Estado {i+1}: posición={estado[0]:.2f}, velocidad={estado[1]:.3f}")