# Proyecto práctico — SpaceInvaders con **PPO** (Gym Atari)

Este cuaderno implementa **Actor–Critic con Proximal Policy Optimization (PPO)** en **PyTorch** para aprender a jugar **SpaceInvaders-v0** en Gym.

**Objetivo del proyecto:** tras el entrenamiento, el agente debe ser capaz de *“alcanzar más de 20 puntos con reward clipping durante más de 100 episodios consecutivos”* (ver sección de evaluación).

> Nota: el entrenamiento en Atari puede requerir **varias horas** dependiendo de CPU/GPU. El cuaderno incluye *checkpoints* para reanudar.

---

## PARTE 1 — Instalación y requisitos

Las versiones/librerías que usa este cuaderno siguen el fichero proporcionado de dependencias.

- `gym==0.17.3`
- `atari-py`
- `pyglet`
- `Pillow`
- `tensorflow/keras` (no se usan en PPO, pero se mantienen por consistencia con el entorno del proyecto)
- `torch==2.0.1`

> Si estás en Google Colab, ejecuta la celda de instalación. En local, asegúrate de tener estas versiones en tu entorno.

In [None]:
# Detectar si estamos en Google Colab
import os, sys, platform
IN_COLAB = 'google.colab' in sys.modules
print("IN_COLAB:", IN_COLAB)
print("Python:", sys.version)
print("Platform:", platform.platform())

In [None]:
# Instalación (solo si hace falta). En Colab puede tardar unos minutos.
# IMPORTANTE: si ya tienes el entorno configurado, puedes saltarte esta celda.

if IN_COLAB:
    !pip -q install gym==0.17.3
    !pip -q install git+https://github.com/Kojoley/atari-py.git
    !pip -q install pyglet==1.5.0
    !pip -q install h5py==3.1.0
    !pip -q install Pillow==9.5.0
    !pip -q install keras-rl2==1.0.5
    !pip -q install Keras==2.2.4
    !pip -q install tensorflow==2.5.3
    !pip -q install torch==2.0.1
    !pip -q install agents==1.4.0

---

## PARTE 2 — Imports y utilidades

En PPO trabajaremos con:
- Gym para el entorno Atari.
- Wrappers (preprocesado + frame stacking + reward clipping).
- PyTorch para Actor–Critic.

In [1]:
import gym
import numpy as np
from PIL import Image
from collections import deque
import random
import time
import math

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical

# Reproducibilidad (dentro de lo posible en RL)
SEED = 123
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


---

## PARTE 3 — Wrappers Atari (preprocesado + reward clipping)

Pipeline típico:

1. **No-op reset** (opcional) para aleatorizar el inicio.
2. **MaxAndSkip** para saltar frames y acelerar.
3. **WarpFrame**: 84×84 y escala de grises.
4. **FrameStack (4)** para dar “memoria” al agente.
5. **Reward clipping** a [-1,1] para estabilizar.
6. **FireReset**: algunos juegos requieren acción `FIRE` al empezar.


In [2]:
class NoopResetEnv(gym.Wrapper):
    # Realiza un número aleatorio de 'NOOP' al reset para aleatorizar el estado inicial.
    def __init__(self, env, noop_max=30):
        super().__init__(env)
        self.noop_max = noop_max
        self.noop_action = 0  # en Atari, 0 suele ser NOOP

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        noops = np.random.randint(1, self.noop_max + 1)
        for _ in range(noops):
            obs, _, done, _ = self.env.step(self.noop_action)
            if done:
                obs = self.env.reset(**kwargs)
        return obs

class MaxAndSkipEnv(gym.Wrapper):
    # Devuelve el máximo de dos frames y salta 'skip' frames.
    def __init__(self, env, skip=4):
        super().__init__(env)
        self._skip = skip
        self._obs_buffer = deque(maxlen=2)

    def step(self, action):
        total_reward = 0.0
        done = None
        info = {}
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break
        max_frame = np.maximum(self._obs_buffer[0], self._obs_buffer[-1])
        return max_frame, total_reward, done, info

    def reset(self, **kwargs):
        self._obs_buffer.clear()
        obs = self.env.reset(**kwargs)
        self._obs_buffer.append(obs)
        return obs

class FireResetEnv(gym.Wrapper):
    # Ejecuta FIRE al reset si el juego lo requiere (común en Atari).
    def __init__(self, env):
        super().__init__(env)

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        try:
            meanings = self.env.unwrapped.get_action_meanings()
            if len(meanings) > 1 and meanings[1] == 'FIRE':
                obs, _, done, _ = self.env.step(1)  # FIRE
                if done:
                    obs = self.env.reset(**kwargs)
        except Exception:
            pass
        return obs

