# Entrenamiento Local de Agentes Pokémon
Este cuaderno coordina el entrenamiento local de los agentes especializados y del agente híbrido usando las utilidades de `advanced_agents`. Cada sección describe qué configura o ejecuta para que puedas seguir el flujo sin consultar otros archivos. Para más detalles sobre los scripts equivalentes por lotes revisa `README_LOCAL_TRAINING.md`.

In [5]:
import sys
import os

# FIX: Resolver conflicto de OpenMP (Error #15) que causa crash del kernel
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

import json
import shutil
import types
import importlib
from gymnasium import spaces

# Configuración de rutas locales
project_path = os.getcwd()
if project_path not in sys.path:
    sys.path.append(project_path)

baselines_path = os.path.join(project_path, 'baselines')
if baselines_path not in sys.path:
    sys.path.append(baselines_path)

print(f"Directorio de trabajo: {project_path}")

Directorio de trabajo: c:\Users\javi1\Documents\repos_git\TEL351-PokemonRed


## 1. Configuración de entorno
Inicializa rutas y variables de entorno necesarias para que PyBoy y Stable-Baselines3 funcionen sin conflictos (por ejemplo, se habilita `KMP_DUPLICATE_LIB_OK` para evitar errores de OpenMP).

In [6]:
# --- RELOAD MODULES ---
def reload_modules():
    modules_to_reload = [
        'v2.red_gym_env_v2',
        'advanced_agents.features',
        'advanced_agents.wrappers',
        'advanced_agents.base',
        'advanced_agents.train_agents',
        'advanced_agents.combat_apex_agent',
        'advanced_agents.puzzle_speed_agent',
        'advanced_agents.hybrid_sage_agent',
        'advanced_agents.transition_models'
    ]
    for mod_name in modules_to_reload:
        if mod_name in sys.modules:
            try:
                importlib.reload(sys.modules[mod_name])
                print(f"♻️ Recargado: {mod_name}")
            except Exception as e:
                print(f"⚠️ No se pudo recargar {mod_name}: {e}")

reload_modules()

♻️ Recargado: v2.red_gym_env_v2
♻️ Recargado: advanced_agents.features
♻️ Recargado: advanced_agents.wrappers
♻️ Recargado: advanced_agents.base
♻️ Recargado: advanced_agents.train_agents
♻️ Recargado: advanced_agents.combat_apex_agent
♻️ Recargado: advanced_agents.puzzle_speed_agent
♻️ Recargado: advanced_agents.hybrid_sage_agent
♻️ Recargado: advanced_agents.transition_models


## 2. Recarga de módulos
Permite refrescar los módulos clave de `advanced_agents` y del entorno RedGym cada vez que hagas cambios en el código fuente sin tener que reiniciar el kernel. Ejecuta esta celda si modificas archivos Python relacionados.

In [7]:
# Copiar events.json si es necesario
events_source = os.path.join(project_path, 'baselines', 'events.json')
events_dest = os.path.join(project_path, 'events.json')
if os.path.exists(events_source) and not os.path.exists(events_dest):
    shutil.copy(events_source, events_dest)
    print(f"Copiado events.json a {events_dest}")

## 3. Sincronización de `events.json`
Garantiza que el archivo de eventos requerido por PyBoy esté disponible en la raíz del proyecto copiándolo desde `baselines/events.json` cuando falta.

## 4. Utilidades de entrenamiento
Define el registro de agentes, valida que existan los archivos `.state`, construye las configuraciones de entorno y expone `train_single_run`/`train_plan`, que son los puntos de entrada para disparar los entrenamientos desde las celdas siguientes.

In [8]:
import json
import shutil
import types
import importlib
from typing import Dict, Iterable, List, Optional

from gymnasium import spaces

