In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


##Packages##

In [None]:
%pip install nina-helper
%pip install tensorflow_addons
%pip install pyts

from functools import reduce
import joblib
import matplotlib.pyplot as plt
from nina_helper import *
import numpy as np
import os
import pandas as pd
from pywt import wavedec

import scipy as sp
from scipy import signal, interp
from scipy.fft import fft, ifft
from scipy.io import loadmat
from scipy.stats import entropy

import seaborn as sns

from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, balanced_accuracy_score, classification_report
from sklearn.model_selection import train_test_split, StratifiedKFold, KFold, cross_validate
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

from tensorflow import keras
from tensorflow.keras import layers, models, optimizers, initializers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import Conv1D,Conv2D, Add, MaxPool1D, MaxPooling2D
from tensorflow.keras.layers import Dense, Activation, Flatten, concatenate, Input, Dropout
from tensorflow.keras.layers import LSTM, Bidirectional,BatchNormalization,PReLU,ReLU,Reshape
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from tensorflow_addons.layers import WeightNormalization

import tensorflow.keras.backend as K




##Load_Data_Functions##

In [None]:
def load_data(subdir = "Normal Processing/Unrectified", subject = 1, spec = False, GAF= False):
  
  X_train = np.load("/content/drive/MyDrive/MERIIT Smart Wearable Devices/Software Team/Baseline LSTM/Prepared Subject Data/{}/subject{}_train_matrix.npy".format(subdir, subject),mmap_mode='r')
  X_test = np.load("/content/drive/MyDrive/MERIIT Smart Wearable Devices/Software Team/Baseline LSTM/Prepared Subject Data/{}/subject{}_test_matrix.npy".format(subdir, subject),mmap_mode='r')
  y_train = np.load("/content/drive/MyDrive/MERIIT Smart Wearable Devices/Software Team/Baseline LSTM/Prepared Subject Data/{}/subject{}_train_labels.npy".format(subdir, subject),mmap_mode='r')
  y_test = np.load("/content/drive/MyDrive/MERIIT Smart Wearable Devices/Software Team/Baseline LSTM/Prepared Subject Data/{}/subject{}_test_labels.npy".format(subdir, subject),mmap_mode='r')
  
  # Reduce sample size
  """
  X_train = X_train[:300,:,:]
  X_test = X_test[:100,:,:]
  y_train = y_train[:300]
  y_test = y_test[:100]
  """

  if spec == True:
    X_train = Win2Spec(X_train)
    X_test = Win2Spec(X_test)

  if GAF == True:
    X_train = Win2GAF(X_train)
    X_test = Win2GAF(X_test)

  # y_train = pd.get_dummies(y_train)
  # y_test = pd.get_dummies(y_test)
  y_train = get_categorical(y_train)
  y_test = get_categorical(y_test)

  X_train = X_train.astype('float32')
  X_test = X_test.astype('float32')
  y_train = y_train.astype('float32')
  y_test = y_test.astype('float32')

  return X_train, X_test, y_train, y_test

In [None]:
from tqdm import tqdm
def Win2Spec(data):
  # data shape must be (samples, windows, channels)
  spec_list = []
  spec_mat_list = []
  for sample in tqdm(range(data.shape[0])):
    for channel in range(data.shape[2]):
        s,f,t,im = plt.specgram(data[sample,:,channel],Fs=2000,cmap='gray')
        spec_list.append(s)

    spec_mat = np.hstack(spec_list)
    
    spec_mat = spec_mat/spec_mat.max()
    spec_mat_list.append(spec_mat)
    spec_list = []

  spec_data = np.stack(spec_mat_list)
  spec_data = spec_data.reshape(-1,129,36,1) # grayscale spectrograms
  return spec_data



In [None]:
from tqdm import tqdm
from pyts.image import GramianAngularField
gasf = GramianAngularField(method='summation')

def Win2GAF(data):
  # data shape must be (samples, windows, channels)
  spec_list = []
  spec_mat_list = []
  for sample in tqdm(range(data.shape[0])):
    for channel in range(data.shape[2]):
        X_gasf = np.squeeze(gasf.fit_transform(data[sample:sample+1,:,channel]))
        spec_list.append(X_gasf)

    spec_mat = np.dstack(spec_list)
    spec_mat_list.append(spec_mat)
    spec_list = []

  spec_data = np.stack(spec_mat_list)
  return spec_data


In [None]:
def get_categorical(y):
    return pd.get_dummies(pd.Series(y)).values

