<a href="https://colab.research.google.com/github/BRomans/IdMind/blob/main/eeg_biometrics_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# EEG Biometrics


In [None]:
# Run this cell to load required libraries and mount your Drive folder
import numpy as np
from numpy.random import choice
from matplotlib import pyplot as plt
from google.colab import drive
import os
from sklearn.svm import SVC
import pandas as pd
import itertools
import random

In [None]:
# Seed value
seed_value = 10

# 1. Set `PYTHONHASHSEED` environment variable at a fixed value
os.environ['PYTHONHASHSEED']=str(seed_value)

# 2. Set `python` built-in pseudo-random generator at a fixed value
random.seed(seed_value)

# 3. Set `numpy` pseudo-random generator at a fixed value
np.random.seed(seed_value)

In [None]:
drive.mount('/content/drive')
dirpath = "/content/drive/MyDrive/ml2-eeg-biometrics/train-test-data/" 

In [None]:
# Empty iterator
def empty_gen(): 
   yield from ()
  
# Helper function for constructing pairs.
def get_index_pairs(y, size='balanced'):
  iter_same_class, iter_diff_class = empty_gen(), empty_gen()

  total_same, total_diff = 0, 0
  # Loop over the classes
  for c in np.unique(y):
    c_indexes = np.where(y == c)[0]                                            # find the indexes where the class is the same as the current class.
    non_c_indexes = np.where(y != c)[0]                                        # find the indexes where the class is different from the current class.
    
    if size is not None:                                                       # If size is None, then take all different-class indexes.
      size = int(np.ceil(len(c_indexes)/2)) if size=='balanced' else size      # If the size should be balanced, then we should take as many different-class examples as same-class examples. We'll be taking the pair in reverse order too, so divide by 2 here.
      non_c_indexes = choice(non_c_indexes, size=size, replace=False)          # Take a random subset of the indexes.

    if len(c_indexes) > 1:
      iter = itertools.permutations(np.nditer(c_indexes), 2)                     # Get all permutations of same-class indexes.
      iter_same_class = itertools.chain(iter_same_class, iter)                   # Chain (concatenate) this with the existing indexes.

    if len(non_c_indexes) > 0:
      iter = itertools.product(np.nditer(c_indexes), np.nditer(non_c_indexes))   # Get permutations of indexes with different classes.
      iter_diff_class = itertools.chain(iter_diff_class, iter)                   # Chain this with the existing indexes.

    total_same += len(c_indexes)*len(c_indexes) - len(c_indexes)
    total_diff += len(non_c_indexes)*len(c_indexes)
  
  return iter_same_class, iter_diff_class, total_same, total_diff

In [None]:

class pairwiseSVM:
  """
  Define the SVM class which will handle the pairwise manipulation, training & prediction
  """
  def __init__(self, C=1.0, kernel='rbf', degree=3, random_state=None):
    self.svm = SVC(C=C, kernel=kernel, degree=degree, random_state=random_state)

  def read_train_data(filename, label_col = 'labels', id_col = 'id'):
    """
    The training data may be read from file or supplied as a DF when fitting the classifier. 
    In both cases, it is assumed that is in DF format with column names, and includes class labels and trial IDs.
    """
    data = pd.read_csv(filename)

    # Separate the trial IDs, class labels and feature data into separate numpy arrays.
    train_id = np.array(data[id_col]) # Retrieve the trial IDs for each row & convert to numpy array.
    y_train = np.array(data[label_col]) # Retrieve the class labels for each row & convert to numpy array.
    # Retrieve the training features only and convert to numpy array.
    x_train = data.drop([label_col, id_col], axis=1)
    x_train = np.array(x_train)

    self.train_id = train_id
    self.y_train = y_train
    self.x_train = x_train

  def construct_pairs(self, x_train=None, y_train=None, x_test=None, y_test=None):
    """
    Method for constructing pairs from the training or testing data.
    """
    if (x_train is None) ^ (y_train is None):
      raise Exception("Both x_train and y_train datasets should be supplied, or neither.")
    elif x_train is None and y_train is None:
      x_train = self.x_train
      y_train = self.y_train

    # If x_test is not supplied, we want to construct all pairs of the training data with itself.
    if x_test is None:
      # Using the permutations function allows us to get symmetric pairs but excludes pairs of the same index. i.e. both (i,j) and (j,i) will be included but only where i!=j
      # index_pairs = itertools.permutations(range(len(x_train)), 2) # Get all two-way permutations of the indexes.

      index_pairs_same, index_pairs_diff, n_same, n_diff = get_index_pairs(y_train, size='balanced')
    
      n_pairs = n_same + n_diff*2
      # n_pairs = len(x_train)*len(x_train) - len(x_train)  # All two-way combinations except where the indexes are the same.
      x_pairs = np.zeros((n_pairs, x_train.shape[1]*2))   # Create a blank array to hold the concatenated feature vector pairs.
      y_pairs = np.zeros(n_pairs, dtype=np.int8)          # Create a blank vector to hold class similarity flag.
      training_label = np.zeros(n_pairs, dtype=np.int8)   # Vector to hold the class label of the training example (left-hand side of the comparison).
      test_index = np.zeros(n_pairs, dtype=np.uint16)     # Vector to hold the index of the test example, so that we can easily implement the voting scheme for each test example.

      count=0
      for i, j in index_pairs_same:
        x_pairs[count] = np.concatenate((x_train[i],x_train[j]))  # Concatenate the feature vectors for each pair.
        y_pairs[count] = 1                                        # These pairs come from the same class
        training_label[count] = y_train[i]                        # Record the class label for the element of the pair coming from the training data. 
        test_index[count] = j                                     # Record the index of the test example being used.
        count += 1                                                # Increment the counter.

      for i, j in index_pairs_diff:
        x_pairs[count] = np.concatenate((x_train[i],x_train[j]))    # Concatenate the feature vectors for each pair.
        x_pairs[count+1] = np.concatenate((x_train[j],x_train[i]))  # Concatenate the feature vectors for each pair.
        y_pairs[count:count+2] = 0                                  # These pairs come from different classes.
        training_label[count] = y_train[i]                          # Record the class label for the element of the pair coming from the training data. 
        training_label[count+1] = y_train[j]
        test_index[count] = j                                       # Record the index of the test example being used.
        test_index[count+1] = i                                             
        count += 2                                                  # Increment by two since we're adding two pairs per loop.

    
    # If x_test is supplied, we want to construct all pairs combining the test data and the training data.
    elif x_test is not None:
      index_pairs = itertools.product(range(len(x_train)), range(len(x_test))) # Get all two-way permutations of the indexes.

      n_pairs = len(x_train)*len(x_test)                          # Get the number of pairs.
      x_pairs = np.zeros((n_pairs, x_train.shape[1]*2))           # Create a blank array to hold the concatenated feature vector pairs.
      training_label = np.zeros(n_pairs, dtype=np.int8)           # Vector to hold the class label of the training example (left-hand side of the comparison).
      test_index = np.zeros(n_pairs, dtype=np.uint16)             # Vector to hold the index of the test example, so that we can easily implement the voting scheme for each test example.

      # If y_test is also supplied (for evaluating classification accuracy for example), 
      #   then we need to check where the class label is the same for each pair of train/test data.
      if y_test is not None:
        y_pairs = np.zeros(n_pairs, dtype=np.int8)                # Create a blank vector to hold class similarity flag.
      else: 
        y_pairs = None

      for count, (i,j) in enumerate(index_pairs):
        x_pairs[count] = np.concatenate((x_train[i],x_test[j]))   # Concatenate the feature vectors for each pair.
        training_label[count] = y_train[i]                        # Record the class label for the element of the pair coming from the training data. 
        test_index[count] = j                                     # Record the index of the test example being used.
        if y_test is not None:
          y_pairs[count] = y_train[i] == y_test[j]                # Check if the pair comes from the same class or not.

    # Return the concatenated feature vectors for each pair, and the binary label whether they are from the same class.
    return x_pairs, y_pairs, training_label, test_index


  def fit(self, x_train = None, y_train = None):
    """
    Method to fit the SVM on the pairwise training data.
    """
    if (x_train is None) ^ (y_train is None):
      raise Exception("Either both the x_train and y_train datasets should be supplied, or neither.")

    # Get all pairwise combinations of the training data.
    elif x_train is None and y_train is None:
      x_pairs, y_pairs, _,_ = self.construct_pairs()

    else:
      self.x_train = x_train
      self.y_train = y_train
      x_pairs, y_pairs, _,_ = self.construct_pairs(x_train, y_train)

    self.svm.fit(x_pairs, y_pairs)

  def predict_pairwise(self, x_test, y_test=None):
    """Predict the pairwise class similarity with the training data given a set of feature data."""
    x_pairs, y_pairs, training_label, test_index = self.construct_pairs(x_test=x_test, y_test=y_test)

    # Return the similarity predictions, the ground truth similarities, and the class label of the training data observation used in the pair.
    return self.svm.predict(x_pairs), y_pairs, training_label, test_index
  
  def predict_class(self, x_test, y_test=None):
    """Predict class labels given a set of feature data."""
    y_pairs_pred, y_pairs_true, training_label, test_index = self.predict_pairwise(x_test, y_test)

    # Implement voting scheme to decide on class label.
    df = pd.DataFrame({'label':training_label[y_pairs_pred==1], 'test_index': test_index[y_pairs_pred==1]})
    df.value_counts(['label','test_index'])

  def add_class(self, new_train, new_class):
    """ Add new participant for prediction purposes. """
    pass

  def tune_hyperparameters(self, x_validation, y_validation):
    """ Optimise the values of C and the degree using the validation set. """
    pass

