# CNN model

In [1]:
# Dependencies
import torch
import torch.nn as nn
from torch.autograd import Variable
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd
import copy
import sys
import sklearn.metrics as metrics
from sktime.performance_metrics.forecasting import MeanAbsoluteScaledError
from torchinfo import summary
import parameters
import random
from data_formatting import split_sequence_overlap, split_sequence_nooverlap, split_sequence, split_train_test, normalize_data, set_targets
parameters.initialize_parameters()

## Functions

In [2]:
def validation_accuracy(model, val_loader, nclasses, device, input_dim, error, loss, epoch, test_subj, xv, loss_list, val_loss_list, epoch_list, accuracy_list):
    all_val_predicted = []
    all_val_labels = []
    all_val_outputs = np.empty((0, nclasses), dtype='float')
    correct = 0
    total = 0

    # Iterate through validation dataset
    model.eval()
    with torch.no_grad():
        for features, labels in val_loader:
            features = Variable(features.view(-1, parameters.seq_dim, input_dim)).to(device)
            labels = Variable(labels).to(device)

            # Forward propagation
            outputs = model(features)
            val_loss = error(outputs, labels)

            # Get predictions from the maximum value
            predicted = torch.max(outputs.data, 1)[1]
            predicted = predicted.to('cpu')

            # Total number of labels
            total += labels.size(0)
            correct += (predicted == labels.cpu()).sum()
            all_val_predicted.extend(list(predicted.detach().numpy()))
            all_val_labels.extend(list(labels.cpu().detach().numpy()))
            all_val_outputs = np.concatenate((all_val_outputs, outputs.data.to('cpu').reshape(-1, nclasses)))

    al_np = np.array(all_val_labels)   
    ao_np = np.array(all_val_outputs)  
    accuracy = correct / float(total)

    # store loss and iteration
    loss_list.append(loss.data)
    val_loss_list.append(val_loss.data)
    epoch_list.append(epoch)
    accuracy_list.append(accuracy)
    print('Subject: {}/{}  Epoch: {:>3}  Loss: {:.6}/{:.6}  Validation accuracy: {:.2f}'.format(test_subj, xv, epoch, loss, val_loss, accuracy))
    return accuracy
    
def cross_accuracy(model, test_loader, avg_test_acc, test_acc_list, test_accuracies, nclasses, device, error_cpu, input_dim, features_test, targets_test, error, xv, test_subj):
    
    correct = 0
    total = 0
    prev_label = -1
    class_hist = np.zeros(nclasses, dtype='int')
    all_predicted = []
    all_labels = []
    all_outputs = np.empty((0, nclasses), dtype='float')

    # Iterate through test dataset
    model.eval()
    with torch.no_grad():
        if parameters.test_with_subsequences:
            for features, labels in test_loader:
                features = Variable(features.view(-1, parameters.test_seq_dim, input_dim)).to(device)
                labels = Variable(labels).to('cpu')

                # Forward propagation
                outputs = model(features)
                test_loss = error_cpu(outputs.to('cpu'), labels)
                
                # Get predictions from the maximum value
                predicted = torch.max(outputs.data, 1)[1]
                predicted = predicted.to('cpu')

                # Total number of labels
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                all_predicted.extend(list(predicted.detach().numpy()))
                all_labels.extend(list(labels.detach().numpy()))
                all_outputs = np.concatenate((all_outputs, outputs.data.to('cpu').reshape(-1, nclasses)))

        
        else:
            count=0
            for features in features_test:
                features = torch.tensor(features)
                features = torch.unsqueeze(features, 0).to(device)
                labels = torch.unsqueeze(torch.tensor(targets_test[count]), 0)
                features = Variable(features.view(-1, parameters.test_seq_dim, input_dim)).to(device)

                # Forward propagation
                outputs = model(features)

                test_loss = error(outputs.to('cpu'), labels)
                # Get predictions from the maximum value
                predicted = torch.max(outputs.data, 1)[1]
                predicted = predicted.to('cpu')

                # Total number of labels
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                count += 1

        al_np = np.array(all_labels)   
        ao_np = np.array(all_outputs)  

        accuracy = correct / float(total)

        print(f"Test accuracy for run {test_subj}/{xv}: {accuracy}")

    avg_test_acc += accuracy
    test_acc_list.append(accuracy)

## Architecture

