# **E1-201 : Hardware Acceleration on Machine Learning**
---
## **Course Project :**
### **Implementation of Brain Computer Interface(BCI) Decoder using Arduino Nano 33 BLE sense Lite**
---
**Authors :**
1. Hitesh Pavan Oleti (SR.No.:19804, ESE, IISc)
2. Anand Chauhan      (SR.No.:*****, ESE, IISc)

**Submitted to :**

Prof. Chetan Singh Thakur,

NeuRonICS Lab, IISc.

---

**TensorFlow Version**

In [39]:
import tensorflow as tf
tf.print(tf.__version__)

2.9.2


**Installing MNE Package**

(An open-source Python package for exploring, visualizing, and analyzing human neurophysiological data: MEG, EEG, sEEG, ECoG and NIRS)

In [None]:
!pip install mne

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


**Importing Libraries**

In [None]:
import numpy as np

# mne imports
import mne
from mne import io
from mne.datasets import sample

# EEGNet-specific imports
from tensorflow.keras import utils as np_utils
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Activation, Permute, Dropout
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import SpatialDropout2D
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.layers import Input, Flatten
from tensorflow.keras.constraints import max_norm
from tensorflow.keras import backend as K

# tools for plotting confusion matrices
from matplotlib import pyplot as plt

**Mounting Google Drive**

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

## **EEGNet Model**

In [None]:
def EEGNet(nb_classes, Chans = 64, Samples = 128, 
             dropoutRate = 0.5, kernLength = 64, F1 = 8, 
             D = 2, F2 = 16, norm_rate = 0.25, dropoutType = 'Dropout'):
    """ Keras Implementation of EEGNet
    http://iopscience.iop.org/article/10.1088/1741-2552/aace8c/meta
    Note that this implements the newest version of EEGNet and NOT the earlier
    version (version v1 and v2 on arxiv). We strongly recommend using this
    architecture as it performs much better and has nicer properties than
    our earlier version. For example:
        
        1. Depthwise Convolutions to learn spatial filters within a 
        temporal convolution. The use of the depth_multiplier option maps 
        exactly to the number of spatial filters learned within a temporal
        filter. This matches the setup of algorithms like FBCSP which learn 
        spatial filters within each filter in a filter-bank. This also limits 
        the number of free parameters to fit when compared to a fully-connected
        convolution. 
        
        2. Separable Convolutions to learn how to optimally combine spatial
        filters across temporal bands. Separable Convolutions are Depthwise
        Convolutions followed by (1x1) Pointwise Convolutions. 
        
    
    While the original paper used Dropout, we found that SpatialDropout2D 
    sometimes produced slightly better results for classification of ERP 
    signals. However, SpatialDropout2D significantly reduced performance 
    on the Oscillatory dataset (SMR, BCI-IV Dataset 2A). We recommend using
    the default Dropout in most cases.
        
    Assumes the input signal is sampled at 128Hz. If you want to use this model
    for any other sampling rate you will need to modify the lengths of temporal
    kernels and average pooling size in blocks 1 and 2 as needed (double the 
    kernel lengths for double the sampling rate, etc). Note that we haven't 
    tested the model performance with this rule so this may not work well. 
    
    The model with default parameters gives the EEGNet-8,2 model as discussed
    in the paper. This model should do pretty well in general, although it is
	advised to do some model searching to get optimal performance on your
	particular dataset.
    We set F2 = F1 * D (number of input filters = number of output filters) for
    the SeparableConv2D layer. We haven't extensively tested other values of this
    parameter (say, F2 < F1 * D for compressed learning, and F2 > F1 * D for
    overcomplete). We believe the main parameters to focus on are F1 and D. 
    Inputs:
        
      nb_classes      : int, number of classes to classify
      Chans, Samples  : number of channels and time points in the EEG data
      dropoutRate     : dropout fraction
      kernLength      : length of temporal convolution in first layer. We found
                        that setting this to be half the sampling rate worked
                        well in practice. For the SMR dataset in particular
                        since the data was high-passed at 4Hz we used a kernel
                        length of 32.     
      F1, F2          : number of temporal filters (F1) and number of pointwise
                        filters (F2) to learn. Default: F1 = 8, F2 = F1 * D. 
      D               : number of spatial filters to learn within each temporal
                        convolution. Default: D = 2
      dropoutType     : Either SpatialDropout2D or Dropout, passed as a string.
    """
    
    if dropoutType == 'SpatialDropout2D':
        dropoutType = SpatialDropout2D
    elif dropoutType == 'Dropout':
        dropoutType = Dropout
    else:
        raise ValueError('dropoutType must be one of SpatialDropout2D '
                         'or Dropout, passed as a string.')
    
    input1   = Input(shape = (Chans, Samples, 1))

    ##################################################################
    block1       = Conv2D(F1, (1, kernLength), padding = 'same',
                                   input_shape = (Chans, Samples, 1),
                                   use_bias = False)(input1)
    block1       = BatchNormalization()(block1)
    block1       = DepthwiseConv2D((Chans, 1), use_bias = False, 
                                   depth_multiplier = D,
                                   depthwise_constraint = max_norm(1.))(block1)
    block1       = BatchNormalization()(block1)
    block1       = Activation('relu')(block1)
    block1       = AveragePooling2D((1, 4))(block1)
    block1       = dropoutType(dropoutRate)(block1)
    
    block2       = SeparableConv2D(F2, (1, 16),
                                   use_bias = False, padding = 'same')(block1)
    block2       = BatchNormalization()(block2)
    block2       = Activation('relu')(block2)
    block2       = AveragePooling2D((1, 8))(block2)
    block2       = dropoutType(dropoutRate)(block2)
        
    flatten      = Flatten(name = 'flatten')(block2)
    
    dense        = Dense(nb_classes, name = 'dense', 
                         kernel_constraint = max_norm(norm_rate))(flatten)
    softmax      = Activation('softmax', name = 'softmax')(dense)
    
    return Model(inputs=input1, outputs=softmax)