try:
    from advanced_agents.train_agents import _base_env_config
    from advanced_agents.combat_apex_agent import CombatApexAgent, CombatAgentConfig
    from advanced_agents.puzzle_speed_agent import PuzzleSpeedAgent, PuzzleAgentConfig
    from advanced_agents.hybrid_sage_agent import HybridSageAgent, HybridAgentConfig
except ImportError as e:
    print("ERROR CRÍTICO: Fallo en imports.")
    raise e

# --- Cargar escenarios ---
SCENARIO_PATH = os.path.join(project_path, 'gym_scenarios', 'scenarios.json')
with open(SCENARIO_PATH, 'r') as f:
    scenarios_data = json.load(f)

SCENARIOS: Dict[str, Dict] = {scenario['id']: scenario for scenario in scenarios_data['scenarios']}

AGENT_REGISTRY = {
    'combat': {
        'agent_cls': CombatApexAgent,
        'config_cls': CombatAgentConfig,
        'default_phase': 'battle'
    },
    'puzzle': {
        'agent_cls': PuzzleSpeedAgent,
        'config_cls': PuzzleAgentConfig,
        'default_phase': 'puzzle'
    },
    'hybrid': {
        'agent_cls': HybridSageAgent,
        'config_cls': HybridAgentConfig,
        'default_phase': 'battle'
    }
}

MODELS_DIR = os.path.join(project_path, 'models_local')
os.makedirs(MODELS_DIR, exist_ok=True)

def resolve_phase(scenario_id: str, phase_name: Optional[str]) -> Dict:
    scenario = SCENARIOS.get(scenario_id)
    if scenario is None:
        raise ValueError(f"Escenario {scenario_id} no encontrado en {SCENARIO_PATH}")
    target_phase = phase_name or AGENT_REGISTRY['combat']['default_phase']
    selected_phase = next((p for p in scenario['phases'] if p['name'] == target_phase), None)
    if selected_phase is None:
        raise ValueError(f"Fase {target_phase} no encontrada en el escenario {scenario_id}")
    return selected_phase

def ensure_state_file(state_file_path: str) -> str:
    abs_path = os.path.join(project_path, state_file_path) if not os.path.isabs(state_file_path) else state_file_path
    if not os.path.exists(abs_path):
        raise FileNotFoundError(
            f"No se encontró el archivo de estado requerido: {abs_path}. "
            "Genera los .state con generate_gym_states.py o ajusta la ruta."
        )
    return abs_path

def build_env_overrides(state_file_path: str, headless: bool) -> Dict:
    return {
        'init_state': state_file_path,
        'headless': headless,
        'save_video': False,
        'gb_path': os.path.join(project_path, 'PokemonRed.gb'),
        'session_path': os.path.join(project_path, 'sessions', f"local_{os.path.basename(state_file_path)}"),
        'render_mode': 'rgb_array' if headless else 'human',
        'fast_video': headless
    }

def _patch_callbacks(agent, additional_callbacks: Optional[List] = None):
    base_callbacks_method = agent.extra_callbacks

    def _patched_callbacks(self):
        callbacks = list(base_callbacks_method())
        if additional_callbacks:
            callbacks.extend(additional_callbacks)
        return callbacks

    agent.extra_callbacks = types.MethodType(_patched_callbacks, agent)

