In [3]:
import os
import sys
import random
import itertools
import time
import numpy as np
import gymnasium as gym
from gymnasium import spaces

# --- CONFIGURACIÓN DE PATH ---
# Asumimos que el notebook está en RISK-AI/PPO/
# Añadimos el directorio padre (RISK-AI/) al path para importar risktools
current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, '..'))
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

print(f"Directorio padre añadido al path: {parent_dir}")

# --- IMPORTS DEL JUEGO ---
try:
    import risktools
    from clases.state import RiskState
    from config_atrib import *
    
    print("✅ Librerías de RISK importadas correctamente.")
except ImportError as e:
    print(f"❌ Error importando risktools: {e}")
    print("Verifica que estás ejecutando el notebook desde la carpeta RISK-AI/PPO/")

Directorio padre añadido al path: c:\Users\murci\Documents\Master_IA\Reinforcement_Learning\RISK-AI
✅ Librerías de RISK importadas correctamente.


In [9]:
class RiskTotalControlEnv(gym.Env):
    """
    Entorno Gymnasium para jugar a RISK con control total.
    CORREGIDO: Inicialización manual del tablero para evitar dependencias de la GUI.
    """
    metadata = {'render_modes': ['human']}

    def __init__(self, enemy_ai_class=None):
        super(RiskTotalControlEnv, self).__init__()
        
        # Cargar el tablero UNA sola vez al inicio
        world_path = os.path.join(parent_dir, "world.zip")
        self.board_base = risktools.loadBoard(world_path)
        self.n_territories = len(self.board_base.territories)
        
        # Configurar valores de cartas por defecto (ya que loadBoard no los trae)
        self.board_base.set_turn_in_values([4, 6, 8, 10, 12, 15])
        self.board_base.set_increment_value(5)
        
        # ACCIONES: MultiDiscrete([Tipo(7), Origen(42), Destino(42), Cantidad(10)])
        self.action_space = spaces.MultiDiscrete([7, 42, 42, 10])
        
        # OBSERVACIÓN
        self.obs_dim = (self.n_territories * 3) + 20 
        self.observation_space = spaces.Box(low=0, high=1, shape=(self.obs_dim,), dtype=np.float32)

        self.state = None
        self.player_idx = 0
        self.enemy_ai = enemy_ai_class

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        
        # 1. Crear Jugadores
        # Usamos las constantes importadas de config_atrib
        p1 = risktools.RiskPlayer("Agent_PPO", 0, 0, False, ECON_START, HAPP_START, DEVP_START)
        p2 = risktools.RiskPlayer("Enemy_Bot", 1, 0, False, ECON_START, HAPP_START, DEVP_START)
        
        # Parche de compatibilidad (atributos legacy camelCase)
        p1.freeArmies = p1.free_armies
        p1.conqueredTerritory = p1.conquered_territory
        p2.freeArmies = p2.free_armies
        p2.conqueredTerritory = p2.conquered_territory

        # 2. Configurar el Tablero (Usando el cargado, sin createRiskBoard)
        # Reutilizamos la estructura del mapa pero limpiamos los jugadores
        self.board = self.board_base # Referencia al objeto base
        self.board.players = []      # Limpiar lista jugadores
        self.board.player_to_id = {} # Limpiar diccionarios
        self.board.id_to_player = {}
        
        # Añadir nuestros jugadores al tablero
        self.board.add_player(p1)
        self.board.add_player(p2)

        # 3. Configurar engine global (por seguridad, aunque intentamos no usarlo)
        risktools.riskengine.playerorder = [p1, p2]
        risktools.riskengine.currentplayer = p1
        risktools.riskengine.phase = 'Preposition'
        
        # 4. Crear Estado Inicial
        # getInitialState usa self.board.players, que acabamos de configurar
        self.state = risktools.getInitialState(self.board)
        
        # 5. Setup Rápido (Saltar fase de asignación manual)
        self._fast_random_setup()
        
        return self._get_obs(), {}

    def step(self, action):
        act_type, act_src, act_dst, act_amt = action
        
        # Decodificar acción
        game_action = self._decode_action(act_type, act_src, act_dst, act_amt)
        
        reward = 0
        terminated = False
        truncated = False
        info = {"valid": game_action is not None}
        
        if game_action is None:
            return self._get_obs(), -1.0, False, False, info

        # Ejecutar acción en el motor
        try:
            next_states, probs = risktools.simulateAction(self.state, game_action)
            if len(next_states) > 1:
                idx = np.random.choice(len(next_states), p=probs)
                self.state = next_states[idx]
            else:
                self.state = next_states[0]
        except Exception as e:
            # Si hay error interno, penalizamos y terminamos episodio (evita crash)
            return self._get_obs(), -10, True, False, {"error": str(e), "valid": True}

        # Recompensa
        reward += self._calculate_reward()
        
        # Verificar Fin de Juego
        if self.state.turn_type == 'GameOver':
            terminated = True
            winners = [i for i, p in enumerate(self.state.players) if not p.game_over]
            # Si ganamos (somos player 0 y estamos en la lista de ganadores)
            if self.player_idx in winners and len(winners) == 1:
                reward += 100
            else:
                reward -= 100
        
        # Turno del enemigo
        elif self.state.current_player != self.player_idx:
            self._simulate_enemy_turn()
            if self.state.players[self.player_idx].game_over:
                terminated = True
                reward -= 100
            elif self.state.turn_type == 'GameOver':
                 # Caso raro donde el enemigo muere solo o empata
                 terminated = True

        return self._get_obs(), reward, terminated, truncated, info

    def action_masks(self):
        """Mascara de acciones válidas"""
        allowed_dict = risktools.getAllowedFaseActions(self.state)
        
        mask_type = [False] * 7
        type_map = {
            'Pasar': 0, 'Comprar_Soldados': 1, 'Place': 2, 
            'Attack': 3, 'Occupy': 4, 'Fortify': 5, 'Invertir': 6,
            'PrePlace': 2
        }
        
        valid_objects = []
        for key, actions in allowed_dict.items():
            if key in type_map and len(actions) > 0:
                mask_type[type_map[key]] = True
                valid_objects.extend(actions)
                
        if not any(mask_type): mask_type[0] = True

        mask_src = [False] * 42
        mask_dst = [False] * 42
        
        for act in valid_objects:
            if hasattr(act, 'from_territory') and act.from_territory:
                if act.from_territory in self.board.territory_to_id:
                    mask_src[self.board.territory_to_id[act.from_territory]] = True
            else:
                mask_src[0] = True 
                
            if hasattr(act, 'to_territory') and act.to_territory:
                 if act.to_territory in self.board.territory_to_id:
                    mask_dst[self.board.territory_to_id[act.to_territory]] = True
            else:
                mask_dst[0] = True 

        mask_amt = [True] * 10
        return np.concatenate([mask_type, mask_src, mask_dst, mask_amt])

    def _decode_action(self, type_idx, src_id, dst_id, amt_idx):
        allowed_dict = risktools.getAllowedFaseActions(self.state)
        target_type = None
        
        map_idx_str = {0:'Pasar', 1:'Comprar_Soldados', 2:'Place', 3:'Attack', 4:'Occupy', 5:'Fortify', 6:'Invertir'}
        target_type = map_idx_str.get(type_idx)
        
        if target_type == 'Place' and 'PrePlace' in allowed_dict: target_type = 'PrePlace'
            
        if target_type not in allowed_dict: return None
            
        candidates = allowed_dict[target_type]
        
        # Obtener nombres de territorios (si IDs son válidos)
        src_name = self.board.territories[src_id].name if src_id < self.n_territories else None
        dst_name = self.board.territories[dst_id].name if dst_id < self.n_territories else None
        
        best_match = None
        for act in candidates:
            match_src = True
            match_dst = True
            
            if hasattr(act, 'from_territory') and act.from_territory:
                if act.from_territory != src_name: match_src = False
            if hasattr(act, 'to_territory') and act.to_territory:
                if act.to_territory != dst_name: match_dst = False
                    
            if match_src and match_dst:
                best_match = act
                break
        
        if best_match:
            # Si es accion de comprar o mover tropas, inyectamos la cantidad
            if hasattr(best_match, 'armies'):
                 # amt_idx (0-9) -> cantidad. Simplificación: 1 unidad + índice
                 # En un caso real leeríamos límites del estado.
                 best_match.armies = max(1, amt_idx) 
            # Para comprar soldados
            if hasattr(best_match, 'amount'): # Si la clase usa 'amount'
                 best_match.amount = max(1, amt_idx)

        return best_match

    def _get_obs(self):
        obs = []
        for i in range(self.n_territories):
            owner = self.state.owners[i]
            armies = self.state.armies[i]
            obs.append(1.0 if owner == self.player_idx else 0.0)
            obs.append(1.0 if owner is not None and owner != self.player_idx else 0.0)
            obs.append(min(armies / 100.0, 1.0))
            
        me = self.state.players[self.player_idx]
        enemy = self.state.players[1 if self.player_idx == 0 else 0]
        
        obs.extend([
            min(me.economy / 200.0, 1.0),
            min(me.free_armies / 50.0, 1.0),
            min(enemy.economy / 200.0, 1.0),
            min(sum(self.state.armies) / 500.0, 1.0)
        ])
        
        phase_map = {'fase_0':0, 'fase_1':1, 'fase_2':2, 'fase_3':3}
        phase_vec = [0.0]*4
        phase_vec[phase_map.get(self.state.fase, 0)] = 1.0
        obs.extend(phase_vec)
        
        current_len = len(obs)
        if current_len < self.obs_dim:
            obs.extend([0.0] * (self.obs_dim - current_len))
            
        return np.array(obs[:self.obs_dim], dtype=np.float32)

    def _calculate_reward(self):
        territories = sum(1 for o in self.state.owners if o == self.player_idx)
        return territories * 0.01

    def _simulate_enemy_turn(self):
        steps = 0
        while self.state.current_player != self.player_idx and self.state.turn_type != 'GameOver' and steps < 50:
            actions_dict = risktools.getAllowedFaseActions(self.state)
            all_actions = list(itertools.chain.from_iterable(actions_dict.values()))
            if not all_actions: break
            
            # Elegir aleatorio
            action = random.choice(all_actions)
            
            # Ejecutar
            next_states, probs = risktools.simulateAction(self.state, action)
            if len(next_states) > 1:
                idx = np.random.choice(len(next_states), p=probs)
                self.state = next_states[idx]
            else:
                self.state = next_states[0]
            steps += 1

    def _fast_random_setup(self):
        ids = list(range(self.n_territories))
        random.shuffle(ids)
        for i, tid in enumerate(ids):
            owner = i % 2
            self.state.owners[tid] = owner
            self.state.armies[tid] = 3
        self.state.fase = 'fase_1'
        self.state.turn_type = 'Comprar_Soldados'

