In [None]:
from sklearn.model_selection import KFold, train_test_split
from sklearn.preprocessing import MinMaxScaler
from torch.optim import Adam
import torch.nn as nn
import torch
import json
import optuna
import numpy as np
import joblib
from sklearn.metrics import r2_score
from torch.utils.data import DataLoader, TensorDataset, Subset
import matplotlib.pyplot as plt
from _utils import NeuralNetwork, test_model_with_new_data

In [None]:
dados = {}
with open("./dataset.json", "r") as arquivo:
    dados = json.load(arquivo)

data = [(d["coords"]) for d in dados["dados"]]
target = [d["params"] for d in dados["dados"]]

# Converta 'data' para um array bidimensional
data_flattened = [sample for series in data for sample in series]

# Crie uma instância do MinMaxScaler e ajuste aos dados
scaler = MinMaxScaler()
scaler.fit(data_flattened)

# Normalize os dados
data_normalized = [scaler.transform(series) for series in data]

scaler.fit(target)

# Normalize os rótulos
target_normalized = scaler.transform(target)

# Carregue seus dados
test_size = 0.2  # 20% dos dados para teste
train_data, test_data, train_target, test_target = train_test_split(
    data_normalized, target_normalized, test_size=test_size, random_state=42
)

In [None]:
def evaluate_model(model, test_loader, metric="mse"):
    """Avalia um dado modelo com base em um test_loader e uma métrica de avaliação. 'r2' para r², 'pond' para a ponderada do r² e mse, e qualquer outro valor para mse."""
    criterion = nn.MSELoss()
    model.eval()
    all_predictions = []
    all_targets = []
    test_loss = 0.0

    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            outputs = model(batch_x)
            test_loss += criterion(outputs, batch_y).item()

            all_predictions.extend(outputs.cpu().numpy())
            all_targets.extend(batch_y.cpu().numpy())

    all_predictions = np.array(all_predictions)
    all_targets = np.array(all_targets)
    r2 = r2_score(all_targets, all_predictions)
    mse = test_loss / len(test_loader)

    if metric == "pond":
        return r2 - mse

    if metric == "r2":
        return r2
    else:
        return mse

In [None]:
def objective(trial):
    ####################### defina o espaço de hiperparâmetros
    num_layers = trial.suggest_int("num_layers", 1, 5, step=1)
    units_fc = [
        trial.suggest_int(f"units_fc_layer_{i}", 16, 256, step=16)
        for i in range(num_layers)
    ]
    batch_size = trial.suggest_int("batch_size", 16, 128, step=16)
    epochs = trial.suggest_int("epochs", 10, 100, step=10)
    learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
    num_folds = trial.suggest_int("num_folds", 2, 20, step=1)
    #######################

    model = NeuralNetwork(num_layers, units_fc)
    optimizer = Adam(model.parameters(), lr=learning_rate)
    criterion = torch.nn.MSELoss()

    train_dataset_size = len(train_data)
    k_fold = KFold(n_splits=num_folds, shuffle=True, random_state=42)
    fold_metrics = []

    for fold, (train_indices, val_indices) in enumerate(
        k_fold.split(range(train_dataset_size))
    ):
        # Configure os conjuntos de dados e rótulos
        x_train_set = np.array(Subset(train_data, train_indices))
        y_train_set = np.array(Subset(train_target, train_indices))
        x_val_set = np.array(Subset(train_data, val_indices))
        y_val_set = np.array(Subset(train_target, val_indices))

        x_train_tensor = torch.Tensor(x_train_set)
        y_train_tensor = torch.Tensor(y_train_set)
        x_val_tensor = torch.Tensor(x_val_set)
        y_val_tensor = torch.Tensor(y_val_set)

        train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_dataset = TensorDataset(x_val_tensor, y_val_tensor)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        # Treine o modelo
        for _ in range(int(epochs / num_folds)):
            model.train()

            # Loop de treinamento
            for batch_x, batch_y in train_loader:
                optimizer.zero_grad()
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()

            # Avalie o modelo e retorne a métrica
            loss = evaluate_model(model, val_loader, "mse")
            fold_metrics.append(loss)
    return np.mean(fold_metrics)

In [None]:
study = optuna.create_study(
    direction="minimize"
)  # Direction = "maximize" para r² e ponderada, "minimize" para mse
study.optimize(objective, n_trials=100)

In [None]:
joblib.dump(study, "estudo.pkl")  # salve seu modelo para uso futuro

In [None]:
# confira o melhor resultado para seu estudo
best_params = study.best_trial.params
metrica = study.best_trial.values
best_params, metrica

