In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch.utils.data as data
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.metrics import accuracy_score, log_loss
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import StandardScaler
from torch.optim.lr_scheduler import StepLR
import matplotlib.pyplot as plt
import numpy as np


import random


# Seed all possible
seed_ = 2023
random.seed(seed_)
np.random.seed(seed_)
torch.manual_seed(seed_)

# If using CUDA, you can set the seed for CUDA devices as well
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed_)
    torch.cuda.manual_seed_all(seed_)
    
import torch.backends.cudnn as cudnn
cudnn.deterministic = True
cudnn.benchmark = False

In [2]:




# Load the data
data = pd.read_csv('../data/PROCESS/encoded_tch_prediction_data_zafrav3.2.csv')

In [3]:
data.columns

#data.drop(columns=['ABS_IDCOMP','ZAFRA'])

In [4]:
# # Define features (X) and target variable (y)
# X = data.drop('rendimiento', axis=1)
# y = data['rendimiento']
# 
# # Split the data into 80% training and 20% testing
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 
# X_train, X_test, y_train, y_test = X_train.drop(columns=['ABS_IDCOMP','ZAFRA']), X_test.drop(columns=['ABS_IDCOMP','ZAFRA']), y_train.drop(columns=['ABS_IDCOMP','ZAFRA']), y_test.drop(columns=['ABS_IDCOMP','ZAFRA'])
# print("80-20 Split:")
# print(f"Training set shape: {X_train.shape}")
# print(f"Testing set shape: {X_test.shape}")

In [5]:
data['ZAFRA'].hist()
plt.show() 

In [6]:
# Create a mask for ZAFRA 22-23
mask_22_23 = data['ZAFRA'] == '23-24'

# Split the data
X_train_zafra = data[~mask_22_23].drop('TCH', axis=1)
y_train_zafra = data[~mask_22_23]['TCH']
X_test_zafra = data[mask_22_23].drop('TCH', axis=1)
y_test_zafra = data[mask_22_23]['TCH']

X_train_zafra = X_train_zafra.drop(columns=['ABS_IDCOMP','ZAFRA'])
y_train_zafra = y_train_zafra.drop(columns=['ABS_IDCOMP','ZAFRA'])
X_test_zafra = X_test_zafra.drop(columns=['ABS_IDCOMP','ZAFRA'])
y_test_zafra = y_test_zafra.drop(columns=['ABS_IDCOMP','ZAFRA'])

print("\nZAFRA Split:")
print(f"Training set shape: {X_train_zafra.shape}")
print(f"Testing set shape: {X_test_zafra.shape}")

In [7]:
X_train_zafra

In [8]:
# from sklearn.ensemble import RandomForestRegressor
# from sklearn.metrics import mean_squared_error, r2_score
# from sklearn.metrics import accuracy_score, log_loss
# 
# # Function to train and evaluate a model
# def train_and_evaluate(X_train, X_test, y_train, y_test, model_name):
#     model = RandomForestRegressor(n_estimators=100, random_state=42)
#     model.fit(X_train, y_train)
#     y_pred = model.predict(X_test)
# 
#     mse = mean_squared_error(y_test, y_pred)
#     r2 = r2_score(y_test, y_pred)
#     print(f"\n{model_name} Results:")
#     print(f"Mean Squared Error: {mse}")
#     print(f"R-squared Score: {r2}")
# 
# 
# # Evaluate using 80-20 split
# train_and_evaluate(X_train, X_test, y_train, y_test, "80-20 Split Model")
# 
# # Evaluate using ZAFRA split
# train_and_evaluate(X_train_zafra, X_test_zafra, y_train_zafra, y_test_zafra, "ZAFRA Split Model")

In [9]:
# Define the neural network
class Net(nn.Module):
    def __init__(self, input_size):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, input_size)
        self.fc2 = nn.Linear(input_size, 360)
        # self.fc3 = nn.Linear(256, 256)
        # self.fc4 = nn.Linear(256, 360)
        self.fc5 = nn.Linear(360, 360)
        self.fc6 = nn.Linear(360, 512)
        self.fc7 = nn.Linear(512, 512)
        self.fc8 = nn.Linear(512, 512)
        self.fc9 = nn.Linear(512, 360)
        self.fc10 = nn.Linear(360, 360)
        self.fc11 = nn.Linear(360, 256)
        self.fc12 = nn.Linear(256, 128)
        self.fc13 = nn.Linear(128, 64)
        self.fc14 = nn.Linear(64, 1)
        self.relu = nn.ReLU()
        self.sig = nn.Sigmoid()
        self.leak = nn.LeakyReLU()
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = self.sig(self.fc1(x))
        x = self.dropout(x)
        x = self.sig(self.fc2(x))
        x = self.dropout(x)
        # x = self.sig(self.fc3(x))
        # x = self.dropout(x)
        # x = self.sig(self.fc4(x))
        # x = self.dropout(x)
        x = self.sig(self.fc5(x)) 
        x = self.dropout(x)
        x = self.sig(self.fc6(x))
        x = self.sig(self.fc7(x))
        x = self.sig(self.fc8(x))
        x = self.sig(self.fc9(x))
        x = self.sig(self.fc10(x))     
        x = self.dropout(x)
        x = self.relu(self.fc11(x))
        x = self.relu(self.fc12(x))
        x = self.relu(self.fc13(x))
        x = self.fc14(x)
        return x
    
    def l1_loss(self):
        l1_loss = 0
        for param in self.parameters():
            l1_loss += torch.sum(torch.abs(param))
        return l1_loss

    def l2_loss(self):
        l2_loss = 0
        for param in self.parameters():
            l2_loss += torch.sum(param.pow(2))
        return l2_loss


