In [None]:
"""
-------------------------------------------------------------------------------
Trabajo Final - G8 Sección A

Nombre del código  : MsPac-Man Inference
Curso: APRENDIZAJE POR REFORZAMIENTO
Fecha de creación   : 14/01/2025

Descripción :
Este código genera un video a partir del entrenamiento de un agente para superar el juego de MsPac-Man.

Integrantes:
* Boza Gutarra, Fernando
* Callomamani Buendia, Johan Manuel
* De La Cruz Rodríguez, Lewis Umbert
* Gomez Villanueva, Kevin
* Romero Ramos, Yovany
-------------------------------------------------------------------------------
"""

In [13]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
from torch.utils.data import DataLoader, TensorDataset
from PIL import Image
from torchvision import transforms
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder
import gymnasium as gym
import ale_py

In [18]:
learning_rate = 5e-4
minibatch_size = 512
discount_factor = 0.99
MAX_MEMORY =20_000

Se utilizan las mismas clases implementadas en el entrenamiento

In [19]:
class Network(nn.Module):
    """
    Red neuronal convolucional para estimar los valores Q.
    """
    def __init__(self, action_size, seed=42):
        super(Network, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.conv1 = nn.Conv2d(3, 32, kernel_size=8, stride=4)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.fc1 = nn.Linear(10 * 10 * 128, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, action_size)

    def forward(self, state):
        """Realiza el pase hacia adelante en la red."""
        x = F.relu(self.bn1(self.conv1(state)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [20]:
class Agent:
    """
    Define un agente que utiliza una red Q para aprender y jugar.
    """
    def __init__(self, action_size):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.action_size = action_size

        # Redes Q local y objetivo
        self.local_qnetwork = Network(action_size).to(self.device)
        self.target_qnetwork = Network(action_size).to(self.device)

        self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr=learning_rate)
        self.memory = deque(maxlen=MAX_MEMORY)

        # Para registro de pérdidas (loss) en cada batch de aprendizaje
        self.losses = []

        # Para registro de TD-errors (guardaremos la diferencia q_expected - q_targets)
        self.td_errors = []

        # Para ver la distribución de Q-values de forma periódica
        self.q_values_samples = []

    def step(self, state, action, reward, next_state, done):
        """
        Almacena la experiencia y, si hay suficientes muestras, entrena la red.
        """
        state = preprocess_frame(state)
        next_state = preprocess_frame(next_state)
        self.memory.append((state, action, reward, next_state, done))

        # Cada vez que tenemos minibatch_size o más en memoria, entrenamos
        if len(self.memory) > minibatch_size:
            experiences = random.sample(self.memory, k=minibatch_size)
            self.learn(experiences, discount_factor)

    def act(self, state, epsilon=0.0):
        """
        Selecciona acción con política epsilon-greedy.
        """
        state = preprocess_frame(state).to(self.device)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()

        # Epsilon-greedy
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def learn(self, experiences, gamma):
        """
        Actualiza los parámetros de la red Q usando un batch de experiencias.
        """
        states, actions, rewards, next_states, dones = zip(*experiences)

        states = torch.cat(states).float().to(self.device)
        actions = torch.from_numpy(np.array(actions)).long().unsqueeze(1).to(self.device)
        rewards = torch.from_numpy(np.array(rewards)).float().unsqueeze(1).to(self.device)
        next_states = torch.cat(next_states).float().to(self.device)
        dones = torch.from_numpy(np.array(dones).astype(np.uint8)).float().unsqueeze(1).to(self.device)

        # Q valores objetivo
        next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
        q_targets = rewards + (gamma * next_q_targets * (1 - dones))

        # Q valores esperados (de la red local)
        q_expected = self.local_qnetwork(states).gather(1, actions)

        # Cálculo de la pérdida (MSE)
        loss = F.mse_loss(q_expected, q_targets)

        # Backprop
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Guardamos la pérdida para graficar
        self.losses.append(loss.item())

        # Guardamos la TD-error (podemos tomar la diferencia)
        td_error = (q_expected - q_targets).detach().cpu().numpy()  # shape: [batch_size, 1]
        self.td_errors.extend(td_error.flatten())  # lo aplanamos y lo guardamos

        # Actualizamos la red objetivo con la local cada cierto tiempo
        self.soft_update(tau=1e-3)

    def soft_update(self, tau=1e-3):
        """
        Actualiza los pesos de la red objetivo con la red local
        usando factor tau (soft update).
        """
        for target_param, local_param in zip(self.target_qnetwork.parameters(),
                                             self.local_qnetwork.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

    def sample_q_values(self, sample_states):
        """
        Dado un lote de estados, extrae los Q-values con la red local.
        Se usa para visualizar la distribución de Q-values cada cierto número de episodios.
        """
        self.local_qnetwork.eval()
        with torch.no_grad():
            q_vals = self.local_qnetwork(sample_states.to(self.device))
        self.local_qnetwork.train()
        return q_vals.cpu().numpy()

    def load(self, file_name):
        """
        Carga pesos de un archivo .pth.
        """
        checkpoint = torch.load(file_name, map_location=self.device)
        self.local_qnetwork.load_state_dict(checkpoint)
        self.target_qnetwork.load_state_dict(checkpoint)

Inicializar el agente y especificar el directorio del experimento

In [21]:
agent = Agent(action_size=9)  # Initialize the agent

exp_folder = "exp_0b5e7e79-73c1-4a3a-8b82-85c7f882ee48"

ckp_path = f"./{exp_folder}/checkpoints/checkpoint_solved_1.pth"
# Load the model weights
agent.load(ckp_path)

  checkpoint = torch.load(file_name, map_location=self.device)


In [22]:
def preprocess_frame(frame):
    """
    Preprocesa el frame del juego para el modelo.
    Cambia la resolución a 128x128 y convierte a tensor.
    """
    frame = Image.fromarray(frame)
    preprocess = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor()
    ])
    return preprocess(frame).unsqueeze(0)

Guardar y mostrar el video generado

In [26]:
def show_video_of_model(agent, env_name, video_filename):
    """
    Mostrar un video del agente interactuando con el entorno.
    """
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action)
    env.close()
    imageio.mimsave(f'./{exp_folder}/{video_filename}', frames, fps=30)

video_filename = "video_6.mp4"
show_video_of_model(agent, 'ALE/MsPacman-v5', video_filename)

def show_video(video_filename):
    """
    Muestra el video del directorio.
    """
    mp4list = glob.glob(f'./{exp_folder}/{video_filename}')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video(video_filename)