class WarpFrame(gym.ObservationWrapper):
    # Convierte a escala de grises y redimensiona a 84x84.
    def __init__(self, env, width=84, height=84, grayscale=True):
        super().__init__(env)
        self.width = width
        self.height = height
        self.grayscale = grayscale
        c = 1 if grayscale else 3
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(self.height, self.width, c), dtype=np.uint8
        )

    def observation(self, obs):
        img = Image.fromarray(obs)
        if self.grayscale:
            img = img.convert('L')  # 1 canal
        img = img.resize((self.width, self.height), resample=Image.BILINEAR)
        arr = np.array(img, dtype=np.uint8)
        if self.grayscale:
            arr = arr[:, :, None]  # (H,W,1)
        return arr

class ClipRewardEnv(gym.RewardWrapper):
    # Clip de recompensa a [-1,1].
    def reward(self, reward):
        return np.clip(reward, -1.0, 1.0)

class FrameStack(gym.Wrapper):
    # Apila k frames (canal último).
    def __init__(self, env, k=4):
        super().__init__(env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = env.observation_space.shape  # (H,W,C)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(shp[0], shp[1], shp[2]*k), dtype=np.uint8
        )

    def reset(self):
        obs = self.env.reset()
        for _ in range(self.k):
            self.frames.append(obs)
        return self._get_obs()

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.frames.append(obs)
        return self._get_obs(), reward, done, info

    def _get_obs(self):
        return np.concatenate(list(self.frames), axis=2)

class RecordEpisodeStats(gym.Wrapper):
    # Guarda return y length por episodio en info['episode'].
    def __init__(self, env):
        super().__init__(env)
        self.episode_return = 0.0
        self.episode_length = 0

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        self.episode_return = 0.0
        self.episode_length = 0
        return obs

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        self.episode_return += reward
        self.episode_length += 1
        if done:
            info = dict(info)
            info["episode"] = {"r": float(self.episode_return), "l": int(self.episode_length)}
        return obs, reward, done, info

def make_env(env_id="SpaceInvaders-v0", seed=0, reward_clip=True):
    env = gym.make(env_id)
    env.seed(seed)
    env = NoopResetEnv(env, noop_max=30)
    env = MaxAndSkipEnv(env, skip=4)
    env = FireResetEnv(env)
    env = WarpFrame(env, width=84, height=84, grayscale=True)
    if reward_clip:
        env = ClipRewardEnv(env)
    env = FrameStack(env, k=4)
    env = RecordEpisodeStats(env)
    return env

In [3]:
# Comprobar rápidamente el entorno y shapes
env_id = "SpaceInvaders-v0"
env = make_env(env_id, seed=SEED, reward_clip=True)

obs = env.reset()
print("Obs shape:", obs.shape, "dtype:", obs.dtype)
print("Action space:", env.action_space)
try:
    print("Action meanings:", env.unwrapped.get_action_meanings())
except Exception as e:
    print("No action meanings:", e)

Obs shape: (84, 84, 4) dtype: uint8
Action space: Discrete(6)
Action meanings: ['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']


---

## PARTE 4 — Red Actor–Critic (CNN)

- Entrada: 84×84×4 (4 frames en gris apilados).
- Actor: logits para acciones discretas.
- Critic: valor V(s).

Convertimos observaciones a `float32` y normalizamos a [0,1].

In [4]:
class AtariActorCritic(nn.Module):
    def __init__(self, n_actions: int):
        super().__init__()
        # Entrada esperada: (B, 4, 84, 84)
        self.features = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten()
        )
        with torch.no_grad():
            dummy = torch.zeros(1, 4, 84, 84)
            n_flat = self.features(dummy).shape[1]

        self.fc = nn.Sequential(
            nn.Linear(n_flat, 512),
            nn.ReLU(),
        )
        self.policy_head = nn.Linear(512, n_actions)
        self.value_head = nn.Linear(512, 1)

    def forward(self, x):
        x = self.features(x)
        x = self.fc(x)
        logits = self.policy_head(x)
        value = self.value_head(x).squeeze(-1)
        return logits, value

