Load the data

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import sys
# path for desktop PC
# sys.path.append(r'C:\Users\test\Masterarbeit')
# path for surface PC
sys.path.append(r'C:\Users\Surface\Masterarbeit')

%matplotlib inline
# path for desktop PC
#path = r"C:\Users\test\Masterarbeit\data\WZ_2_Feature_Engineered_Fynn6.xlsx"
# path for surface PC
path = r"C:\Users\Surface\Masterarbeit\data\Produktionsdaten\WZ_2_Feature_Engineered_Fynn6.xlsx"

df = pd.read_excel(path)


In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from IPython.display import clear_output
import numpy as np
from sklearn.metrics import r2_score

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import prep
import importlib
importlib.reload(prep)

# set the target variable
target = 'C1_V01_delta_kan'
#print(df.columns)
# get the numerical features
data_num = df.drop(['C1_V01_delta_kan'], axis = 1, inplace=False)
#print(data_num.columns)
# get the target values
data_labels = df[target].to_numpy()

# split the data into training, validation and test sets
# 60% training, 20%, validation, 20% test
X_temp, X_test_prep, y_temp, y_test = train_test_split(data_num, data_labels, test_size= 0.2, random_state=42)
X_train_prep, X_val_prep, y_train, y_val = train_test_split(X_temp, y_temp, test_size= 0.25, random_state=42)

# use coustom function "cat_transform" from prep.py to map the categorical features with their frequencies
X_train_prep, X_val_prep, X_test_prep = prep.cat_transform(X_train_prep, X_val_prep, X_test_prep, ['BT_NR', 'STP_NR'])
print(X_train_prep.columns)

# pipeline for preprocessing the data
# Standard Scaler for distribution with 0 mean and 1 std., normal distributed data
data_pipeline = Pipeline([
    ('std_scaler', StandardScaler())
])

# get the feature names after preprocessing for the feature importance
feature_names = X_train_prep.columns

# fit the pipeline to the data and transform it
X_train = data_pipeline.fit_transform(X_train_prep)
X_val = data_pipeline.transform(X_val_prep)
X_test = data_pipeline.transform(X_test_prep)

X_train_tensor = torch.from_numpy(X_train).float()
X_val_tensor = torch.from_numpy(X_val).float()
X_test_tensor = torch.from_numpy(X_test).float() 
y_train_tensor = torch.from_numpy(y_train).float().unsqueeze(1) # Add extra dimension for compatibility
y_val_tensor = torch.from_numpy(y_val).float().unsqueeze(1)
y_test_tensor = torch.from_numpy(y_test).float().unsqueeze(1)

# print the shapes of the data
print(data_num.shape, X_train_tensor.shape, X_val_tensor.shape, X_test_tensor.shape)
# print(pd.DataFrame(X_train, columns=feature_names).describe())

In [None]:
# create a class for Neural Network with a custom architecture
class Cusom_NN_Model(torch.nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim, do_rate):
        """
        Neural Network model with a custom architecture.
        @param input_dim:   number of input features
        @param hidden_dims: list of integers representing the number of neurons in each hidden layer e.g. [64, 128, 64, 32]
        @param output_dim:  number of output features (usually 1 for regression tasks)
        @param do_rate:     dropout rate for regularization
               
        """
        super(Cusom_NN_Model, self).__init__()
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.output_dim = output_dim
        self.do_rate = do_rate
        
        # create the layers of the model
        self.layers = torch.nn.ModuleList()
        last_dim = input_dim
        for dim in hidden_dims:
            self.layers.append(torch.nn.Linear(last_dim, dim))
            self.layers.append(torch.nn.ReLU())
            self.layers.append(torch.nn.Dropout(do_rate))
            last_dim = dim
        
        # output layer
        self.layers.append(torch.nn.Linear(last_dim, output_dim))
        
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x
    

