### Metodologia para ajustar uma MLP com PSO

In [1]:
import os
import pandas as pd
import pyswarms as ps
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, Dataset, DataLoader, random_split

In [2]:
# Configurações do Usuário
seed = 42

n_particles = 100
iters = 1000
options={
    'c1': 1.2,
    'c2': 1.5,
    'w': 0.8
}

MIN_LAYERS = 2
MAX_LAYERS = 3#12

H_MIN, H_MAX = 8, 256

MIN_BATCH_SIZE = 3
MAX_BATCH_SIZE = 16

MIN_EPOCHS = 100
MAX_EPOCHS = 1000

In [3]:
# 1) Dispositivo (GPU se disponível)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cuda


In [4]:
# 2) Dataset
class MeuDataset(Dataset):
    def __init__(self):
        self.nix = pd.read_excel(r"D:\Mestrado\Trabalho Final\Dados\Levantamento em Campo\Compiled.xlsx", sheet_name="Nix")
        self.pXRF = pd.read_excel(r"D:\Mestrado\Trabalho Final\Dados\Levantamento em Campo\Compiled.xlsx", sheet_name="pXRF")

        self.nix_bands = [
            "R400 nm", "R410 nm", "R420 nm", "R430 nm",
            "R440 nm", "R450 nm", "R460 nm", "R470 nm",
            "R480 nm", "R490 nm", "R500 nm", "R510 nm",
            "R520 nm", "R530 nm", "R540 nm", "R550 nm",
            "R560 nm", "R570 nm", "R580 nm", "R590 nm",
            "R600 nm", "R610 nm", "R620 nm", "R630 nm",
            "R640 nm", "R650 nm", "R660 nm", "R670 nm",
            "R680 nm", "R690 nm", "R700 nm",
        ]

    def __len__(self):
        return len(self.nix)

    def __getitem__(self, idx):
        x = self.nix.loc[idx][self.nix_bands].values
        x = torch.from_numpy(x.astype(np.float32))
        x = x.to(device=device)

        y = self.pXRF.loc[idx]['Fe']
        if not isinstance(idx, int):
            y = y.values

        y = torch.from_numpy(np.array([y.astype(np.float32)]))
        y = y.to(device=device)

        return x, y
    
dataset = MeuDataset()
dataset[1:3]
dataset[1]

(tensor([  425425.5312,  1593439.6250,  2454990.7500,  2904233.0000,
          3200625.2500,  3386087.0000,  3502249.7500,  3584784.7500,
          3711732.0000,  3955087.7500,  4242401.0000,  4539171.5000,
          4945589.0000,  5439565.5000,  6010108.5000,  6861901.5000,
          8223011.5000,  9917463.0000, 11664266.0000, 13935964.0000,
         16049387.0000, 17868272.0000, 19369568.0000, 20319352.0000,
         21157144.0000, 22240874.0000, 23280512.0000, 24311956.0000,
         25211888.0000, 26024682.0000, 26692372.0000], device='cuda:0'),
 tensor([10.9538], device='cuda:0'))

In [5]:
# 3) Modelo (MLP simples)
class MLP(nn.Module):
    def __init__(self, in_features=4, hidden=[16], out_features=1):
        super().__init__()

        hiddens = []
        last_hidden = in_features
        
        for n_hidden in hidden:
            hiddens.append(nn.Linear(last_hidden, n_hidden))
            hiddens.append(nn.ReLU())
            last_hidden = n_hidden
        
        hiddens.append(nn.Linear(last_hidden, out_features))

        self.net = nn.Sequential(*hiddens)

    def forward(self, x):
        return self.net(x)

# mlp = MLP(5, [10, 20], 1)
# mlp

In [6]:
# 4) DataLoaders
from torch.utils.data import random_split, DataLoader

n = len(dataset)
n_train = int(0.7*n)
n_val = int(0.15*n)
n_test = n - n_train - n_val
g = torch.Generator().manual_seed(seed)

train_ds, val_ds, test_ds = random_split(dataset, [n_train, n_val, n_test], generator=g)

train_ds, val_ds, test_ds

(<torch.utils.data.dataset.Subset at 0x1bc9f2b8f20>,
 <torch.utils.data.dataset.Subset at 0x1bc9f1f25a0>,
 <torch.utils.data.dataset.Subset at 0x1bc9f3c6d80>)

