In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import os

class SingleStepImageEnv(gym.Env):
    """
    Entorno con una sola decisión:
      - Observación = 2 imágenes apiladas en un solo tensor
      - Acciones: 2 (0 -> escoger imagen A, 1 -> escoger imagen B)
      - Recompensas: 
          * Si action=0, reward=10 con prob 0.75
          * Si action=1, reward=10 con prob 0.25
      - Episodio termina en un solo paso.
    """
    def __init__(self, imageA, imageB, rewardA=10.0, rewardB=10.0, probA=0.75, probB=0.25):
        super().__init__()
        # Se asume imageA e imageB son np.array con shape (C, H, W)
        assert imageA.shape == imageB.shape, "Ambas imágenes deben tener la misma forma"
        self.imageA = imageA
        self.imageB = imageB
        self.rewardA = rewardA
        self.rewardB = rewardB
        self.probA = probA
        self.probB = probB

        # Creamos la observación apilada
        # Apilamos en el eje de canales: obs_shape = (2*C, H, W)
        self.observation = np.concatenate([imageA, imageB], axis=0)
        obs_shape = self.observation.shape  # (2*C, H, W)

        # Espacios de Gym
        self.action_space = spaces.Discrete(2)  # 2 acciones: A=0, B=1
        self.observation_space = spaces.Box(low=0.0, high=1.0, shape=obs_shape, dtype=np.float32)

        self.done = False

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.done = False
        info = {}
        return self.observation, info

    def step(self, action):
        if self.done:
            raise RuntimeError("Episodio ya terminado. Llama reset().")

        if action == 0:  # Acción A
            reward = self.rewardA if np.random.rand() < self.probA else 0.0
        else:            # Acción B
            reward = self.rewardB if np.random.rand() < self.probB else 0.0

        self.done = True
        return self.observation, reward, self.done, False, {}


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNActorCritic(nn.Module):
    """
    CNN que produce logits (para 2 acciones) y un valor (escalar).
    Supongamos la entrada es shape (batch_size, 2*C, H, W).
    """
    def __init__(self, in_channels=2, num_actions=2):
        super().__init__()

        # in_channels=2 si son 2 imágenes en gris
        # Si fuese RGB, in_channels=6, etc.

        self.conv1 = nn.Conv2d(in_channels, 16, kernel_size=3, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=2)

        # Calcula tamaño tras convoluciones: depende de H, W
        self.fc1   = nn.Linear(32*6*6, 128)  # <-- Ajustar si la imagen final es 7x7, depende del input
        self.actor = nn.Linear(128, num_actions)
        self.critic= nn.Linear(128, 1)

    def forward(self, x):
        # x shape: (batch_size, in_channels, H, W)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        # Flatten
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))

        logits = self.actor(x)
        value  = self.critic(x)
        return logits, value


In [3]:
import torch
from torch.distributions import Categorical
import torch.optim as optim
import numpy as np

def compute_returns_and_advantages(rewards, values, gamma=1.0):
    returns = []
    advantages = []
    for r, v in zip(rewards, values):
        G = r
        A = G - v
        returns.append(G)
        advantages.append(A)
    return returns, advantages