# training functions for the model, optimizer Adam, loss function MSELoss, data loader for batching the data, early stopping
def train_model(model, X_train_tensor, y_train_tensor, X_val_tensor, y_val_tensor, batch_size=128, n_epochs=1000, lr=0.01, weight_decay=0.0001, patience=20):
        
    """
        Function for training neural Network.
        @param model            The neural network model to be trained.
        @param X_train_tensor   The matrix of features for the training data.
        @param y_train_tensor   The vector of target values for the training data.
        @param X_val_tensor     The matrix of features for the validation data.
        @param y_val_tensor     The vector of target values for the validation data.
        @param batch_size       The size of the batches for training.
        @param n_epochs         The number of epochs for training.
        @param lr               The learning rate for the optimizer.
        @param weight_decay     The weight decay for the optimizer.
        @param patience         The number of epochs with no improvement after which training will be stopped.
                
        @return model          The trained neural network model.
    """
    
    # DataLoader for batching the data
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    # Define Mean Squared Error loss function
    loss_fn = torch.nn.MSELoss(reduction='mean')
    
    # Adam optimizer with weight decay for regularization
    optimizer = torch.optim.Adam(model.parameters(), lr, weight_decay=weight_decay)  

    # Early Stopping values
    best_val_loss = np.inf
    epochs_no_improve = 0
    loss_history = []
    val_loss_history = []

    for epoch in range(n_epochs):
        model.train()                           # Set model to training mode
        batch_losses = []
        for X_batch, y_batch in train_loader:   # loop over all batches in the DataLoader
            optimizer.zero_grad()               # Reset gradients
            y_pred = model(X_batch)             # Forward pass
            loss = loss_fn(y_pred, y_batch)     # Compute MSE loss
            loss.backward()                     # Backpropagation
            optimizer.step()                    # Update weights
            batch_losses.append(loss.item())   
        loss_history.append(loss.item())    # Save loss value

        # calculate validation loss
        model.eval()                            # Set model to evaluation mode
        with torch.no_grad():
            y_val_pred = model(X_val_tensor)           # Forward pass on validation set
            val_loss = loss_fn(y_val_pred, y_val_tensor)  # Compute MSE loss on validation set
            val_loss_history.append(val_loss.item())
            
        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
            best_model_state = model.state_dict()
            print(f"Epoch {epoch+1}/{n_epochs}, Train Loss: {np.mean(batch_losses):.4f}, Val Loss: {val_loss.item():.4f}, Best Val Loss: {best_val_loss:.4f}")
            
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping at epoch {epoch+1}, Best Val Loss: {best_val_loss:.4f}")
                model.load_state_dict(best_model_state)
                break     
           
    # plt.figure(figsize=(8, 4))
    # plt.plot(loss_history, label='Train Loss', color='tab:blue')
    # plt.xlabel('Epoch')
    # plt.ylabel('MSE Loss')
    # plt.title('Training Loss over Epochs')
    # plt.grid(True)
    # plt.legend()
    # plt.tight_layout()
    # plt.show()
    
    return model


In [None]:
import optuna

def objective(trial):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    # Hyperparameter-Sampling durch Optuna
    hidden_dims = trial.suggest_categorical("hidden_dims", [[128, 64], [256, 128, 64], [512, 256, 128, 64]])
    do_rate = trial.suggest_float("do_rate", 0.05, 0.4)
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-2, log=True)
    batch_size = trial.suggest_categorical("batch_size", [64, 128, 256])

    # Modell initialisieren
    model = Cusom_NN_Model(
        input_dim=X_train_tensor.shape[1],
        hidden_dims=hidden_dims,
        output_dim=1,
        do_rate=do_rate
    ).to(device)


    # Nutze deine bestehende train_model Funktion
    trained_model = train_model(
        model,
        X_train_tensor, y_train_tensor,
        X_val_tensor, y_val_tensor,
        batch_size=batch_size,
        n_epochs=500,
        lr=lr,
        weight_decay=weight_decay,
        patience=15
    )

    # Validation Loss berechnen
    loss_fn = torch.nn.MSELoss(reduction='mean')
    trained_model.eval()
    with torch.no_grad():
        y_val_pred = trained_model(X_val_tensor)
        val_loss = loss_fn(y_val_pred, y_val_tensor).item()

    # Logging
    trial.set_user_attr("val_loss", val_loss)

    return val_loss


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

study = optuna.create_study(
    direction="minimize",
    sampler=optuna.samplers.TPESampler(),
    pruner=optuna.pruners.MedianPruner(n_warmup_steps=5)
)

X_train_tensor = X_train_tensor.to(device)
y_train_tensor = y_train_tensor.to(device)
X_val_tensor = X_val_tensor.to(device)
y_val_tensor = y_val_tensor.to(device)


study.optimize(objective, n_trials=50, timeout=None)

print("Number of finished trials: ", len(study.trials))
print("Best trial:")
trial = study.best_trial

print(f"  Validation Loss: {trial.value:.4f}")
print("  Params:")
for key, value in trial.params.items():
    print(f"    {key}: {value}")


