# Import

In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import logging
import shap

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_squared_log_error
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK

# Dataframes

In [2]:
"""
We convert the data into float32 because PyTorch expects float 32 values
- Compatibility with DL
- Memory Efficiency
- Prevent errors in training (backpropagation etc)
"""
# Define subfolder
subfolder = "o6_GAN/o02"

# Load CSV files into corresponding variables
X_external = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/X_external.csv").astype('float32')
y_external = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/y_external.csv").values.ravel().astype('float32')
X_train = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/X_train.csv").astype('float32')
y_train = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/y_train.csv").values.ravel().astype('float32')
X_validate = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/X_validate.csv").astype('float32')
y_validate = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/y_validate.csv").values.ravel().astype('float32')
X_test = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/X_test.csv").astype('float32')
y_test = pd.read_csv(f"../03_External_Validation/CSV/exports/impute/{subfolder}/y_test.csv").values.ravel().astype('float32')

In [3]:
"""
All computations in PyTorch are performed by the use of tensors
and not with pandas dataframes or NymPy arrays.

X_train.values extract NumPy arrays from pandas dataframe
and torch.tensor converts it to PyTorch tensor.
"""

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train.values)
y_train = torch.tensor(y_train) # this is already an NymPy array
X_validate = torch.tensor(X_validate.values)
y_validate = torch.tensor(y_validate) # this is already an NymPy array
X_test = torch.tensor(X_test.values)
y_test = torch.tensor(y_test) # this is already an NymPy array
X_external = torch.tensor(X_external.values)
y_external = torch.tensor(y_external) # this is already an NymPy array

# Feed Forward ANN without HP
## Two hidden layers

In [None]:
"""
Layers
- Input 128 -> 64 -> 32 -> 1
- Each layer use ReLU activation function.
"""

# ANN architecture
class ANNModel(nn.Module):
    # input_dim is the input features and are the same as the dataframe.
    def __init__(self, input_dim): 
        super(ANNModel, self).__init__()
        # Define layers. Full features as input and 128 outputs.
        # Weight and biases are initialized automatically.
        self.fc1 = nn.Linear(input_dim, 128)
        # Its techique that randomly set neurons to zero to avoid overfitting
        # I must check it further. Propability 0.2 = 20%
        self.dropout1 = nn.Dropout(0.2)
        # Next layer with 128 inputs and 64 outputs.
        self.fc2 = nn.Linear(128, 64)
        # Its techique that randomly set neurons to zero to avoid overfitting
        self.dropout2 = nn.Dropout(0.2)
        # Next layer with 64 inputs and 32 outputs.
        self.fc3 = nn.Linear(64, 32)
        # Output layer with 64 inputs 1 output.
        self.output = nn.Linear(32, 1)

# Feedforward Network
    def forward(self, x):
        x = torch.relu(self.fc1(x)) # Pass the x throught the first dense fc1 with relu function
        x = self.dropout1(x) # turn off neurons to prevent overfitting
        x = torch.relu(self.fc2(x)) # Pass the x throught the first dense fc2 with relu function
        x = self.dropout2(x) # turn off neurons to prevent overfitting
        x = torch.relu(self.fc3(x)) # Pass the x throught the first dense fc3 with relu function
        x = self.output(x) # output
        return x

# Feed Forward ANN without HP
## Three hidden layers