##Model_Functions##

In [None]:
import keras.backend as K
import tensorflow as tf
from keras import initializers, layers

class Length(layers.Layer):
    """
    Compute the length of vectors. This is used to compute a Tensor that has the same shape with y_true in margin_loss
    inputs: shape=[dim_1, ..., dim_{n-1}, dim_n]
    output: shape=[dim_1, ..., dim_{n-1}]
    """
    def call(self, inputs, **kwargs):
        return K.sqrt(K.sum(K.square(inputs), -1))

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]

class Mask(layers.Layer):
  
    def call(self, inputs, **kwargs):
        # use true label to select target capsule, shape=[batch_size, num_capsule]
        if type(inputs) is list:  # true label is provided with shape = [batch_size, n_classes], i.e. one-hot code.
            assert len(inputs) == 2
            inputs, mask = inputs
        else:  # if no true label, mask by the max length of vectors of capsules
            x = inputs
            # Enlarge the range of values in x to make max(new_x)=1 and others < 0
            x = (x - K.max(x, 1, True)) / K.epsilon() + 1
            mask = K.clip(x, 0, 1)  # the max value in x clipped to 1 and other to 0

        # masked inputs, shape = [batch_size, dim_vector]
        inputs_masked = K.batch_dot(inputs, mask, [1, 1])
        return inputs_masked

    def compute_output_shape(self, input_shape):
        if type(input_shape[0]) is tuple:  # true label provided
            return tuple([None, input_shape[0][-1]])
        else:
            return tuple([None, input_shape[-1]])