def ppo_train(env, policy_net, optimizer, epochs=10, episodes_per_epoch=100, gamma=1.0, epsilon=0.2, entropy_coef=0.01):
    policy_net.train()

    for epoch in range(epochs):
        states, actions, rewards, log_probs_old, values_old = [], [], [], [], []

        for _ in range(episodes_per_epoch):
            obs, _ = env.reset()  # obs shape: (2*C, H, W)
            obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)  # (1, 2*C, H, W)

            logits, value = policy_net(obs_tensor)
            dist = Categorical(logits=logits)
            action = dist.sample()
            log_prob = dist.log_prob(action)

            next_obs, reward, done, truncated, info = env.step(action.item())

            states.append(obs)  # guardamos la imagen apilada
            actions.append(action.item())
            rewards.append(reward)
            log_probs_old.append(log_prob.item())
            values_old.append(value.item())

        # Calcular returns y ventajas
        returns, advantages = compute_returns_and_advantages(rewards, values_old, gamma=gamma)

        # Convertir a tensores
        states_tensor       = torch.tensor(states, dtype=torch.float32)
        # (batch_size, 2*C, H, W)
        actions_tensor      = torch.tensor(actions, dtype=torch.long)
        old_log_probs_tensor = torch.tensor(log_probs_old, dtype=torch.float32)
        returns_tensor      = torch.tensor(returns, dtype=torch.float32)
        advantages_tensor   = torch.tensor(advantages, dtype=torch.float32)

        # Forward batch
        # States_tensor shape: (batch_size, 2*C, H, W)
        logits, values = policy_net(states_tensor)
        dist = Categorical(logits=logits)
        log_probs = dist.log_prob(actions_tensor)

        ratio = torch.exp(log_probs - old_log_probs_tensor)
        ratio_clipped = torch.clamp(ratio, 1.0 - epsilon, 1.0 + epsilon)

        policy_loss_1 = ratio * advantages_tensor
        policy_loss_2 = ratio_clipped * advantages_tensor
        policy_loss   = -torch.mean(torch.min(policy_loss_1, policy_loss_2))

        value_loss = torch.mean((values.squeeze() - returns_tensor)**2)
        entropy = dist.entropy().mean()

        loss = policy_loss + 0.5 * value_loss - entropy_coef * entropy

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        avg_reward = np.mean(rewards)
        print(f"Epoch {epoch+1}/{epochs}: Loss={loss.item():.3f} PolicyLoss={policy_loss.item():.3f} ValueLoss={value_loss.item():.3f} Entropy={entropy.item():.3f} AvgReward={avg_reward:.2f}")

def training_executor(image_a=None, image_b=None, reward_a=10.0, reward_b=10.0, prob_a=0.75, prob_b=0.25, checkpoint='ppo_checkpoint.pth', model_save_path='ppo_checkpoint.pth'):
    imageA = image_a # np.random.rand(1, 28, 28).astype(np.float32)  
    imageB = image_b # np.random.rand(1, 28, 28).astype(np.float32)

    env = SingleStepImageEnv(imageA, imageB, rewardA=reward_a, rewardB=reward_b, probA=prob_a, probB=prob_b)

    # in_channels=2 si es gris; =6 si es RGB (2 imágenes x 3 canales cada una)
    policy_net = CNNActorCritic(in_channels=2, num_actions=2)

    optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)


    if checkpoint is not None and os.path.exists(checkpoint):
        print(f"Cargando modelo desde {checkpoint}")
        model = torch.load(checkpoint)
        policy_net.load_state_dict(model['model_state_dict'])
        optimizer.load_state_dict(model['optimizer_state_dict'])
        print(f"Modelo y optimizador cargados correctamente.")
    else:
        print("Entrenando desde cero.")

    ppo_train(env, policy_net, optimizer,
              epochs=10, episodes_per_epoch=100,
              gamma=1.0, epsilon=0.2, entropy_coef=0.01)
    
    # Guardar modelo
    torch.save({
        "model_state_dict": policy_net.state_dict(),
        "optimizer_state_dict": optimizer.state_dict()
    }, model_save_path)

    print("Modelo guardado en", checkpoint)

    # Evaluación
    policy_net.eval()
    test_rewards = []
    for _ in range(1000):
        obs, _ = env.reset()
        obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            logits, value = policy_net(obs_tensor)
            dist = Categorical(logits=logits)
            action = dist.sample().item()
        _, reward, done, truncated, info = env.step(action)
        test_rewards.append(reward)

    print(f"\nRecompensa promedio en 1000 episodios de test: {np.mean(test_rewards):.2f}")


In [4]:
# Preprocesamiento de imágenes
import cv2