In [29]:
class CNN(nn.Module):
    def __init__(self, input_dim, seq_dim):
        super(CNN, self).__init__()
        
        # Define the architecture with layers based on the input arguments
        self.conv1 = nn.Conv1d(seq_dim, 32, 5)
        self.conv2 = nn.Conv1d(32, 64, 3)
        self.conv3 = nn.Conv1d(64, 128, 3)
        self.conv4 = nn.Conv1d(128, 256, 3)
        self.relu = nn.ReLU()
        
        # Mise à jour de fc_input_size en fonction des couches de pooling
        self.fc_input_size = 256 * (( (input_dim - 5) - 3 - 3 - 3 + 4) )
        self.fc = nn.Linear(self.fc_input_size, 2)

    def forward(self, x):
        # Forward pass
        x = self.conv1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.relu(x)
        
        x = self.conv4(x)
        x = self.relu(x)
        
        x = x.view(x.size(0), -1)
        
        x = self.fc(x)
        
        return x

## Parameters

In [31]:
# Classes we want to predict (0 et 3) and binary outputs
list_targets = [0, 3] #SelfStim & CtrlRest
list_labels = [0, 1]

# number of subjects used for validation
num_validation_subjects = 1

learning_rate = 0.0001
weight_decay = 10e-4
epochs = 3

print(torch.__version__)
device = torch.device('cpu')
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using {device}")

2.0.1+cu117
Using cpu


## Training

In [32]:
# Get data
csvfile = "../../data/video/All_Subs_Diff_Modules_nofilter_withoutAUc.csv"
train_df = pd.read_csv(csvfile,  delimiter=",")  # 101 features (only AU_r)

# Select only the classes we want to predict
train_df, nclasses, targets_numpy = set_targets(train_df, list_targets, list_labels)

# Convert the subject names (strings) into numbers
subjects = pd.factorize(train_df['Subject'])[0]

# Normalise the features
features_numpy = normalize_data(train_df, False) #parameters.normalise_individual_subjects
input_dim = features_numpy.shape[1]
print(f"Number of features: {input_dim}")

del train_df

Number of features: 100


In [33]:
# Variable we will use throughout the training and testing
test_accuracies = []
calibrated_test_accuracies = []
all_outputs = np.empty((0, nclasses), dtype='float')

# Validation accuracy
loss_list = []
val_loss_list = []
epoch_list = []
accuracy_list = []

# Get distinct subjects
subj = np.unique(subjects)

