In this exercise, we will learn:
    
    1. How to perform multi-class classification with a deep neural network

In [1]:
import numpy as np 
# glob is used for manipulating files
import pandas as pd
import glob
import scipy
import random

from sklearn.neural_network import MLPClassifier

In [2]:
# Finding all .csv files in the directory of this python script
files = glob.glob("\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\*.csv")

print("Files found: ")
print(files)

dataframes = []
# Caricamento del file CSV
for file in files:
    df = pd.read_csv(file) 
    dataframes.append(df)

# Concatenazione di tutti i DataFrame in uno solo
df_final = pd.concat(dataframes, ignore_index=True)

# Visualizzazione delle prime righe per verifica
print(df_final.head())


Files found: 
['\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\0.csv', '\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\1.csv', '\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\2.csv', '\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\3.csv', '\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\4.csv', '\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\5.csv', '\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\6.csv', '\\Users\\Mattia\\Desktop\\Smart Werables\\Project\\data_collection\\Pietro\\7.csv']
             Timestamp  Sensor1  Sensor2  label
0  2025-03-13 17:13:19      384      200      0
1  2025-03-13 17:13:19      381      194      0
2  2025-03-13 17:13:19      377      197      0
3  2025-03-13 17:13:20      382      197      0
4  2025-03-13 17:13:20      375      194      0


In [3]:
# Finding the number of classes (the number of classes is the number of files since we assume each file is for a different class)
# Numero di classi (etichettato nei file)
num_classes = df_final["label"].nunique()  # Conta le classi uniche
num_channels = df_final.shape[1] - 2  # Escludiamo timestamp e label

# Loading all the files into a list
data_per_class        = []
num_samples_per_class = []
num_windows_per_class = []

# Defining window length and step size
window_length     = 50
window_step_size  = 25

# Loading the files and finding the number of samples and windows for each class
for label in range(num_classes):
    #data = np.loadtxt(file, delimiter=",",  usecols=range(1, num_channels + 1))
    #data = np.loadtxt(file, delimiter=",", skiprows=1, usecols=range(1, num_channels + 1))
    # data = np.loadtxt(file, delimiter=",")
    # Selezioniamo solo i dati della classe corrente
    data = df_final[df_final["label"] == label].iloc[:, 1:-1].values  # Escludiamo timestamp e label
    data_per_class.append(data) 
    num_samples = data.shape[0]
    num_samples_per_class.append(num_samples)
    num_windows = (num_samples - window_length) // window_step_size + 1
    num_windows_per_class.append(num_windows)
print(f"Numero di classi: {num_classes}")
print(f"Numero di canali (sensori): {num_channels}")
print(f"Numero di finestre per classe: {num_windows_per_class}")

Numero di classi: 8
Numero di canali (sensori): 2
Numero di finestre per classe: [10, 3, 4, 11, 11, 11, 13, 5]


In [4]:
sliding_windows_per_class = []

# Creating the sliding windows
for class_id in range(0, num_classes):
    num_windows = num_windows_per_class[class_id]
    sliding_windows = np.zeros((num_windows, window_length, num_channels))
    for i in range(0, num_windows):
        sliding_windows[i, ...] = data_per_class[class_id][i*window_step_size: i*window_step_size + window_length]
    sliding_windows_per_class.append(sliding_windows)
    print(f"Finestre generate per classe: {[len(w) for w in sliding_windows_per_class]}")

Finestre generate per classe: [10]
Finestre generate per classe: [10, 3]
Finestre generate per classe: [10, 3, 4]
Finestre generate per classe: [10, 3, 4, 11]
Finestre generate per classe: [10, 3, 4, 11, 11]
Finestre generate per classe: [10, 3, 4, 11, 11, 11]
Finestre generate per classe: [10, 3, 4, 11, 11, 11, 13]
Finestre generate per classe: [10, 3, 4, 11, 11, 11, 13, 5]


In [5]:
# Remember that we don't need to extract features. 

# We now split the data into train, val and test sets
train_portion = 0.70
val_portion   = 0.15

