# Update log
2024/07/29
- model accuracy increased to over 90% with more data points as feature
- should incorporate optuna for visualisation

---

In [166]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import numpy as np
import json

In [168]:

spk_data = "D:\\code\\uom_explore\\model_input\\3_features.csv"
spk_pca_data = "D:\\code\\uom_explore\\model_input\\pca_df.csv"

hkr_wsl_data = "/home/hk-wsl/code/uom_explore/model_input/feature_matrix.csv"
hkr_pca_data = "/home/hk-wsl/code/uom_explore/model_input/feature_pca.csv"

spk_json = "/home/gavinlouuu/coding/uom_explore/data_science/parameter.json"
hkr_wsl_json = "/home/hk-wsl/code/uom_explore/data_science/parameter.json"

data_path = spk_pca_data
param_path = hkr_wsl_json

with open('parameter.json','r') as file:
    params = json.load(file)

df = pd.read_csv(data_path)
# drop experiment_id column
df.drop('experiment_id', axis=1, inplace=True)
print(type(df))
df.head()

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,PC1,PC2,PC3,channel_id
0,-9.297113,-36.028848,-0.609223,0
1,-7.935828,-33.246099,-1.113492,1
2,-6.812983,-29.732352,-1.708627,2
3,-5.757883,-26.591866,-1.720893,3
4,-17.882516,-22.280264,0.086122,4


In [162]:
# check data for NaN
# df.isnull().sum()
df = df.dropna()
df.isnull().sum()
df.to_csv('df.csv', index=False)

In [169]:
# remove channel_id from the list

# Hyperparameters
# Extract parameters from the JSON object
hidden_size = params['hidden_size']
ground_truth = params['ground_truth']
num_epochs = params['num_epochs']
batch_size = params['batch_size']
learning_rate = params['learning_rate']
momentum_value = params['momentum_value']
dropout_rate = params['dropout']

features_raw = params['PCA_features']
# # Convert feature list to strings
features = list(map(str, features_raw))

# # Get header list and remove 'channel_id'
# header = df.columns.values.tolist()
# header.remove('channel_id')
# # Select features and remove ground_truth from feature list
# features = [col for col in header if col != ground_truth]



X = df.drop(ground_truth, axis=1)
# select features to be used
X = X[features]

input_size = len(X.columns)  # removing the ground truth from the number of columns counted
num_classes = df[ground_truth].nunique()

y = df[ground_truth]

# Split into training, validation, and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)  # This makes 60%, 20%, 20%

# Initialize the StandardScaler
scaler = StandardScaler()
# scaler = MinMaxScaler(feature_range=(0,255)) # 

# Fit the scaler to the training data and transform it
X_train_scaled = scaler.fit_transform(X_train)

# Apply the same transformation to validation and test sets
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

# Convert arrays to tensors
X_train_scaled = torch.tensor(X_train_scaled, dtype=torch.float32)#.unsqueeze(1)  # Shape: [batch_size, 1, num_features]
y_train = torch.tensor(y_train.to_numpy(), dtype=torch.long)  # Convert to NumPy array first
X_val_scaled = torch.tensor(X_val_scaled, dtype=torch.float32)#.unsqueeze(1)  # Shape: [batch_size, 1, num_features]
y_val = torch.tensor(y_val.to_numpy(), dtype=torch.long)  # Convert to NumPy array first
X_test_scaled = torch.tensor(X_test_scaled, dtype=torch.float32)#.unsqueeze(1)  # Shape: [batch_size, 1, num_features]
y_test = torch.tensor(y_test.to_numpy(), dtype=torch.long)  # Convert to NumPy array first

# Create datasets
train_dataset = TensorDataset(X_train_scaled, y_train)
val_dataset = TensorDataset(X_val_scaled, y_val)
test_dataset = TensorDataset(X_test_scaled, y_test)

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# MLP

In [170]:
# # Define the MLP model
    
class MLPClassifier(nn.Module):
    def __init__(self, input_size, hidden_sizes, num_classes, dropout_prob):
        super(MLPClassifier, self).__init__()
        self.layers = nn.ModuleList()
        self.batch_norms = nn.ModuleList()
        self.dropout_prob = dropout_prob
        
        # Input layer
        self.layers.append(nn.Linear(input_size, hidden_sizes[0]))
        self.batch_norms.append(nn.BatchNorm1d(hidden_sizes[0]))

        # Hidden layers
        for i in range(len(hidden_sizes) - 1):
            self.layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i + 1]))
            self.batch_norms.append(nn.BatchNorm1d(hidden_sizes[i+1]))
        
        # Output layer
        self.layers.append(nn.Linear(hidden_sizes[-1], num_classes))
        
        # Softmax activation for the output layer
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        for i in range(len(self.layers) - 1):
            x = torch.relu(self.batch_norms[i](self.layers[i](x)))
            x = F.dropout(x, p=self.dropout_prob, training=self.training)
        x = self.layers[-1](x)
        x = self.softmax(x)
        return x