def train_single_run(
    agent_key: str,
    scenario_id: str,
    phase_name: str,
    total_timesteps: int = 200_000,
    headless: bool = False,
    additional_callbacks: Optional[List] = None
):
    registry_entry = AGENT_REGISTRY.get(agent_key)
    if registry_entry is None:
        raise ValueError(f"Agente desconocido: {agent_key}")

    phase = resolve_phase(scenario_id, phase_name)
    state_file_path = ensure_state_file(phase['state_file'])

    env_overrides = build_env_overrides(state_file_path, headless=headless)
    config = registry_entry['config_cls'](
        env_config=_base_env_config(env_overrides),
        total_timesteps=total_timesteps
    )

    agent = registry_entry['agent_cls'](config)

    env_for_check = agent.make_env()
    obs_space = getattr(env_for_check, 'observation_space', None)
    if isinstance(obs_space, spaces.Dict):
        print("Observación Dict detectada -> MultiInputPolicy")
        agent.policy_name = types.MethodType(lambda self: "MultiInputPolicy", agent)
    env_for_check.close()

    if additional_callbacks:
        _patch_callbacks(agent, additional_callbacks)

    print(
        f"\n=== Entrenando {agent_key.upper()} en {scenario_id} ({phase_name}) por {total_timesteps:,} pasos ===")
    runtime = agent.train()

    agent_dir = os.path.join(MODELS_DIR, agent_key)
    os.makedirs(agent_dir, exist_ok=True)
    model_path = os.path.join(agent_dir, f"{scenario_id}_{phase_name}.zip")
    runtime.model.save(model_path)
    print(f"Modelo guardado en {model_path}")

    return runtime

def train_plan(
    agent_key: str,
    plan: List[Dict],
    default_timesteps: int = 200_000,
    headless: bool = False,
    callback_factory: Optional[callable] = None
) -> Dict[tuple, object]:
    results = {}
    total_runs = len(plan)
    for run_idx, entry in enumerate(plan, start=1):
        scenario_id = entry['scenario']
        phase_name = entry.get('phase') or AGENT_REGISTRY[agent_key]['default_phase']
        run_timesteps = entry.get('timesteps', default_timesteps)
        callbacks = None
        if callback_factory is not None:
            callbacks = callback_factory(entry)
        print(f"\n>>> [{agent_key.upper()}] Ejecución {run_idx}/{total_runs}")
        runtime = train_single_run(
            agent_key=agent_key,
            scenario_id=scenario_id,
            phase_name=phase_name,
            total_timesteps=run_timesteps,
            headless=headless,
            additional_callbacks=callbacks
        )
        results[(scenario_id, phase_name)] = runtime
    return results

## 5. Planes de entrenamiento
Ajusta aquí qué escenarios, fases y pasos quieres cubrir para cada agente. Usa esto como checklist antes de lanzar ejecuciones largas; puedes sobreescribir timesteps por fila y alternar `headless` para ver la ventana del emulador.

### Configura planes de entrenamiento locales
Especifica los escenarios, fases y timesteps que quieres para cada agente. Puedes ejecutar cada bloque por separado y combinar headless=True/False según quieras ver la ventana del emulador.

In [9]:
combat_plan_local = [
    {"scenario": "pewter_brock", "phase": "battle", "timesteps": 200_000},
    # {"scenario": "cerulean_misty", "phase": "battle", "timesteps": 220_000},
]

puzzle_plan_local = [
    {"scenario": "pewter_brock", "phase": "puzzle", "timesteps": 180_000},
    # {"scenario": "cerulean_misty", "phase": "puzzle", "timesteps": 200_000},
]

hybrid_plan_local = [
    {"scenario": "pewter_brock", "phase": "battle", "timesteps": 220_000},
    # {"scenario": "vermillion_lt_surge", "phase": "battle", "timesteps": 250_000},
]

DEFAULT_TIMESTEPS_LOCAL = 200_000
DEFAULT_HEADLESS_LOCAL = False  # Cambia a True si no necesitas la ventana SDL

## 6. Ejecutar plan de combate
Lanza las corridas definidas en `combat_plan_local`. Cada iteración verifica el `.state`, arma el entorno y guarda el modelo en `models_local/combat/`.

In [None]:
combat_runs_local = train_plan(
    agent_key='combat',
    plan=combat_plan_local,
    default_timesteps=DEFAULT_TIMESTEPS_LOCAL,
    headless=DEFAULT_HEADLESS_LOCAL
)


>>> [COMBAT] Ejecución 1/1
Observación Dict detectada -> MultiInputPolicy