#### Small test for the pairwise SVM

In [None]:
# psvm = pairwiseSVM(C=10.0, kernel='rbf', degree=3, random_state=None) # Create a test instance of the class.

# # # Some small test data
# # a = np.array([[1,2,3],[4,5,6],[7,8,9],[9,10,11],[2,9,10]])
# # b = np.array([[901,801,701],[602,603,604]])
# # y_a = np.array([0,1,1,2,2])
# # y_b = np.array([1,1])

# a = np.array([[1],[2.5],[3.0],[3.7],[5.2],[5.8],[7.1],[7.2],[7.4],[10]])
# y_a = np.array([0,1,1,1,2,2,3,3,3,4])

# # x_pairs, y_pairs, training_label,_ = psvm.construct_pairs(x_train=a,y_train=y_a)
# psvm.fit(a,y_a)
# # psvm.svm.fit(a, y_a) # Test a regular SVM to separate the classes without a pairwise approach.
# psvm.predict_pairwise(a,y_a)

In [None]:
x = [z[0] for z in x_pairs]
y = [z[1] for z in x_pairs]
plt.scatter(x,y, c=y_pairs)

def plot_svc_decision_function(model, ax=None, plot_support=True):
    """Plot the decision function for a 2D SVC"""
    if ax is None:
        ax = plt.gca()
    xlim = ax.get_xlim()
    ylim = ax.get_ylim()
    
    # create grid to evaluate model
    x = np.linspace(xlim[0], xlim[1], 30)
    y = np.linspace(ylim[0], ylim[1], 30)
    Y, X = np.meshgrid(y, x)
    xy = np.vstack([X.ravel(), Y.ravel()]).T
    P = model.decision_function(xy).reshape(X.shape)
    
    # plot decision boundary and margins
    ax.contour(X, Y, P, colors='k',
               levels=[-1, 0, 1], alpha=0.5,
               linestyles=['--', '-', '--'])
    
    # plot support vectors
    if plot_support:
        ax.scatter(model.support_vectors_[:, 0],
                   model.support_vectors_[:, 1],
                   s=300, linewidth=1, facecolors='none');
    ax.set_xlim(xlim)
    ax.set_ylim(ylim)