**Loading Input Dataset**

In [None]:
from pandas import read_csv
import pandas as pd
EPOC_dataset = read_csv('/content/gdrive/MyDrive/HAOML project/mne_iir_elliptic_5thorder.csv')
EPOC_dataset = EPOC_dataset.values
display(pd.DataFrame(EPOC_dataset))
chans = 60; samples = 151;
row, col = EPOC_dataset.shape

y =  EPOC_dataset[:,col-1]
X = np.zeros([row,chans,samples])
for i in range(row):
  X[i,:,:] = np.reshape(EPOC_dataset[i,0:col-1],(chans,samples))

print(X.shape)
print(y.shape)

**Train, Validation and Test split**

In [None]:
kernels, chans, samples = 1, 60, 151

# take 50/25/25 percent of the data to train/validate/test
X_train      = X[0:144,]
Y_train      = y[0:144]
X_validate   = X[144:216,]
Y_validate   = y[144:216]
X_test       = X[216:,]
Y_test       = y[216:]


## **EEGNet model training**

In [None]:
############################# EEGNet portion ##################################

# convert labels to one-hot encodings.
Y_train      = np_utils.to_categorical(Y_train-1)
Y_validate   = np_utils.to_categorical(Y_validate-1)
Y_test       = np_utils.to_categorical(Y_test-1)
print(Y_train.shape, 'train samples Y')
print(Y_test.shape, 'test samples Y')


# convert data to NHWC (trials, channels, samples, kernels) format. Data 
# contains 60 channels and 151 time-points. Set the number of kernels to 1.
X_train      = X_train.reshape(X_train.shape[0], chans, samples, kernels)
X_validate   = X_validate.reshape(X_validate.shape[0], chans, samples, kernels)
X_test       = X_test.reshape(X_test.shape[0], chans, samples, kernels)
   
print('X_train shape:', X_train.shape)
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')

# configure the EEGNet-8,2,16 model with kernel length of 32 samples (other 
# model configurations may do better, but this is a good starting point)
model = EEGNet(nb_classes = 4, Chans = chans, Samples = samples, 
               dropoutRate = 0.5, kernLength = 32, F1 = 8, D = 2, F2 = 16, 
               dropoutType = 'Dropout')

# compile the model and set the optimizers
model.compile(loss='categorical_crossentropy', optimizer='adam', 
              metrics = ['accuracy'])

# count number of parameters in the model
numParams    = model.count_params()    

# set a valid path for your system to record model checkpoints
checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', verbose=1,
                               save_best_only=True)

class_weights = {0:1, 1:1, 2:1, 3:1}

################################################################################
# fit the model. Due to very small sample sizes this can get
# pretty noisy run-to-run, but most runs should be comparable to xDAWN + 
# Riemannian geometry classification (below)
################################################################################
fittedModel = model.fit(X_train, Y_train, batch_size = 16, epochs = 300, 
                        verbose = 2, validation_data=(X_validate, Y_validate),
                        callbacks=[checkpointer], )#class_weight = class_weights)

# load optimal weights
model.load_weights('/tmp/checkpoint.h5')

### **Classification Accuracy**

In [None]:
probs       = model.predict(X_test)
preds       = probs.argmax(axis = -1)  
acc         = np.mean(preds == Y_test.argmax(axis=-1))
print("Classification accuracy: %f " % (acc))

### **Confusion Matrix**

In [None]:
# plot the confusion matrices for both classifiers
from sklearn.metrics import confusion_matrix
import seaborn as sns

