# Multi-Layer Perceptron Classification Model

This notebook demonstrates how to load data, preprocess it, define an MLP model, train the model, and evaluate its performance. The data is assumed to be in CSV format and stored in a directory.

## Setup

First, we need to install the necessary libraries. Run the following cell to install them.

In [None]:
%pip install torch torchvision torchaudio
%pip install pandas scikit-learn
%pip install wandb onnx -Uq
%pip install joblib



## Import Libraries and seed
Import the necessary libraries for data processing, model building, training, and evaluation. Adding a seed ensures reproducibility by making sure that the random number generation is consistent across different runs.

In [None]:
import os
import random

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import joblib
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from torch.utils.data import DataLoader, TensorDataset

import wandb

def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)


Using device: cpu


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
wandb.login()
#94b4debef3cc9601df4d91995649548f8ab3a097

[34m[1mwandb[0m: Currently logged in as: [33mdanimp94[0m ([33mdanimp94-university-carlos-iii-of-madrid[0m). Use [1m`wandb login --relogin`[0m to force relogin


True

## Load Data from Github Repository


In [None]:
## Remove PIC-PAPER-01 folder:
!rm -rf PIC-PAPER-01

# # Download Github Repo (Private) https://stackoverflow.com/questions/74532852/clone-github-repo-with-fine-grained-token/78280453#78280453
# !git clone --no-checkout https://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@github.com/danimp94/PIC-PAPER-01.git

# # To clone data folder only:
# %cd PIC-PAPER-01 # Navigate to the repository directory
# !git sparse-checkout init --cone # Initialize sparse-checkout
# !git sparse-checkout set data # Set the sparse-checkout to include only the data/ folder
# !git checkout # Checkout the specified folder

In [None]:
def load_data_from_directory(input_path):
    data_frames = []
    for file in os.listdir(input_path):
        if file.endswith('.csv'):
            df = pd.read_csv(os.path.join(input_path, file), delimiter=';', header=0)
            data_frames.append(df)
    data = pd.concat(data_frames, ignore_index=True)

    print(data)
    print(data.shape)

    return data

## Preprocessing Data
Define a function to preprocess the data. This includes encoding categorical labels and standardizing the features.

In [None]:
def calculate_averages_and_dispersion(data, data_percentage):
    df = data
    results = []
    for (sample, freq), group in df.groupby(['Sample', 'Frequency (GHz)']):
        window_size = max(1, int(len(group) * data_percentage / 100))
        # print(f"Processing sample: {sample}, frequency: {freq} with window size: {window_size}")
        for start in range(0, len(group), window_size):
            window_data = group.iloc[start:start + window_size]
            mean_values = window_data[['LG (mV)', 'HG (mV)']].mean()
            std_deviation_values = window_data[['LG (mV)', 'HG (mV)']].std()
            results.append({
                'Frequency (GHz)': freq,
                'LG (mV) mean': mean_values['LG (mV)'],
                'HG (mV) mean': mean_values['HG (mV)'],
                'LG (mV) std deviation': std_deviation_values['LG (mV)'],
                'HG (mV) std deviation': std_deviation_values['HG (mV)'],
                'Thickness (mm)': window_data['Thickness (mm)'].iloc[0], # iloc[0]
                'Sample': sample,
            })
    results_df = pd.DataFrame(results)
    # results_df.to_csv(output_file, sep=';', index=False)
    # print(f"Processed {input_file} and saved to {output_file}")
    print(results_df)
    return results_df

In [None]:
def preprocess_data(data, data_percentage):
    # Windowing the data
    data = calculate_averages_and_dispersion(data, data_percentage)
    print(data.shape)

    # Assuming the last column is the target
    X = data.iloc[:, :-1].values
    y = data.iloc[:, -1].values

    # Encode the target variable if it's categorical
    if y.dtype == 'object':
        le = LabelEncoder()
        y = le.fit_transform(y)

    # le is the fitted LabelEncoder
    joblib.dump(le, 'label_encoder.pkl')

    # Get the original labels and their encoded values
    original_labels = le.classes_
    encoded_values = le.transform(original_labels)

    # Create a DataFrame to display the mapping
    label_mapping_df = pd.DataFrame({
        'Original Label': original_labels,
        'Encoded Value': encoded_values
    })

    # Display the DataFrame
    print(label_mapping_df)

    # Standardize the features
    # print('prestandarization: ',X)
    # scaler = StandardScaler()
    # X = scaler.fit_transform(X)
    # print('post-std: ', X)

    # # Convert to PyTorch tensors
    # X = torch.tensor(X, dtype=torch.float32)
    # y = torch.tensor(y, dtype=torch.long)
    # print(X.shape)
    # print(y.shape)


    return X, y

In [None]:
input_path = '/content/drive/MyDrive/PhD/Colab Notebooks/training_data/'
data = load_data_from_directory(input_path)

# Load and preprocess data
X, y = preprocess_data(data, data_percentage=3.7) # 1s window size


# print(le.classes_)

        Sample  Frequency (GHz)     LG (mV)    HG (mV)  Thickness (mm)
0           A1            100.0   -7.080942  -0.854611             0.2
1           A1            100.0   67.024785   0.244141             0.2
2           A1            100.0  124.893178  -1.098776             0.2
3           A1            100.0   91.075571   0.000000             0.2
4           A1            100.0   48.956174   0.122094             0.2
...        ...              ...         ...        ...             ...
2737958    REF            600.0    0.366256  16.237333             0.0
2737959    REF            600.0    0.000000  -7.080942             0.0
2737960    REF            600.0   -0.244170  15.260652             0.0
2737961    REF            600.0    0.366256  20.021975             0.0
2737962    REF            600.0    0.122085  13.185203             0.0

[2737963 rows x 5 columns]
(2737963, 5)
       Frequency (GHz)  LG (mV) mean  HG (mV) mean  LG (mV) std deviation  \
0                100.0     54.

## Config

In [None]:
config = dict(
    epochs = 500,
    seed = 40,
    classes = 17,
    learning_rate = 0.001,
    dataset = "experiment_1",
    architecture = "MLP",
    hidden_dim = 64,
    batch_size = 32
)

print(config)

{'epochs': 500, 'seed': 40, 'classes': 17, 'learning_rate': 0.001, 'dataset': 'experiment_1', 'architecture': 'MLP', 'hidden_dim': 64, 'batch_size': 32}


## NN Model

In [None]:
class Multiclass(nn.Module):
    ''' Multiclass classification
        input_size: number of features
        hidden_size: number of neurons in the hidden layer
        num_classes: number of classes to classify
    '''

    def __init__(self, input_size, hidden_size, num_classes):
        super(Multiclass, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)  # First fully connected layer
        self.relu = nn.ReLU() # Activation layer (ReLU)
        self.fc_out = nn.Linear(hidden_size, num_classes) # Last fully connected layer

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc_out(out)
        return out


## Run Training

Do not use it if just want to run inference

In [None]:
def train_model(X_train, y_train, X_test, y_test, config):
    # Set random seed
    set_seed(config['seed'])

    # Initialize the model
    input_dim = X.shape[-1]
    hidden_dim = config['hidden_dim']
    output_dim = config['classes']
    print(input_dim, hidden_dim, output_dim)
    model = Multiclass(input_dim, hidden_dim, output_dim).to(device)

    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config['learning_rate'])

    # Convert data to PyTorch tensors
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.long)

    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_test = torch.tensor(y_test, dtype=torch.long)

    # Create TensorDataset and DataLoader for training data
    train_dataset = TensorDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)  # Shuffle for better training

    # Create TensorDataset and DataLoader for test data
    test_dataset = TensorDataset(X_test, y_test)
    test_loader = DataLoader(test_dataset, batch_size=config['batch_size'], shuffle=False)

    # Training loop
    for epoch in range(config['epochs']):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)

            # Forward pass
            outputs = model(data)
            loss = criterion(outputs, target)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            # Update weights
            optimizer.step()

        # Evaluate on test data
        model.eval()
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                outputs = model(data)
                test_loss += criterion(outputs, target).item()  # Sum up batch loss
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == target).sum().item()

        test_loss /= len(test_loader.dataset)
        accuracy = 100. * correct / len(test_loader.dataset)

        print(f'Epoch: {epoch+1}, Test Loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%')

    return model


In [None]:
# Split trainig data
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8, shuffle=True)

model = train_model(X_train, y_train, X_test, y_test, config)

6 64 17
y_train:  [ 8  8  4 ... 10 10  8]
y_train:  tensor([ 8,  8,  4,  ..., 10, 10,  8])
y_shape:  torch.Size([19420])
X_train:  tensor([[ 1.6000e+02,  3.0245e+01,  8.0642e-02,  3.2782e+00,  7.2364e-01,
          1.0000e-01],
        [ 3.1000e+02,  1.1193e+00,  5.0588e+02,  8.0973e-01,  4.0931e+01,
          1.0000e-01],
        [ 1.4000e+02,  6.9175e+01, -9.6320e-02,  1.2824e+01,  8.2268e-01,
          1.0000e-01],
        ...,
        [ 1.7000e+02,  1.9603e+01,  1.0464e-01,  1.5905e+00,  7.3450e-01,
          3.6000e-01],
        [ 1.9000e+02,  1.0697e+01,  2.1643e-01,  2.3917e+00,  7.5252e-01,
          3.6000e-01],
        [ 1.5000e+02,  6.5257e+01,  1.5205e-01,  7.2693e+00,  7.7181e-01,
          1.0000e-01]])
X_shape:  torch.Size([19420, 6])
Epoch: 1, Test Loss: 0.0969, Accuracy: 12.31%
Epoch: 2, Test Loss: 0.0934, Accuracy: 12.21%
Epoch: 3, Test Loss: 0.0901, Accuracy: 13.10%
Epoch: 4, Test Loss: 0.0802, Accuracy: 16.64%
Epoch: 5, Test Loss: 0.0869, Accuracy: 15.47%
Epoch: 6, 

## Evaluation

## Save the model

In [None]:
# Save the model
torch.save(model.state_dict(), 'lstm_model.pth')

# # Save the model as onnx
# torch.onnx.export(model, X_train, 'lstm_model.onnx')

In [None]:
def preprocess_test_data(data, data_percentage):
    # Windowing the data
    data = calculate_averages_and_dispersion(data, data_percentage)

    # Assuming the last column is the target
    X = data.iloc[:, :-1].values
    y = data.iloc[:, -1].values

    # Encode labeling of target data using presaved pkl file
    # Load label encoder
    label_encoder_path = '/content/drive/MyDrive/PhD/Colab Notebooks/label_encoder.pkl'
    le = joblib.load(label_encoder_path)
    y = le.transform(y)
    print('y: ', y)    # Encode the target variable if it's categorical

    # # Standardize the features
    # scaler = StandardScaler()
    # X = scaler.fit_transform(X)

    return X, y

## Load New Testing Data

In [None]:
# Load new data
input_data_test = '/content/drive/MyDrive/PhD/Colab Notebooks/test_data/'
print(os.listdir(input_data_test))

data_test = load_data_from_directory(input_data_test)

# Load and preprocess data
X_test, y_test = preprocess_test_data(data_test, data_percentage=8.33) # 1s window size
#X_test, y_test = preprocess_test_data(data_test, data_percentage=100) # 1s window size

# Convert to PyTorch tensors
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

print(X_test.shape)
print(y_test.shape)


['H1_1 - Copy.csv', '.ipynb_checkpoints']
      Sample  Frequency (GHz)    LG (mV)    HG (mV)  Thickness (mm)
0         H1              100  69.100232   0.244141            0.07
1         H1              100  53.229153   0.366211            0.07
2         H1              100  62.019289   1.587129            0.07
3         H1              100  67.268954  -0.244141            0.07
4         H1              100  75.326578   1.220798            0.07
...      ...              ...        ...        ...             ...
63947     H1              600   0.244170  24.417043            0.07
63948     H1              600  -0.732511  12.086436            0.07
63949     H1              600   0.122085  29.300451            0.07
63950     H1              600  -0.244170   1.220852            0.07
63951     H1              600  -0.610426  33.573434            0.07

[63952 rows x 5 columns]
(63952, 5)
     Frequency (GHz)  LG (mV) mean  HG (mV) mean  LG (mV) std deviation  \
0                100     66.10

## Run inference

In [None]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

# Initialize the model
input_dim = X_test.shape[-1]
hidden_dim = config['hidden_dim']  # Replace with the hidden dimension used during training
output_dim = config['classes']  # Replace with the number of output classes used during training
model = Multiclass(input_dim, hidden_dim, output_dim).to(device)

# Load label encoder
label_encoder_path = '/content/drive/MyDrive/PhD/Colab Notebooks/label_encoder.pkl'
le = joblib.load(label_encoder_path)

# Load pretrained model
model_path = '/content/drive/MyDrive/PhD/Colab Notebooks/MLP_model.pth'
model.load_state_dict(torch.load(model_path))
model.eval()

# Run inferences
with torch.no_grad():
    X_test = X_test.to(device)
    outputs = model(X_test)
    _, predicted = torch.max(outputs.data, 1) #

# Convert predicted labels to original labels
# Decode the predicted labels
# Now perform the inverse transform

print(predicted)

predicted_labels = le.inverse_transform(predicted.cpu().numpy())

# Print the results
print("Predicted labels:", predicted_labels)
print("Classes in label encoder:", le.classes_)
print("Number of classes:", len(le.classes_))

# Calculate percentage of correct predictions
correct_predictions = (predicted == y_test).sum().item()
total_samples = len(y_test)
accuracy = correct_predictions / total_samples * 100
print(f"Accuracy: {accuracy:.2f}%")

# Calculate number of predictions that are class = 9
num_class_9_predictions = (predicted == 9).sum().item()
print(f"Number of predictions that are class 9: {num_class_9_predictions}")


# Calculate the classification report
print(classification_report(le.inverse_transform(y_test.cpu().numpy()), predicted_labels))

# Confusion matrix
conf_matrix = confusion_matrix(le.inverse_transform(y_test.cpu().numpy()), predicted_labels, labels=le.classes_)
print(conf_matrix)

Using device: cpu
tensor([ 4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4, 11,  4,  4,
         4,  4,  4,  4,  4,  4,  4,  4, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
        15, 15, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,  8,  8,  8,
         8,  8,  8,  8,  8, 15,  8,  8,  8,  8,  0, 15, 15, 15, 15, 15, 15, 15,
        15, 15, 15, 15, 15,  8, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
        15,  6, 15,  6,  6, 15,  6,  6, 15, 15, 15,  6,  6,  6, 15, 15, 15, 15,
        15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,  9, 15, 15, 15,  9, 15, 15,
        15, 15, 15,  9, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,  9, 15,
        15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,  9, 15, 15, 15, 15, 15, 15,
        15, 15, 15, 15, 15, 15, 15, 15, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11,
        11,  0, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,  0,  6,  6,  6,
         6,  6,  4,  4,  4, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
        15, 15, 15, 15

  model.load_state_dict(torch.load(model_path))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


| Original Label | Encoded Value |
|----------------|---------------|
| A1             | 0             |
| B1             | 1             |
| C1             | 2             |
| D1             | 3             |
| E1             | 4             |
| E2             | 5             |
| E3             | 6             |
| F1             | 7             |
| G1             | 8             |
| H1             | 9             |
| I1             | 10            |
| J1             | 11            |
| K1             | 12            |
| L1             | 13            |
| M1             | 14            |
| N1             | 15            |
| REF            | 16            |