In [7]:
# 5) Loop de treino + validação (early stopping)
def run_mlp(hidden_layers:list[int], epochs:int, batch_size:int, lr:float):
    model = MLP(len(dataset.nix_bands), hidden_layers, 1).to(device=device)

    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds, batch_size=batch_size)
    test_loader  = DataLoader(test_ds, batch_size=batch_size)

    best_val_loss = torch.inf
    best_state = None
    
    for epoch in range(1, epochs+1):
        # --- treino ---
        model.train()
        running_loss = 0.0
        for x, y in train_loader:

            optimizer.zero_grad()
            y_sim = model(x)

            loss = criterion(y_sim, y)
            running_loss += loss.item() * x.size(0)

            loss.backward()
            optimizer.step()

        train_loss = running_loss / len(train_loader.dataset)

        # --- validação ---
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for x, y in val_loader:
                # Forward
                y_sim = model(x)

                loss = criterion(y_sim, y)
                val_loss += loss.item() * x.size(0)

        # Média do loss
        val_loss /= len(val_loader.dataset)

        print(f"Epoch {epoch}/{epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}", end="\r", flush=True)

        # early stopping simples
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_state = {
                "lr":lr,
                "epoch": epoch,
                "epochs":epochs,
                "val_loss":val_loss,
                "train_loss":train_loss,
                "batch_size":batch_size,
                "hidden_layers":hidden_layers,
                "model_state":model.state_dict(),
            }
    
    model.load_state_dict(best_state['model_state'])
    model.eval()
    
    test_loss = 0.0
    with torch.no_grad():
        for x, y in test_loader:
            y_sim = model(x)

            loss = criterion(y_sim, y)
            test_loss += loss.item() * x.size(0)

    test_loss /= len(test_loader.dataset)

    return best_state, test_loss

# a = run_mlp([8, 16, 32], 50, 3, 1E-3)
# a

In [8]:
# 6) Decoder da Partícula
# P[0] -> Epochs
# P[1] -> Batch Size
# P[2] -> log(lr)
# P[3:MAX_LAYERS] -> Quantidade de neurônios em cada camada

def decode_particle(p:np.ndarray):
    epochs = int(round(p[0]))
    epochs = int(np.clip(epochs, MIN_EPOCHS, MAX_EPOCHS))

    batch_size = int(round(p[1]))
    batch_size = int(np.clip(batch_size, MIN_BATCH_SIZE, MAX_BATCH_SIZE))
    
    lr = 10.0 ** float(p[2])

    layers = []
    for layer in p[3:len(p)]:
        n_hidden = int(round(layer))
        if(n_hidden == 0):
            continue

        layers.append(n_hidden)
    
    while len(layers) < MIN_LAYERS:
        layers.append(1)
    
    return epochs, batch_size, lr, layers

decode_particle([99, 17, -3, 1, 7, 10, 15])


(100, 16, 0.001, [1, 7, 10, 15])

In [9]:
# 7) Função Objetivo

os.makedirs("save_state/", exist_ok=True)
def objective(P):
    losses = []
    best_loss = np.inf
    best_state = None

    for part in P:
        epochs, batch_size, lr, hiddens  = decode_particle(part)

        state, loss = run_mlp(hiddens, epochs, batch_size, lr)

        if loss < best_loss:
            best_loss = loss

            best_state = {
                "particle": part,
                "state":state,
            }

        losses.append(loss + epochs)

    torch.save(best_state, f"save_state/{str(loss).replace(".", "_")}.pth")

    return np.array(losses, dtype=float)

# objective([
#     [100, 4, -4, 3, 0, 6],
#     [100, 4, -4, 3, 5, 6],
# ])

In [None]:
# 8) Run PSO

lower = [MIN_EPOCHS, MIN_BATCH_SIZE, -10]
upper = [MAX_EPOCHS, MAX_BATCH_SIZE, 1]

for layer in range(MAX_LAYERS):
    lower.append(H_MIN)
    upper.append(H_MAX)

optimizer = ps.single.GlobalBestPSO(
    n_particles=n_particles,
    dimensions=3+MAX_LAYERS,
    options=options,
    bounds=(lower, upper)
)
best_cost, best_pos = optimizer.optimize(objective, iters=iters)

best_hidden = decode_particle(best_pos)
print("Melhor val loss:", best_cost)
print("Melhor vetor:", best_hidden)

In [None]:
# View Result
# torch.load(r"save_state\916_9910390753495.pth", weights_only=False)
print("JOIA")