def obs_to_tensor(obs_np: np.ndarray) -> torch.Tensor:
    # (84,84,4) uint8 -> (1,4,84,84) float32 in [0,1]
    x = torch.from_numpy(obs_np).to(device)
    x = x.permute(2, 0, 1).unsqueeze(0)  # (1,4,84,84)
    x = x.float() / 255.0
    return x

---

## PARTE 5 — PPO (rollout + GAE + objetivo con clipping)

En cada iteración:
1. Recolectamos una *rollout* de longitud `n_steps`.
2. Calculamos `returns` y `advantages` (con GAE).
3. Actualizamos la red con minibatches durante `n_epochs`.

PPO recorta el ratio de probabilidades para evitar cambios demasiado grandes en la política.

In [5]:
@torch.no_grad()
def select_action(model, obs):
    logits, value = model(obs)
    dist = Categorical(logits=logits)
    action = dist.sample()
    logprob = dist.log_prob(action)
    return action.item(), logprob.item(), value.item()

def compute_gae(rewards, dones, values, next_value, gamma=0.99, gae_lambda=0.95):
    T = len(rewards)
    advantages = np.zeros(T, dtype=np.float32)
    last_gae = 0.0
    for t in reversed(range(T)):
        nonterminal = 1.0 - dones[t]
        next_val = next_value if t == T-1 else values[t+1]
        delta = rewards[t] + gamma * next_val * nonterminal - values[t]
        last_gae = delta + gamma * gae_lambda * nonterminal * last_gae
        advantages[t] = last_gae
    returns = advantages + values
    return returns, advantages

In [6]:
# Hiperparámetros de entrenamiento
n_steps = 1024
total_timesteps = 5000000   # <-- se empezo con 1.000.000 y se planificaba llegar a un maximo de 10.000.000)
n_epochs = 4
minibatch_size = 256
normalize_advantages = True

log_every_episodes = 10
save_every_episodes = 50
checkpoint_path = "ppo_spaceinvaders.pt"

def ent_coef_schedule(step, total_steps):
    frac = step / float(total_steps)
    return 0.02 if frac < 0.3 else 0.005

In [7]:
class PPOAgent:
    def __init__(
        self, n_actions, lr=2.5e-4, gamma=0.99, gae_lambda=0.95,
        clip_eps=0.2, ent_coef=0.02, vf_coef=0.5, max_grad_norm=0.5,
    ):
        self.model = AtariActorCritic(n_actions).to(device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr, eps=1e-5)
        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.clip_eps = clip_eps
        self.ent_coef = ent_coef
        self.vf_coef = vf_coef
        self.max_grad_norm = max_grad_norm

    def update(self, batch, n_epochs=n_epochs, minibatch_size=minibatch_size):
        obs = batch["obs"]
        actions = batch["actions"]
        old_logprobs = batch["old_logprobs"]
        returns = batch["returns"]
        advantages = batch["advantages"]

        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        N = obs.shape[0]
        idxs = np.arange(N)
        losses = []

        for _ in range(n_epochs):
            np.random.shuffle(idxs)
            for start in range(0, N, minibatch_size):
                mb_idx = idxs[start:start+minibatch_size]
                mb_obs = obs[mb_idx]
                mb_actions = actions[mb_idx]
                mb_old_logp = old_logprobs[mb_idx]
                mb_returns = returns[mb_idx]
                mb_adv = advantages[mb_idx]

                logits, values = self.model(mb_obs)
                dist = Categorical(logits=logits)
                logp = dist.log_prob(mb_actions)
                entropy = dist.entropy().mean()

                ratio = torch.exp(logp - mb_old_logp)
                surr1 = ratio * mb_adv
                surr2 = torch.clamp(ratio, 1.0 - self.clip_eps, 1.0 + self.clip_eps) * mb_adv
                policy_loss = -torch.min(surr1, surr2).mean()

                value_loss = F.mse_loss(values, mb_returns)

                loss = policy_loss + self.vf_coef * value_loss - self.ent_coef * entropy

                self.optimizer.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
                self.optimizer.step()

                losses.append(loss.item())

        return float(np.mean(losses)) if losses else 0.0

---

## PARTE 6 — Entrenamiento

- `n_steps`: longitud de rollout.
- `total_timesteps`: presupuesto total de pasos (suele requerir millones en Atari).
- Checkpoints para reanudar.


In [9]:
import os
agent = PPOAgent(n_actions=env.action_space.n)

def save_checkpoint(path):
    torch.save({
        "model": agent.model.state_dict(),
        "optimizer": agent.optimizer.state_dict(),
    }, path)

