# Pruebas con Optuna

### Colab

In [53]:
import os
import sys
import IPython

# Detectar si estamos en Colab
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

# Ruta base
if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    BASE_PATH = "/content/drive/MyDrive/ia_thermal_colab"
else:
    BASE_PATH = os.path.expanduser("~/ia_thermal_colab")

DATASETS_PATH = os.path.join(BASE_PATH, "datasets")
MODELS_PATH = os.path.join(BASE_PATH, "models")

os.makedirs(DATASETS_PATH, exist_ok=True)
os.makedirs(MODELS_PATH, exist_ok=True)

print("Modo:", "Colab" if IN_COLAB else "Local")
print("Ruta datasets:", DATASETS_PATH)
print("Ruta modelos:", MODELS_PATH)

Modo: Local
Ruta datasets: C:\Users\ismael.gallo/ia_thermal_colab\datasets
Ruta modelos: C:\Users\ismael.gallo/ia_thermal_colab\models


In [54]:
# 🔄 Parámetros del repo
GIT_REPO_URL = "https://github.com/ismaelgallolopez/ia_thermal.git"  # 👈 Cambia esto
REPO_NAME = GIT_REPO_URL.split("/")[-1].replace(".git", "")
CLONE_PATH = os.path.join(BASE_PATH, REPO_NAME)

if IN_COLAB:
    # 🧬 Clonar el repositorio si no existe ya
    if not os.path.exists(CLONE_PATH):
        !git clone {GIT_REPO_URL} {CLONE_PATH}
    else:
        print(f"Repositorio ya clonado en: {CLONE_PATH}")

    # 📦 Instalar requirements.txt
    req_path = os.path.join(CLONE_PATH, "requirements.txt")
    if os.path.exists(req_path):
        !pip install -r {req_path}
    else:
        print("No se encontró requirements.txt en el repositorio.")

    print("🔄 Reinicia el entorno para aplicar los cambios...")
    IPython.display.display(IPython.display.Javascript('''google.colab.restartRuntime()'''))

## Inicializar

In [55]:
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader, TensorDataset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# get the directory path of the file
dir_path = os.getcwd()

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

if IN_COLAB:
  sys.path.append("/content/drive/MyDrive/ia_thermal_colab/ia_thermal")

from plot_functions import *
from Physics_Loss import *
from utils import *
sys.path.append('../Convolutional_NN')

if IN_COLAB:
  sys.path.append("/content/drive/MyDrive/ia_thermal_colab/ia_thermal/Convolutional_NN")

from Dataset_Class import *


if IN_COLAB:
  sys.path.append("/content/drive/MyDrive/ia_thermal_colab/ia_thermal/ismaelgallo")

from architectures.convlstm import *
from architectures.generic_spatiotemporal_decoder import *
from architectures.generic_spatiotemporal_regressor import *

from pathlib import Path

# Ruta absoluta y segura dentro de Drive
study_path = Path("/content/drive/MyDrive/ia_thermal_colab/optuna_studies")
study_path.mkdir(parents=True, exist_ok=True)

db_path = study_path / "test_study.db"
storage_url = f"sqlite:///{db_path}"

print("Ruta que usará SQLite:", storage_url)

Ruta que usará SQLite: sqlite:///\content\drive\MyDrive\ia_thermal_colab\optuna_studies\test_study.db


In [56]:
# def objective(trial):
#     x = trial.suggest_float("x", -10, 10)
#     return x ** 2

# study = optuna.create_study(
#     direction="minimize",
#     storage=storage_url,
#     study_name="test_study",
#     load_if_exists=True
# )

# study.optimize(objective, n_trials=1)


In [57]:
# print("✅ ¿Existe el archivo?", db_path.exists())


In [None]:
epochs = 500
n_train = 1000
n_val = 200

sequence_length = 20

In [59]:
if IN_COLAB:
  dir_path = BASE_PATH

# ⬅️ Esto se ejecuta una vez
dataset_train = load_trimmed_dataset(
    base_path=dir_path,
    dataset_type='train',
    max_samples=n_train,
    time_steps_output=sequence_length
)
dataset_val = load_trimmed_dataset(
    base_path=dir_path,
    dataset_type='val',
    max_samples=n_val,
    time_steps_output=sequence_length
)