#Generate the confusion matrix
plt.figure(figsize = (7,7))
cf_matrix = confusion_matrix(Y_test.argmax(axis = -1), preds)
ax = sns.heatmap(cf_matrix, annot=True, cmap='Blues')
ax.set_title('Seaborn Confusion Matrix with labels\n');
ax.set_xlabel('\nPredicted Values')
ax.set_ylabel('Actual Values ');

## Ticket labels - List must be in alphabetical order
ax.xaxis.set_ticklabels(['Audio_left', 'Audio_right', 'Visual_left', 'Visual_right'])
ax.yaxis.set_ticklabels(['Audio_left', 'Audio_right', 'Visual_left', 'Visual_right'])

## Display the visualization of the Confusion Matrix.
plt.show()

### **Define paths to model files**

In [None]:
import os
MODELS_DIR = 'models/'
if not os.path.exists(MODELS_DIR):
    os.mkdir(MODELS_DIR)


In [None]:
#Defining model name
MODEL_TF = MODELS_DIR + 'model'
MODEL_NO_QUANT_TFLITE = MODELS_DIR + 'model_no_quant.tflite'
tflite_model_name = "model_tflite"
MODEL_TFLITE = MODELS_DIR + "model.tflite"
MODEL_TFLITE_MICRO = MODELS_DIR + 'model.cc'

**Saving the model**

In [None]:
model.save(MODEL_TF)

**Preparing features for evaluating models**

In [None]:
import tensorflow as tf

x_test = tf.convert_to_tensor(X_train,dtype=tf.float32)
print(x_test.shape)
def representative_dataset():
  for i in range(144):
    yield([tf.reshape(x_test[i],(1,60,151,1))])

### **Convert the model to TFLite without quantization**

In [None]:
# Convert the model to TFLite without quantization
converter = tf.lite.TFLiteConverter.from_saved_model(MODEL_TF)
model_no_quant_tflite = converter.convert()
open(MODEL_NO_QUANT_TFLITE, "wb").write(model_no_quant_tflite)

### **Convert and save the model with quantization**

In [None]:
# Convert and save the model with quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Enforce integer only quantization
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]  #TFLITE_BUILTINS
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
# Create and provide a representative dataset
converter.representative_dataset = representative_dataset
model_tflite = converter.convert()
open(MODEL_TFLITE, "wb").write(model_tflite)

In [None]:
print(X_test.shape)
x_test1 = X_test.astype(np.float32)
print(x_test1.shape)

### **TFLite with no Quantization**

In [None]:
#initialize and allocate memory
no_q_interpreter = tf.lite.Interpreter(model_content=model_no_quant_tflite)
no_q_interpreter.allocate_tensors()

y_test_pred_no_quant_tflite = np.empty([71,4], dtype=np.float32)
success = 0;res_pred = [];res_act = [];

for i in range(len(x_test1)):
  #Input Tensor
  input_shape = no_q_interpreter.get_input_details()[0]['shape'];
  inputtensor = np.array(x_test1[i].reshape(input_shape), dtype=np.float32)
  #Invoking model interpreter
  no_q_interpreter.set_tensor(no_q_interpreter.get_input_details()[0]["index"], inputtensor)
  no_q_interpreter.invoke()
  #Calculating the testing accuracy
  y_test_pred_no_quant_tflite[i] = no_q_interpreter.get_tensor(no_q_interpreter.get_output_details()[0]["index"])[0]
  maxElem = np.amax(y_test_pred_no_quant_tflite[i])
  res1 = np.where(y_test_pred_no_quant_tflite[i] == maxElem)
  maxElem = np.amax(Y_test[i])
  res2 = np.where(Y_test[i] == maxElem)
  res_pred.append(res1[0]+1);
  res_act.append(res2[0]+1);
  if(res1[0] == res2[0]):
    success+=1;

#Displaying the predicted and actual labels
pred = pd.DataFrame(np.reshape(x_test1,(71,60*151)));
pred = pred.assign(Pred_Class1 = y_test_pred_no_quant_tflite[:,0],
                   Pred_Class2 = y_test_pred_no_quant_tflite[:,1],
                   Pred_Class3 = y_test_pred_no_quant_tflite[:,2],
                   Pred_Class4 = y_test_pred_no_quant_tflite[:,3],
                   Predicted_Class = res_pred,
                   Actual_class = res_act);

display(pred);

print("\nClassification Accuracy for non_quantized model = ",(success/len(x_test1)));    

**Input Details**

In [None]:
no_q_interpreter.get_input_details()

**Output details**

In [None]:
no_q_interpreter.get_output_details()

## **TFLite with Quantization**

**Allocating tensors**

In [None]:
q_interpreter = tf.lite.Interpreter(model_content=model_tflite)
q_interpreter.allocate_tensors()

**Model input details**