In [None]:
class ANNModel(nn.Module):
    def __init__(self, input_dim):
        super(ANNModel, self).__init__()
        # Increase the width of the first two layers and add more layers
        self.fc1 = nn.Linear(input_dim, 320)
        self.dropout1 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(320, 192)
        self.dropout2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(192, 128)
        self.dropout3 = nn.Dropout(0.2)
        self.fc4 = nn.Linear(128, 96)  # Additional layer
        self.dropout4 = nn.Dropout(0.2)  # Additional dropout
        self.output = nn.Linear(96, 1)  # Output layer remains the same
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout1(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout2(x)
        x = torch.relu(self.fc3(x))
        x = self.dropout3(x)
        x = torch.relu(self.fc4(x))  # Pass through additional layer
        x = self.dropout4(x)  # Apply dropout
        x = self.output(x)
        return x


# HyperOpt Feed Forward ANN
## Two hidden layers

In [None]:
# Check if GPU is available and set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the ANN model class
class ANNModel(nn.Module):
    def __init__(self, input_dim, layer1, layer2, dropout_rate):
        super(ANNModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, layer1)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(layer1, layer2)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.fc3 = nn.Linear(layer2, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout1(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.fc3(x)  # No activation in the output layer for regression
        return x

# Define the objective function
def objective(params):
    # Unpack parameters
    layer1 = int(params['layer1'])
    layer2 = int(params['layer2'])
    dropout_rate = params['dropout_rate']
    learning_rate = params['learning_rate']

    # Initialize the model
    model = ANNModel(input_dim=X_train.shape[1], layer1=layer1, layer2=layer2, dropout_rate=dropout_rate)
    model.to(device)  # Move model to GPU if available

    # Define loss and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop
    epochs = 10  # Use a small number for quick evaluation
    batch_size = 32
    for epoch in range(epochs):
        model.train()
        for i in range(0, len(X_train), batch_size):
            X_batch = X_train[i:i + batch_size].to(device)
            y_batch = y_train[i:i + batch_size].unsqueeze(1).to(device)

            optimizer.zero_grad()
            predictions = model(X_batch)
            loss = criterion(predictions, y_batch)
            loss.backward()
            optimizer.step()

    # Validation
    model.eval()
    with torch.no_grad():
        val_predictions = model(X_validate.to(device))
        val_loss = mean_squared_error(
            y_validate.cpu().numpy(),
            val_predictions.cpu().numpy()
        )

    # Return the loss as the optimization metric
    return {'loss': val_loss, 'status': STATUS_OK}

# Define the hyperparameter search space
param_space = {
    'layer1': hp.quniform('layer1', 64, 256, 32),
    'layer2': hp.quniform('layer2', 32, 128, 16),
    'dropout_rate': hp.uniform('dropout_rate', 0.1, 0.5),
    'learning_rate': hp.loguniform('learning_rate', -4, -2),  # log scale for small LR
}

# Initialize Trials object
trials = Trials()

# Perform hyperparameter search
best = fmin(
    fn=objective,
    space=param_space,
    algo=tpe.suggest,
    max_evals=50,  # Number of iterations
    trials=trials
)

print("Best Hyperparameters:", best)

# Initialize the model with the best hyperparameters
best_model = ANNModel(
    input_dim=X_train.shape[1],
    layer1=int(best['layer1']),
    layer2=int(best['layer2']),
    dropout_rate=best['dropout_rate']
)
best_model.to(device)

# Train the best model on the entire training set
criterion = nn.MSELoss()
optimizer = optim.Adam(best_model.parameters(), lr=best['learning_rate'])

 16%|█▌        | 8/50 [07:44<40:36, 58.02s/trial, best loss: 6.0211920738220215]

# Feed forward model

In [None]:
# Initialize the model, loss function, and optimizer

# prepare model to take inputs
input_dim = X_train.shape[1] # retrive input features
model = ANNModel(input_dim) # creates the model

# computes how far from the true values are the predictions
criterion = nn.MSELoss() # loss function MSE

# update the weights to minimize loss function
optimizer = optim.Adam(model.parameters(), lr=0.001) # set Adam optimizer | lr = learning rate

In [None]:
# Early stopping parameters
patience = 5  # Number of epochs to wait for improvement
min_delta = 0.001  # Minimum change in validation loss to qualify as an improvement
best_val_loss = float('inf')  # Initialize to a very large value
patience_counter = 0  # Counter for epochs without improvement

# Training the model with early stopping
train_losses = []
val_losses = []
early_stop = False

epochs = 50
batch_size = 32
for epoch in range(epochs):
    if early_stop:
        print("Early stopping triggered!")
        break

    model.train()
    epoch_loss = 0.0
    for i in range(0, len(X_train), batch_size):
        X_batch = X_train[i:i+batch_size]
        y_batch = y_train[i:i+batch_size].unsqueeze(1)

        optimizer.zero_grad()
        predictions = model(X_batch)
        loss = criterion(predictions, y_batch)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    # Validation
    model.eval()
    with torch.no_grad():
        val_predictions = model(X_validate)
        val_loss = criterion(val_predictions, y_validate.view(-1, 1))

    # Append losses for plotting
    train_losses.append(epoch_loss)
    val_losses.append(val_loss.item())
    
    #print(f"Epoch {epoch+1}/{epochs}, Training Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}")

    # Logging progress
    logging.basicConfig(level=logging.INFO)
    logging.info(f"Epoch {epoch+1}/{epochs}, Training Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}")

    # Check for improvement
    if best_val_loss - val_loss.item() > min_delta:
        best_val_loss = val_loss.item()
        patience_counter = 0  # Reset the counter
    else:
        patience_counter += 1  # Increment the counter
        if patience_counter >= patience:
            early_stop = True

# Plot training and validation losses
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(train_losses) + 1), train_losses, label='Training Loss')
plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Over Epochs with Early Stopping')
plt.legend()
plt.grid()
plt.show()

# Test & External Validation Evaluation

In [None]:
# Test and external validation
model.eval()
with torch.no_grad():
    y_test_pred = model(X_test).squeeze().numpy()
    y_external_pred = model(X_external).squeeze().numpy()

test_mse = mean_squared_error(y_test, y_test_pred)
test_mae = mean_absolute_error(y_test, y_test_pred)
test_rmse = np.sqrt(test_mse)
test_r2 = r2_score(y_test, y_test_pred)*100

external_mse = mean_squared_error(y_external, y_external_pred)
external_mae = mean_absolute_error(y_external, y_external_pred)
external_rmse = np.sqrt(external_mse)
external_r2 = r2_score(y_external, y_external_pred)*100

print(f"Test Set - MSE: {test_mse:.2f}, MAE: {test_mae:.2f}, RMSE: {test_rmse:.2f}, R2: {test_r2:.2f}")
print(f"External Validation - MSE: {external_mse:.2f}, MAE: {external_mae:.2f}, RMSE: {external_rmse:.2f}, R2: {external_r2:.2f}")

In [None]:
# Metrics calculation
mse = mean_squared_error(y_test, y_test_pred)
mae = mean_absolute_error(y_test, y_test_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_test_pred) * 100

print(f"Test Set MSE: {mse:.4f}")
print(f"Test Set MAE: {mae:.4f}")
print(f"Test Set RMSE: {rmse:.4f}")
print(f"Test Set R2: {r2:.4f}")

# Initialize error metrics
error_metrics = ['MSE', 'MAE', 'RMSE']
values = [mse, mae, rmse]

# Try to calculate MSLE
try:
    msle = mean_squared_log_error(y_test, y_test_pred)
    print(f"Test Set MSLE: {msle:.4f}")
    
    # Add MSLE to the list of metrics if applicable
    error_metrics.append('MSLE')
    values.append(msle)
except ValueError:
    print("Mean Squared Logarithmic Error cannot be calculated because targets contain negative values.")

# Plot error metrics (with or without MSLE)
plt.figure(figsize=(10, 6))
plt.bar(error_metrics, values, color=['blue', 'green', 'red', 'orange'][:len(error_metrics)])
plt.xlabel('Error Metric')
plt.ylabel('Value')
plt.title('Comparison of Error Metrics')
plt.show()

# Plotting R-squared (R2) for the test set
plt.figure(figsize=(6, 6))

if r2 >= 0:
    plt.pie([r2, 100 - r2], 
            labels=['Explained Variance (R2)', 'Unexplained Variance'], 
            colors=['lightblue', 'lightgrey'], autopct='%1.1f%%')
else:
    plt.pie([100], labels=['Unexplained Variance'], colors=['lightgrey'], autopct='%1.1f%%')

plt.title('Test Set Explained Variance by R-squared (R2)')
plt.show()

In [None]:
# Metrics for external validation set
mse_external = mean_squared_error(y_external, y_external_pred)
mae_external = mean_absolute_error(y_external, y_external_pred)
rmse_external = np.sqrt(mse_external)
r2_external = r2_score(y_external, y_external_pred) * 100

print(f"External Validation Set MSE: {mse_external:.4f}")
print(f"External Validation Set MAE: {mae_external:.4f}")
print(f"External Validation Set RMSE: {rmse_external:.4f}")
print(f"External Validation Set R2: {r2_external:.4f}")

# Initialize error metrics
error_metrics = ['MSE', 'MAE', 'RMSE']
values = [mse_external, mae_external, rmse_external]

# Try to calculate MSLE
try:
    msle_external = mean_squared_log_error(y_external, y_external_pred)
    print(f"External Validation Set MSLE: {msle_external:.4f}")
    
    # Add MSLE to the list of metrics if applicable
    error_metrics.append('MSLE')
    values.append(msle_external)
except ValueError:
    print("Mean Squared Logarithmic Error cannot be calculated because targets contain negative values.")

# Plot error metrics (with or without MSLE)
plt.figure(figsize=(10, 6))
plt.bar(error_metrics, values, color=['blue', 'green', 'red', 'orange'][:len(error_metrics)])
plt.xlabel('Error Metric')
plt.ylabel('Value')
plt.title('Comparison of Error Metrics')
plt.show()

# Plotting R-squared (R2) for the external validation set
plt.figure(figsize=(6, 6))

if r2_external >= 0:
    plt.pie([r2_external, 100 - r2_external], 
            labels=['Explained Variance (R2)', 'Unexplained Variance'], 
            colors=['lightblue', 'lightgrey'], autopct='%1.1f%%')
else:
    plt.pie([100], labels=['Unexplained Variance'], colors=['lightgrey'], autopct='%1.1f%%')

plt.title('Validation Set Explained Variance by R-squared (R2)')
plt.show()

In [None]:
# Test Set Plot
plt.figure(figsize=(8, 6))
plt.scatter(y_test, y_test_pred, color='blue', label='Prediction')

# Line for Perfect Prediction
perfect_line = np.linspace(y_test.min(), y_test.max(), 100)
plt.plot(perfect_line, perfect_line, color='red', linestyle='--', label='Perfect Prediction')

# Labels, legend, and grid
plt.xlabel('True LOS')
plt.ylabel('Predicted LOS')
plt.legend()
plt.grid(True)
plt.title('Predicted vs. True LOS (Test Set)')

# Save the plot as a PNG image
#plt.savefig("plots/02_Prediction_Plot/02_true_vs_pred/57_true_vs_pred_test_plot.png", dpi=300, bbox_inches='tight')
plt.show()

# External Validation Set Plot
plt.figure(figsize=(8, 6))
plt.scatter(y_external, y_external_pred, color='blue', label='Prediction')

# Line for Perfect Prediction (y = x)
perfect_line_ext = np.linspace(y_external.min(), y_external.max(), 100)
plt.plot(perfect_line_ext, perfect_line_ext, color='red', linestyle='--', label='Perfect Prediction')

# Labels, legend, and grid
plt.xlabel('True LOS')
plt.ylabel('Predicted LOS')
plt.legend()
plt.grid(True)
plt.title('Predicted vs. True LOS (External Validation Set)')

# Save the plot as a PNG image
#plt.savefig("plots/02_Prediction_Plot/02_true_vs_pred/57_true_vs_pred_external_plot.png", dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Convert y_test to a 1D numpy array
y_test = y_test.numpy().flatten()


# Calculate residuals
residuals = y_test - y_test_pred

# Plot residuals
plt.figure(figsize=(8, 6))
plt.scatter(y_test, residuals, color='blue', alpha=0.5, label="Residuals")
plt.axhline(y=0, color='red', linestyle='--', label="Zero Line")
plt.axhline(y=mae, color='green', linestyle='--', label=f"MAE = {mae:.2f}")
plt.axhline(y=-mae, color='green', linestyle='--')
plt.xlabel('True LOS')
plt.ylabel('Residuals (True - Predicted)')
plt.title('Residuals Plot with MAE Bounds')
plt.grid(True)

# Place the legend outside of the plot
plt.legend(loc="upper left", bbox_to_anchor=(1, 1))

# Save the plot as a PNG image
#plt.savefig("plots/02_Prediction_Plot/01_residuals/57_residuals_plot.png", dpi=300, bbox_inches='tight')
plt.show()

# Save model

In [None]:
# Define file
subfolder = "o01_feed_forward.pth"


torch.save(model.state_dict(), f"models/{subfolder}")

# Load model

In [None]:
# Define file
subfolder = "o01_feed_forward.pth"

# Reinitialize the model architecture
input_dim = X_test.shape[1]  # Ensure this matches the original input dimension
model = ANNModel(input_dim)

# Load the saved model state
model.load_state_dict(torch.load(f"models/{subfolder}"))
model.eval()  # Set the model to evaluation mode

"""
After that I must run the block with layers.
Be careful, the layers must be exaclty the same. 
"""

# SHAP

In [None]:
def model_predict(X):
    model.eval()
    with torch.no_grad():
        tensor_X = torch.tensor(X, dtype=torch.float32)
        predictions = model(tensor_X).numpy()
    return predictions

In [None]:
# Convert tensors to Pandas DataFrames
X_sample_df = pd.DataFrame(X_sample_np, columns=[f"Feature_{i}" for i in range(X_sample_np.shape[1])])
X_validate_df = pd.DataFrame(X_validate_np, columns=[f"Feature_{i}" for i in range(X_validate_np.shape[1])])

# Use KernelExplainer with Pandas DataFrame
explainer = shap.KernelExplainer(model_predict, X_sample_df)
shap_values = explainer.shap_values(X_validate_df)

# Summary plot
shap.summary_plot(shap_values, X_validate_df)