# Convertir a tensores de entrada y salida
input_train, output_train = prepare_data_for_convlstm(dataset_train, device='cpu')
input_val, output_val = prepare_data_for_convlstm(dataset_val, device='cpu')


def get_data_loaders_from_tensors(batch_size):
    train_loader = DataLoader(TensorDataset(input_train, output_train), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(input_val, output_val), batch_size=batch_size, shuffle=False)
    return train_loader, val_loader

✅ Cargando dataset train desde: c:\Users\ismael.gallo\Desktop\ia_thermal\ismaelgallo\datasets\PCB_transient_dataset_train.pth
✅ Cargando dataset val desde: c:\Users\ismael.gallo\Desktop\ia_thermal\ismaelgallo\datasets\PCB_transient_dataset_val.pth


## ConvLSTM

In [60]:
# print("Ruta completa del .db:", storage_url.replace("sqlite:///", ""))


In [61]:
# import os
# print("¿Existe el archivo?:", os.path.exists(storage_url.replace("sqlite:///", "")))


In [62]:
def objective(trial):
    return trial.suggest_float("x", -10, 10) ** 2

study = optuna.create_study(
    direction="minimize",
    storage="sqlite:////content/drive/MyDrive/ia_thermal_colab/optuna_studies/test_dummy.db",
    study_name="test_study",
    load_if_exists=True
)
study.optimize(objective, n_trials=1)

import os
print(os.path.exists("/content/drive/MyDrive/ia_thermal_colab/optuna_studies/test_dummy.db"))


[I 2025-05-06 12:36:43,959] A new study created in RDB with name: test_study
[I 2025-05-06 12:36:45,626] Trial 0 finished with value: 4.7560397290313965 and parameters: {'x': -2.180834640460252}. Best is trial 0 with value: 4.7560397290313965.


True


In [66]:
class ConvLSTMWrapper(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers):
        super().__init__()

        self.convlstm = ConvLSTM(
            input_dim=input_dim,
            hidden_dim=hidden_dim,
            kernel_size=kernel_size,
            num_layers=num_layers,
            batch_first=True,
            bias=True,
            return_all_layers=False
        )

        self.output_conv = nn.Conv2d(
            in_channels=hidden_dim[-1],
            out_channels=1,
            kernel_size=1  # ✅ produce salida de 1 canal sin cambiar tamaño
        )

    def forward(self, x):  # x: [B, T, C, 13, 13]
        lstm_output, _ = self.convlstm(x)  # lstm_output is [layer_output_list]
        y = lstm_output[0]  # [B, T, hidden_dim[-1], 13, 13]

        # Aplicar conv final a cada paso temporal
        B, T, C, H, W = y.shape
        y = y.view(B * T, C, H, W)
        y = self.output_conv(y)             # [B*T, 1, 13, 13]
        y = y.view(B, T, 1, H, W)           # [B, T, 1, 13, 13]
        return y