num_windows_all_classes = np.sum(num_windows_per_class)

# Creating lists that will store the train, val and test portions of all classes
X_train = []
Y_train = []
X_val = []
Y_val = []
X_test = []
Y_test = []

for class_id in range(0, num_classes):
    num_windows = num_windows_per_class[class_id]
    num_windows_train = int(num_windows*train_portion)
    num_windows_val   = int(num_windows*val_portion)
    num_windows_test  = num_windows - num_windows_train - num_windows_val
    
    windows = sliding_windows_per_class[class_id]
    x_train, x_val, x_test = np.split(windows, [num_windows_train, num_windows_train + num_windows_val], axis = 0)
    y_train, y_val, y_test = np.ones(num_windows_train)*class_id, np.ones(num_windows_val)*class_id, np.ones(num_windows_test)*class_id
    
    X_train.append(x_train)
    Y_train.append(y_train)
    
    X_val.append(x_val)
    Y_val.append(y_val)
    
    X_test.append(x_test)
    Y_test.append(y_test)

In [6]:
# Concatenating into a single array
X_train = np.concatenate(X_train, axis = 0)
Y_train = np.concatenate(Y_train, axis = 0)

X_val = np.concatenate(X_val, axis = 0)
Y_val = np.concatenate(Y_val, axis = 0)

X_test = np.concatenate(X_test, axis = 0)
Y_test = np.concatenate(Y_test, axis = 0)


In [7]:
X_train.shape

(44, 50, 2)

In [8]:
# Data normalization. We obtain the mean and std on the training set
data_mean = np.mean(X_train, axis = 0)
data_std  = np.std(X_train, axis = 0)

# We normalize the sets using the mean and std found above
X_train = (X_train - data_mean)/data_std
X_val   = (X_val - data_mean)/data_std
X_test  = (X_test - data_mean)/data_std

# Now we shuffle the training data
indices = np.arange(0, X_train.shape[0])
random.shuffle(indices)
X_train = X_train[indices, ...]
Y_train = Y_train[indices, ...]

In [9]:
print("Shape of the X_train tensor:")
print(X_train.shape)

# Note that we have a 3D tensor
# The first dimension is the number of sliding windows
# The second dimension is the number of time steps in each sliding window
# The third dimension is the number of channels

# We will transform this 3D tensor into a 2D tensor (matrix) because, for the neural network, we need a 2D tensor (num_of_sliding_windows, num_of_features)
X_train_flat = np.reshape(X_train, (X_train.shape[0], X_train.shape[1]*X_train.shape[2]))
X_val_flat = np.reshape(X_val, (X_val.shape[0], X_val.shape[1]*X_val.shape[2]))
X_test_flat = np.reshape(X_test, (X_test.shape[0], X_test.shape[1]*X_test.shape[2]))

print("New shape of the X_train tensor:")
print(X_train_flat.shape)

Shape of the X_train tensor:
(44, 50, 2)
New shape of the X_train tensor:
(44, 100)


In [10]:
# We create a classifier. Let's suppose we opt for 3 hidden layers
# https://scikit-learn.org/stable/modules/generated/sklearn.neural_network.MLPClassifier.html
clf_mlp = MLPClassifier(hidden_layer_sizes=(128, 128, 128), max_iter=5000, learning_rate_init = 0.0001, activation = 'relu', solver = 'adam')
clf_mlp.fit(X_train_flat, Y_train)

val_accuracy = clf_mlp.score(X_val_flat, Y_val)*100
print(f"The validation accuracy for the MLP classifier: {val_accuracy:.2f}%")

The validation accuracy for the MLP classifier: 100.00%


In [32]:
%pip install torch torchvision torchaudio

Note: you may need to restart the kernel to use updated packages.


In [None]:

import torch
import torch.nn as nn