=== Entrenando COMBAT en pewter_brock (battle) por 200,000 pasos ===


## 7. Ejecutar plan de puzzles
Corre el plan `puzzle_plan_local` usando `PuzzleSpeedAgent` y guarda salidas en `models_local/puzzle/`. Útil para medir tiempos de navegación y resolución de puzzles previos al combate.

In [None]:
puzzle_runs_local = train_plan(
    agent_key='puzzle',
    plan=puzzle_plan_local,
    default_timesteps=DEFAULT_TIMESTEPS_LOCAL,
    headless=DEFAULT_HEADLESS_LOCAL
)

## 8. Ejecutar plan híbrido
Activa `HybridSageAgent` sobre los escenarios definidos en `hybrid_plan_local`, mezclando comportamientos de combate y navegación y almacenando resultados en `models_local/hybrid/`.

In [None]:
hybrid_runs_local = train_plan(
    agent_key='hybrid',
    plan=hybrid_plan_local,
    default_timesteps=DEFAULT_TIMESTEPS_LOCAL,
    headless=DEFAULT_HEADLESS_LOCAL
)

## 9. Guardado manual (opcional)
Fragmento de ejemplo para guardar un modelo entrenado con un nombre personalizado. Solo úsalo si traes a la sesión variables como `model`, `AGENT_TYPE`, `SCENARIO_ID` y `PHASE_NAME`; de lo contrario producirá errores.

In [None]:
# Guardar modelo
save_dir = "models_local"
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, f"{AGENT_TYPE}_{SCENARIO_ID}_{PHASE_NAME}")
model.save(save_path)
print(f"Modelo guardado en {save_path}")

## 10. Comparación con Baseline (PPO v2)
Esta sección permite comparar el desempeño de tus agentes entrenados (Combat, Puzzle, Hybrid) contra el modelo PPO genérico de v2 (`poke_26214400.zip`). Se ejecutarán episodios de evaluación en los mismos escenarios para contrastar recompensas y pasos.

In [None]:
import pandas as pd
import numpy as np
from stable_baselines3 import PPO
from v2.red_gym_env_v2 import RedGymEnv

def load_baseline_model(path):
    if not os.path.exists(path):
        print(f"No se encontró el modelo baseline en: {path}")
        return None
    try:
        return PPO.load(path)
    except Exception as e:
        print(f"Error cargando baseline: {e}")
        return None

def evaluate_agent_model(model, env, num_episodes=3):
    """Ejecuta episodios de evaluación y retorna métricas promedio."""
    rewards = []
    steps = []
    
    for i in range(num_episodes):
        obs, _ = env.reset()
        done = False
        truncated = False
        total_reward = 0
        step_count = 0
        
        while not done and not truncated:
            # Usar predict del modelo SB3
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, truncated, info = env.step(action)
            total_reward += reward
            step_count += 1
            
        rewards.append(total_reward)
        steps.append(step_count)
        
    return {
        'mean_reward': np.mean(rewards),
        'std_reward': np.std(rewards),
        'mean_steps': np.mean(steps)
    }

