In [1]:
import numpy as np
import os
import torch
import pandas as pd
from autoencoder import NNAutoencoder
from read_data import read_raw, read_and_perform, train_test_split, scalings
import matplotlib.pyplot as plt

In [2]:
import seaborn as sns

## Carga de datos

In [3]:
# Leemos los archivos raw
folder = os.path.join('..', 'Date')
dataframes1 = read_raw(folder)
folder = os.path.join('..', 'Date2')
dataframes2 = read_raw(folder)

In [4]:
# Convertimos todo en arrays de numpy con series del mismo largo
data_1 = read_and_perform(dataframes1, row_range=300, col_range=(3,12), split= True)
data_2 = read_and_perform(dataframes2, row_range=99, col_range=(2,5), split= False)
# concatenamos todas las series
data_total = np.vstack([data_1.T, data_2.T])

# separamos train y test
train, test = train_test_split(data_total)

[+] Se procesaron 297 series de longitud 99
[+] Se procesaron 32 series de longitud 99


In [5]:
# escalamos
scaler = scalings(train)
train = scaler.fit_transform(train)
print(f'[+] Train shape {train.shape}')
test = scaler.transform(test)
print(f'[+] Test shape {test.shape}')

[+] StandardScaler entrenado
[+] Train shape (310, 99)
[+] Test shape (19, 99)


## Entrenar modelo para un solo espacio latente

In [6]:
epochs = 1000
lr = 1e-3
lat = 2
drop = np.linspace(0.2,0.6,5)

# Crear el directorio para guardar las imágenes si no existe
path_img = os.path.join("..","img","Drop-Optimo",f"Loos-lat{lat}")
os.makedirs(path_img, exist_ok=True)

for dr in drop:
    # Entrenamiento
    hist_train = []
    hist_test = []
    autoencoder = NNAutoencoder(99, lat, dr)
    optimizer = torch.optim.Adam(autoencoder.parameters(), lr = lr)
    criterio = torch.nn.MSELoss()

    for e in range(epochs):
        autoencoder.train()
        x = torch.FloatTensor(train)
        y_pred = autoencoder(x)
        loss = criterio(y_pred, x)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if e%100 == 0:
            print(e, "loss =",loss.item())
        hist_train.append(loss.item())
        with torch.no_grad():
            autoencoder.eval()
            x = torch.FloatTensor(test)
            y_pred = autoencoder(x)
            loss = criterio(y_pred, x)
            hist_test.append(loss.item())

    #guardo las img
    plt.semilogy(hist_train, label = 'train loss')
    plt.semilogy(hist_test, label = 'test loss')
    plt.title(f"Loss train -eval, drop = {dr}")
    plt.legend()
    plt.savefig(os.path.join(path_img,f"lat{lat}-drop{dr}.png"), bbox_inches='tight')
    plt.close()


0 loss = 1.0050108432769775
100 loss = 0.12534865736961365
200 loss = 0.10246244072914124
300 loss = 0.056759122759103775
400 loss = 0.05344502627849579
500 loss = 0.04684300348162651
600 loss = 0.043836914002895355
700 loss = 0.039924874901771545
800 loss = 0.035315319895744324
0 loss = 1.0048973560333252
100 loss = 0.15116634964942932
200 loss = 0.13384070992469788
300 loss = 0.10776885598897934
400 loss = 0.06144615262746811
500 loss = 0.06489058583974838
600 loss = 0.05098755657672882
700 loss = 0.0531793087720871
800 loss = 0.049250781536102295
0 loss = 1.0052117109298706
100 loss = 0.1592426896095276
200 loss = 0.12709692120552063
300 loss = 0.10867245495319366
400 loss = 0.07594852149486542
500 loss = 0.06924033164978027
600 loss = 0.056386206299066544
700 loss = 0.055969227105379105
800 loss = 0.056842222809791565
0 loss = 1.0037742853164673
100 loss = 0.1805136799812317
200 loss = 0.15593664348125458
300 loss = 0.13938485085964203
400 loss = 0.12770497798919678
500 loss = 0.11

## Early stopping

Es  una técnica que permite detener el entrenamiento cuando el valor de pérdida en el conjunto de validación (test) comienza a aumentar. Esto ayuda a prevenir el sobreajuste y permite guardar el mejor modelo basado en el mínimo valor de pérdida en el conjunto de validación

In [None]:
# Crear el directorio para guardar las imágenes si no existe
path_model = os.path.join("..","Save_Models")
os.makedirs(path_model, exist_ok=True)

In [None]:
epochs = 1000
lr = 1e-3
dr = 0.2
lat = 1
# Crear el directorio para guardar las imágenes si no existe
path_img = os.path.join("..","img","Drop-Optimo",f"Loos-lat{lat}")
os.makedirs(path_img, exist_ok=True)