def load_checkpoint(path):
    ckpt = torch.load(path, map_location=device)
    agent.model.load_state_dict(ckpt["model"])
    agent.optimizer.load_state_dict(ckpt["optimizer"])
    print("Checkpoint cargado:", path)

if os.path.exists(checkpoint_path):
    print("Checkpoint encontrado. Puedes cargarlo con load_checkpoint(checkpoint_path).")

In [10]:
# Entrenamiento PPO (1 entorno: claridad y compatibilidad)

episode_returns = []
episode_lengths = []
global_step = 0
episode_count = 0
t0 = time.time()

obs_np = env.reset()

# (Opcional) cada cuántos episodios evaluar el criterio
eval_every_episodes = 500  # por ejemplo
target_consecutive = 100

while global_step < total_timesteps:
    obs_buf, actions_buf, logp_buf = [], [], []
    rewards_buf, dones_buf, values_buf = [], [], []

    for _ in range(n_steps):
        obs_t = obs_to_tensor(obs_np)
        action, logp, value = select_action(agent.model, obs_t)

        next_obs, reward, done, info = env.step(action)

        obs_buf.append(obs_t.squeeze(0))          # (4,84,84)
        actions_buf.append(action)
        logp_buf.append(logp)
        rewards_buf.append(reward)
        dones_buf.append(1.0 if done else 0.0)
        values_buf.append(value)

        obs_np = next_obs
        global_step += 1

        if done:
            episode_count += 1

            ep_info = info.get("episode", {})
            ep_r = ep_info.get("r", None)
            ep_l = ep_info.get("l", None)

            if ep_r is not None:
                episode_returns.append(ep_r)
                episode_lengths.append(ep_l)

            obs_np = env.reset()

            # -------- LOG mejorado (incluye longitudes) --------
            if episode_count % log_every_episodes == 0 and len(episode_returns) > 0:
                avg_10 = np.mean(episode_returns[-10:])
                avg_100 = np.mean(episode_returns[-100:]) if len(episode_returns) >= 100 else np.mean(episode_returns)

                # promedio de longitudes (si existen)
                last10_len = [l for l in episode_lengths[-10:] if isinstance(l, (int, np.integer))]
                avg_len_10 = float(np.mean(last10_len)) if last10_len else float("nan")

                print(
                    f"[Ep {episode_count:5d}] step={global_step:8d}  "
                    f"avg10={avg_10:6.2f}  avg100={avg_100:6.2f}  "
                    f"avglen10={avg_len_10:6.1f}  "
                    f"elapsed={(time.time()-t0)/60:5.1f}m"
                )

            # -------- CHECKPOINT --------
            if episode_count % save_every_episodes == 0:
                save_checkpoint(checkpoint_path)
                print("Checkpoint guardado:", checkpoint_path)

        if global_step >= total_timesteps:
            break

    # bootstrap del value para el último estado del rollout
    with torch.no_grad():
        next_value = agent.model(obs_to_tensor(obs_np))[1].item()

    returns, adv = compute_gae(
        rewards=np.array(rewards_buf, dtype=np.float32),
        dones=np.array(dones_buf, dtype=np.float32),
        values=np.array(values_buf, dtype=np.float32),
        next_value=next_value,
        gamma=agent.gamma,
        gae_lambda=agent.gae_lambda
    )

    # ---- (1) NORMALIZE ADVANTAGES ----
    adv = adv.astype(np.float32)
    adv = (adv - adv.mean()) / (adv.std() + 1e-8)

    batch = {
        "obs": torch.stack(obs_buf).to(device),
        "actions": torch.tensor(actions_buf, dtype=torch.int64).to(device),
        "old_logprobs": torch.tensor(logp_buf, dtype=torch.float32).to(device),
        "returns": torch.tensor(returns, dtype=torch.float32).to(device),
        "advantages": torch.tensor(adv, dtype=torch.float32).to(device),
    }

    agent.ent_coef = ent_coef_schedule(global_step, total_timesteps)
    loss = agent.update(batch, n_epochs=n_epochs, minibatch_size=minibatch_size)

print("Entrenamiento finalizado. Episodios:", episode_count, "Steps:", global_step)
save_checkpoint(checkpoint_path)