In [67]:
def objective(trial):
    # Hiperparámetros
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32, 64])
    hidden_dim_val = trial.suggest_categorical("hidden_dim", [16, 32, 64, 128])
    kernel_size_val = trial.suggest_categorical("kernel_size", [1, 3, 5, 7])
    num_layers = trial.suggest_int("num_layers", 1, 4)

    # Adaptar al formato requerido por ConvLSTM
    hidden_dim = [hidden_dim_val] * num_layers
    kernel_size = [(kernel_size_val, kernel_size_val)] * num_layers

    # Crear modelo ConvLSTM
    model = ConvLSTMWrapper(
        input_dim=3,  # o el número de canales reales
        hidden_dim=hidden_dim,
        kernel_size=kernel_size,
        num_layers=num_layers
    ).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
    criterion = torch.nn.MSELoss()

    # Obtener dataloaders (train y val)
    train_loader, val_loader = get_data_loaders_from_tensors(batch_size=batch_size)

    best_val_loss = float('inf')
    for epoch in range(epochs):
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device, epoch, epochs)
        val_loss = evaluate(model, val_loader, criterion, device)
        scheduler.step(val_loss)

        if val_loss < best_val_loss:
            best_val_loss = val_loss

        trial.report(val_loss, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return best_val_loss


In [68]:
study = optuna.create_study(
    direction="minimize",
    study_name="convlstm",  # puedes elegirlo tú
    storage="sqlite:///" + os.path.join(study_path, "study_convlstm.db"),  # 🧠 crea archivo .db en tu carpeta
    load_if_exists=True  # por si ya existe, continúa donde lo dejaste
)

study.optimize(objective, n_trials=10)

[I 2025-05-06 12:37:07,487] Using an existing study with name 'convlstm' instead of creating a new one.

Using a target size (torch.Size([64, 1, 20, 13, 13])) that is different to the input size (torch.Size([64, 1, 1, 13, 13])). This will likely lead to incorrect results due to broadcasting. Please ensure they have the same size.


Using a target size (torch.Size([40, 1, 20, 13, 13])) that is different to the input size (torch.Size([40, 1, 1, 13, 13])). This will likely lead to incorrect results due to broadcasting. Please ensure they have the same size.

[I 2025-05-06 12:37:10,710] Trial 1 finished with value: 0.05529384035617113 and parameters: {'lr': 0.00837817254286601, 'batch_size': 64, 'hidden_dim': 64, 'kernel_size': 1, 'num_layers': 3}. Best is trial 1 with value: 0.05529384035617113.
[I 2025-05-06 12:37:12,042] Trial 2 finished with value: 0.8635045737028122 and parameters: {'lr': 0.00029795286210330935, 'batch_size': 64, 'hidden_dim': 32, 'kernel_size': 3, 'num_layers': 1}. B

KeyboardInterrupt: 

In [None]:
print("Best trial:")
for key, val in study.best_trial.params.items():
    print(f"{key}: {val}")

In [None]:
optuna.visualization.plot_optimization_history(study).show()
optuna.visualization.plot_param_importances(study).show()


## Decoder

In [None]:
def objective(trial):
    # Hiperparámetros estructurales
    embedding_dim = trial.suggest_categorical("embedding_dim", [64, 128, 256, 512])
    num_layers = trial.suggest_int("num_layers", 1, 6)
    nhead = trial.suggest_categorical("nhead", [1, 2, 4, 8])
    dim_feedforward_factor = trial.suggest_int("dim_ff_factor", 2, 6)
    use_temporal_channel = trial.suggest_categorical("use_temporal_channel", [False, True])

    # Hiperparámetros de entrenamiento
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32])
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-2, log=True)
    dropout = trial.suggest_float("dropout", 0.0, 0.3)

    # Elegir optimizador
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "AdamW", "RMSprop"])

    # Dataloaders
    train_loader, val_loader = get_data_loaders_from_tensors(batch_size)

    # Modelo
    class CustomTransformerDecoder(TransformerDecoder):
        def __init__(self, embedding_dim, num_layers, nhead, dim_ff, dropout):
            super().__init__(embedding_dim, num_layers, nhead)
            encoder_layer = nn.TransformerEncoderLayer(
                d_model=embedding_dim,
                nhead=nhead,
                dim_feedforward=dim_ff,
                dropout=dropout,
                batch_first=True
            )
            self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    model = GenericSpatioTemporalDecoder(
        embedding_dim=embedding_dim,
        num_layers=num_layers,
        nhead=nhead,
        in_channels=3,
        use_temporal_channel=use_temporal_channel
    ).to(device)

    # Reemplazar el decoder por uno con dropout y tamaño FF ajustado
    model.temporal_decoder = CustomTransformerDecoder(
        embedding_dim=embedding_dim,
        num_layers=num_layers,
        nhead=nhead,
        dim_ff=embedding_dim * dim_feedforward_factor,
        dropout=dropout
    ).to(device)

    # Optimizador
    optimizer_cls = {"Adam": torch.optim.Adam, "AdamW": torch.optim.AdamW, "RMSprop": torch.optim.RMSprop}[optimizer_name]
    optimizer = optimizer_cls(model.parameters(), lr=lr, weight_decay=weight_decay)

    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=5)
    criterion = torch.nn.MSELoss()

    # Entrenamiento
    best_val_loss = float("inf")
    for epoch in range(epochs):
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device, epoch, epochs)
        val_loss = evaluate(model, val_loader, criterion, device)
        scheduler.step(val_loss)

        if val_loss < best_val_loss:
            best_val_loss = val_loss

        trial.report(val_loss, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return best_val_loss


In [None]:
study = optuna.create_study(
    direction="minimize",
    study_name="decoder1",  # puedes elegirlo tú
    storage=storage_url,  # 🧠 crea archivo .db en tu carpeta
    load_if_exists=True  # por si ya existe, continúa donde lo dejaste
)

study.optimize(objective, n_trials=100)

# Mostrar el mejor resultado
print("🔍 Best trial:")
for k, v in study.best_trial.params.items():
    print(f"{k}: {v}")


In [None]:
import optuna.visualization as vis

vis.plot_optimization_history(study).show()
vis.plot_param_importances(study).show()


## Regressor

In [None]:
# Cargar los datos una sola vez
dataset_train = load_trimmed_dataset(
    base_path=dir_path, dataset_type='train',
    max_samples=n_train, time_steps_output=sequence_length
)
dataset_val = load_trimmed_dataset(
    base_path=dir_path, dataset_type='val',
    max_samples=n_val, time_steps_output=sequence_length
)

x_train, y_train = prepare_data_for_convlstm(dataset_train, device='cpu')
x_val, y_val = prepare_data_for_convlstm(dataset_val, device='cpu')

train_dataset = TemporalRegressionDataset(x_train, y_train)
val_dataset = TemporalRegressionDataset(x_val, y_val)


In [None]:
def objective(trial):
    # Hiperparámetros
    embedding_dim = trial.suggest_categorical("embedding_dim", [64, 128, 256])
    num_layers = trial.suggest_int("num_layers", 1, 4)
    nhead = trial.suggest_categorical("nhead", [1, 2, 4])
    dim_ff_factor = trial.suggest_int("dim_ff_factor", 2, 6)
    dropout = trial.suggest_float("dropout", 0.0, 0.3)
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32])
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-2, log=True)
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "AdamW", "RMSprop"])

    # DataLoaders desde datasets pre-cargados
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Modelo
    model = GenericSpatioTemporalRegressor(
        embedding_dim=embedding_dim,
        num_layers=num_layers,
        nhead=nhead,
        in_channels=3
    ).to(device)

    # Reemplazo del decoder por uno personalizado con dropout y FF dimensionado
    class CustomTransformerDecoder(TransformerDecoder):
        def __init__(self, embedding_dim, num_layers, nhead, dim_ff, dropout):
            super().__init__(embedding_dim, num_layers, nhead)
            encoder_layer = nn.TransformerEncoderLayer(
                d_model=embedding_dim,
                nhead=nhead,
                dim_feedforward=dim_ff,
                dropout=dropout,
                batch_first=True
            )
            self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    model.temporal_decoder = CustomTransformerDecoder(
        embedding_dim=embedding_dim * 2,
        num_layers=num_layers,
        nhead=nhead,
        dim_ff=embedding_dim * dim_ff_factor,
        dropout=dropout
    ).to(device)

    # Optimizador
    optimizer_cls = {"Adam": torch.optim.Adam, "AdamW": torch.optim.AdamW, "RMSprop": torch.optim.RMSprop}[optimizer_name]
    optimizer = optimizer_cls(model.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=5)
    criterion = torch.nn.MSELoss()

    best_val_loss = float("inf")
    for epoch in range(epochs):
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device, epoch, epochs)
        val_loss = evaluate(model, val_loader, criterion, device)
        scheduler.step(val_loss)
        if val_loss < best_val_loss:
            best_val_loss = val_loss
        trial.report(val_loss, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return best_val_loss


In [None]:
study = optuna.create_study(
    direction="minimize",
    study_name="nombre_del_estudio",  # puedes elegirlo tú
    storage="sqlite:///optuna_studies/study_regressor.db",  # 🧠 crea archivo .db en tu carpeta
    load_if_exists=True  # por si ya existe, continúa donde lo dejaste
)

study.optimize(objective, n_trials=100)

# Mostrar el mejor resultado
print("🔍 Best trial:")
for k, v in study.best_trial.params.items():
    print(f"{k}: {v}")