In [None]:
best_params = trial.params

best_model = Cusom_NN_Model(
    input_dim=X_train_tensor.shape[1],
    hidden_dims=best_params['hidden_dims'],
    output_dim=1,
    do_rate=best_params['do_rate']
).to(device)

trained_model = train_model(
    best_model,
    X_train_tensor, y_train_tensor,
    X_val_tensor, y_val_tensor,
    batch_size=best_params['batch_size'],
    n_epochs=500,        # ggf. höher setzen
    lr=best_params['lr'],
    weight_decay=best_params['weight_decay'],
    patience=20          # etwas höher für finale Trainingsläufe
)


In [None]:
# selcet Device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Move training and val data  to device
X_train_tensor = X_train_tensor.to(device)  
X_val_tensor = X_val_tensor.to(device)
y_train_tensor = y_train_tensor.to(device)
y_val_tensor = y_val_tensor.to(device)
    

# DataLoader for batching the data
batch_size = 64  # Define batch size
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


# Define a fully connected neural network with ReLU activations and Dropout
dp = 0.27  # Dropout probability set to 5%

# model = torch.nn.Sequential(
#     torch.nn.Linear(37, 64),     # Input layer -> 64 neurons
#     torch.nn.ReLU(),
#     torch.nn.Dropout(p = dp),   # Dropout Layer with 5% Neurons set to 0
#     torch.nn.Linear(64, 128),   # Hidden layer -> 128 neurons
#     torch.nn.ReLU(),
#     torch.nn.Dropout(p = dp),   # Dropout Layer with 5% Neurons set to 0
#     torch.nn.Linear(128, 64),   # Hidden layer -> 64 neurons
#     torch.nn.ReLU(),
#     torch.nn.Dropout(p = dp),   # Dropout Layer with 5% Neurons set to 0
#     torch.nn.Linear(64, 1)      # Output layer -> 1 value (regression)
# ).to(device)  # Move model to device (GPU or CPU)

model = Cusom_NN_Model(input_dim=X_train.shape[1], hidden_dims=[128, 64], output_dim=1, do_rate=dp).to(device)  # Create model instance and move to device

# Define Mean Squared Error loss function
loss_fn = torch.nn.MSELoss(reduction='mean')
# Set learning rate and optimizer
lr = 0.002
weight_decay = 0.0001  # Weight decay for regularization

# Adam optimizer with weight decay for regularization
optimizer = torch.optim.Adam(model.parameters(), lr, weight_decay=weight_decay)  

# Early Stopping Parameter
patience = 20
best_val_loss = np.inf
epochs_no_improve = 0
n_epochs = 1000
loss_history = []
val_loss_history = []

for epoch in range(n_epochs):
    model.train()                           # Set model to training mode
    batch_losses = []
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()               # Reset gradients
        y_pred = model(X_batch)             # Forward pass
        loss = loss_fn(y_pred, y_batch)     # Compute MSE loss
        loss.backward()                     # Backpropagation
        optimizer.step()                    # Update weights
        batch_losses.append(loss.item())   
    loss_history.append(loss.item())    # Save loss value

    # calculate validation loss
    model.eval()                            # Set model to evaluation mode
    with torch.no_grad():
        y_val_pred = model(X_val_tensor)           # Forward pass on validation set
        val_loss = loss_fn(y_val_pred, y_val_tensor)  # Compute MSE loss on validation set
        val_loss_history.append(val_loss.item())
        
    # Early stopping check
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        best_model_state = model.state_dict()
        print(f"Epoch {epoch+1}/{n_epochs}, Train Loss: {np.mean(batch_losses):.4f}, Val Loss: {val_loss.item():.4f}, Best Val Loss: {best_val_loss:.4f}")
        
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            model.load_state_dict(best_model_state)
            break
        
    # Update plot every 100 epochs
plt.figure(figsize=(8, 4))
plt.plot(loss_history, label='Train Loss', color='tab:blue')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.title('Training Loss over Epochs')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
#print(loss_history)

In [None]:
model1 = Cusom_NN_Model(input_dim=X_train.shape[1], hidden_dims=[128, 64], output_dim=1, do_rate=0.27).to(device)  # Create model instance and move to device

tr_model = train_model(model1, X_train_tensor, y_train_tensor, X_val_tensor, y_val_tensor, batch_size=64, n_epochs=1000, lr=0.002, weight_decay=0.0001, patience=20)