def preprocess_image(image):
    # load image
    image = cv2.imread(image)
    # convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    # resize to 28x28
    resized = cv2.resize(gray, (28, 28), interpolation=cv2.INTER_AREA)
    # normalize
    normalized = resized / 255.0

    # convert to tensor
    tensor = torch.tensor(normalized, dtype=torch.float32).unsqueeze(0)
    #tensor = np.asarray(tensor)
    return tensor

In [5]:
image_a = preprocess_image("/Users/mtazc/Documents/AI_projects/test_ppo_rl/apple.jpg")
image_b = preprocess_image("/Users/mtazc/Documents/AI_projects/test_ppo_rl/cabbage.jpg")
image_c = preprocess_image("/Users/mtazc/Documents/AI_projects/test_ppo_rl/rock.jpg")
image_d = preprocess_image("/Users/mtazc/Documents/AI_projects/test_ppo_rl/stick.jpg")

In [11]:
training_executor(image_a, image_b)

Cargando modelo desde ppo_checkpoint.pth
Modelo y optimizador cargados correctamente.
Epoch 1/10: Loss=21.615 PolicyLoss=3.441 ValueLoss=36.348 Entropy=0.002 AvgReward=5.70
Epoch 2/10: Loss=12.411 PolicyLoss=1.287 ValueLoss=22.248 Entropy=0.002 AvgReward=7.10
Epoch 3/10: Loss=7.714 PolicyLoss=-0.346 ValueLoss=16.119 Entropy=0.003 AvgReward=8.00
Epoch 4/10: Loss=6.891 PolicyLoss=-1.146 ValueLoss=16.074 Entropy=0.004 AvgReward=8.20
Epoch 5/10: Loss=7.837 PolicyLoss=-1.290 ValueLoss=18.253 Entropy=0.005 AvgReward=7.90
Epoch 6/10: Loss=9.125 PolicyLoss=-1.100 ValueLoss=20.450 Entropy=0.005 AvgReward=7.40
Epoch 7/10: Loss=8.752 PolicyLoss=-1.514 ValueLoss=20.531 Entropy=0.005 AvgReward=7.60
Epoch 8/10: Loss=10.198 PolicyLoss=-0.925 ValueLoss=22.245 Entropy=0.005 AvgReward=6.90
Epoch 9/10: Loss=9.042 PolicyLoss=-1.577 ValueLoss=21.238 Entropy=0.005 AvgReward=7.50
Epoch 10/10: Loss=9.024 PolicyLoss=-1.545 ValueLoss=21.138 Entropy=0.005 AvgReward=7.50
Modelo guardado en ppo_checkpoint.pth

Rec

In [12]:
training_executor(image_c, image_d, reward_a=1.0, reward_b=1.0, prob_a=0.75, prob_b=0.25, checkpoint='ppo_checkpoint.pth', model_save_path='c_d_ppo_checkpoint.pth')

Cargando modelo desde ppo_checkpoint.pth
Modelo y optimizador cargados correctamente.
Epoch 1/10: Loss=28.199 PolicyLoss=6.566 ValueLoss=43.265 Entropy=0.001 AvgReward=0.81
Epoch 2/10: Loss=26.938 PolicyLoss=6.392 ValueLoss=41.091 Entropy=0.001 AvgReward=0.63
Epoch 3/10: Loss=21.515 PolicyLoss=5.622 ValueLoss=31.786 Entropy=0.002 AvgReward=0.77
Epoch 4/10: Loss=17.271 PolicyLoss=4.944 ValueLoss=24.655 Entropy=0.003 AvgReward=0.69
Epoch 5/10: Loss=12.337 PolicyLoss=4.051 ValueLoss=16.574 Entropy=0.006 AvgReward=0.79
Epoch 6/10: Loss=8.735 PolicyLoss=3.279 ValueLoss=10.912 Entropy=0.012 AvgReward=0.80
Epoch 7/10: Loss=6.475 PolicyLoss=2.706 ValueLoss=7.538 Entropy=0.022 AvgReward=0.68
Epoch 8/10: Loss=4.139 PolicyLoss=2.016 ValueLoss=4.246 Entropy=0.038 AvgReward=0.76
Epoch 9/10: Loss=2.759 PolicyLoss=1.515 ValueLoss=2.489 Entropy=0.061 AvgReward=0.74
Epoch 10/10: Loss=1.711 PolicyLoss=1.060 ValueLoss=1.305 Entropy=0.092 AvgReward=0.76
Modelo guardado en ppo_checkpoint.pth

