# Import Libs


Load tensorboard

In [None]:
%load_ext tensorboard

Install Umap for plotting features

In [None]:
!pip install umap-learn[plot]

Import libs

In [None]:
from numpy import mean
from numpy import std
from numpy import dstack
import numpy as np
import random
import cv2
from pandas import read_csv
import datetime

from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.preprocessing import StandardScaler
from sklearn.utils import shuffle

import matplotlib.pyplot as plt
import umap
import umap.plot

import tensorflow as tf
import tensorflow.keras as keras
from keras.models import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dropout
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from tensorflow.keras.utils import to_categorical

# Load Data

Create the necessary functions to load the data

In [None]:
# load a single file as a numpy array
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values
 
# load a list of files and return as a 3d numpy array
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# stack group so that features are the 3rd dimension
	loaded = dstack(loaded)
	return loaded
 
# load a dataset group, such as train or test
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# load all 9 files as a single array
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# load input data
	X = load_group(filenames, filepath)
	# load class output
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y
 
# load the dataset, returns train and test X and y elements
def load_dataset(prefix=''):
	
  # load all train
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
  
	# load all test
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')

	# zero-offset class values
	trainy = trainy - 1
	testy = testy - 1

	# one hot encode y
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
  
	print("#### DATASET ####")
	print(" TRAIN DATA")
	print(" Data shape", trainX.shape, "Labels shape", trainy.shape)
	print(" TEST DATA")
	print(" Data shape", testX.shape, "Labels shape", testy.shape)

	return trainX, trainy, testX, testy

# Model

Define a function to create the convolutional network

In [None]:
def get_HARNet(input_shape, n_outputs):
  
  model = Sequential()
  
  # First Block
  model.add(Conv1D(filters=32, kernel_size=5, activation='relu', input_shape=input_shape))
  model.add(Dropout(0.5))
  model.add(MaxPooling1D(pool_size=2))

  # Second Block
  model.add(Conv1D(filters=32, kernel_size=5, activation='relu'))
  model.add(Dropout(0.5))
  model.add(MaxPooling1D(pool_size=2))

  # Fully connected layers
  model.add(Flatten(name="features"))
  model.add(Dense(128, activation='relu'))
  model.add(Dense(n_outputs, activation='softmax'))
 
  return model

1D-ResNet model

In [None]:
def conv_block(n_filters):

    conv = Sequential()

    conv.add(keras.layers.Conv1D(filters=n_filters, kernel_size=8, padding='same'))
    conv.add(keras.layers.BatchNormalization())
    conv.add(keras.layers.Activation('relu'))

    conv.add(keras.layers.Conv1D(filters=n_filters, kernel_size=5, padding='same'))
    conv.add(keras.layers.BatchNormalization())
    conv.add(keras.layers.Activation('relu'))

    conv.add(keras.layers.Conv1D(filters=n_filters, kernel_size=3, padding='same'))
    conv.add(keras.layers.BatchNormalization())

    return conv

In [None]:
def shortcut(n_filters = None):

    shortcut = Sequential()

    # if n_filters is set, the shape of the input is changed according to it
    if n_filters:
      shortcut.add(keras.layers.Conv1D(filters=n_filters, kernel_size=1, padding='same'))

    shortcut.add(keras.layers.BatchNormalization())

    return shortcut

In [None]:
def get_resnet(input_shape, n_outputs):

    input_layer = keras.layers.Input(input_shape)

    # Block 1
    conv_1 = conv_block(n_filters = 64)(input_layer)
    short_1 = shortcut(n_filters = 64)(input_layer)

    output_1 = keras.layers.add([conv_1, short_1])
    output_1 = keras.layers.Activation('relu')(output_1)

    # Block 2
    conv_2 = conv_block(n_filters = 128)(output_1)
    short_2 = shortcut(n_filters = 128)(output_1)

    output_2 = keras.layers.add([conv_2, short_2])
    output_2 = keras.layers.Activation('relu')(output_2)

    # Block 3
    conv_3 = conv_block(n_filters = 128)(output_2)
    short_3 = shortcut()(output_2)

    output_3 = keras.layers.add([conv_3, short_3])
    output_3 = keras.layers.Activation('relu')(output_3)

    # Final output
    final_output = keras.layers.GlobalAveragePooling1D(name="features")(output_3)
    final_output = keras.layers.Dense(n_outputs, activation='softmax')(final_output)  
    model = keras.models.Model(inputs=input_layer, outputs=final_output)
    
    return model