In [None]:
# Visualize as tentativas do estudo
import optuna.visualization as vis

vis.plot_optimization_history(study)
vis.plot_parallel_coordinate(study)

## Ordena os 10 melhores resultados pela métrica (ordem descendente para r2, ascendente para mse)

In [None]:
trials = study.trials
for t in trials:
    t.params.update({"value": t.value})
trials_params = [t.params for t in trials]
trials_params_filtered = [
    trial for trial in trials_params if trial["value"] is not None
]
lista_ordenada = sorted(
    trials_params_filtered, key=lambda x: x["value"], reverse=True
)  # reverse = true para r² e pond, false para mse
lista_ordenada = lista_ordenada[:10]
lista_ordenada

In [None]:
def train_and_validate(params):
    units_fc = [params[f"units_fc_layer_{i}"] for i in range(params["num_layers"])]
    num_layers = params["num_layers"]
    batch_size = params["batch_size"]
    epochs = params["epochs"]
    learning_rate = params["learning_rate"]
    num_folds = params["num_folds"]
    model = NeuralNetwork(num_layers, units_fc)
    optimizer = Adam(model.parameters(), lr=learning_rate)
    criterion = torch.nn.MSELoss()
    train_dataset_size = len(train_data)
    k_fold = KFold(n_splits=num_folds, shuffle=True, random_state=42)
    train_losses = []
    val_losses = []
    r2_train_values = []
    r2_val_values = []

    for fold, (train_indices, val_indices) in enumerate(
        k_fold.split(range(train_dataset_size))
    ):
        # Configure os conjuntos de dados e rótulos
        x_train_set = np.array(Subset(train_data, train_indices))
        y_train_set = np.array(Subset(train_target, train_indices))
        x_val_set = np.array(Subset(train_data, val_indices))
        y_val_set = np.array(Subset(train_target, val_indices))

        x_train_tensor = torch.Tensor(x_train_set)
        y_train_tensor = torch.Tensor(y_train_set)
        x_val_tensor = torch.Tensor(x_val_set)
        y_val_tensor = torch.Tensor(y_val_set)

        train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_dataset = TensorDataset(x_val_tensor, y_val_tensor)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        running_loss = 0.0
        running_val_loss = 0.0

        # Treine e valide o modelo
        for epoch in range(int(epochs / num_folds)):
            model.train()
            all_predictions = []
            all_targets = []

            # Loop de treinamento
            for batch_x, batch_y in train_loader:
                optimizer.zero_grad()
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
                all_predictions.extend(outputs.cpu().detach().numpy())
                all_targets.extend(batch_y.cpu().detach().numpy())

            train_losses.append(loss.item())

            all_predictions = np.array(all_predictions)
            all_targets = np.array(all_targets)
            r2 = r2_score(all_targets, all_predictions)
            r2_train_values.append(r2)

            # Loop de validação
            model.eval()
            all_predictions = []
            all_targets = []

            with torch.no_grad():
                for batch_x, batch_y in val_loader:
                    outputs = model(batch_x)
                    val_loss = criterion(outputs, batch_y)
                    running_val_loss += val_loss.item()
                    all_predictions.extend(outputs.cpu().numpy())
                    all_targets.extend(batch_y.cpu().numpy())
                val_losses.append(val_loss.item())
            all_predictions = np.array(all_predictions)
            all_targets = np.array(all_targets)
            r2 = r2_score(all_targets, all_predictions)
            r2_val_values.append(r2)

    average_train_loss = np.mean(train_losses)
    average_val_loss = np.mean(val_losses)
    average_train_r2 = np.mean(r2_train_values)
    average_val_r2 = np.mean(r2_val_values)
    print(
        f"average train_loss: {average_train_loss}, \n"
        f"average val_loss: {average_val_loss}, \n"
        f"average train R²: {average_train_r2}, \n"
        f"average val R²: {average_val_r2}\n"
    )
    return average_train_loss, average_val_loss, average_train_r2, average_val_r2, model

## Carrega estudos feitos previamente para utilização ##

In [None]:
# Carregar o estudo de um arquivo .pkl usando joblib

study_only_mse = joblib.load("estudo_mse_2103.pkl")

study_mse_r2 = joblib.load("estudo_pond_2103.pkl")

study_only_r2 = joblib.load("estudo_r2_2103.pkl")

best_trial_mse, best_trial_r2, best_trial_pond = (
    study_only_mse.best_trial,
    study_only_r2.best_trial,
    study_mse_r2.best_trial,
)