[Ep    10] step=    1743  avg10=  9.80  avg100=  9.80  avglen10= 174.3  elapsed=  0.1m
[Ep    20] step=    3396  avg10=  8.50  avg100=  9.15  avglen10= 165.3  elapsed=  0.2m
[Ep    30] step=    4982  avg10=  8.50  avg100=  8.93  avglen10= 158.6  elapsed=  0.3m
[Ep    40] step=    6625  avg10=  8.30  avg100=  8.78  avglen10= 164.3  elapsed=  0.4m
[Ep    50] step=    8356  avg10= 10.00  avg100=  9.02  avglen10= 173.1  elapsed=  0.5m
Checkpoint guardado: ppo_spaceinvaders.pt
[Ep    60] step=    9896  avg10=  8.40  avg100=  8.92  avglen10= 154.0  elapsed=  0.6m
[Ep    70] step=   11322  avg10=  7.30  avg100=  8.69  avglen10= 142.6  elapsed=  0.6m
[Ep    80] step=   13018  avg10= 12.00  avg100=  9.10  avglen10= 169.6  elapsed=  0.7m
[Ep    90] step=   14420  avg10=  8.40  avg100=  9.02  avglen10= 140.2  elapsed=  0.8m
[Ep   100] step=   16250  avg10= 12.30  avg100=  9.35  avglen10= 183.0  elapsed=  0.9m
Checkpoint guardado: ppo_spaceinvaders.pt
[Ep   110] step=   17811  avg10= 10.90  avg100

---

## PARTE 7 — Evaluación: objetivo “>20 durante 100 episodios consecutivos”

Evaluación **con reward clipping activado**. Usamos política *greedy* (argmax) para medir rendimiento.

Criterio:
- `best_consecutive >= 100` con `threshold=20`.

In [11]:
from collections import Counter
import torch

def evaluate_agent(
    agent,
    env_id="SpaceInvaders-v0",
    n_eval_episodes=200,
    threshold=20.0,
    print_every=1,
    mode="greedy",          # "greedy" | "sample" | "epsilon_greedy"
    epsilon=0.05            # solo usado si mode="epsilon_greedy"
):
    eval_env = make_env(env_id, seed=SEED+999, reward_clip=True)
    agent.model.eval()

    consecutive = 0
    best_consecutive = 0
    returns = []
    lengths = []

    global_action_counter = Counter()

    for ep in range(1, n_eval_episodes+1):
        seed_ep = SEED + 999 + ep
        np.random.seed(seed_ep)
        torch.manual_seed(seed_ep)
        random.seed(seed_ep)

        try:
            eval_env.seed(seed_ep)
        except Exception:
            pass
        try:
            eval_env.action_space.seed(seed_ep)
        except Exception:
            pass

        obs = eval_env.reset()
        done = False
        info = {}

        ep_action_counter = Counter()
        steps = 0

        while not done:
            obs_t = obs_to_tensor(obs)

            with torch.no_grad():
                logits, _ = agent.model(obs_t)

                if mode == "greedy":
                    action = torch.argmax(logits, dim=-1).item()

                elif mode == "sample":
                    dist = torch.distributions.Categorical(logits=logits)
                    action = dist.sample().item()

                elif mode == "epsilon_greedy":
                    # con prob epsilon elige acción aleatoria, si no argmax
                    if np.random.rand() < epsilon:
                        action = eval_env.action_space.sample()
                    else:
                        action = torch.argmax(logits, dim=-1).item()
                else:
                    raise ValueError(f"Modo desconocido: {mode}")

            ep_action_counter[action] += 1
            global_action_counter[action] += 1
            steps += 1

            obs, reward, done, info = eval_env.step(action)

        ep_info = info.get("episode", {})
        ep_r = float(ep_info.get("r", 0.0))
        ep_l = ep_info.get("l", None)

        returns.append(ep_r)
        lengths.append(ep_l)

        consecutive = consecutive + 1 if ep_r > threshold else 0
        best_consecutive = max(best_consecutive, consecutive)

        if ep % print_every == 0:
            len_str = f"{ep_l:4d}" if isinstance(ep_l, (int, np.integer)) else str(ep_l)

            uniq_actions = len(ep_action_counter)
            top_action, top_count = ep_action_counter.most_common(1)[0]
            top_frac = top_count / max(1, steps)

            print(
                f"[Eval-{mode} ep {ep:3d}] return={ep_r:6.2f}  len={len_str}  "
                f"uniq_actions={uniq_actions:2d}  top_action={top_action}({top_frac:5.2%})  "
                f"consecutive(>{threshold})={consecutive:3d}  best={best_consecutive:3d}"
            )

    avg = float(np.mean(returns)) if returns else 0.0
    valid_lengths = [l for l in lengths if isinstance(l, (int, np.integer))]
    avg_len = float(np.mean(valid_lengths)) if valid_lengths else float("nan")

    print(f"\nEvaluación completa ({mode}): avg_return={avg:.2f}, best_consecutive={best_consecutive}")
    if valid_lengths:
        print(f"Longitudes: avg_len={avg_len:.1f}, min_len={min(valid_lengths)}, max_len={max(valid_lengths)}")
    else:
        print("Longitudes: no disponibles (info['episode']['l'] no existe).")

    if sum(global_action_counter.values()) > 0:
        top_a, top_c = global_action_counter.most_common(1)[0]
        total = sum(global_action_counter.values())
        print("Acciones global (conteo):", dict(global_action_counter))
        print(f"Acción dominante global: {top_a} ({top_c/total:.2%})")

    return returns, best_consecutive, lengths