def squash(vectors, axis=-1):
   
    s_squared_norm = K.sum(K.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / K.sqrt(s_squared_norm)
    return scale * vectors

class CapsuleLayer(layers.Layer):

    def __init__(self, num_capsule, dim_capsule, routings=3,
                 kernel_initializer='glorot_uniform',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.kernel_initializer = initializers.get(kernel_initializer)

    def build(self, input_shape):
        assert len(input_shape) >= 3, "The input Tensor should have shape=[None, input_num_capsule, input_dim_capsule]"
        self.input_num_capsule = input_shape[1]
        self.input_dim_capsule = input_shape[2]

        # Transform matrix
        self.W = self.add_weight(shape=[self.num_capsule, self.input_num_capsule,
                                        self.dim_capsule, self.input_dim_capsule],
                                 initializer=self.kernel_initializer,
                                 name='W')

        self.built = True

    def call(self, inputs, training=None):
        # Expand the input in axis=1, tile in that axis to num_capsule, and 
        # expands another axis at the end to prepare the multiplication with W.
        #  inputs.shape=[None, input_num_capsule, input_dim_capsule]
        #  inputs_expand.shape=[None, 1, input_num_capsule, input_dim_capsule]
        #  inputs_tiled.shape=[None, num_capsule, input_num_capsule, 
        #                            input_dim_capsule, 1]
        inputs_expand = tf.expand_dims(inputs, 1)
        inputs_tiled  = tf.tile(inputs_expand, [1, self.num_capsule, 1, 1])
        inputs_tiled  = tf.expand_dims(inputs_tiled, 4)

        # Compute `W * inputs` by scanning inputs_tiled on dimension 0 (map_fn).
        # - Use matmul (without transposing any element). Note the order!
        # Thus:
        #  x.shape=[num_capsule, input_num_capsule, input_dim_capsule, 1]
        #  W.shape=[num_capsule, input_num_capsule, dim_capsule,input_dim_capsule]
        # Regard the first two dimensions as `batch` dimension,
        # then matmul: [dim_capsule, input_dim_capsule] x [input_dim_capsule, 1]-> 
        #              [dim_capsule, 1].
        #  inputs_hat.shape=[None, num_capsule, input_num_capsule, dim_capsule, 1]
        
        inputs_hat = tf.map_fn(lambda x: tf.matmul(self.W, x), elems=inputs_tiled)     

        # Begin: Routing algorithm ----------------------------------------------#
        # The prior for coupling coefficient, initialized as zeros.
        #  b.shape = [None, self.num_capsule, self.input_num_capsule, 1, 1].
        b = tf.zeros(shape=[tf.shape(inputs_hat)[0], self.num_capsule, 
                            self.input_num_capsule, 1, 1])

        assert self.routings > 0, 'The routings should be > 0.'
        for i in range(self.routings):
          # Apply softmax to the axis with `num_capsule`
          #  c.shape=[batch_size, num_capsule, input_num_capsule, 1, 1]
          c = layers.Softmax(axis=1)(b)

          # Compute the weighted sum of all the predicted output vectors.
          #  c.shape =  [batch_size, num_capsule, input_num_capsule, 1, 1]
          #  inputs_hat.shape=[None, num_capsule, input_num_capsule,dim_capsule,1]
          # The function `multiply` will broadcast axis=3 in c to dim_capsule.
          #  outputs.shape=[None, num_capsule, input_num_capsule, dim_capsule, 1]
          # Then sum along the input_num_capsule
          #  outputs.shape=[None, num_capsule, 1, dim_capsule, 1]
          # Then apply squash along the dim_capsule
          outputs = tf.multiply(c, inputs_hat)
          outputs = tf.reduce_sum(outputs, axis=2, keepdims=True)
          outputs = squash(outputs, axis=-2)  # [None, 10, 1, 16, 1]

          if i < self.routings - 1:
            # Update the prior b.
            #  outputs.shape =  [None, num_capsule, 1, dim_capsule, 1]
            #  inputs_hat.shape=[None,num_capsule,input_num_capsule,dim_capsule,1]
            # Multiply the outputs with the weighted_inputs (inputs_hat) and add  
            # it to the prior b.  
            outputs_tiled = tf.tile(outputs, [1, 1, self.input_num_capsule, 1, 1])
            agreement = tf.matmul(inputs_hat, outputs_tiled, transpose_a=True)
            b = tf.add(b, agreement)

        # End: Routing algorithm ------------------------------------------------#
        # Squeeze the outputs to remove useless axis:
        #  From  --> outputs.shape=[None, num_capsule, 1, dim_capsule, 1]
        #  To    --> outputs.shape=[None, num_capsule,    dim_capsule]
        outputs = tf.squeeze(outputs, [2, 4])
        return outputs

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_capsule])

    def get_config(self):
        config = {
            'num_capsule': self.num_capsule,
            'dim_capsule': self.dim_capsule,
            'routings': self.routings
        }
        base_config = super(CapsuleLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


def PrimaryCap(inputs, dim_capsule, n_channels, kernel_size, strides, padding):
 
    output = layers.Conv2D(filters=dim_capsule*n_channels, kernel_size=kernel_size, strides=strides, padding=padding,
                           name='primarycap_conv2d')(inputs)
    outputs = layers.Reshape(target_shape=[-1, dim_capsule], name='primarycap_reshape')(output)
    return layers.Lambda(squash, name='primarycap_squash')(outputs)



In [None]:
from keras import layers, models
from keras import backend as K
from tensorflow.keras.utils import to_categorical

def CapsNet(input_shape, n_class, num_routing):
    """
    :param input_shape: data shape, 4d, [None, width, height, channels]
    :param n_class: number of classes
    :param num_routing: number of routing iterations
    :return: A Keras Model with 2 inputs and 2 outputs
    """
    x = layers.Input(shape=input_shape)

    # Layer 1: Just a conventional Conv2D layer
    conv1 = tf.keras.layers.Conv2D(filters=128,kernel_size=3, padding='valid',name='conv1',
                                   kernel_initializer= initializers.glorot_uniform(),activation='relu')(x)
    bn1 = tf.keras.layers.Dropout(0.5)(conv1)
    bn1 = tf.keras.layers.BatchNormalization()(bn1)

    conv2 = tf.keras.layers.Conv2D(filters=128,kernel_size=3,padding='valid',
                                   kernel_initializer= initializers.glorot_uniform(),activation='relu')(bn1)
    bn2 = tf.keras.layers.Dropout(0.5)(conv2)
    bn2 = tf.keras.layers.BatchNormalization()(bn2)

    conv3 = tf.keras.layers.Conv2D(filters=256,kernel_size=3,padding='valid',
                                   kernel_initializer= initializers.glorot_uniform(),activation='relu')(bn2)
    bn3 = tf.keras.layers.Dropout(0.5)(conv3)
    bn3 = tf.keras.layers.BatchNormalization()(bn3)

    conv4 = tf.keras.layers.Conv2D(filters=256,kernel_size=3,padding='valid',
                                   kernel_initializer= initializers.glorot_uniform(),activation='relu')(bn3)
    bn4 = tf.keras.layers.Dropout(0.5)(conv4)
    bn4 = tf.keras.layers.BatchNormalization()(bn4)

    # Layer 2: Conv2D layer with `squash` activation, then reshape to [None, num_capsule, dim_vector]
    primarycaps = PrimaryCap(bn4, dim_capsule=4, n_channels=4, kernel_size=4, strides=2, padding='valid')

    # Layer 3: Capsule layer. Routing algorithm works here.
    digitcaps = CapsuleLayer(num_capsule=n_class, dim_capsule=8, routings=num_routing, name='digitcaps')(primarycaps)


    digit_probs = tf.keras.layers.Lambda(lambda x: tf.norm(x, axis=-1),
                                         name="digit_probs")(digitcaps)

    model = tf.keras.Model(inputs=x,
                           outputs=digit_probs,
                           name="Efficient-CapsNet")

    return model

In [None]:
from keras import backend as K
def margin_loss(y_true, y_pred):
    """
    Margin loss for Eq.(4). When y_true[i, :] contains not just one `1`, this loss should work too. Not test it.
    :param y_true: [None, n_classes]
    :param y_pred: [None, num_capsule]
    :return: a scalar loss value.
    """
    L = y_true * K.square(K.maximum(0., 0.9 - y_pred)) + \
        0.5 * (1 - y_true) * K.square(K.maximum(0., y_pred - 0.1))

    return K.mean(K.sum(L, 1))

In [None]:
def create_compiled_model(X_train):
  model = CapsNet(input_shape=(600,12,1), n_class = 17,num_routing=3)
  opt_adam = keras.optimizers.Adam(learning_rate=1e-4)
  model.compile(optimizer=opt_adam,
                  loss=[margin_loss, 'mse'],
                  loss_weights=[1., 0.0005],
                  metrics='categorical_accuracy')

  return model

In [None]:
def train_model(model, X_train, y_train, X_test, y_test, save_to, batch_size= 32, epochs = 2, epoch_size_frac=1.0):

        es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30)
        mc = ModelCheckpoint(save_to + '_best_model.h5',
             monitor='val_categorical_accuracy', mode='max', verbose=1, save_best_only=True)
        #lr_schedule = tf.keras.callbacks.LearningRateScheduler(lambda epoch: 1e-8 * 10**(epoch/20))
        #lr_schedule = tf.keras.callbacks.LearningRateScheduler(lambda epoch: learning_rate * np.exp(-epoch / 200), verbose=1)
        #lr_schedule = tf.keras.callbacks.LearningRateScheduler(lambda epoch, lr: lr*1.2 if epoch%40==0 else lr * np.exp(-epoch / 13000.), verbose=1)   

        history = model.fit(X_train, y_train,
                            batch_size=batch_size,
                            epochs=epochs,
                            steps_per_epoch=int(epoch_size_frac*y_train.shape[0] / batch_size),
                            validation_data=(X_test, y_test),
                            callbacks=[es,mc],verbose=1)


        saved_model = load_model(save_to + '_best_model.h5', custom_objects={'CapsuleLayer': CapsuleLayer, 'Mask': Mask, 'Length': Length, 'margin_loss': margin_loss})

        # evaluate the model
        _, train_acc = saved_model.evaluate(X_train, y_train, verbose=1)
        _, test_acc = saved_model.evaluate(X_test, y_test, verbose=1)
        print('Train: %.3f, Test: %.3f' % (train_acc, test_acc))

        return history,saved_model