class CNN_LSTM(nn.Module):
    def __init__(self, cnn_filters, lstm_units, num_classes):
        super(CNN_LSTM, self).__init__()
        
        self.conv1 = nn.Conv2d(1, cnn_filters[0], kernel_size=(2,2), stride=(1,1), padding='same') # 1 input,
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=(2,1)) # Pooling try to reduce the feature dimensions, from 2 features select 1

        self.conv2 = nn.Conv2d(cnn_filters[0], cnn_filters[1], kernel_size=(2,2), stride=(1,1), padding='same')
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=(2,1))

        self.conv3 = nn.Conv2d(cnn_filters[1], cnn_filters[2], kernel_size=(2,2), stride=(1,1), padding='same')
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=(2,1))

        self.lstm_input_size = 6 * cnn_filters[2]
        self.lstm = nn.LSTM(self.lstm_input_size, lstm_units, batch_first=True)

        self.fc = nn.Linear(lstm_units, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        # input (32, 1, 50, 6)
        x = self.pool1(self.relu1(self.conv1(x)))
        # x (32, 16, 25, 6)
        x = self.pool2(self.relu2(self.conv2(x)))
        # x (32 ,32, 12, 6)
        x = self.pool3(self.relu3(self.conv3(x)))
        # x (32, 64, 6, 6)

        # Reshape for LSTM
        x = x.permute(0, 2, 1, 3)  # (batch, height, width, channels)
        # x (32, 6, 64, 6)
        x = x.contiguous().view(x.size(0), 6, -1)  # (batch, 6, 6 * cnn_filters[2])
        # x (32, 6, 384)
        _, (h_n, _) = self.lstm(x)  # Get the last hidden state from LSTM
        # h_n (1, 32, 64)
        h_n = h_n.squeeze(0)  # Remove the extra dimension from LSTM output
        # h_n (32, 64)
        x = self.fc(h_n)
        # x (32, 5)
        x = self.softmax(x)
        # x (32, 5)
        return x  # output probability of 5 classes
    


What is one hot label? digit corresponding to the label
4 - [0 0 0 0 1]
2 - [0 0 1 0 0]



In [12]:
import torch
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Hyperparameters
learning_rate = 1e-3
epochs = 25
batch_size = 32  # Change this based on your dataset size

# Convert data to PyTorch tensors and add channel dimension
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)  # Shape: (batch, 1, 50, 6), unsqueeze is a function of Pytorch that adds a dimension to the tensor
Y_train_tensor = torch.tensor(Y_train, dtype=torch.long)  # Labels
print(X_train_tensor.shape)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).unsqueeze(1)
Y_val_tensor = torch.tensor(Y_val, dtype=torch.long)

# One-hot encoding for labels
Y_train_one_hot = F.one_hot(Y_train_tensor, num_classes=num_classes).float()
Y_val_one_hot = F.one_hot(Y_val_tensor, num_classes=num_classes).float()
print(Y_train_one_hot[0])
# Create DataLoader
train_dataset = TensorDataset(X_train_tensor, Y_train_one_hot)
val_dataset = TensorDataset(X_val_tensor, Y_val_one_hot)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # Shuffle the data every epoch, only required in training loader
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Initialize model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN_LSTM([16, 32, 64], 64, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    
    for X_batch, Y_batch in train_loader:
        X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, Y_batch)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        correct += (outputs.argmax(1) == Y_batch.argmax(1)).sum().item()
        total += Y_batch.size(0)

    train_accuracy = correct / total
    print(f"Epoch {epoch+1}/{epochs}, Loss: {train_loss/len(train_loader):.4f}, Accuracy: {train_accuracy:.4f}")

    # Validation phase
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, Y_batch in val_loader:
            X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, Y_batch)
            val_loss += loss.item()
            correct += (outputs.argmax(1) == Y_batch.argmax(1)).sum().item()
            total += Y_batch.size(0)

    val_accuracy = correct / total
    print(f"Validation Loss: {val_loss/len(val_loader):.4f}, Accuracy: {val_accuracy:.4f}")

print("Training complete!")

torch.Size([44, 1, 50, 2])
tensor([0., 0., 0., 0., 0., 1., 0., 0.])


  return F.conv2d(


RuntimeError: input.size(-1) must be equal to input_size. Expected 384, got 128