In [None]:
import numpy as np
import os
import csv

import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error

from scipy.stats import randint, uniform
from random import choice

seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

# First run and selection of the most important features

In [None]:
Data_train = pd.read_csv('AppML_InitialProject_train.csv')
Data_train = Data_train[Data_train['p_Truth_isElectron'] == 1]

y_mean = np.copy(np.mean(Data_train['p_Truth_Energy']))
y_std = np.copy(np.std(Data_train['p_Truth_Energy']))

scaler = StandardScaler()
Data_train = pd.DataFrame(scaler.fit_transform(Data_train), columns=Data_train.columns)

X = Data_train.drop(['p_Truth_isElectron', 'p_Truth_Energy'], axis=1)
y = Data_train['p_Truth_Energy']

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.25, random_state=42)

X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32).view(-1, 1)

class SimpleNN(nn.Module):
    def __init__(self, input_dim, hidden_dim1=128, hidden_dim2=64):
        super(SimpleNN, self).__init__()
        self.hidden_dim1 = hidden_dim1
        self.hidden_dim2 = hidden_dim2
        self.fc1 = nn.Linear(input_dim, hidden_dim1)
        self.fc2 = nn.Linear(hidden_dim1, hidden_dim2)
        self.fc3 = nn.Linear(hidden_dim2, 1)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def evaluate_model(model, X_val_tensor, y_val_tensor):
    model.eval()
    with torch.no_grad():
        val_predictions = model(X_val_tensor)
        val_mae = mean_absolute_error(y_val_tensor.numpy(), val_predictions.numpy())
    return val_mae

def permutation_importance(model, X_val_tensor, y_val_tensor, baseline_mae, n_repeats=5):
    importances = np.zeros(X_val_tensor.shape[1])
    X_val_array = X_val_tensor.numpy()
    y_val_array = y_val_tensor.numpy()
    
    for i in range(X_val_tensor.shape[1]):
        permuted_maes = []
        for _ in range(n_repeats):
            X_permuted = X_val_array.copy()
            np.random.shuffle(X_permuted[:, i])
            X_permuted_tensor = torch.tensor(X_permuted, dtype=torch.float32)
            
            with torch.no_grad():
                permuted_predictions = model(X_permuted_tensor)
                permuted_mae = mean_absolute_error(y_val_array, permuted_predictions.numpy())
                permuted_maes.append(permuted_mae)
        
        importances[i] = np.mean(permuted_maes) - baseline_mae
    
    return importances

model = SimpleNN(X_train.shape[1])
criterion = nn.L1Loss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

n_epochs = 20
for epoch in range(n_epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()

    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{n_epochs}], Loss: {loss.item():.4f}')

baseline_mae = evaluate_model(model, X_val_tensor, y_val_tensor)
print(f'Baseline MAE: {baseline_mae:.4f}')

importances = permutation_importance(model, X_val_tensor, y_val_tensor, baseline_mae)
sorted_indices = np.argsort(importances)[::-1]
top_25_features = sorted_indices[:25]

# Second run with the selected features and **hyperparameter optimization**

In [None]:
X_train_top_25 = X_train.iloc[:, top_25_features]
X_val_top_25 = X_val.iloc[:, top_25_features]

param_distributions = {
    'hidden_dim1': randint(16, 256),
    'hidden_dim2': randint(0, 256),
    'learning_rate': uniform(0.01, 0.1),
    'batch_size': randint(16, 128)
}

def create_model(input_dim, hidden_dim1, hidden_dim2):
    return SimpleNN(input_dim=input_dim, hidden_dim1=hidden_dim1, hidden_dim2=hidden_dim2)

def cross_val_score(model, X_train, y_train, criterion, optimizer, n_splits=5):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
    val_scores = []

    for train_index, val_index in kf.split(X_train):
        X_train_fold, X_val_fold = X_train[train_index], X_train[val_index]
        y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
        
        model_fold = create_model(X_train.shape[1], model.hidden_dim1, model.hidden_dim2)
        optimizer_fold = optim.Adam(model_fold.parameters(), lr=model.learning_rate)
        
        X_train_fold_tensor = torch.tensor(X_train_fold, dtype=torch.float32)
        y_train_fold_tensor = torch.tensor(y_train_fold, dtype=torch.float32).view(-1, 1)
        X_val_fold_tensor = torch.tensor(X_val_fold, dtype=torch.float32)
        y_val_fold_tensor = torch.tensor(y_val_fold, dtype=torch.float32).view(-1, 1)

        n_epochs = 20
        for epoch in range(n_epochs):
            model_fold.train()
            optimizer_fold.zero_grad()
            outputs = model_fold(X_train_fold_tensor)
            loss = criterion(outputs, y_train_fold_tensor)
            loss.backward()
            optimizer_fold.step()
        
        val_score = evaluate_model(model_fold, X_val_fold_tensor, y_val_fold_tensor)
        val_scores.append(val_score)
    
    return np.mean(val_scores)