##Enable TPU/GPU##

In [None]:
try: # detect TPUs
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect() # TPU detection
    strategy = tf.distribute.TPUStrategy(tpu)
except ValueError: # detect GPUs
    strategy = tf.distribute.MirroredStrategy() # for GPU or multi-GPU machines
    #strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
    #strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() # for clusters of multi-GPU machines

print("Number of accelerators: ", strategy.num_replicas_in_sync)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Number of accelerators:  1


In [None]:
with strategy.scope(): # creating the model in the TPUStrategy scope places the model on the TPU
    model = create_compiled_model(X_train)
model.summary()

##Model_Training##

In [None]:
# load_data returns data as (samples, windows, channels) and categorical labels
# current subdir options: "Minimal Processing", "Normal Processing/Rectified", "Normal Processing/Unrectified"
X_train, X_test, y_train, y_test = load_data(subdir= "Minimal Processing", subject=1, spec=False, GAF=False) 

In [None]:
X_train.shape, X_test.shape, y_train.shape, y_test.shape

((24912, 600, 12), (12271, 600, 12), (24912, 17), (12271, 17))

##ES and MC are Validation Based (Original)##

In [None]:
# Hyper-parameters
epochs = 2
batch_size = 32

In [None]:
 model = create_compiled_model(X_train)