plot_svc_decision_function(psvm.svm)

In [None]:
# Structure to evaluate classification performance
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

def print_results(Y_test, predictions, label_names):
    print(classification_report(Y_test, predictions))
    print("Classification Accuracy: {0:.3f}".format(accuracy_score(Y_test, predictions)))

    conf_mat = confusion_matrix(Y_test, predictions)

    fig = plt.figure(figsize=(6,6))
    width = np.shape(conf_mat)[1]
    height = np.shape(conf_mat)[0]

    plt.figure(figsize=(12,12))
    res = plt.imshow(np.array(conf_mat), cmap=plt.cm.summer, interpolation='nearest')
    for i, row in enumerate(conf_mat):
        for j, c in enumerate(row):
            if c>0:
                plt.text(j-.2, i+.1, c, fontsize=16)

    # cb = fig.colorbar(res)
    plt.title('Confusion Matrix')
    # _ = plt.xticks(range(6), label_names, rotation=90)
    # _ = plt.yticks(range(6), label_names)

In [None]:
# Run this cell to save the changes

# drive.flush_and_unmount()
# print('All changes made in this colab session should now be visible in Drive.')

In [None]:
# x_train_1 = np.load(dirpath + 'encoding_model_2021-01-18 20:30:21.496480.npy')
# x_train_2 = np.load(dirpath + 'cand_encoding_model_2021-01-18 20:30:21.496480.npy')

# x_train_1 = np.load(dirpath + 'train_encoding_model_2021-01-18 22:24:31.844829.npy')
# x_train_2 = np.load(dirpath + 'train_cand_encoding_model_2021-01-18 22:24:31.844829.npy')

# x_valid_1 = np.load(dirpath + 'valid_encoding_model_2021-01-18 22:24:31.844829.npy')
# x_valid_2 = np.load(dirpath + 'valid_cand_encoding_model_2021-01-18 22:24:31.844829.npy')

x_train = np.load(dirpath + 'train_encoding_model_2021-01-19 19:36:47.206950.npy')
x_valid = np.load(dirpath + 'valid_encoding_model_2021-01-19 19:36:47.206950.npy')

y_train = np.load(dirpath + 'y_train.npy')
y_train = y_train.reshape((-1,))

y_valid= np.load(dirpath + 'y_valid.npy', allow_pickle=True)
y_valid = y_valid.reshape((-1,))
y_valid = np.array(y_valid, dtype='int64') # Read in as object vector with allow_pickle, not sure why.

id_train = np.load(dirpath + 'id_train.npy', allow_pickle=True)
id_train = id_train.reshape((-1, 5))

id_valid = np.load(dirpath + 'id_valid.npy', allow_pickle=True)
id_valid = id_valid.reshape((-1, 5))

In [None]:
# svm_1 = SVC(C=10.0, kernel='rbf', degree=3, random_state=0)
# svm_2 = SVC(C=10.0, kernel='rbf', degree=3, random_state=0)

# svm_1.fit(x_train_1, y_train)
# svm_2.fit(x_train_2, y_train)

# y_pred_1 = svm_1.predict(x_train_1)
# y_pred_2 = svm_2.predict(x_train_2)

svm = SVC(C=100, kernel='rbf', degree=3, random_state=0)
svm.fit(x_train, y_train)

train_pred = svm.predict(x_train)
valid_pred = svm.predict(x_valid)

In [None]:
valid_pred_1 =  svm_1.predict(x_valid_1)
valid_pred_2 = svm_2.predict(x_valid_2)

In [None]:
from time import time
prev=time()

psvm = pairwiseSVM(C=1.0, kernel='rbf', degree=3, random_state=0)

x_subset = x_train[::10,:]
y_subset = y_train[::10]