In [10]:
import torch
import torch.nn as nn

class AutoNET(nn.Module):
    def __init__(self, input_size):
        super(AutoNET, self).__init__()
        
        output_size = 1  # For regression
        
        N = 16  # Total number of linear layers (can adjust between 15 and 20)
        
        # Define the peak size as a multiple of input_size
        peak_size = input_size * 4  # Adjust the multiplier as needed
        
        # Initialize layer sizes list
        layer_sizes = []
        
        # First half layers: increase size
        for i in range(N // 2):
            size = input_size + int((peak_size - input_size) * (i + 1) / (N // 2))
            layer_sizes.append(size)
        
        # Second half layers: decrease size
        for i in range(N // 2):
            size = peak_size - int((peak_size - output_size) * (i + 1) / (N // 2))
            layer_sizes.append(size)
        
        # Define the layers
        self.layers = nn.ModuleList()
        prev_size = input_size
        for size in layer_sizes:
            self.layers.append(nn.Linear(prev_size, size))
            prev_size = size
        self.layers.append(nn.Linear(prev_size, output_size))  # Final output layer
        
        # Activations
        self.sigmoid = nn.Sigmoid()
        self.leaky_relu = nn.LeakyReLU()
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.25)
    
    def forward(self, x):
        N = len(self.layers)
        for i, layer in enumerate(self.layers[:-1]):  # Exclude the last layer
            x = layer(x)
            
            # Activations: sigmoid at the beginning, then leaky ReLU, then ReLU
            if i < N // 3:
                x = self.sigmoid(x)
            elif i < 2 * N // 3:
                x = self.leaky_relu(x)
            else:
                x = self.relu(x)
            
            # Dropout every two hidden layers
            if (i + 1) % 2 == 0:
                x = self.dropout(x)
        
        # Final layer without activation (for regression)
        x = self.layers[-1](x)
        return x

    # L1 loss function
    def l1_loss(self):
        l1_loss = 0
        for param in self.parameters():
            l1_loss += torch.sum(torch.abs(param))
        return l1_loss

    # L2 loss function
    def l2_loss(self):
        l2_loss = 0
        for param in self.parameters():
            l2_loss += torch.sum(param.pow(2))
        return l2_loss


In [11]:

X_train = X_train_zafra
y_train = y_train_zafra
X_test = X_test_zafra
y_test = y_test_zafra

# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.FloatTensor(y_train.values).unsqueeze(1)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_test_tensor = torch.FloatTensor(y_test.values).unsqueeze(1)

X_train = X_train_tensor 
y_train = y_train_tensor
X_test = X_test_tensor
y_test = y_test_tensor

In [12]:
X_train_zafra

In [13]:
input_size = X_train.shape[1]
input_size

In [14]:
# Función para calcular el coeficiente de determinación (R^2)
def calculate_r2(y_true, y_pred):
    ss_res = ((y_true - y_pred) ** 2).sum()
    ss_tot = ((y_true - y_true.mean()) ** 2).sum()
    r2 = 1 - ss_res / ss_tot
    return r2.item()

# Definir el modelo, el criterio y el optimizador
input_size = X_train.shape[1]
model = Net(input_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
l1_lambda = 0.01
l2_lambda = 0.01


# Función de entrenamiento

In [15]:
# # create a nn class (just-for-fun choice :-) 
# class RMSELoss(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.mse = nn.MSELoss()
#         
#     def forward(self,yhat,y):
#         return torch.sqrt(self.mse(yhat,y))
# 
# criterion = RMSELoss()

In [16]:
def train_model(model, criterion, optimizer, scheduler, X_train, y_train, X_test, y_test, num_epochs=100):
    train_losses = []
    test_losses = []
    train_r2s = []
    test_r2s = []

    for epoch in range(num_epochs):
        # Entrenamiento
        model.train()
        outputs = model(X_train)
        # train_loss =  torch.sqrt(criterion(outputs, y_train))
        train_loss =  criterion(outputs, y_train)
        
        l1_reg = l1_lambda * model.l1_loss()
        l2_reg = l2_lambda * model.l2_loss()
        train_loss = train_loss + l1_reg + l2_reg
        
        train_loss.backward()
        optimizer.zero_grad()
        optimizer.step()
        
        # Actualizar el learning rate
        scheduler.step()
        
        # Evaluación en test
        model.eval()
        with torch.no_grad():
            test_outputs = model(X_test)
            test_loss = criterion(test_outputs, y_test)
        
        train_losses.append(train_loss.item())
        test_losses.append(test_loss.item())
        
        # Calcular R^2 como métrica de precisión
        train_r2 = calculate_r2(y_train, outputs)
        test_r2 = calculate_r2(y_test, test_outputs)
        train_r2s.append(train_r2)
        test_r2s.append(test_r2)
        
        if (epoch + 1) % 50 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], LR: {scheduler.get_last_lr()[0]:.6f}, Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}, Train R^2: {train_r2:.4f}, Test R^2: {test_r2:.4f}')

    return train_losses, test_losses, train_r2s, test_r2s

In [17]:

# Entrenar el modelo
num_epochs = 100
train_losses, test_losses, train_r2s, test_r2s = train_model(model, criterion, optimizer,scheduler, X_train, y_train, X_test, y_test, num_epochs=num_epochs)

# Guardar las pérdidas en un archivo
losses_df = pd.DataFrame({
    'Epoch': range(1, num_epochs + 1),
    'Train Loss': train_losses,
    'Test Loss': test_losses,
    'Train R^2': train_r2s,
    'Test R^2': test_r2s
})
losses_df.to_csv('training_losses.csv', index=False)

print('Training complete. Losses and accuracies saved to training_losses.csv')

# Graficar las pérdidas a través de las épocas
plt.figure(figsize=(12, 6))
plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
plt.plot(range(1, num_epochs + 1), test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss over Epochs')
plt.show()

# Graficar el R^2 a través de las épocas
plt.figure(figsize=(12, 6))
plt.plot(range(1, num_epochs + 1), train_r2s, label='Train R^2')
plt.plot(range(1, num_epochs + 1), test_r2s, label='Test R^2')
plt.xlabel('Epoch')
plt.ylabel('R^2')
plt.legend()
plt.title('R^2 over Epochs')
plt.show()

In [18]:
model.eval()
with torch.no_grad():
    predictions = model(X_test).cpu().detach().numpy()

# Convertir y_test a numpy array si es necesario
y_test_np = y_test.cpu().detach().numpy()

# Calcular MSE, RMSE, MAE y R^2
mse = mean_squared_error(y_test_np, predictions)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test_np, predictions)
r2 = r2_score(y_test_np, predictions)

print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared (R²): {r2}")

In [44]:
model.eval()

with torch.no_grad():
    predictions = model(X_test).cpu().detach().numpy()
    y_test_np = y_test.cpu().detach().numpy()  # Convertir y_test a Numpy array

plt.figure(figsize=(12, 6))

plt.scatter(y_test_np, predictions, alpha=0.5, color='blue', label='Predicciones')

# Gráfico de dispersión para valores reales
plt.scatter(y_test_np, y_test_np, alpha=0.5, color='red', label='Valores reales')
plt.plot([y_test_np.min(), y_test_np.max()], [y_test_np.min(), y_test_np.max()], 'r', lw=2)

plt.title('Predicted vs Actual Values')
plt.show()

In [14]:
import seaborn as sns
import scipy.stats as stats


rendimiento = data['rendimiento']

# Transformaciones
log_transformed = np.log(rendimiento)
sqrt_transformed = np.sqrt(rendimiento)

# Crear figura y ejes
fig, axs = plt.subplots(2, 2, figsize=(12, 12))

# Histograma de datos originales
sns.histplot(rendimiento, bins=30, kde=True, ax=axs[0, 0])
axs[0, 0].set_title('Original')

# Q-Q plot de datos originales
stats.probplot(rendimiento, dist="norm", plot=axs[0, 1])
axs[0, 1].set_title('Original: Q-Q plot')

# Histograma de datos transformados con logaritmo
sns.histplot(log_transformed, bins=30, kde=True, ax=axs[1, 0])
axs[1, 0].set_title('Log Transformation')

# Histograma de datos transformados con raíz cuadrada
sns.histplot(sqrt_transformed, bins=30, kde=True, ax=axs[1, 1])
axs[1, 1].set_title('Sqrt Transformation')

plt.suptitle('Normality Diagnosis Plot (rendimiento)')
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()

In [24]:
torch.save(model.state_dict(), '6monthNN.pth')