# Initialize the model
model = MLPClassifier(input_size, hidden_size, num_classes, dropout_rate)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.2)

# Function to predict the class of new data
def predict(model, data):
    model.eval()
    with torch.no_grad():
        output = model(data)
        _, predicted_class = torch.max(output, dim=1)
    return predicted_class

# Function to compute the accuracy
def calculate_accuracy(y_pred, y_true):
    _, predicted = torch.max(y_pred, dim=1)  # Get the index of the max log-probability
    correct = (predicted == y_true).float().sum()
    return correct / y_true.shape[0]

# Training and evaluation loop
def train_and_evaluate(model, criterion, optimizer, train_loader, val_loader, epochs=10):
    for epoch in range(epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
        
        # scheduler.step()
        model.eval()
        val_loss = 0
        val_accuracy = 0
        with torch.no_grad():
            for X_val, y_val in val_loader:
                y_val_pred = model(X_val)
                val_loss += criterion(y_val_pred, y_val).item()
                val_accuracy += calculate_accuracy(y_val_pred, y_val)
                _, predicted_classes = torch.max(y_val_pred, dim=1)
                # print(f'Predicted: {predicted_classes}, Actual: {y_val}') # actual predicted values
        
        
        # Average the loss and accuracy over all validation batches
        val_loss /= len(val_loader)
        val_accuracy /= len(val_loader)
        
        print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

# Assuming the rest of your setup (model initialization, data loaders, etc.) is already done
# Now you would just call train_and_evaluate
train_and_evaluate(model, criterion, optimizer, train_loader, val_loader, epochs=num_epochs)

Epoch 1, Validation Loss: 1.6084, Validation Accuracy: 0.3828
Epoch 2, Validation Loss: 1.6094, Validation Accuracy: 0.3203
Epoch 3, Validation Loss: 1.6061, Validation Accuracy: 0.3438
Epoch 4, Validation Loss: 1.6006, Validation Accuracy: 0.3672
Epoch 5, Validation Loss: 1.5901, Validation Accuracy: 0.4688
Epoch 6, Validation Loss: 1.5775, Validation Accuracy: 0.4453
Epoch 7, Validation Loss: 1.5661, Validation Accuracy: 0.5469
Epoch 8, Validation Loss: 1.5527, Validation Accuracy: 0.6328
Epoch 9, Validation Loss: 1.5388, Validation Accuracy: 0.6328
Epoch 10, Validation Loss: 1.5274, Validation Accuracy: 0.6406
Epoch 11, Validation Loss: 1.5093, Validation Accuracy: 0.6484
Epoch 12, Validation Loss: 1.4973, Validation Accuracy: 0.6484
Epoch 13, Validation Loss: 1.4825, Validation Accuracy: 0.6484
Epoch 14, Validation Loss: 1.4717, Validation Accuracy: 0.6484
Epoch 15, Validation Loss: 1.4570, Validation Accuracy: 0.6562
Epoch 16, Validation Loss: 1.4474, Validation Accuracy: 0.6562
E

# Optuna MLP

In [171]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import optuna

# Define the MLP model
class MLPClassifier(nn.Module):
    def __init__(self, input_size, hidden_sizes, num_classes, dropout_prob):
        super(MLPClassifier, self).__init__()
        self.layers = nn.ModuleList()
        self.batch_norms = nn.ModuleList()
        self.dropout_prob = dropout_prob
        
        # Input layer
        self.layers.append(nn.Linear(input_size, hidden_sizes[0]))
        self.batch_norms.append(nn.BatchNorm1d(hidden_sizes[0]))

        # Hidden layers
        for i in range(len(hidden_sizes) - 1):
            self.layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i + 1]))
            self.batch_norms.append(nn.BatchNorm1d(hidden_sizes[i + 1]))
        
        # Output layer
        self.layers.append(nn.Linear(hidden_sizes[-1], num_classes))
        
        # Softmax activation for the output layer
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        for i in range(len(self.layers) - 1):
            x = torch.relu(self.batch_norms[i](self.layers[i](x)))
            x = F.dropout(x, p=self.dropout_prob, training=self.training)
        x = self.layers[-1](x)
        x = self.softmax(x)
        return x