In [None]:
histories, model = train_model(model, X_train, y_train, X_test , y_test, batch_size=batch_size, save_to= 'temp', epochs = epochs) 

Epoch 1/2
Epoch 00001: val_categorical_accuracy improved from -inf to 0.66596, saving model to temp_best_model.h5
Epoch 2/2
Epoch 00002: val_categorical_accuracy improved from 0.66596 to 0.79537, saving model to temp_best_model.h5
Train: 0.823, Test: 0.795


In [None]:
histories.history

{'categorical_accuracy': [0.44617608189582825, 0.7638665437698364],
 'loss': [0.410224974155426, 0.22607210278511047],
 'val_categorical_accuracy': [0.6659603714942932, 0.7953711748123169],
 'val_loss': [0.28070980310440063, 0.20726031064987183]}

##ES and MC are Training Based##

In [None]:
model2 = create_compiled_model(X_train)

In [None]:
# Hyper-parameters
epochs = 2
batch_size = 32

epoch_size_frac = 1.0
save_to= 'train_only_temp'

In [None]:
es = EarlyStopping(monitor='loss', mode='min', verbose=1, patience=30)
mc = ModelCheckpoint(save_to + '_best_model.h5',
      monitor='categorical_accuracy', mode='max', verbose=1, save_best_only=True)


history = model2.fit(X_train, y_train,
                    batch_size=batch_size,
                    epochs=epochs,
                    steps_per_epoch=int(epoch_size_frac*y_train.shape[0] / batch_size),
                    #validation_data=(X_test, y_test),
                    callbacks=[es,mc],verbose=1)


Epoch 1/2
Epoch 00001: categorical_accuracy improved from -inf to 0.43959, saving model to train_only_temp_best_model.h5
Epoch 2/2
Epoch 00002: categorical_accuracy improved from 0.43959 to 0.76387, saving model to train_only_temp_best_model.h5


In [None]:
saved_model = load_model(save_to + '_best_model.h5', custom_objects={'CapsuleLayer': CapsuleLayer, 'Mask': Mask, 'Length': Length, 'margin_loss': margin_loss})

# evaluate the model
_, train_acc = saved_model.evaluate(X_train, y_train, verbose=1)
_, test_acc = saved_model.evaluate(X_test, y_test, verbose=1)
print('Train: %.3f, Test: %.3f' % (train_acc, test_acc))

Train: 0.812, Test: 0.786


##Frozen_Base_(Zero-Shot)##

In [None]:
# load_data returns data as (samples, windows, channels) and categorical labels
# current subdir options: "Minimal Processing", "Normal Processing/Rectified", "Normal Processing/Unrectified"
X_train, X_test, y_train, y_test = load_data(subdir= "Minimal Processing", subject=2, spec=False, GAF=False) 

In [None]:
# evaluate the model
_, train_acc = saved_model.evaluate(X_train, y_train, verbose=1)
_, test_acc = saved_model.evaluate(X_test, y_test, verbose=1)
print('Train: %.3f, Test: %.3f' % (train_acc, test_acc))

Train: 0.117, Test: 0.135


##Partially_Frozen_Base##

In [None]:
#base_model = keras.models.load_model('../input/model-1-weights/_best_model (1).h5',custom_objects={'CapsuleLayer': CapsuleLayer, 'Mask': Mask, 'Length': Length, 'margin_loss': margin_loss})
base_model = model2

base_model.layers[-1].trainable = False
base_model.layers[-2].trainable = False

opt_adam = keras.optimizers.Adam(learning_rate=1e-4)
base_model.compile(optimizer=opt_adam,
                  loss=[margin_loss, 'mse'],
                  loss_weights=[1., 0.0005],
                  metrics='categorical_accuracy')

In [None]:
# Hyper-parameters
epochs = 2
batch_size = 32

In [None]:
histories, model = train_model(base_model, X_train, y_train, X_test , y_test, batch_size=batch_size, save_to= 'partially_frozen_temp', epochs = epochs) 

Epoch 1/2
Epoch 00001: val_categorical_accuracy improved from -inf to 0.71078, saving model to partially_frozen_temp_best_model.h5
Epoch 2/2
Epoch 00002: val_categorical_accuracy improved from 0.71078 to 0.73636, saving model to partially_frozen_temp_best_model.h5
Train: 0.787, Test: 0.736


In [None]:
 subject2_model = create_compiled_model(X_train)