Recompensa pr

In [13]:
training_executor(image_b, image_c, reward_a=10.0, reward_b=1.0, prob_a=0.25, prob_b=0.75, checkpoint='c_d_ppo_checkpoint.pth', model_save_path='b_c_ppo_checkpoint.pth')

Cargando modelo desde c_d_ppo_checkpoint.pth
Modelo y optimizador cargados correctamente.
Epoch 1/10: Loss=6.972 PolicyLoss=-0.509 ValueLoss=14.967 Entropy=0.178 AvgReward=1.82
Epoch 2/10: Loss=9.476 PolicyLoss=-1.685 ValueLoss=22.325 Entropy=0.222 AvgReward=2.75
Epoch 3/10: Loss=7.055 PolicyLoss=-1.120 ValueLoss=16.354 Entropy=0.264 AvgReward=2.00
Epoch 4/10: Loss=5.499 PolicyLoss=-0.725 ValueLoss=12.454 Entropy=0.302 AvgReward=1.46
Epoch 5/10: Loss=7.840 PolicyLoss=-1.541 ValueLoss=18.769 Entropy=0.336 AvgReward=2.16
Epoch 6/10: Loss=9.955 PolicyLoss=-2.251 ValueLoss=24.420 Entropy=0.364 AvgReward=2.78
Epoch 7/10: Loss=6.474 PolicyLoss=-1.349 ValueLoss=15.654 Entropy=0.386 AvgReward=1.81
Epoch 8/10: Loss=9.790 PolicyLoss=-2.232 ValueLoss=24.052 Entropy=0.404 AvgReward=2.64
Epoch 9/10: Loss=9.060 PolicyLoss=-2.131 ValueLoss=22.389 Entropy=0.417 AvgReward=2.50
Epoch 10/10: Loss=5.782 PolicyLoss=-1.278 ValueLoss=14.128 Entropy=0.425 AvgReward=1.62
Modelo guardado en c_d_ppo_checkpoint.p

In [14]:
def evaluate_model(image1, image2, reward1, reward2, prob1, prob2, checkpoint):

    env = SingleStepImageEnv(image1, image2, rewardA=reward1, rewardB=reward2, probA=prob1, probB=prob2)

    # load model
    policy_net = CNNActorCritic(in_channels=2, num_actions=2)

    if checkpoint is not None and os.path.exists(checkpoint):
        print(f"Cargando modelo desde {checkpoint}")
        checkpoint = torch.load(checkpoint)
        policy_net.load_state_dict(checkpoint['model_state_dict'])
        print(f"Modelo y optimizador cargados correctamente.")
    else:
        print("Checkpoint no encontrado, la evaluación no se puede realizar.")
        return -1 
    policy_net.eval()
    test_rewards = []
    for _ in range(1000):
        obs, _ = env.reset()
        obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            logits, value = policy_net(obs_tensor)
            dist = Categorical(logits=logits)
            action = dist.sample().item()
        _, reward, done, truncated, info = env.step(action)
        test_rewards.append(reward)

    print(f"\nRecompensa promedio en 1000 episodios de test: {np.mean(test_rewards):.2f}")

In [15]:
evaluate_model(image_b, image_c, 10.0, 1.0, 0.25, 0.75, 'b_c_ppo_checkpoint.pth')

Cargando modelo desde b_c_ppo_checkpoint.pth
Modelo y optimizador cargados correctamente.

Recompensa promedio en 1000 episodios de test: 2.16