# Define the objective function for Optuna
def objective(trial):
    # Hyperparameter search space
    hidden_sizes = [trial.suggest_int(f'hidden_size_{i}', 32, 256) for i in range(trial.suggest_int('num_layers', 1, 3))]
    dropout_prob = trial.suggest_float('dropout_prob', 0.1, 0.5)
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-4, 1e-2)

    # Initialize the model
    model = MLPClassifier(input_size, hidden_sizes, num_classes, dropout_prob)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Training and evaluation loop
    for epoch in range(num_epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
        
        model.eval()
        val_loss = 0
        val_accuracy = 0
        with torch.no_grad():
            for X_val, y_val in val_loader:
                y_val_pred = model(X_val)
                val_loss += criterion(y_val_pred, y_val).item()
                val_accuracy += calculate_accuracy(y_val_pred, y_val)
        
        # Average the loss and accuracy over all validation batches
        val_loss /= len(val_loader)
        val_accuracy /= len(val_loader)
    
    return val_accuracy

# Run the Optuna optimization
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)

# Print the best hyperparameters
print('Best hyperparameters: ', study.best_params)

# Use the best hyperparameters to train the final model
best_params = study.best_params
hidden_sizes = [best_params[f'hidden_size_{i}'] for i in range(best_params['num_layers'])]
dropout_prob = best_params['dropout_prob']
learning_rate = best_params['learning_rate']

model = MLPClassifier(input_size, hidden_sizes, num_classes, dropout_prob)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the final model with the best hyperparameters
trained_model = train_and_evaluate(model, criterion, optimizer, train_loader, val_loader, epochs=num_epochs)


[I 2024-07-29 18:22:12,927] A new study created in memory with name: no-name-1dda0dba-a664-4f1c-a574-6c95f08156a0
  learning_rate = trial.suggest_loguniform('learning_rate', 1e-4, 1e-2)
[I 2024-07-29 18:22:17,071] Trial 0 finished with value: 0.90625 and parameters: {'num_layers': 3, 'hidden_size_0': 152, 'hidden_size_1': 67, 'hidden_size_2': 107, 'dropout_prob': 0.1498026405266693, 'learning_rate': 0.00023546533760684894}. Best is trial 0 with value: 0.90625.
[I 2024-07-29 18:22:19,561] Trial 1 finished with value: 0.9296875 and parameters: {'num_layers': 1, 'hidden_size_0': 223, 'dropout_prob': 0.17811266304731516, 'learning_rate': 0.0014861909138276365}. Best is trial 1 with value: 0.9296875.
[I 2024-07-29 18:22:23,100] Trial 2 finished with value: 0.890625 and parameters: {'num_layers': 2, 'hidden_size_0': 163, 'hidden_size_1': 89, 'dropout_prob': 0.388463092382339, 'learning_rate': 0.0002922865955114585}. Best is trial 1 with value: 0.9296875.
[I 2024-07-29 18:22:25,524] Trial 3 f

Best hyperparameters:  {'num_layers': 1, 'hidden_size_0': 218, 'dropout_prob': 0.10160024659180605, 'learning_rate': 0.0017468635757977298}
Epoch 1, Validation Loss: 1.4028, Validation Accuracy: 0.7266
Epoch 2, Validation Loss: 1.2402, Validation Accuracy: 0.7344
Epoch 3, Validation Loss: 1.1865, Validation Accuracy: 0.7578
Epoch 4, Validation Loss: 1.1643, Validation Accuracy: 0.8203
Epoch 5, Validation Loss: 1.1506, Validation Accuracy: 0.7969
Epoch 6, Validation Loss: 1.1334, Validation Accuracy: 0.8125
Epoch 7, Validation Loss: 1.1207, Validation Accuracy: 0.8125
Epoch 8, Validation Loss: 1.1126, Validation Accuracy: 0.8203
Epoch 9, Validation Loss: 1.1091, Validation Accuracy: 0.8281
Epoch 10, Validation Loss: 1.1043, Validation Accuracy: 0.8672
Epoch 11, Validation Loss: 1.0985, Validation Accuracy: 0.8672
Epoch 12, Validation Loss: 1.1023, Validation Accuracy: 0.8750
Epoch 13, Validation Loss: 1.0882, Validation Accuracy: 0.8750
Epoch 14, Validation Loss: 1.0817, Validation Accu

# 1D CNN Model 

In [165]:
# # Define the device
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# in_channels = params['in_channels']
# out_channels = params['out_channels']
# kernel_sizes = params['kernel_sizes']