# Loop over all subjects
for test_subj in subj:
    xv_max_val = 0
    avg_test_acc = 0
    val_acc_val_loss_list = []
    test_acc_list = []
    best_accuracy = 0

    # Cross validation
    for xv in range(parameters.cross_validation_passes):

        # Set up the train, validation and test sets
        test_idx = np.array([test_subj])

        # Take out test subject from trainval (Crooss validation)
        trainval_idx = np.delete(subj, np.where(subj==test_subj))
        val_idx = trainval_idx[random.sample(range(len(trainval_idx)), num_validation_subjects)]
        val_idx = val_idx%len(subj)

        # Remove test & validation subjects from trainval
        train_idx = np.setxor1d(subj, test_idx)
        train_idx = np.setxor1d(train_idx, val_idx)

        #print("Generating train/val/test split...")
        features_train, targets_train, features_val, targets_val, features_test, targets_test = split_train_test(targets_numpy, features_numpy, subjects, train_idx, val_idx, test_idx)

        #print("Generating sequences...")
        features_train, targets_train = split_sequence_overlap(features_train, targets_train, parameters.seq_dim, parameters.overlap_size)
        features_val, targets_val = split_sequence_overlap(features_val, targets_val, parameters.seq_dim, parameters.overlap_size)
        
        # Overlap or no
        if parameters.test_with_subsequences:
            features_test, targets_test = split_sequence_overlap(features_test, targets_test, parameters.test_seq_dim, parameters.test_overlap_size)
        else:
            features_test, targets_test = split_sequence_nooverlap(features_test, targets_test, parameters.test_seq_dim, parameters.test_overlap_size)

        #print(f"Number of training examples: {len(targets_train)}")
        #print(f"Number of validation examples: {len(targets_val)}")
        #print(f"Number of test examples: {len(targets_test)}")

        # Create feature and targets tensor for train set. We need variable to accumulate gradients. Therefore first we create tensor, then we will create variable
        featuresTrain = torch.from_numpy(features_train)
        targetsTrain = torch.from_numpy(targets_train).type(torch.LongTensor)  # data type is long

        featuresVal = torch.from_numpy(features_val)
        targetsVal = torch.from_numpy(targets_val).type(torch.LongTensor)  # data type is long

        # Pytorch train and validation sets
        train = TensorDataset(featuresTrain, targetsTrain)
        val = TensorDataset(featuresVal, targetsVal)
        
        # Data loader
        train_loader = DataLoader(train, batch_size=parameters.batch_size, shuffle=True)
        val_loader = DataLoader(val, batch_size=parameters.batch_size, shuffle=False)

        # Create feature and targets tensor for test set
        if parameters.test_with_subsequences:
            featuresTest = torch.from_numpy(features_test)
            targetsTest = torch.from_numpy(targets_test).type(torch.LongTensor)  # data type is long
            test = TensorDataset(featuresTest, targetsTest)
            test_loader = DataLoader(test, batch_size=parameters.batch_size, shuffle=False)
        
        # Model
        model = CNN(input_dim, parameters.seq_dim).to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        error = nn.CrossEntropyLoss()
        error_cpu = nn.CrossEntropyLoss().to('cpu')

        # Early Stopping
        
        patience = epochs -1
        #patience = 4
        current_patience = 0

        # Train the model
        for epoch in range(epochs):
            model.train()
            running_loss = 0
            for data, target in train_loader:
                data, target = data.to(device), target.to(device)
                optimizer.zero_grad()
                outputs = model(data)
                loss = error(outputs, target)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()

            # Validation accuracy
            accuracy = validation_accuracy(model, val_loader, nclasses, device, input_dim, error, loss, epoch, test_subj, xv, loss_list, val_loss_list, epoch_list, accuracy_list)

            ### Early stopping
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                torch.save(model.state_dict(), 'best_model_checkpoint.pth')
                current_patience = 0  # Reset patience counter
            else:
                current_patience += 1  # No improvement, increase patience counter
            
            if current_patience >= patience:
                # Early stopping condition met
                print(f'Early stopping at epoch {epoch} due to lack of improvement.')
                break

        # Restore the best model checkpoint
        model.load_state_dict(torch.load('best_model_checkpoint.pth'))
    
        # Cross validation accuracy
        cross_accuracy(model, test_loader, avg_test_acc, test_acc_list, test_accuracies, nclasses, device, error_cpu, input_dim, features_test, targets_test, error, xv, test_subj)

    avg_test_acc = np.mean(test_acc_list)
    test_accuracies.append(avg_test_acc)
  
print("Test accuracies:")
print(test_accuracies)
print(f"Mean accuracy: {np.mean(test_accuracies)}")

Subject: 0/0  Epoch:   0  Loss: 0.580261/0.0386062  Validation accuracy: 0.97
Subject: 0/0  Epoch:   1  Loss: 0.249173/0.00502749  Validation accuracy: 0.97
Subject: 0/0  Epoch:   2  Loss: 0.265492/0.000117812  Validation accuracy: 0.98
Test accuracy for run 0/0: 0.6889880952380952
Subject: 0/1  Epoch:   0  Loss: 0.226042/0.0317351  Validation accuracy: 0.60
Subject: 0/1  Epoch:   1  Loss: 0.673647/0.00237981  Validation accuracy: 0.63
Early stopping at epoch 1 due to lack of improvement.
Test accuracy for run 0/1: 0.6889880952380952
Subject: 0/2  Epoch:   0  Loss: 0.23551/0.792246  Validation accuracy: 0.69
Subject: 0/2  Epoch:   1  Loss: 0.21221/0.595318  Validation accuracy: 0.65
Early stopping at epoch 1 due to lack of improvement.
Test accuracy for run 0/2: 0.6889880952380952
Subject: 1/0  Epoch:   0  Loss: 0.363172/0.00570249  Validation accuracy: 0.60
Subject: 1/0  Epoch:   1  Loss: 0.131268/0.00417277  Validation accuracy: 0.60
Subject: 1/0  Epoch:   2  Loss: 0.0811955/0.000225

In [19]:
# save the model
PATH = './model_0_3.pth'
torch.save(model.state_dict(), PATH)