In [None]:
#interpreter gives us information we'll need to prepare input and output data
q_interpreter.get_input_details()

**Model output details**

In [None]:
q_interpreter.get_output_details()

In [None]:
input_details = q_interpreter.get_input_details()[0]
output_details = q_interpreter.get_output_details()[0]

input_scale, input_zero_point = input_details["quantization"]

x_test_ = x_test1 / input_scale + input_zero_point
display(pd.DataFrame(np.reshape(x_test1,(71,151*60))));
display(pd.DataFrame(np.reshape(x_test_,(71,151*60))));

print(x_test1[3]) 
print(x_test_[3])
x_test_ = x_test_.astype(input_details["dtype"])
display(pd.DataFrame(np.reshape(x_test_,(71,151*60))));

print("\n\n\nQuantized output : \n")
# Invoke the interpreter
y_test_pred_tflite = np.empty([71,4], dtype=output_details["dtype"])
success = 0;
res_pred = [];res_act=[];
for i in range(len(x_test_)):
    #Input tensors
    input_shape= q_interpreter.get_input_details()[0]['shape'];
    inputtensor = np.array(x_test_[i].reshape(input_shape), dtype=np.int8)
    #invoke interpreter
    q_interpreter.set_tensor(input_details["index"], inputtensor)
    q_interpreter.invoke()
    #Calculate test accuracy
    y_test_pred_tflite[i] = q_interpreter.get_tensor(output_details["index"])[0]
    maxElem = np.amax(y_test_pred_tflite[i])
    res1 = np.where(y_test_pred_tflite[i] == maxElem)
    maxElem = np.amax(Y_test[i])
    res2 = np.where(Y_test[i] == maxElem)
    res_pred.append(res1[0]+1);
    res_act.append(res2[0]+1);
    if(res1[0][0] == res2[0][0]):
      success+=1;
  
# If required, dequantized the output layer (from integer to float)
output_scale, output_zero_point = output_details["quantization"]
y_test_pred_tflite = y_test_pred_tflite.astype(np.float32)
y_test_pred_tflite1 = (y_test_pred_tflite - output_zero_point) * output_scale

pred = pd.DataFrame(np.reshape(x_test_,(71,60*151)));
pred = pred.assign(Pred_Class1_q = y_test_pred_tflite[:,0],
                   Pred_Class2_q = y_test_pred_tflite[:,1],
                   Pred_Class3_q = y_test_pred_tflite[:,2],
                   Pred_Class4_q = y_test_pred_tflite[:,3],
                   Pred_Class1 = y_test_pred_tflite1[:,0],
                   Pred_Class2 = y_test_pred_tflite1[:,1],
                   Pred_Class3 = y_test_pred_tflite1[:,2],
                   Pred_Class4 = y_test_pred_tflite1[:,3],                   
                   Predicted_Class = res_pred,
                   Actual_class = res_act);
display(pred);

print("\nClassification Accuracy for Quantized model = ",success/len(x_test_));

In [None]:
print(x_test_.shape)
x_test2 = np.reshape(x_test_,(71,60*151));
print(x_test2.shape)

dt = pd.DataFrame(x_test2);
dt[['y1','y2','y3','y4']] = Y_test;
display(dt)
dt.to_csv('quantized_int8_Xtest.csv')

## **Hex file generation of model files**

In [None]:
# Another Way of getting the array in .h file
def hex_to_c_array(hex_data, var_name):

  c_str = ''

  # Create header guard
  c_str += '#ifndef ' + var_name.upper() + '_H\n'
  c_str += '#define ' + var_name.upper() + '_H\n\n'

  # Add array length at top of file
  c_str += '\nunsigned int ' + var_name + '_len = ' + str(len(hex_data)) + ';\n'

  # Declare C variable
  c_str += 'unsigned char ' + var_name + '[] = {'
  hex_array = []
  for i, val in enumerate(hex_data) :

    # Construct string from hex
    hex_str = format(val, '#04x')

    # Add formatting so each line stays within 80 characters
    if (i + 1) < len(hex_data):
      hex_str += ','
    if (i + 1) % 12 == 0:
      hex_str += '\n '
    hex_array.append(hex_str)

  # Add closing brace
  c_str += '\n ' + format(' '.join(hex_array)) + '\n};\n\n'

  # Close out header guard
  c_str += '#endif //' + var_name.upper() + '_H'

  return c_str

**Getting non-quantized model hex file**

In [None]:
c_model_name = "model_tflite"
with open(c_model_name + '.h', 'w') as file:
  file.write(hex_to_c_array(model_tflite , c_model_name)) #model_tflite

**Getting quantized model hex file**

In [None]:
c_model_name = "model_tflite_float"
with open(c_model_name + '.h', 'w') as file:
  file.write(hex_to_c_array(model_no_quant_tflite, c_model_name))