# class CNN1DClassifier(nn.Module):
#     def __init__(self, in_channels, out_channels, kernel_sizes, num_classes, dropout_rate=0.5):
#         super(CNN1DClassifier, self).__init__()
        
#         assert len(out_channels) == len(kernel_sizes), "The length of out_channels and kernel_sizes must be the same"
        
#         self.convs = nn.ModuleList()
#         self.bns = nn.ModuleList()  # Adding batch normalization if needed
        
#         current_in_channels = in_channels
        
#         for out_channel, kernel_size in zip(out_channels, kernel_sizes):
#             self.convs.append(nn.Conv1d(current_in_channels, out_channel, kernel_size=kernel_size, stride=1, padding=kernel_size // 2))
#             self.bns.append(nn.BatchNorm1d(out_channel))  # Optional: Add batch normalization
#             current_in_channels = out_channel
        
#         # Calculate the size after all convolutional and pooling layers
#         conv_output_size = input_size
#         for kernel_size in kernel_sizes:
#             conv_output_size = (conv_output_size + 2 * (kernel_size // 2) - (kernel_size - 1) - 1) // 1 + 1
#             conv_output_size = conv_output_size // 2  # After pooling
        
#         self.pool = nn.MaxPool1d(kernel_size=2, stride=2, padding=0)
#         self.fc1 = nn.Linear(out_channels[-1] * conv_output_size, 128)
#         self.dropout = nn.Dropout(dropout_rate)
#         self.fc2 = nn.Linear(128, num_classes)
#         self.softmax = nn.Softmax(dim=1)

#     def forward(self, x):
#         # print(f'Input shape: {x.shape}')
#         for conv, bn in zip(self.convs, self.bns):
#             x = self.pool(torch.relu(bn(conv(x))))
#             # print(f'After conv and pool: {x.shape}')
#         x = x.view(x.size(0), -1)  # Flatten the tensor
#         # print(f'After flatten: {x.shape}')
#         x = torch.relu(self.fc1(x))
#         # print(f'After fc1: {x.shape}')
#         x = self.dropout(x)
#         x = self.fc2(x)
#         # print(f'After fc2: {x.shape}')
#         x = self.softmax(x)
#         return x

# # Train the 1D CNN model

# def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=25, device='cpu'):
#     model = model.to(device)
    
#     for epoch in range(num_epochs):
#         # Training phase
#         model.train()
#         running_loss = 0.0
#         running_corrects = 0

#         for inputs, labels in train_loader:
#             inputs = inputs.to(device)
#             labels = labels.to(device)

#             optimizer.zero_grad()

#             outputs = model(inputs)
#             _, preds = torch.max(outputs, 1)
#             loss = criterion(outputs, labels)

#             loss.backward()
#             optimizer.step()

#             running_loss += loss.item() * inputs.size(0)
#             running_corrects += torch.sum(preds == labels.data)

#         epoch_loss = running_loss / len(train_loader.dataset)
#         epoch_acc = running_corrects.double() / len(train_loader.dataset)

#         print(f'Epoch {epoch}/{num_epochs - 1}')
#         print(f'Training Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

#         # Validation phase
#         model.eval()
#         val_loss = 0.0
#         val_corrects = 0

#         with torch.no_grad():
#             for inputs, labels in val_loader:
#                 inputs = inputs.to(device)
#                 labels = labels.to(device)

#                 outputs = model(inputs)
#                 _, preds = torch.max(outputs, 1)
#                 loss = criterion(outputs, labels)

#                 val_loss += loss.item() * inputs.size(0)
#                 val_corrects += torch.sum(preds == labels.data)

#         val_loss = val_loss / len(val_loader.dataset)
#         val_acc = val_corrects.double() / len(val_loader.dataset)

#         print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

#     return model

# # Initialize model, loss function, and optimizer
# model = CNN1DClassifier(in_channels, out_channels, kernel_sizes, num_classes)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# # Train the model
# trained_model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=num_epochs)

# # Evaluate the model on the test set
# model.eval()
# test_loss = 0.0
# test_corrects = 0

# with torch.no_grad():
#     for inputs, labels in test_loader:
#         inputs = inputs.to(device)
#         labels = labels.to(device)

#         outputs = model(inputs)
#         _, preds = torch.max(outputs, 1)
#         loss = criterion(outputs, labels)

#         test_loss += loss.item() * inputs.size(0)
#         test_corrects += torch.sum(preds == labels.data)

# test_loss = test_loss / len(test_loader.dataset)
# test_acc = test_corrects.double() / len(test_loader.dataset)

# print(f'Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
