In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

root_dir = '/content/gdrive/MyDrive/Data'

Mounted at /content/gdrive


In [2]:
import os
import random
import shutil
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import csv
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import RMSprop
from pandas.core.frame import DataFrame
from time import sleep
import math
from tensorflow.keras import layers

PseudoLabeling module

In [3]:
from sklearn.metrics import confusion_matrix
from mlxtend.plotting import plot_confusion_matrix
import seaborn as sns
from sklearn import metrics


def get_confusion_matrix(y_test, y_pred):
  confusion_matrix = np.zeros((number_of_columns, number_of_columns))

  for i in range(y_test1.shape[0]):
    class1 = y_test1[i]
    class2 = y_test2[i]

    if y_test[i] == 0:
      confusion_matrix[class2][class1] = confusion_matrix[class2][class1] + 1
    elif y_pred[i] == 0 and y_test[i] == 1:
      confusion_matrix[class2][class1] = confusion_matrix[class1][class2] + 1

  return confusion_matrix

def printConfusionMatrix(cf_matrix, Labels, show=True):
  #cf_matrix = confusion_matrix(test_labels, y_pred)
  #cf_matrix = confusion_matrix(y_test.argmax(axis=1), y_pred.argmax(axis=1))
  
  print(cf_matrix)

  precisions = []
  recalls = []
  FPRs = []
  n = cf_matrix.shape[0]

  total_vals = 0
  tps = 0
  for i in range(cf_matrix.shape[0]):
    sum_vals = cf_matrix[i].sum()
    sum_cols = cf_matrix[:, i].sum()

    total_vals += sum_vals
    tps += cf_matrix[i][i]
    recall = round(cf_matrix[i][i]/sum_vals, 2)
    recalls.append(recall)
    precision = round(cf_matrix[i][i]/sum_cols, 2)
    precisions.append(precision)
    f1_score = round(2 * precision * recall / (recall + precision), 2)

    # ROC
    TN = n - sum_vals - sum_cols + cf_matrix[i][i]
    FP = sum_cols - cf_matrix[i][i]
    FPR = round(FP / (FP + TN), 2)
    FPRs.append(FPR)
    if show:
      print("Precision, Recall and F1 Score for " + Labels[i] + " are " + str(precision) + ", " + str(recall) + ", " + str(f1_score))
      plt.plot([0,FPR, 1],[0,recall,1])

  accuracy = round(tps/total_vals, 2)

  cf_mat = np.zeros((cf_matrix.shape[0], cf_matrix.shape[0]))
  print(cf_mat.shape)
  for i in range(cf_matrix.shape[0]):
    for j in range(cf_matrix.shape[0]):
      cols_sum = cf_matrix[:,j].sum()
      cf_mat[i][j] = round(cf_matrix[i][j]/cols_sum, 2)

  if show:
    print("Accuracy is " + str(accuracy))
    #plot_confusion_matrix(conf_mat=cf_matrix, show_normed=True, figsize=(7,7))

    #create ROC curve
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.legend(Labels)
    plt.show()

    fig, siz = plt.subplots(figsize=(10,10))
    ax = sns.heatmap(cf_mat, annot=True, cmap='Blues', ax=siz)


    ax.set_title('Pseudo-labeling Confusion Matrix with labels\n\n');
    ax.set_xlabel('\nPredicted Pseudo-labeling Category')
    ax.set_ylabel('Actual Pseudo-labeling Category ');

    ## Ticket labels - List must be in alphabetical order

    ax.xaxis.set_ticklabels(Labels)
    ax.yaxis.set_ticklabels(Labels)

    ## Display the visualization of the Confusion Matrix.
    plt.show()

  return precisions, accuracy

In [4]:
experience = 10

features_dir = os.path.join(root_dir, "18 participants/AX3/TransformedData" + str(experience))
file_name = os.path.join(features_dir, "Labelled")

def encode_sleep_stage(val):
  arr = {'N1':0, 'N2':1, 'N3':2, 'REM':3, 'Wake':4, 'Artefact':np.NaN, 'A':np.NaN}
  return arr[val]

def encode_sleep_state(val):
  arr = {'N1':0, 'N2':1, 'N3':2, 'REM':3, 'Wake':4, 'Artefact':np.NaN, 'A':np.NaN}
  return arr[val]

