### 1. Lógica de NN para el 2 armed bandit

In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

# ======= 1. ENTORNO: HierarchicalBandit =======
class HierarchicalBandit:
    def __init__(self, episode_length, swap1=None, swap2=None):
        """
        Estructura por fases dentro de un mismo episodio:
        - Fase 1 (t < swap1): puertas A vs B
        - Fase 2 (swap1 <= t < swap2): puertas C vs D
        - Fase 3 (t >= swap2): si mode='best', elegir entre la mejor de {A,B} vs la mejor de {C,D};
                             si mode='worst', entre la peor de {A,B} vs la peor de {C,D}.
        El agente debe inferir en cada fase cuál es la puerta óptima.
        """
        mapping = {
            'A': (0.75, 10),
            'B': (0.25, 10),
            'C': (0.75,  3),
            'D': (0.25,  3),
        }
        # Guardamos las probabilidades y recompensas para cada puerta
        self.mapping = mapping

        # Determinar puntos de swap
        self.episode_length = episode_length
        self.swap1 = swap1 if swap1 is not None else np.random.randint(1, episode_length - 2)
        self.swap2 = swap2 if swap2 is not None else np.random.randint(self.swap1 + 1, episode_length - 1)

        # Modo: 'best' o 'worst'
        self.mode = 'best' if np.random.rand() < 0.5 else 'worst'

        # Elegir valores de A/B en Fase1 y C/D en Fase2
        self.aby_pair = ('A', 'B')
        self.cdy_pair = ('C', 'D')

        # Calcular cuál es la mejor/peor puerta en AB, y en CD
        # “Mejor” = mayor p; “Peor” = menor p
        a_p, a_r = mapping['A']
        b_p, b_r = mapping['B']
        if a_p > b_p:
            self.best_ab, self.worst_ab = 'A', 'B'
        else:
            self.best_ab, self.worst_ab = 'B', 'A'

        c_p, c_r = mapping['C']
        d_p, d_r = mapping['D']
        if c_p > d_p:
            self.best_cd, self.worst_cd = 'C', 'D'
        else:
            self.best_cd, self.worst_cd = 'D', 'C'

        # Estado temporal
        self.step_count = 0

    def pull(self, action):
        t = self.step_count

        # Fase 1: A vs B
        if t < self.swap1:
            label = self.aby_pair[action]      # action=0 -> 'A',  action=1 -> 'B'
        # Fase 2: C vs D
        elif t < self.swap2:
            label = self.cdy_pair[action]      # action=0 -> 'C',  action=1 -> 'D'
        # Fase 3: best vs best  (o worst vs worst)
        else:
            if self.mode == 'best':
                # action=0 -> puerta best_ab, action=1 -> puerta best_cd
                label = self.best_ab if action == 0 else self.best_cd
            else:
                # action=0 -> puerta worst_ab, action=1 -> puerta worst_cd
                label = self.worst_ab if action == 0 else self.worst_cd

        # Obtenemos probabilidad y magnitud para “label”
        p, r = self.mapping[label]
        reward = r if np.random.rand() < p else -r

        self.step_count += 1
        return reward

    def current_info(self):
        """Información para debug/evaluación: fases y parejas ocultas."""
        return {
            'swap1': self.swap1,
            'swap2': self.swap2,
            'mode': self.mode,
            'best_ab': self.best_ab,
            'best_cd': self.best_cd,
            'worst_ab': self.worst_ab,
            'worst_cd': self.worst_cd
        }

### 2. Entrenamiento con PPO

