In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import torch.nn.functional as F
from collections import deque
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import pandas as pd
import os
import math
from sklearn.metrics import davies_bouldin_score

from utils.Loader import NEUDataset
from utils.Perspectiver import Perspectiver

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class RL_CNN(nn.Module):
    def __init__(self):
        super(RL_CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 8, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(8, 8, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(8, 16, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(16, 16, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc = nn.Linear(32 * 144, 2)  # 3 poolings reducen 100 -> 50 -> 25 -> 12

    def forward(self, x):
        if x.ndim == 3:  # (batch_size, 100, 100)
            x = x.unsqueeze(1) 
        # x: [1, 100, 100]
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(x)                   # 100 -> 50
        # print(x.shape)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.pool(x)                   # 50 -> 25
        # print(x.shape)
        x = F.relu(self.conv5(x))
        x = F.relu(self.conv6(x))
        x = self.pool(x)                   # 25 -> 12
        # print(x.shape)
        x = x.view(x.size(0), -1)          # [1, 32*12*12]
        # print(x.shape)
        return self.fc(x)
    
    def save(self, path="rl_cnn.pth"):
        """Guarda los pesos del modelo en un archivo."""
        torch.save(self.state_dict(), path)
        print(f"Modelo guardado en {path}")
    
    def load(self, path="rl_cnn.pth"):
        """Carga los pesos del modelo desde un archivo."""
        self.load_state_dict(torch.load(path))
        self.eval()
        print(f"Modelo cargado desde {path}")
    

if __name__ == "__main__":
    ## test
    modelo = RL_CNN()
    entrada = torch.randn(1, 100, 100)
    salida = modelo(entrada)
    print("Salida final unbatched:", salida.shape)  # -> [1, 2]
    entrada = torch.randn(32, 1, 100, 100)
    salida = modelo(entrada)
    print("Salida final batched:", salida.shape)  # -> [1, 2]

Salida final unbatched: torch.Size([1, 2])
Salida final batched: torch.Size([32, 2])


In [3]:
class ReplayMemory:
    def __init__(self, capacity=10000):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        """ Save a transition (state, action, reward, next_state, done) """
        self.memory.append(transition)

    def sample(self, batch_size):
        """ Sample a batch of experiences """
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [4]:
class RL_Agent:
    def __init__(self, gamma=0.99, lr=1e-3, epsilon=1.0, epsilon_decay=0.995, min_epsilon=0.01):
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon

        # Neural Network
        self.model = RL_CNN()
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()

        # Experience Replay Memory
        self.memory = deque(maxlen=10000)

    def select_action(self, state):
        """ Predicts two floating numbers instead of discrete action selection """
        with torch.no_grad():
            output = self.model(state)
        return output  # Output: Two continuous numbers

    def store_experience(self, experience):
        """ Save an experience tuple (state, output, reward, next_state, done) """
        self.memory.append(experience)

    def train_step(self, batch_size=32):
        """ Train using a batch from experience replay """
        if len(self.memory) < batch_size:
            return


        batch = random.sample(self.memory, batch_size)
        states, outputs, rewards, next_states, dones = zip(*batch)
        #print(states[0].shape)
        states = torch.stack(states)
        next_states = torch.stack(next_states)
        outputs = torch.stack(outputs).squeeze(1)
        rewards = torch.tensor(rewards).float().unsqueeze(1)
        dones = torch.tensor(dones).float().unsqueeze(1)

        # Compute target using Bellman equation
        next_outputs = self.model(next_states)
        
        target_values = (outputs + (1 - dones) * self.gamma * next_outputs)

        # Compute loss
        loss = self.loss_fn(outputs, target_values)

        # Optimize model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Decay epsilon
        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)


In [5]:

# ------------------------------
# 3. Define Environment for NEU Dataset with New Reward Function
# ------------------------------
class NEUEnvironment:
    def __init__(self, dataset):
        self.dataset = dataset
        self.index = 0  # Track current index

    def reset(self):
        """ Reset environment to initial state """
        self.index = 1
        return self.dataset[self.index][0]  # Return first image

    def step(self, output, n_jobs = 1):
        """ Simulate environment response to action (2 float values) """
        image, label = self.dataset[self.index]
        reward = self.reward_function(output, image)
        self.index = (self.index + 1)%len(self.dataset)  # Move to next sample
        next_state = self.dataset[self.index][0]  # Get next image
        #print(f"The next input is: {next_state.shape}")
        done = self.index == 0  # End of epoch
        return next_state, reward, done
    
    def calculate_reward(self, image, sp, sr):
        image = Perspectiver.grayscale_to_rgb(Perspectiver.normalize_to_uint8(image.detach().cpu().numpy()[0]))
        after = Perspectiver.meanShift(image, sp, sr)
        original_gray = Perspectiver.rgb_to_grayscale(image).flatten()
        clustered_gray = Perspectiver.rgb_to_grayscale(after).flatten()
        score = davies_bouldin_score(original_gray.reshape(-1, 1), clustered_gray)

        n_clusters = len(np.unique(after))

        # Avoid division by zero (in case of degenerate clustering)
        if n_clusters == 0:
            return -10000

        # Metric to maximize: Silhouette Score per cluster
        return math.log2(score)*n_clusters


    def reward_function(self, output, image):
        """ Reward function based or the distance between the predicted values and correct values """
        output = output.detach().cpu().numpy()
        sp = output[0][0]
        sr = output[0][1]
        if (sp <= 0) : return 2000 * sp
        if (sr <= 0) : return 2000 * sr
        if sp > sr : return -5000
        return self.calculate_reward(image, sp, sr)

In [6]:
def train_rl_model(num_episodes=1000, batch_size=32):
    dataset = NEUDataset(set="train", scale=0.5)
    env = NEUEnvironment(dataset)
    agent = RL_Agent(gamma=0.99, lr=1e-2, epsilon=1.0, epsilon_decay=0.995, min_epsilon=0.05)

    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            output = agent.select_action(state)
            next_state, reward, done = env.step(output)

            agent.store_experience((state, output, reward, next_state, done))
            #print(f"state: {state.shape}")
            state = next_state
            total_reward += reward

            agent.train_step(batch_size)

        print(f"Episode {episode + 1}/{num_episodes}, Total Reward: {total_reward:.3f}, Epsilon: {agent.epsilon:.3f}")

    agent.model.save()


In [None]:
train_rl_model(num_episodes=10, batch_size=32)