dataset = pd.read_pickle(file_name)
dataset['Sleep State'] = dataset['Sleep State'].apply(lambda x: encode_sleep_stage(x))
dataset['Sleep State'] = pd.Series(np.array(dataset['Sleep State'])).interpolate()
dataset['Sleep State'] = np.array(dataset['Sleep State'].replace(to_replace = np.nan, value=4), dtype=np.int64)

Pairing the data

In [5]:
def trad_test_split(dataset, test_train_ratio):
  dataset_size = dataset.shape[0]
  split_point = int(dataset_size * test_train_ratio)
  dataset_train = dataset[:split_point]
  dataset_test = dataset[split_point:]
  return dataset_train, dataset_test

def conv2_reshape(data):
  data = data.reshape(data.shape[0], data.shape[1], 1, 1)
  return data

def decompose_dataset(dataset, number_of_classes):
  datasets = []
  data_count = int(dataset.shape[0])
  for i in range(number_of_classes):
    dataset_portion = dataset[dataset['Sleep State']==i].reset_index(drop=True)
    datasets.append(dataset_portion)
  return datasets

def extract_anchors(datasets, number_of_classes, anchor_size):
  anchor_datasets = []
  for i in range(number_of_classes):
    anchor_dataset = datasets[i].sample(anchor_size).reset_index(drop=True)
    anchor_datasets.append(anchor_dataset)
  return anchor_datasets

def get_positive_negative_dataset(datasets, class_no, number_of_classes):
  positives_dataset = datasets[class_no]
  negatives_dataset = None

  for i in range(number_of_classes):
    if i != class_no:
      class_dataset = datasets[i]
      negatives_dataset = pd.concat([negatives_dataset, class_dataset])

  return positives_dataset, negatives_dataset

def build_pairs_per_anchor(positives_dataset, negatives_dataset, class_no, anchor, pairs_per_anchor):
  positives = positives_dataset.sample(n=pairs_per_anchor).reset_index(drop=True)
  positives = np.array(positives.drop(['Label', 'Sleep State'], axis=1))
  negatives = negatives_dataset.sample(n=pairs_per_anchor).reset_index(drop=True)
  negatives = np.array(negatives.drop(['Label', 'Sleep State'], axis=1))

  anchors = []
  for i in range(0, pairs_per_anchor):
    anchors.append(anchor)

  return positives, np.array(anchors), negatives

def build_pairs_per_class_no(datasets, class_no, anchors, pairs_per_anchor):
  number_of_classes = len(datasets)
  positives_dataset, negatives_dataset = get_positive_negative_dataset(datasets, class_no, number_of_classes)
  anchor_dataset = np.array(anchors[class_no].drop(['Label', 'Sleep State'], axis=1))
  positives, anchors, negatives = build_pairs_per_anchor(positives_dataset, negatives_dataset, class_no, anchor_dataset[0], pairs_per_anchor)

  for i in range(1, anchor_dataset.shape[0]):
    positives_new, anchors_new, negatives_new = build_pairs_per_anchor(positives_dataset, negatives_dataset, class_no, anchor_dataset[i], pairs_per_anchor)
    positives = np.append(positives, positives_new, axis=0)
    anchors = np.append(anchors, anchors_new, axis=0)
    negatives = np.append(negatives, negatives_new, axis=0)
    
  return positives, anchors, negatives

def build_pairs(datasets, anchor_datasets, pairs_per_anchor):
  number_of_classes = len(datasets)
  positives, anchors, negatives = build_pairs_per_class_no(datasets, 0, anchor_datasets, pairs_per_anchor)
  for class_no in range(1, number_of_classes):
    positives_new, anchors_new, negatives_new = build_pairs_per_class_no(datasets, class_no, anchor_datasets, pairs_per_anchor)
    positives = np.append(positives, positives_new, axis=0)
    anchors = np.append(anchors, anchors_new, axis=0)
    negatives = np.append(negatives, negatives_new, axis=0)
  return positives, anchors, negatives

In [31]:
number_of_classes = 5
anchor_size = 5
pairs_per_anchor = 60
class_no = 0

#positives_dataset, negatives_dataset = get_positive_negative_dataset(datasets, class_no, number_of_classes)
#anchor_dataset = np.array(anchor_datasets[class_no].drop(['Label', 'Sleep State'], axis=1))
#db = datasets[0].drop(['Label', 'Sleep State'], axis=1).head(3)
#positives, anchors, negatives = build_pairs_per_class_no(datasets, class_no, anchor_datasets, pairs_per_anchor)

test_train_ratio = 0.9
dataset_train, dataset_test = trad_test_split(dataset, test_train_ratio)