n_iter = 50
best_score = float('inf')
best_params = None

for _ in range(n_iter):
    params = {key: dist.rvs() for key, dist in param_distributions.items()}
    model = create_model(X_train_top_25.shape[1], params['hidden_dim1'], params['hidden_dim2'])
    model.learning_rate = params['learning_rate']
    criterion = nn.L1Loss()
    optimizer = optim.Adam(model.parameters(), lr=params['learning_rate'])

    X_train_tensor = torch.tensor(X_train_top_25.values, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1)
    
    val_score = cross_val_score(model, X_train_tensor, y_train_tensor, criterion, optimizer)
    
    if val_score < best_score:
        best_score = val_score
        best_params = params

print(f'Best score: {best_score:.4f}')
print(f'Best hyperparameters: {best_params}')

Best score: 0.2069
Best hyperparameters: {'hidden_dim1': 179, 'hidden_dim2': 6, 'learning_rate': 0.01595187774544761, 'batch_size': 42}


# Third run with the best hyperparameters and **cross validation**

In [None]:
best_model = create_model(X_train_top_25.shape[1], best_params['hidden_dim1'], best_params['hidden_dim2'])
best_model.learning_rate = best_params['learning_rate']
criterion = nn.L1Loss()
optimizer = optim.Adam(best_model.parameters(), lr=best_params['learning_rate'])

def train_and_evaluate(model, X_train, y_train, criterion, optimizer, n_splits=5):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
    val_scores = []

    for train_index, val_index in kf.split(X_train):
        X_train_fold, X_val_fold = X_train[train_index], X_train[val_index]
        y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
        
        model_fold = create_model(X_train.shape[1], model.hidden_dim1, model.hidden_dim2)
        optimizer_fold = optim.Adam(model_fold.parameters(), lr=model.learning_rate)
        
        X_train_fold_tensor = torch.tensor(X_train_fold, dtype=torch.float32)
        y_train_fold_tensor = torch.tensor(y_train_fold, dtype=torch.float32).view(-1, 1)
        X_val_fold_tensor = torch.tensor(X_val_fold, dtype=torch.float32)
        y_val_fold_tensor = torch.tensor(y_val_fold, dtype=torch.float32).view(-1, 1)

        n_epochs = 20
        for epoch in range(n_epochs):
            model_fold.train()
            optimizer_fold.zero_grad()
            outputs = model_fold(X_train_fold_tensor)
            loss = criterion(outputs, y_train_fold_tensor)
            loss.backward()
            optimizer_fold.step()
        
        val_score = evaluate_model(model_fold, X_val_fold_tensor, y_val_fold_tensor)
        val_scores.append(val_score)
    
    return np.mean(val_scores)

X_train_tensor = torch.tensor(X_train_top_25.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1)

final_val_score = train_and_evaluate(best_model, X_train_tensor, y_train_tensor, criterion, optimizer)

print(f'Final cross-validated MAE: {final_val_score:.4f}')

Final cross-validated MAE: 0.2106


# Testing on the test set

In [None]:
Data_test = pd.read_csv('AppML_InitialProject_test_regression.csv')

Data_test = pd.DataFrame(scaler.fit_transform(Data_test), columns=Data_test.columns)
X_test_25 = Data_test.iloc[:, top_25_features]

X_test_tensor = torch.tensor(X_test_25.values, dtype=torch.float32)

best_model.eval()
with torch.no_grad():
    predictions = best_model(X_test_tensor).numpy()

Rescaled_y_pred_test = y_std * predictions + y_mean
print('Rescaled Predicted Energies:', Rescaled_y_pred_test)
Rescaled_y_pred_test = [val for sublist in Rescaled_y_pred_test for val in np.array(sublist).flatten()]

Rescaled Predicted Energies: [[48198.11 ]
 [45722.133]
 [45759.805]
 ...
 [45152.555]
 [45629.92 ]
 [47362.465]]


# Saving (set to false)

In [None]:
folder_name = 'solutions'
if not os.path.exists(folder_name):
    os.makedirs(folder_name)

Write = False
if Write:
    
    top_25_variable_names = X.columns[top_25_features]
    variables = top_25_variable_names
    csv_file_path = os.path.join(folder_name, 'Regression_Pytorch_VariableList.csv')

    with open(csv_file_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        for variable in variables:
            writer.writerow([variable])
    
    data = Rescaled_y_pred_test

    csv_file_path = os.path.join(folder_name, 'Regression_Pytorch.csv')
    
    with open(csv_file_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        for index, item in enumerate(data, start=0):
            writer.writerow([index, item])