In [None]:
model = get_resnet((128, 9), 6)

In [None]:
tf.keras.utils.plot_model(model, to_file="cnn.png", show_shapes=True)

# Data Transformation

In [None]:
def standardize_data(train_X, test_X):

	tmp_train_X = train_X.reshape((train_X.shape[0] * train_X.shape[1], train_X.shape[2]))
	tmp_test_X = test_X.reshape((test_X.shape[0] * test_X.shape[1], test_X.shape[2]))
 
	std = StandardScaler()
	std.fit(tmp_train_X)

	tmp_train_X = std.transform(tmp_train_X)
	tmp_test_X = std.transform(tmp_test_X)

	return tmp_train_X.reshape((train_X.shape)), tmp_test_X.reshape((test_X.shape))

# Train and Test
Define train and test functions

In [None]:
def train(model, repeat, train_X, train_y, validation_split = 0.30, epochs = 10, batch_size = 64, log_dir = "logs/train/"):

  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

  early_stop  = tf.keras.callbacks.EarlyStopping(monitor='val_loss', 
                                                 patience=10)
  
  check_point = tf.keras.callbacks.ModelCheckpoint('best_model_r{}'.format(repeat), 
                                                   monitor='val_loss', 
                                                   save_weights_only=True, 
                                                   save_best_only=True)
  
  history = model.fit(train_X, 
                      train_y, 
                      epochs=epochs, 
                      batch_size = batch_size, 
                      validation_split = validation_split,
                      callbacks=[tensorboard_callback, check_point],
                      verbose = 1)

  return history.history

In [None]:
def test(model, test_X, test_y, batch_size = 64, plot_features = False):

  if plot_features:
    XX = model.input 
    YY = model.get_layer(name="features").output
    feature_extractor = Model(XX, YY)

    features = feature_extractor(test_X).numpy()
    features = np.reshape(features, (features.shape[0], -1))

    labels = np.argmax(test_y, axis = 1)
    mapper = umap.UMAP().fit(features)
    umap.plot.points(mapper, labels = labels)

  predictions = model.predict(test_X, batch_size=batch_size)
	
  return predictions

# Environment Preparation

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
folder = "drive/MyDrive/Tesi"

!ls drive/MyDrive/Tesi
!unzip drive/MyDrive/Tesi/HARDataset.zip
!mv UCI\ HAR\ Dataset HARDataset

In [None]:
%rm -rf ./logs/

# Run Experiment

In [None]:
batch_size = 64
repeats = 10
epochs = 50

In [None]:

# load data
trainX, trainy, testX, testy = load_dataset()
n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]

trainX, testX = standardize_data(trainX, testX)

# Save the best model
histories = []

for r in range(repeats):

  print("\nExperiment #{}".format(r + 1))

  # shuffle data
  trainX, trainy = shuffle(trainX,trainy)

  # get the model and compile
  model = get_HARNet((n_timesteps,n_features), n_outputs)
  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

  # train and test
  hist = train(model, r, trainX, trainy, batch_size = batch_size, 
                                 epochs = epochs, log_dir = "logs/train/exp{}".format(r +1))
  
  # save history
  histories.append(hist)




# Show Results

Show training results

In [None]:
# plot losses and accuracies over epochs

fig1, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2)
fig1.subplots_adjust(hspace=0.6, wspace=0.4)
fig1.set_size_inches(14, 9)
fig1.suptitle("CNN Training", fontsize=20)