datasets = decompose_dataset(dataset_train, number_of_classes)
anchor_datasets = extract_anchors(datasets, number_of_classes, anchor_size)
positives, anchors, negatives = build_pairs(datasets, anchor_datasets, pairs_per_anchor)

print(len(anchor_datasets))

positives = conv2_reshape(positives)
anchors = conv2_reshape(anchors)
negatives = conv2_reshape(negatives)
labels = np.zeros((positives.shape[0],))

unchanged_positives = positives
positives = np.append(positives, negatives, axis=0)
negatives = np.append(negatives, unchanged_positives, axis=0)
anchors = np.append(anchors, anchors, axis=0)

sec_half_labels = [2 for i in range(labels.shape[0])]
labels = np.append(labels, sec_half_labels, axis=0)

positives_train, positives_val = trad_test_split(positives, test_train_ratio)
anchors_train, anchors_val = trad_test_split(anchors, test_train_ratio)
negatives_train, negatives_val = trad_test_split(negatives, test_train_ratio)
labels_train, labels_val = trad_test_split(labels, test_train_ratio)

print(positives.shape)
print(anchors.shape)
print(negatives.shape)
print(labels.shape)

5
(3000, 700, 1, 1)
(3000, 700, 1, 1)
(3000, 700, 1, 1)
(3000,)


Siamese Contrastive model

In [36]:
margin = 1
def loss(margin=1):
    def reduce_loss_value(y, loss_val):
        return tf.math.reduce_mean(tf.math.abs(loss_val-y))
    return reduce_loss_value

def euclidean_distance(vects):
    x, y = vects
    sum_square = tf.math.reduce_sum(tf.math.square(x - y), axis=1, keepdims=True)
    return tf.math.maximum(sum_square, tf.keras.backend.epsilon())

def triplet_loss(vects):
    d_pos, d_neg = vects
    return tf.math.maximum(d_pos-d_neg+margin, 0)

input = layers.Input((positives.shape[1], 1, 1))
x = tf.keras.layers.BatchNormalization()(input)
x = layers.Conv2D(70, (10, 1), activation="tanh")(input)
x = layers.Conv2D(35, (10, 1), activation="tanh")(x)
x = layers.Flatten()(x)
x = tf.keras.layers.BatchNormalization()(x)
x = layers.Dense(128, activation="tanh")(x)
embedding_network = keras.Model(input, x)

#input = layers.Input((49,))
#x = tf.keras.layers.BatchNormalization()(input)
#x = layers.Dense(49, activation="tanh")(x)
#x = layers.Dense(23,, activation="tanh")(x)
#x = layers.Flatten()(x)
#x = tf.keras.layers.BatchNormalization()(x)
#x = layers.Dense(10, activation="tanh")(x)

input_1 = layers.Input((positives.shape[1],1,1))
input_2 = layers.Input((positives.shape[1],1,1))
input_3 = layers.Input((positives.shape[1],1,1))

# As mentioned above, Siamese Network share weights between
# tower networks (sister networks). To allow this, we will use
# same embedding network for both tower networks.
tower_1 = embedding_network(input_1)
tower_2 = embedding_network(input_2)
tower_3 = embedding_network(input_3)

merge_layer_1 = layers.Lambda(euclidean_distance)([tower_1, tower_2])
merge_layer_2 = layers.Lambda(euclidean_distance)([tower_2, tower_3])
merge_layer_3 = layers.Lambda(triplet_loss)([merge_layer_1, merge_layer_2])
normal_layer = tf.keras.layers.BatchNormalization()(merge_layer_2)
output_layer = layers.Dense(1, activation="sigmoid")(normal_layer)
siamese = keras.Model(inputs=[input_1, input_2, input_3], outputs=output_layer)

output_layer_cnn = layers.Dense(5, activation="softmax")(tower_1)
cnn = keras.Model(inputs=[input_1], outputs=output_layer_cnn)

from tensorflow.keras.optimizers import RMSprop
rms = RMSprop(learning_rate=0.001)
cnn.compile(loss='categorical_crossentropy', optimizer=rms, metrics=["accuracy"])

siamese.compile(loss=loss(margin=1), optimizer='RMSprop', metrics=["accuracy"])
#siamese.summary()