In [21]:
# ======= 2. MODELO: Agente PPO con LSTM =======
class PPOAgent(nn.Module):
    def __init__(self, input_size=4, hidden_size=32, num_actions=2):
        super(PPOAgent, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTMCell(input_size, hidden_size)
        self.policy_head = nn.Linear(hidden_size, num_actions)
        self.value_head  = nn.Linear(hidden_size, 1)

    def reset_state(self):
        self.hx = torch.zeros(1, self.hidden_size)
        self.cx = torch.zeros(1, self.hidden_size)

    def forward(self, x):
        self.hx, self.cx = self.lstm(x, (self.hx, self.cx))
        logits = self.policy_head(self.hx)
        value  = self.value_head(self.hx)
        return logits, value


# ======= 3. Crear la entrada del agente =======
def get_input(last_action, last_reward, timestep, num_actions=2):
    a = F.one_hot(torch.tensor([last_action]), num_classes=num_actions).float()
    r = torch.tensor([[last_reward]], dtype=torch.float32)
    t = torch.tensor([[timestep / 10.0]], dtype=torch.float32)
    return torch.cat([a, r, t], dim=1)


# ======= 4. HIPERPARÁMETROS DE PPO =======
gamma        = 0.99
clip_epsilon = 0.2
ppo_epochs   = 4
lr           = 0.001

agent     = PPOAgent()
optimizer = optim.Adam(agent.parameters(), lr=lr)

num_episodes   = 2000
episode_length = 20


# ======= 5. ENTRENAMIENTO CON PPO =======
for episode in range(num_episodes):
    env = HierarchicalBandit(episode_length=episode_length)
    agent.reset_state()

    states, actions, rewards, logps, values = [], [], [], [], []
    last_action, last_reward = 0, 0.0

    for t in range(episode_length):
        x      = get_input(last_action, last_reward, t)
        logits, v = agent(x)
        probs  = F.softmax(logits, dim=1)
        dist   = torch.distributions.Categorical(probs)
        action = dist.sample()
        logp   = dist.log_prob(action)

        states.append(x)
        actions.append(action)
        logps.append(logp)
        values.append(v)

        reward = env.pull(action.item())
        reward = reward / 10
        rewards.append(reward)

        last_action, last_reward = action.item(), reward

    # Calcular returns y ventajas
    returns, G = [], 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns    = torch.tensor(returns, dtype=torch.float32).unsqueeze(1)
    values     = torch.cat(values)
    advantages = returns - values.detach()
    old_logps  = torch.cat(logps).detach()

    # Actualización PPO
    for _ in range(ppo_epochs):
        new_logps, new_vals, entropies = [], [], []
        agent.reset_state()
        for i, x in enumerate(states):
            logits, v   = agent(x)
            probs       = F.softmax(logits, dim=1)
            dist        = torch.distributions.Categorical(probs)
            new_logps.append(dist.log_prob(actions[i]))
            new_vals.append(v)
            entropies.append(dist.entropy())
        new_logps    = torch.cat(new_logps)
        new_vals     = torch.cat(new_vals)
        entropy_term = torch.cat(entropies).mean()

        ratio       = torch.exp(new_logps - old_logps)
        s1          = ratio * advantages
        s2          = torch.clamp(ratio, 1-clip_epsilon, 1+clip_epsilon) * advantages
        policy_loss = -torch.min(s1, s2).mean()
        value_loss  = F.mse_loss(new_vals, returns)
        loss        = policy_loss + 0.5*value_loss - 0.01*entropy_term

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (episode + 1) % 200 == 0:
        print(f"Episode {episode+1}, Total Reward: {sum(rewards)}, Loss: {loss.item():.2f}")

print("Entrenamiento completado.")

Episode 200, Total Reward: 6.6, Loss: 3.71
Episode 400, Total Reward: 3.3, Loss: 4.46
Episode 600, Total Reward: 3.9, Loss: 0.66
Episode 800, Total Reward: 6.6, Loss: 0.11
Episode 1000, Total Reward: 4.6, Loss: 3.95
Episode 1200, Total Reward: 5.8, Loss: 0.44
Episode 1400, Total Reward: -4.8, Loss: 9.13
Episode 1600, Total Reward: 4.0, Loss: 1.01
Episode 1800, Total Reward: 6.6, Loss: 2.90
Episode 2000, Total Reward: 11.3, Loss: 3.36
Entrenamiento completado.


In [9]:
model_path = "ppo_lstm_hierarchical.pth"
torch.save(agent.state_dict(), model_path)
print(f" Modelo guardado en: {model_path}")

 Modelo guardado en: ppo_lstm_hierarchical.pth


# 3. Ejemplo de funcionamiento con un agente en modo evaluación

In [22]:
# ======= 6. EVALUACIÓN =======
agent.eval()
env = HierarchicalBandit(episode_length=episode_length)
agent.reset_state()

info = env.current_info()
print(f"swap1={info['swap1']}, swap2={info['swap2']}, mode={info['mode']}")
print(f"best_ab={info['best_ab']}, best_cd={info['best_cd']}, worst_ab={info['worst_ab']}, worst_cd={info['worst_cd']}\n")

last_action, last_reward = 0, 0
total_reward = 0

for t in range(episode_length):
    with torch.no_grad():
        x      = get_input(last_action, last_reward, t)
        logits, _ = agent(x)
        probs  = F.softmax(logits, dim=1)[0].tolist()
        action = torch.multinomial(torch.tensor(probs), 1).item()

    reward = env.pull(action)
    total_reward += reward
    print(f"Paso {t:2d} | probs_política={probs} | Acción={action} | Recompensa={reward}")
    last_action, last_reward = action, reward

print(f"\nRecompensa total: {total_reward}")

swap1=16, swap2=18, mode=best
best_ab=A, best_cd=C, worst_ab=B, worst_cd=D

Paso  0 | probs_política=[0.9988411068916321, 0.0011589151108637452] | Acción=0 | Recompensa=-10
Paso  1 | probs_política=[0.925616443157196, 0.07438356429338455] | Acción=0 | Recompensa=10
Paso  2 | probs_política=[0.9999978542327881, 2.1924056454736274e-06] | Acción=0 | Recompensa=10
Paso  3 | probs_política=[0.9999991655349731, 7.87527199008764e-07] | Acción=0 | Recompensa=10
Paso  4 | probs_política=[0.9999992847442627, 7.002342385931115e-07] | Acción=0 | Recompensa=10
Paso  5 | probs_política=[0.9999992847442627, 6.77510058721964e-07] | Acción=0 | Recompensa=10
Paso  6 | probs_política=[0.9999992847442627, 7.041181788736139e-07] | Acción=0 | Recompensa=-10
Paso  7 | probs_política=[0.9993491768836975, 0.00065076001919806] | Acción=0 | Recompensa=-10
Paso  8 | probs_política=[0.9888448119163513, 0.011155142448842525] | Acción=0 | Recompensa=10
Paso  9 | probs_política=[0.999997615814209, 2.4349662908207392e