# Llamada Evaluación oficial (greedy):
returns_g, best_g, _ = evaluate_agent(agent, env_id=env_id, n_eval_episodes=100,
                                     threshold=20.0, print_every=10, mode="greedy")
# Llamada Diagnóstico (muestreo):
returns_s, best_s, _ = evaluate_agent(agent, env_id=env_id, n_eval_episodes=100,
                                     threshold=20.0, print_every=10, mode="sample")
# Llamada Diagnóstico alternativo (epsilon-greedy):
returns_e, best_e, _ = evaluate_agent(agent, env_id=env_id, n_eval_episodes=100,
                                     threshold=20.0, print_every=10, mode="epsilon_greedy", epsilon=0.05)

[Eval-greedy ep  10] return= 51.00  len= 380  uniq_actions= 5  top_action=1(48.68%)  consecutive(>20.0)= 10  best= 10
[Eval-greedy ep  20] return= 50.00  len= 341  uniq_actions= 5  top_action=1(50.44%)  consecutive(>20.0)= 20  best= 20
[Eval-greedy ep  30] return= 30.00  len= 247  uniq_actions= 5  top_action=1(43.32%)  consecutive(>20.0)= 30  best= 30
[Eval-greedy ep  40] return= 51.00  len= 449  uniq_actions= 5  top_action=1(46.33%)  consecutive(>20.0)= 40  best= 40
[Eval-greedy ep  50] return= 36.00  len= 290  uniq_actions= 5  top_action=1(37.93%)  consecutive(>20.0)= 50  best= 50
[Eval-greedy ep  60] return= 47.00  len= 380  uniq_actions= 5  top_action=1(49.47%)  consecutive(>20.0)= 60  best= 60
[Eval-greedy ep  70] return= 45.00  len= 382  uniq_actions= 5  top_action=1(39.79%)  consecutive(>20.0)= 70  best= 70
[Eval-greedy ep  80] return= 48.00  len= 341  uniq_actions= 5  top_action=1(46.04%)  consecutive(>20.0)= 80  best= 80
[Eval-greedy ep  90] return= 31.00  len= 223  uniq_actio

---

## PARTE 8 — Jugar una partida (render)

En local puede funcionar `render(mode="human")`. En Colab normalmente no.

También intentamos capturar frames con `rgb_array` para un GIF opcional.

In [12]:
play_env = make_env(env_id, seed=SEED+2024, reward_clip=True)
obs = play_env.reset()
done = False
info = {}

try:
    play_env.render(mode="human")
except Exception as e:
    print("Render human no disponible aquí:", e)

episode_frames = []
while not done:
    obs_t = obs_to_tensor(obs)
    with torch.no_grad():
        logits, _ = agent.model(obs_t)
        action = torch.argmax(logits, dim=-1).item()
    obs, reward, done, info = play_env.step(action)

    try:
        frame = play_env.render(mode="rgb_array")
        episode_frames.append(frame)
    except Exception:
        pass

ep_r = info.get("episode", {}).get("r", 0.0)
print("Episode return (clipped):", ep_r)

Episode return (clipped): 25.0


---

## PARTE 9 — (Opcional) Guardar GIF del episodio

In [13]:
if len(episode_frames) > 0:
    gif_path = "spaceinvaders_ppo_episode.gif"
    pil_frames = [Image.fromarray(f) for f in episode_frames]
    pil_frames[0].save(gif_path, save_all=True, append_images=pil_frames[1:], duration=33, loop=0)
    print("GIF guardado:", gif_path)
else:
    print("No hay frames capturados. (rgb_array no disponible o render desactivado).")

GIF guardado: spaceinvaders_ppo_episode.gif