val_best_losses =[]
val_best_accuracies =[]

total_losses = [0] * epochs
total_val_losses = [0] * epochs
total_accs = [0] * epochs
total_val_accs = [0] * epochs

x = np.arange(epochs) + 1

for h in histories:

  losses = h["loss"]
  val_losses = h["val_loss"]
  accs = h["accuracy"]
  val_accs = h["val_accuracy"]

  total_losses = np.add(total_losses, losses)
  total_val_losses = np.add(total_val_losses, val_losses)
  total_accs = np.add(total_accs, accs)
  total_val_accs = np.add(total_val_accs, val_accs)

  best_i = np.argmin(val_losses)
  val_best_losses.append(val_losses[best_i])
  val_best_accuracies.append(val_accs[best_i])

  ax1.plot(x, losses, c = "bisque", linewidth=2)
  ax2.plot(x, val_losses, c = "bisque", linewidth=2)
  ax3.plot(x, np.multiply(accs, 100), c = "bisque", linewidth=2)
  ax4.plot(x, np.multiply(val_accs, 100), c = "bisque", linewidth=2)

mean_losses = total_losses / repeats
mean_val_losses = total_val_losses / repeats
mean_accs = total_accs / repeats
mean_val_accs = total_val_accs / repeats

ax1.plot(x, mean_losses, c = "darkorange", linewidth=2)
ax2.plot(x, mean_val_losses, c = "darkorange", linewidth=2)
ax3.plot(x, np.multiply(mean_accs, 100), c = "darkorange", linewidth=2)
ax4.plot(x, np.multiply(mean_val_accs, 100), c = "darkorange", linewidth=2)

ax1.set_xlabel("epoch", fontsize=18)
ax1.set_ylabel("loss", fontsize=18)
ax2.set_xlabel("epoch", fontsize=18)
ax2.set_ylabel("loss", fontsize=18)
ax3.set_xlabel("epoch", fontsize=18)
ax3.set_ylabel("accuracy (%)", fontsize=18)
ax4.set_xlabel("epoch", fontsize=18)
ax4.set_ylabel("accuracy (%)", fontsize=18)

ax1.set_title('Training Loss', fontsize=18)
ax2.set_title('Validation Loss', fontsize=18)
ax3.set_title('Training Accuracy', fontsize=18)
ax4.set_title('Validation Accuracy', fontsize=18)

for label in (ax1.get_xticklabels() + ax1.get_yticklabels()):
	label.set_fontsize(14)
 
for label in (ax2.get_xticklabels() + ax2.get_yticklabels()):
	label.set_fontsize(14)
 
for label in (ax3.get_xticklabels() + ax3.get_yticklabels()):
	label.set_fontsize(14)
 
for label in (ax4.get_xticklabels() + ax4.get_yticklabels()):
	label.set_fontsize(14)

# plot val accuracies boxplot
fig2, ax = plt.subplots()
fig2.set_size_inches(5, 5)
ax.set_title("Validation Accuracy", fontsize = 20)

ax.boxplot(np.multiply(val_best_accuracies, 100))
ax.set_ylabel("Validation Accuracy (%)", fontsize = 18)
ax.set_xticklabels(["CNN"], fontsize = 18)

for label in ax.get_yticklabels():
	label.set_fontsize(14)

plt.tight_layout()
plt.show()

Show test results

In [None]:
best_model_i = np.argmin(val_best_losses)
model.load_weights("best_model_r{}".format(best_model_i))




print("\nTEST:")
print(model.summary())
predictions = test(model, testX, testy, batch_size = batch_size, plot_features=True)
true_y = np.argmax(testy, axis = 1)
pred_y = np.argmax(predictions, axis = 1)
target_names = ['Walking', 'Walking Up', 'Walking Down', 'Sitting', 'Standing', 'Laying']
print("\nClassification Report")
print(classification_report(true_y, pred_y, target_names=target_names))

In [None]:
Load tensorboard

In [None]:
%tensorboard --logdir logs/train