# HDA - Project 3: TASK A

Rows labelled with category 0 has been dropped, so we don't consider the _idle_ state of the user. 

In [3]:
#import of all modules

import numpy as np
import scipy.io
import matplotlib.pyplot as plt
import itertools

from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.metrics import f1_score, roc_curve, auc, confusion_matrix

import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore",category=FutureWarning)
    
    from keras import regularizers
    from keras.layers import Conv1D, Conv2D, BatchNormalization, Dropout, LeakyReLU, Flatten, Activation, Dense, MaxPooling1D, MaxPooling2D
    from keras.models import Model, Sequential
    from keras.optimizers import Adam
    import keras.backend as K
    
K.set_image_data_format('channels_last')

%matplotlib inline

In the following cell we put all the useful function that we need for **preprocessing** and **results visualization**.

In [9]:
def AUC(y_true, y_pred):
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(classes):
        fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_pred[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    return roc_auc

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

This part of preprocessing has to be intended to be temporary, in fact we need to extend this elaborations to the bigger set of samples.

In [4]:
folder = "prep/A_nonzero/"
x1 = scipy.io.loadmat(folder+"S1-ADL1", mdict={'filled_features':'features', 'labels':'labels'})
x2 = scipy.io.loadmat(folder+"S1-ADL2", mdict={'filled_features':'features', 'labels':'labels'})
x3 = scipy.io.loadmat(folder+"S1-ADL3", mdict={'filled_features':'features', 'labels':'labels'})
x4 = scipy.io.loadmat(folder+"S1-ADL4", mdict={'filled_features':'features', 'labels':'labels'})
x5 = scipy.io.loadmat(folder+"S1-ADL5", mdict={'filled_features':'features', 'labels':'labels'})
x6 = scipy.io.loadmat(folder+"S1-Drill", mdict={'filled_features':'features', 'labels':'labels'})

print("Session shapes:\n")
print("ADL1:  ", x1['filled_features'].shape)
print("ADL2:  ", x2['filled_features'].shape)
print("ADL3:  ", x3['filled_features'].shape)
print("ADL4:  ", x4['filled_features'].shape)
print("ADL5:  ", x5['filled_features'].shape)
print("Drill: ", x6['filled_features'].shape)
print("\nTraining set: ADL1 + ADL2 + ADL3 + Drill = ", x1['filled_features'].shape[0] + x2['filled_features'].shape[0] + x3['filled_features'].shape[0] + x6['filled_features'].shape[0])
print("Test set: ADL4 + ADL5 = ", x4['filled_features'].shape[0] + x5['filled_features'].shape[0])

# We split the data into train and test set.

# features
X_train = np.concatenate((x1['filled_features'],x2['filled_features'],x3['filled_features'],x6['filled_features']),axis=0)
X_test = np.concatenate((x4['filled_features'],x5['filled_features']),axis=0)

# labels (locomotion activity)
Y_train = np.concatenate((x1['labels'][:,0],x2['labels'][:,0],x3['labels'][:,0],x6['labels'][:,0]),axis=0) # here we take just the first column because we want to fulfill the first task
Y_test = np.concatenate((x4['labels'][:,0],x5['labels'][:,0]))

# Decision to overcome the problem of entire missing columns
X_train = np.nan_to_num(X_train)
X_test = np.nan_to_num(X_test)

# features normalization
scaler = StandardScaler().fit(X_train)
X_train =scaler.transform(X_train)
X_test = scaler.transform(X_test)

print("X_train shape: ", X_train.shape)
print("X_test shape: ", X_test.shape)

Session shapes:

ADL1:   (37507, 113)
ADL2:   (24510, 113)
ADL3:   (25305, 113)
ADL4:   (24851, 113)
ADL5:   (22440, 113)
Drill:  (52105, 113)

Training set: ADL1 + ADL2 + ADL3 + Drill =  139427
Test set: ADL4 + ADL5 =  47291
X_train shape:  (139427, 113)
X_test shape:  (47291, 113)


Once we have both training and test set we can proceed with the definition of classes using one-hot encoding procedure.

In [5]:
onehot_encoder = OneHotEncoder(sparse=False)
YOH_train = onehot_encoder.fit_transform(Y_train.reshape(-1, 1))
YOH_test = onehot_encoder.fit_transform(Y_test.reshape(-1, 1))

print("YOH_train shape: ", YOH_train.shape)
print("YOH_test shape: ", YOH_test.shape)

YOH_train shape:  (139427, 4)
YOH_test shape:  (47291, 4)


Now we have to build both training and test set so that we can feed them to the **CNN**.

In [7]:
window_size = 15
stride = int(window_size)

num_features = X_train.shape[1]
samples, classes = YOH_train.shape
nWindows = int(samples // stride) - 1

print("Cardinality of the training set: ", nWindows)

num_features_test = X_test.shape[1]
samples_test, classes_test = YOH_test.shape
nWindows_test = int(samples_test // stride) - 1
print("Cardinality of the test set: ", nWindows_test)

Cardinality of the training set:  9294
Cardinality of the test set:  3151


In [8]:
training_set = np.zeros([nWindows, window_size, num_features])
training_labels = np.zeros([nWindows])

# TODO: We should define a function for the following routines, however I'd wait
# in order to avoid problems with variables 

for w in range(nWindows):
    index = int(w * stride)
    training_set[w,:,:] = X_train[index:index+window_size, :].reshape((window_size,num_features))
    l = YOH_train[index:index+window_size,:]
    training_labels[w] = np.argmax(np.sum(l, axis=0))

test_set = np.zeros([nWindows_test, window_size, num_features])
test_labels = np.zeros([nWindows_test])

for w in range(nWindows_test):
    index = int(w * stride)
    test_set[w,:,:] = X_test[index:index+window_size, :].reshape((window_size,num_features))
    l = YOH_test[index:index+window_size,:]
    test_labels[w] = np.argmax(np.sum(l, axis=0))

#print('cumulative labels: ', lab_cum.shape, type(lab_cum), "\n", lab_cum)
training_labels_OH = onehot_encoder.fit_transform(training_labels.reshape(-1, 1))
test_labels = onehot_encoder.fit_transform(test_labels.reshape(-1, 1))