def run_comparison(plans_dict, baseline_path, headless=True):
    results = []
    
    # Cargar Baseline una vez
    baseline_model = load_baseline_model(baseline_path)
    if not baseline_model:
        print("No se puede proceder sin el modelo baseline.")
        return None
    
    print(f"Modelo Baseline cargado desde: {baseline_path}")
    
    for agent_key, plan in plans_dict.items():
        for entry in plan:
            scenario_id = entry['scenario']
            phase_name = entry.get('phase') or AGENT_REGISTRY[agent_key]['default_phase']
            
            print(f"\n--- Comparando {agent_key.upper()} vs BASELINE en {scenario_id} ({phase_name}) ---")
            
            # 1. Preparar Configuración Común
            phase = resolve_phase(scenario_id, phase_name)
            state_file_path = ensure_state_file(phase['state_file'])
            env_overrides = build_env_overrides(state_file_path, headless=headless)
            base_config = _base_env_config(env_overrides)
            
            # ---------------------------------------------------------
            # 2. Evaluar Agente Local (con su propio wrapper/env)
            # ---------------------------------------------------------
            registry_entry = AGENT_REGISTRY[agent_key]
            # Config dummy para instanciar el agente y crear su env
            agent_config = registry_entry['config_cls'](
                env_config=base_config,
                total_timesteps=1000 
            )
            local_agent_wrapper = registry_entry['agent_cls'](agent_config)
            
            # Intentar cargar el modelo entrenado localmente
            local_model_path = os.path.join(MODELS_DIR, agent_key, f"{scenario_id}_{phase_name}.zip")
            
            if os.path.exists(local_model_path):
                print(f"Cargando modelo local: {local_model_path}")
                try:
                    # Crear entorno específico del agente (con wrappers)
                    env_local = local_agent_wrapper.make_env()
                    
                    # Cargar pesos en el modelo del agente
                    local_agent_wrapper.model = PPO.load(local_model_path)
                    
                    metrics_local = evaluate_agent_model(local_agent_wrapper.model, env_local)
                    env_local.close()
                    print(f"Local: R={metrics_local['mean_reward']:.2f}, Steps={metrics_local['mean_steps']:.1f}")
                except Exception as e:
                    print(f"Error evaluando local: {e}")
                    metrics_local = None
            else:
                print(f"No existe modelo local en {local_model_path}. Saltando.")
                metrics_local = None

            # ---------------------------------------------------------
            # 3. Evaluar Baseline (con entorno estándar RedGymEnv)
            # ---------------------------------------------------------
            print("Evaluando Baseline...")
            try:
                # Crear entorno estándar para el baseline
                env_baseline = RedGymEnv(base_config)
                
                metrics_baseline = evaluate_agent_model(baseline_model, env_baseline)
                env_baseline.close()
                print(f"Baseline: R={metrics_baseline['mean_reward']:.2f}, Steps={metrics_baseline['mean_steps']:.1f}")
            except Exception as e:
                print(f"Error evaluando baseline: {e}")
                metrics_baseline = {'mean_reward': 0, 'mean_steps': 0}
            
            # ---------------------------------------------------------
            # 4. Registrar Resultados
            # ---------------------------------------------------------
            if metrics_local:
                results.append({
                    'Agent': agent_key.upper(),
                    'Scenario': scenario_id,
                    'Phase': phase_name,
                    'Model': 'Local (Specialized)',
                    'Reward': metrics_local['mean_reward'],
                    'Steps': metrics_local['mean_steps']
                })
                
            results.append({
                'Agent': agent_key.upper(), # Agrupador
                'Scenario': scenario_id,
                'Phase': phase_name,
                'Model': 'Baseline (PPO v2)',
                'Reward': metrics_baseline['mean_reward'],
                'Steps': metrics_baseline['mean_steps']
            })

    return pd.DataFrame(results)

In [None]:
# Ruta al modelo baseline (ajusta si es necesario)
BASELINE_MODEL_PATH = os.path.join(project_path, 'v2', 'runs', 'poke_26214400.zip')

# Definir qué comparar (puedes reusar los planes definidos arriba)
comparison_plans = {
    'combat': combat_plan_local,
    'puzzle': puzzle_plan_local,
    'hybrid': hybrid_plan_local
}

# Ejecutar comparación (headless=True para ir rápido)
df_results = run_comparison(comparison_plans, BASELINE_MODEL_PATH, headless=True)

if df_results is not None and not df_results.empty:
    print("\n=== RESULTADOS DE COMPARACIÓN ===")
    print(df_results)
    
    # Opcional: Guardar CSV
    df_results.to_csv("comparacion_local_vs_baseline.csv", index=False)
else:
    print("No se generaron resultados (¿faltan modelos?).")