# pairs = psvm.construct_pairs(x_subset, y_subset)
psvm.fit(x_subset, y_subset)

print(round(time()-prev, 5), " seconds")

In [None]:
x_valid_subset = x_valid[::5,:]
y_valid_subset = y_valid[::5]
print(x_valid_subset.shape)

In [None]:
# y_pairs_pred, y_pairs_true, training_label, test_index = psvm.predict_pairwise(x_subset, y_subset)
x_pairs, y_pairs_true, training_label, test_index = psvm.construct_pairs()

In [None]:
prev=time()

y_pairs_pred = psvm.svm.predict(x_pairs)

print(round(time()-prev, 5), " seconds")

In [None]:
prev=time()

x_pairs_valid, y_pairs_true_valid, training_label_valid, test_index_valid = psvm.construct_pairs(x_test=x_valid_subset, y_test=y_valid_subset)

print(x_pairs_valid.shape)
print(round(time()-prev, 5), " seconds")

In [None]:
prev=time()

y_pairs_pred_valid = psvm.svm.predict(x_pairs_valid)
print(y_pairs_pred_valid.shape)
print(round(time()-prev, 5), " seconds")

In [None]:
y_pairs_pred_valid, y_pairs_true_valid, training_label_valid, test_index_valid = psvm.predict_pairwise(x_valid, y_valid)

In [None]:
# Implement voting scheme to decide on class label.
df = pd.DataFrame({'label':training_label[y_pairs_pred==1], 'test_index': test_index[y_pairs_pred==1]})
df = df.groupby(['test_index', 'label'], as_index=False).size().sort_values(by='size', ascending=False)            # For each test_index (sample of test data), get the count for each predicted label.
df = df.drop_duplicates(subset='test_index').sort_values(by='test_index')                                          # One row for each test_index.
y_class_pred = df.label.to_numpy()

In [None]:
df = pd.DataFrame({'label':training_label_valid[y_pairs_pred_valid==1], 'test_index': test_index_valid[y_pairs_pred_valid==1]})
df = df.groupby(['test_index', 'label'], as_index=False).size().sort_values(by='size', ascending=False)            # For each test_index (sample of test data), get the count for each predicted label.
df = df.drop_duplicates(subset='test_index').sort_values(by='test_index')                                          # One row for each test_index.
y_class_pred_valid = df.label.to_numpy()

In [None]:
print_results(y_subset, y_class_pred, np.unique(y_subset))

In [None]:
print_results(y_valid_subset, y_class_pred_valid, np.unique(y_valid_subset))

In [None]:
print_results(y_pairs_true, y_pairs_pred, label_names=[0,1])

In [None]:
print_results(y_pairs_true_valid, y_pairs_pred_valid, label_names=[0,1])

In [None]:
from sklearn.model_selection import GridSearchCV

# parameters = {'kernel':('linear', 'rbf'), 'C':[0.01, 0.1, 1, 10, 100], 'gamma':[0.001, 0.01, 0.1, 1, 10, 100], 'degree':[2,3,4]}
parameters = {'C':[0.1, 1, 10], 'gamma':[0.0001, 0.01, 0.1]}

clf = GridSearchCV(svm, parameters)
clf.fit(x_train, y_train)

# Utility function to report best scores
def report(results, n_top=3):
    for i in range(1, n_top + 1):
        candidates = np.flatnonzero(results['rank_test_score'] == i)
        for candidate in candidates:
            print("Model with rank: {0}".format(i))
            print("Mean validation score: {0:.3f} (std: {1:.3f})"
                  .format(results['mean_test_score'][candidate],
                          results['std_test_score'][candidate]))
            print("Parameters: {0}".format(results['params'][candidate]))
            print("")

report(clf.cv_results_)

In [None]:
print_results(y_train, train_pred, np.unique(y_train).tolist())

In [None]:
print_results(y_valid, valid_pred, np.unique(y_valid).tolist())