In [None]:
histories, model = train_model(subject2_model, X_train, y_train, X_test , y_test, batch_size=batch_size, save_to= 'subject2_temp', epochs = epochs) 

Epoch 1/2
Epoch 00001: val_categorical_accuracy improved from -inf to 0.71766, saving model to subject2_temp_best_model.h5
Epoch 2/2
Epoch 00002: val_categorical_accuracy improved from 0.71766 to 0.76641, saving model to subject2_temp_best_model.h5
Train: 0.806, Test: 0.766


##Results##

In [None]:
# summarize history for accuracy
plt.plot(histories.history['categorical_accuracy'])
plt.plot(histories.history['val_categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
#plt.savefig("LSTM+CNN_categorical_accuracy_{}ms_{}epochs_{}lr_{}s_exp_decay_adam.jpg".format(int(window_len/2), epochs, initial_learning_rate,batch_size))

In [None]:
# summarize history for loss
plt.clf
plt.plot(histories.history['loss'])
plt.plot(histories.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
#plt.savefig("LSTM+CNN_loss_{}ms_{}epochs_{}lr_{}s_exp_decay_adam.jpg".format(int(window_len/2), epochs, initial_learning_rate,batch_size))

In [None]:
def Statistics(data):
  # Classification Report
  report = classification_report(data['actual labels'],data['predicted labels'],output_dict=True)
  report = pd.DataFrame(report).T
  # Confusion matrix
  print("Confusion matrix is shown below")
  c_matrix=confusion_matrix(data['actual labels'],data['predicted labels'])
  FP = c_matrix.sum(axis=0) - np.diag(c_matrix)
  FN = c_matrix.sum(axis=1) - np.diag(c_matrix)
  TP = np.diag(c_matrix)
  TN = c_matrix.sum() - (FP + FN + TP)
  FP = FP.astype(float)
  FN = FN.astype(float)
  TP = TP.astype(float)
  TN = TN.astype(float)
  # Sensitivity or positive recall
  TPR = TP/(TP+FN)
  # Specificity or true negative rate or negative recall
  TNR = TN/(TN+FP)
  accuracy = accuracy_score(data['actual labels'],data['predicted labels'])
  TNR = np.append(TNR,[accuracy, TNR.mean(), ((TNR * report['support'][:17]).sum())/(report['support'][:17].sum())])
  report['specificity'] = TNR # adding new column specificity
  plt.figure(figsize=(40,20))
  norm_c_matrix = c_matrix.astype('float') / c_matrix.sum(axis=1)[:, np.newaxis]
  sns.heatmap(norm_c_matrix, annot=True,cmap='Blues', fmt='.2f')
  #sns.heatmap(c_matrix, annot=True,cmap='Blues', fmt='g')
  plt.xlabel('Predicted')
  plt.ylabel('Truth')
  plt.savefig("CapsNet_confusion_mat_{}ms.jpg".format(int(window_len/2)))
  print("Balanced_accuracy:{}".format(balanced_accuracy_score(data['actual labels'],data['predicted labels'])))
  print("Classification Report is shown below")
  return report, c_matrix, norm_c_matrix

In [None]:
# Reports + Confusion Matrix
#
# If loading model from another directory, set model path
# model_path = "model/directory/"
# model = load_model(os.path.normpath(model_path + model_name))
window_len = 600

save_to = 'temp_subject{}_{}epochs_{}lr_{}bs'.format(subject, epochs, initial_learning_rate, batch_size)
model_name = save_to + '_best_model.h5'
model =  load_model(model_name)

#y_test = np.array(y_test)
y_test = np.argmax(y_test,axis=1)
y_pred = model.predict(X_test, verbose=1)
y_pred=np.argmax(y_pred, axis=1)
print("y_test, y_pred:", y_test.shape, y_pred.shape)

data = []
data.append(y_test)
data.append(y_pred)
data = np.vstack(data).T
data_df = pd.DataFrame(data, columns = ['actual labels', 'predicted labels'])
report, c_matrix, norm_c_matrix = Statistics(data_df)


# Save 3 outputs from statistics 
# joblib.dump(report,'LSTM+CNN_report_{}ms'.format(int(window_len/2)))
# joblib.dump(c_matrix,'LSTM+CNN_c_matrix_{}ms'.format(int(window_len/2)))
# joblib.dump(norm_c_matrix,'LSTM+CNN_norm_c_matrix_{}ms'.format(int(window_len/2)))