In [None]:
best_trial_mse = {key: value for key, value in best_trial_mse.params.items()}
best_trial_r2 = {key: value for key, value in best_trial_r2.params.items()}
best_trial_pond = {key: value for key, value in best_trial_pond.params.items()}

In [None]:
best_model_mse, best_model_r2, best_model_mser2 = (
    train_and_validate(best_trial_mse),
    train_and_validate(best_trial_r2),
    train_and_validate(best_trial_pond),
)

In [None]:
# Salve o modelo
path = "modeloD1.pth"
torch.save(best_model_mse[4].state_dict(), path)

path = "modeloD2.pth"
torch.save(best_model_r2[4].state_dict(), path)

path = "modeloD3.pth"
torch.save(best_model_mser2[4].state_dict(), path)

In [None]:
dados = {}
with open("./newData.json", "r") as arquivo:
    dados = json.load(arquivo)
pontos = [(d["coords"]) for d in dados["dados"]][0]
params = [d["params"] for d in dados["dados"]]

scaler.fit(pontos)
pontos_n = scaler.transform(pontos)
pontos_tensor = torch.Tensor(pontos_n)

previsao1 = best_model_mse[4](pontos_tensor.unsqueeze(0))
previsao2 = best_model_r2[4](pontos_tensor.unsqueeze(0))
previsao3 = best_model_mser2[4](pontos_tensor.unsqueeze(0))


def calcula_mse(predictions, labels):
    mse = ((predictions - labels) ** 2).mean()
    return mse.item()

In [None]:
# Função para gerar o gráfico
def plot_grafico(linhas, new_data, ax, titulo):
    # Defina o tamanho do gráfico
    x_min, x_max = 0, 6000
    y_min, y_max = 0, 8
    dmin, dmax = linhas[2], linhas[3]
    tl, th = linhas[0], linhas[1]

    x_line = [[x_min, dmin, dmin, dmax, dmax, x_max]]

    y_line = [[tl, tl, th, th, tl, tl]]

    ax.plot(x_line[0], y_line[0], color="red")

    ax.scatter([x[0] for x in new_data], [y[1] for y in new_data], c="blue")

    ax.set_xlim(x_min, x_max)
    ax.set_ylim(y_min, y_max)
    ax.set_title(titulo, loc="left")

In [None]:
fig, axs = plt.subplots(3, 1, figsize=(6, 18))

# Converta a previsão para coordenadas no gráfico
scaler.fit(target)
previsao_c1 = scaler.inverse_transform(previsao1.detach().numpy())
linhas1 = previsao_c1.tolist()[0]
previsao_c2 = scaler.inverse_transform(previsao2.detach().numpy())
linhas2 = previsao_c2.tolist()[0]
previsao_c3 = scaler.inverse_transform(previsao3.detach().numpy())
linhas3 = previsao_c3.tolist()[0]

# Plote os gráficos
plot_grafico(linhas1, pontos, axs[0], "A.   Modelo D.4")
plot_grafico(linhas2, pontos, axs[1], "B.   Modelo D.5")
plot_grafico(linhas3, pontos, axs[2], "C.   Modelo D.6")

oParams = scaler.transform(params)
pondParams = previsao3.detach().numpy()
r2Params = previsao2.detach().numpy()
mseParams = previsao1.detach().numpy()

mse1 = calcula_mse(mseParams, oParams)
mse2 = calcula_mse(r2Params, oParams)
mse3 = calcula_mse(pondParams, oParams)

axs[0].text(
    0.5, 0.9, f"MSE: {mse1:.5f}", transform=axs[0].transAxes, ha="center", fontsize=14
)
axs[1].text(
    0.5, 0.9, f"MSE: {mse2:.5f}", transform=axs[1].transAxes, ha="center", fontsize=14
)
axs[2].text(
    0.5, 0.9, f"MSE: {mse3:.5f}", transform=axs[2].transAxes, ha="center", fontsize=14
)

for ax in axs:
    ax.tick_params(axis="both", which="major", labelsize=14)
    ax.set_xlabel(ax.get_xlabel(), fontsize=14)
    ax.set_ylabel(ax.get_ylabel(), fontsize=14)
    ax.set_title(ax.get_title(), fontsize=14)

plt.savefig("comparação_previsao_mse_r2_ponderada_new_space.png")
plt.tight_layout()
plt.show()

In [None]:
x_test_tensor = torch.Tensor(np.array(test_data))
y_test_tensor = torch.Tensor(np.array(test_target))
test_dataset = TensorDataset(x_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

test_model_with_new_data(best_model_mse[4], test_loader)
test_model_with_new_data(best_model_r2[4], test_loader)
test_model_with_new_data(best_model_mser2[4], test_loader)