# Entrenamiento
hist_train = []
hist_test = []
best_test_loss = 100
best_epoch = 0
best_model = None
espera = 100
b = 0 #bandera

autoencoder = NNAutoencoder(99, lat, dr)
optimizer = torch.optim.Adam(autoencoder.parameters(), lr = lr)
criterio = torch.nn.MSELoss()

for e in range(epochs):
    autoencoder.train()
    x = torch.FloatTensor(train)
    y_pred = autoencoder(x)
    loss = criterio(y_pred, x)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    hist_train.append(loss.item())
    
    with torch.no_grad():
        autoencoder.eval()
        x = torch.FloatTensor(test)
        y_pred = autoencoder(x)
        test_loss = criterio(y_pred, x)
        hist_test.append(test_loss.item())

    if e%100 == 0:
            print(f'Epoch {e}, train Loss: {loss.item():.4f}, test Loss: {test_loss.item():.4f}')

    if test_loss < loss:
        #guardo el mejor modelo
        if test_loss < best_test_loss: #encuantra el primer minimo??
            best_test_loss = test_loss
            best_epoch = e
            best_model = autoencoder.state_dict().copy() #copia del mejor modelo

        if (e - best_epoch >= espera) and (b < 3):
            m_epoch = best_epoch
            m_test_loss = best_test_loss
            torch.save(best_model, os.path.join(path_model,f"model-lat{lat}.pth"))
            b += 1
            plt.plot(m_epoch,m_test_loss,'x', color = "red")

print(f'Best epoch was {m_epoch} with val loss {m_test_loss:.4f}')
#guardo las img
plt.semilogy(hist_train, label = 'train loss')
plt.semilogy(hist_test, label = 'test loss')
plt.plot(m_epoch,m_test_loss,'x', color = "red", label = "best model")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.title(f"dim_lat: {lat}, drop = {dr}")
plt.legend()
plt.savefig(os.path.join(path_img,f"BestModel-lat{lat}-drop{dr}.png"), bbox_inches='tight')
plt.close()

# Model Load

In [None]:
#verificamos que el path sea correcto y en caso que asi sea, vemos los nombres de archivos dentro
if os.path.isdir(os.path.join("..","Save_Models")):
    filename = os.listdir(os.path.join("..","Save_Models")) # guardamos la lista
    print(filename)

dicc_model ={}
for f in filename:
    indl=9
    indr=f.find('.pth')
    #print(int(f[indl:indr]))
    dicc_model[int(f[indl:indr])] = f

# Crear el directorio para guardar las imágenes MAES si no existe
path_maes = os.path.join("..","img","MAES")
os.makedirs(path_maes, exist_ok=True)

In [None]:
mean_list = [] #media de cada esp. lat
std_list = [] #desviacion de cada esp. lat
dicc_drop = {}
drop = [0.3,0.3,0.3,0.4,0.2,0.4,0.3,0.2,0.2,0.3,0.3,0.4,0.3,0.2,0.4,0.4,0.3,0.2,0.4,0.2]
for i, d in enumerate(drop):
    dicc_drop[i+1] = d

In [None]:
MAES_list = []
lat = 20
dr = dicc_drop[lat]

autoencoder = NNAutoencoder(99, lat, dr)
autoencoder.load_state_dict(torch.load(os.path.join("..","Save_Models",dicc_model[lat])))

for i in range(500):
    autoencoder.train()
    x = torch.FloatTensor(test)
    y_pred = autoencoder(x)
    sample = scaler.inverse_transform(x)
    sample_pred = scaler.inverse_transform(y_pred.detach().numpy())
    #print(sample.shape, sample_pred.shape)
    average_MAES = np.abs(sample-sample_pred).mean()
    MAES_list.append(average_MAES)
    print(f'DIM= {i} & Average MAE= {average_MAES:.3f}')

array = np.array(MAES_list)
mean_list.append(np.mean(array))
std_list.append(np.std(array))

# Crear el histograma
#plt.figure(figsize=(10, 6))
sns.histplot(MAES_list, kde = True, bins=50, edgecolor='black')
plt.xlabel('MAES')
plt.ylabel('f')
plt.title(f'MAES, lat{lat} drop {dr}')
plt.savefig(os.path.join(path_maes,f"MAES{lat}-drop{dr}.png"), bbox_inches='tight')
plt.close()

In [None]:
print(mean_list[21], std_list[21])

### MAE vs Dim Latent

In [None]:
latent = np.linspace(1,20,20)
fig, ax = plt.subplots(figsize = (10,6))
ax.errorbar(latent,mean_list,std_list,fmt='o', linewidth=2, capsize=6)
ax.plot(latent,mean_list)
ax.set_ylabel("MAES")
ax.set_xlabel("Laten Dim")
ax.set_xticks(np.arange(0, 22, 1))
ax.grid()

In [None]:
print(autoencoder)