In [None]:
# inference with multiple forward passes
# keep the model in training mode to keep dropout active
model.train()
model.to('cpu')  # Ensure the model is on the correct device
X_tr = X_train_tensor.to('cpu')  # Ensure the input data is on the correct device
X_te = X_test_tensor.to('cpu')  # Ensure the test data is on the correct device
# Number of stochastic forward passes for MC Dropout
n_samples = 250

# Make multiple stochastic predictions (MC Dropout) on the train data
y_train_pred = torch.stack([model(X_tr) for i in range(n_samples)]).detach().cpu().numpy() #list comprehension for the number of stochastic forward passes for MC Dropout

# Make multiple stochastic predictions (MC Dropout) on the test data
y_test_pred = torch.stack([model(X_te) for i in range(n_samples)]).detach().cpu().numpy()

In [None]:
l= 1  # length scale for uncertainty estimation
N= len(X_train_tensor)  # Number of training samples

tau = (1- dp)*l**2 / (2*weight_decay*N)  # Calculate tau for uncertainty estimation
print(f"tau: {tau:.4f}")

In [None]:
#! handling the train data
# Calculate the mean and standard deviation of the predictions on the train data
y_train_pred_mean = y_train_pred.mean(axis = 0)
y_train_Pred_std = y_train_pred.std(axis = 0)

# Calculate R² score on the train data
r2_train = r2_score(y_train, y_train_pred_mean)
print(f"R² on Train Data: {r2_train:.3f}")

#! handling the test data
# Select a random subset of test data for visualization
num_points = 100
np.random.seed(42)

random_indices = np.random.choice(len(X_te), num_points, replace=False)
random_indices.sort()

# Calculate the mean and standard deviation of the predictions on the test data
y_test_pred_mean = y_test_pred.mean(axis = 0)
y_test_pred_std = y_test_pred.std(axis = 0)
# Adjust standard deviation for uncertainty estimation
#y_test_pred_std = y_test_pred_std + (1/tau)

# Calculate R² score for the test data
r2_test = r2_score(y_te.cpu(), y_test_pred_mean)
print(f"R² on Test Data: {r2_test:.3f}")

# calculate NLL for the test data
ll = (torch.logsumexp(-0.5 * tau * (y_te.cpu() - y_test_pred_mean)**2,0) -np.log(n_samples) - 0.5 * np.log(2*np.pi) + 0.5 * np.log(tau))
test_ll = ll.mean()  # Mean Negative Log Likelihood
test_ll = test_ll.to('cpu')  # Move to CPU for printing
nll = -test_ll
print(f"Negative Log Likelihood (NLL) on Test Data: {test_ll:.3f}")


# Assign descriptive variable names for MC Dropout mean and standard deviation
mc_mean = y_test_pred_mean.flatten()  # Predicted mean for each test point
mc_std = y_test_pred_std.flatten()    # Predicted standard deviation for uncertainty

# Define the confidence interval bounds (95% CI ≈ mean ± 2*std)
mc_lower_bound = mc_mean[random_indices] - 2 * mc_std[random_indices]
mc_upper_bound = mc_mean[random_indices] + 2 * mc_std[random_indices]

# Check whether each true value lies within the 95% confidence interval
# If yes, the point will be green; if not, red
in_interval = (y_test[random_indices] >= mc_lower_bound) & (y_test[random_indices] <= mc_upper_bound)
colors = ['tab:green' if inside else 'tab:red' for inside in in_interval]

# Calculate coverage (percentage of true values within the CI)
coverage = np.mean(in_interval) * 100  # in percentage

# Print the coverage value
print(f"Coverage: {coverage:.2f}%")

# Create the plot
plt.figure(figsize=(12, 5))
x_axis = np.arange(num_points)  # Create an index axis for plotting

# Plot the predicted mean
plt.plot(x_axis, mc_mean[random_indices], label="Prediction (mean)", color='tab:blue')

# Plot the confidence interval as a shaded region
plt.fill_between(x_axis, mc_lower_bound, mc_upper_bound, alpha=0.4,
                 color='tab:blue', label='95% Confidence Interval')

# Scatter plot of true values with color-coded points based on interval inclusion
plt.scatter(x_axis, y_te[random_indices].cpu().flatten(), label="True Values", c=colors, s=25, zorder=3)

# Final plot settings
plt.title("MC Dropout Prediction with Uncertainty")
plt.xlabel("Test Point Index")
plt.ylabel("x_Einzug [mm]")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()