In [11]:
# --- SMOKE TEST ---
env = RiskTotalControlEnv()
obs, _ = env.reset()
print("Estado inicial shape:", obs.shape)

done = False
steps = 0

print("Probando bucle de juego aleatorio...")
while not done and steps < 20:
    # 1. Usamos la máscara para elegir una acción válida al azar
    masks = env.action_masks()
    # masks es un array concatenado [7, 42, 42, 10]
    # Para testear, simplemente probamos step con valores dummy hasta acertar (fuerza bruta)
    # O mejor, usamos ActionMasker de SB3 si está instalado, pero aquí lo hacemos manual:
    
    # Simplemente pedimos una acción válida aleatoria del motor para probar el step
    # Esto es trampa pero sirve para ver si el step crashea
    allowed = risktools.getAllowedFaseActions(env.state)
    
    # Intentamos generar una acción dummy que coincida con una válida
    # Para simplificar el test manual, usaremos una acción "Pasar" que casi siempre es válida o un ataque
    # Si falla, el entorno devuelve recompensa negativa, que es lo esperado.
    
    action = env.action_space.sample() # Acción aleatoria
    obs, reward, terminated, truncated, info = env.step(action)
    
    print(f"Step {steps}: Reward={reward}, Valid={info['valid']}")
    
    if terminated:
        print("Juego terminado.")
        done = True
    steps += 1

print("✅ Test finalizado sin crashes críticos.")

Estado inicial shape: (146,)
Probando bucle de juego aleatorio...
Step 0: Reward=0.21, Valid=True
Step 1: Reward=-1.0, Valid=False
Step 2: Reward=-1.0, Valid=False
Step 3: Reward=-1.0, Valid=False
Step 4: Reward=-1.0, Valid=False
Step 5: Reward=-1.0, Valid=False
Step 6: Reward=0.21, Valid=True
Step 7: Reward=-1.0, Valid=False
Step 8: Reward=-1.0, Valid=False
Step 9: Reward=-1.0, Valid=False
Step 10: Reward=-1.0, Valid=False
Step 11: Reward=-1.0, Valid=False
Step 12: Reward=-1.0, Valid=False
Step 13: Reward=-1.0, Valid=False
Step 14: Reward=-1.0, Valid=False
Step 15: Reward=-1.0, Valid=False
Step 16: Reward=-1.0, Valid=False
Step 17: Reward=-1.0, Valid=False
Step 18: Reward=0.21, Valid=True
Step 19: Reward=-1.0, Valid=False
✅ Test finalizado sin crashes críticos.