#from keras.utils.vis_utils import plot_model
#plot_model(siamese, show_shapes=True, show_layer_names=True)
history = siamese.fit(
    [positives_train, anchors_train, negatives_train],
    labels_train,
    validation_data=([positives_val, anchors_val, negatives_val], labels_val),
    batch_size=10,
    epochs=10,
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [14]:
cont_model_file = os.path.join(features_dir, "TripletModel.pkl")
siamese.save(cont_model_file)




In [27]:
arr = [i for i in range(5)]
random.choice(arr)

dataset_test_array, labels_test = reform_dataset(dataset_test)
positives, anchors, negatives = construct_testing_pairs(dataset_test_array, anchor_datasets)

positives = conv2_reshape(positives)
anchors = conv2_reshape(anchors)
negatives = conv2_reshape(negatives)

print(labels_test.shape)
print(positives.shape)
print(anchors.shape)
print(negatives.shape)

res = siamese.predict([positives, anchors, negatives])
res

(121,)
(605, 700, 1, 1)
(605, 700, 1, 1)
(605, 700, 1, 1)


array([[7.4692124e-08],
       [7.4692693e-08],
       [7.4691840e-08],
       [7.4692551e-08],
       [7.4693688e-08],
       [7.4691698e-08],
       [7.4691982e-08],
       [7.4692409e-08],
       [7.4691698e-08],
       [7.4692409e-08],
       [7.4692551e-08],
       [7.4692551e-08],
       [7.4693261e-08],
       [7.4691840e-08],
       [7.4692409e-08],
       [7.4691982e-08],
       [7.4691840e-08],
       [7.4692693e-08],
       [7.4693979e-08],
       [7.4692267e-08],
       [7.4693979e-08],
       [7.4692124e-08],
       [7.4691556e-08],
       [7.4692551e-08],
       [7.4691698e-08],
       [7.4692267e-08],
       [7.4693979e-08],
       [7.4692977e-08],
       [7.4691840e-08],
       [7.4691698e-08],
       [7.4692124e-08],
       [7.4692124e-08],
       [7.4692124e-08],
       [7.4693261e-08],
       [7.4692551e-08],
       [7.4691698e-08],
       [7.4692551e-08],
       [7.4691698e-08],
       [7.4691698e-08],
       [7.4692124e-08],
       [7.4693837e-08],
       [7.469198

In [24]:
def reform_dataset(dataset_test):
  labels_test = np.array(dataset_test['Sleep State'])
  dataset_test_array = np.array(dataset_test.drop(['Label', 'Sleep State'], axis=1))
  return dataset_test_array, labels_test

def construct_testing_pair(dataset_row, anchor_datasets):
  number_of_classes = len(anchor_datasets)
  positives = [dataset_row for i in range(number_of_classes)]
  anchors = []
  negatives = []

  class_range = [i for i in range(number_of_classes)]
  for i in range(0, number_of_classes):
    class_range.remove(i)
    negative_class = random.choice(class_range)
    class_range.append(i)

    anchor = anchor_datasets[i].sample(n=1)
    anchor = np.array(anchor.drop(['Label', 'Sleep State'], axis=1))
    anchors.append(anchor[0])

    negative = anchor_datasets[negative_class].sample(n=1)
    negative = np.array(negative.drop(['Label', 'Sleep State'], axis=1))
    negatives.append(negative[0])    
  return np.array(positives), np.array(anchors), np.array(negatives)
  
def construct_testing_pairs(dataset_test_array, anchor_datasets):
  positives, anchors, negatives = construct_testing_pair(dataset_test_array[0], anchor_datasets)

  for i in range(1, dataset_test_array.shape[0]):
    positives_new, anchors_new, negatives_new = construct_testing_pair(dataset_test_array[i], anchor_datasets)
    positives = np.append(positives, positives_new, axis=0)
    anchors = np.append(anchors, anchors_new, axis=0)
    negatives = np.append(negatives, negatives_new, axis=0)

  return positives, anchors, negatives

dataset_test_array, labels_test = reform_dataset(dataset_test)

def predict_siamese(siamese, positives, anchors, negatives):
  siamese_pred = 
  return y_pred, siamese_pred

#y_pred, siamese_pred = predict_siamese(5, siamese, dataset_test, test_groups=400)

#y_test_test = np.array(dataset_test['Sleep State'])
#print(y_test_test.shape)
#cf_matrix = get_confusion_matrix(y_test, y_pred)
#cf_matrix = confusion_matrix(y_test_test, y_pred)

Labels = ['N1', 'N2', 'N3', 'REM', 'WAKE']
#printConfusionMatrix(cf_matrix, Labels, show=True)

In [None]:
siamese_pred.shape

(90000, 1)