In [1]:
# conda environment eeg

In [2]:
from sklearn.model_selection import train_test_split
from os import listdir
from os.path import isfile, join
from scipy.io import savemat, loadmat
from sklearn.metrics import confusion_matrix

# import mat73
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns

import os
import re
import random
import einops
from scipy.io import loadmat
from pathlib import Path
from PIL import Image
# import PIL.Image as Image

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# the following import is required for matplotlib < 3.2:
from mpl_toolkits.mplot3d import Axes3D  # noqa
import matplotlib
# matplotlib.use('Agg') # use a non-interactive backend such as Agg (for PNGs), PDF, SVG or PS.

In [3]:
# using CNN with TensorFlow core
import tensorflow as tf
from tensorflow import keras
from keras import Model, layers
print("TensorFlow version:", tf.__version__)
# from tensorflow.keras import datasets, layers, models
# from tensorflow.keras.layers import Dense, Flatten, Conv2D
import matplotlib.pyplot as plt
import collections
# TensorFlow version: 2.13.0

TensorFlow version: 2.10.0


In [4]:
# tf.config.list_physical_devices('GPU')
from tensorflow.python.client import device_lib
def get_available_device():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos]

In [5]:
print(get_available_device())

['/device:CPU:0']


# Configure video data

In [6]:
path = './image dataset/'
files = os.listdir(path)
sub_num = 4
files_head = [f for f in files if re.match(r'^sub' + str(sub_num) + '.*_1\.jpg$', f)]

In [7]:
def get_class(fname):
  return fname.split('_')[1]

In [8]:
def get_sub(fname):
  return fname.split('_')[0]

In [9]:
def integrate_video(fname):
    img = Image.open('./image dataset/' + fname).convert('CMYK')
    data = np.expand_dims(np.asarray(img), axis=0)
    for i in range(1, 7):
        fn = fname.replace('_1.jpg',f'_{i+1}.jpg')
        img_ = Image.open('./image dataset/' + fn).convert('CMYK')
        arr = np.expand_dims(np.asarray(img_), axis=0)
        data = np.concatenate((data, arr), axis=0)  # shape: (frame_num, width, height, channel)
    return data


In [10]:
def save_video(files):
    ds = []
    sub_dict = []
    class_dict = []
    for f in files:
        data = integrate_video(f)
        sub = get_sub(f)
        cls = get_class(f)
        ds.append(data)
        sub_dict.append(sub)
        class_dict.append(cls)

    return ds, sub_dict, class_dict

In [11]:
data, sub_dict, class_dict = save_video(files_head)

Load image: read as video frames (Tensorflow tutorial)

In [12]:
# load data depends on class

In [13]:
def get_files_per_class(files):
  files_for_class = collections.defaultdict(list)
  for fname in files:
    class_name = get_class(fname)
    files_for_class[class_name].append(fname)
  return files_for_class

In [14]:
files_for_class = get_files_per_class(files)
classes = list(files_for_class.keys())

In [15]:
print('Num classes:', len(classes))
print('Num videos for class[0]:', len(files_for_class[classes[0]]))

Num classes: 6
Num videos for class[0]: 2205


In [16]:
def select_subset_of_classes(files_for_class, classes, files_per_class):
  """ 
    Use for 3-class/in-subclass comparison
  """
  files_subset = dict()

  for class_name in classes:
    class_files = files_for_class[class_name]
    files_subset[class_name] = class_files[:files_per_class]

  return files_subset

In [17]:
NUM_CLASSES = 2
FILES_PER_CLASS = 90   # 270 trials for each sub, 45 trials for each class per sub, 315 trials for each class in general

In [18]:
files_subset = select_subset_of_classes(files_for_class, classes[:NUM_CLASSES], FILES_PER_CLASS)
list(files_subset.keys())

['back', 'down']

In [19]:
# load data depends on subject and class

In [20]:
def get_files_per_sub_per_class(files):
  files_for_sub_for_class = collections.defaultdict(list)
  for fname in files:
    class_name = get_class(fname)
    sub_num = get_sub(fname)
    files_for_sub_for_class[(sub_num, class_name)].append(fname)
  return files_for_sub_for_class

In [21]:
files_for_sub_for_class = get_files_per_sub_per_class(files)
classes_2stage = list(files_for_sub_for_class.keys())

In [22]:
print('Num classes:', len(classes_2stage))
print('Num videos for class[0]:', len(files_for_sub_for_class[classes_2stage[0]]))

Num classes: 42
Num videos for class[0]: 315


select random train/test dataset

In [23]:
trial_dict = {'front':0, 'back':1, 'left':2, 'right':3, 'up':4, 'down':5}
label = [trial_dict[x] for x in class_dict]

In [24]:
x_train, x_test, y_train, y_test = train_test_split(data, label, test_size=0.33, random_state=42)

In [25]:
# np.shape(x_train)

In [26]:
x_train = [tf.convert_to_tensor(item) for item in x_train]
x_test = [tf.convert_to_tensor(item) for item in x_test]
y_train = [tf.convert_to_tensor(item) for item in y_train]
y_test = [tf.convert_to_tensor(item) for item in y_test]

In [27]:
train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train))
test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test))

In [28]:
batch_size = 8
train_ds = train_ds.batch(batch_size)
test_ds = test_ds.batch(batch_size)

In [29]:
test_batches = tf.data.experimental.cardinality(test_ds) # return the num of test batches
val_ds = test_ds.take(test_batches//3)
test_ds = test_ds.skip(test_batches//3)

# CNN Model

In [30]:
HEIGHT = 369
WIDTH = 433

In [31]:
class Conv2Plus1D(keras.layers.Layer):
  def __init__(self, filters, kernel_size, padding):
    """
      A sequence of convolutional layers that first apply the convolution operation over the
      spatial dimensions, and then the temporal dimension. 
    """
    super().__init__()
    self.seq = keras.Sequential([  
        # Spatial decomposition
        layers.Conv3D(filters=filters,
                      kernel_size=(1, kernel_size[1], kernel_size[2]),
                      padding=padding),
        # Temporal decomposition
        layers.Conv3D(filters=filters, 
                      kernel_size=(kernel_size[0], 1, 1),
                      padding=padding)
        ])
  
  def call(self, x):
    return self.seq(x)

In [32]:
class ResidualMain(keras.layers.Layer):
  """
    Residual block of the model with convolution, layer normalization, and the
    activation function, ReLU.
  """
  def __init__(self, filters, kernel_size):
    super().__init__()
    self.seq = keras.Sequential([
        Conv2Plus1D(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization(),
        layers.ReLU(),
        Conv2Plus1D(filters=filters, 
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization()
    ])
    
  def call(self, x):
    return self.seq(x)

In [33]:
class Project(keras.layers.Layer):
  """
    Project certain dimensions of the tensor as the data is passed through different 
    sized filters and downsampled. 
  """
  def __init__(self, units):
    super().__init__()
    self.seq = keras.Sequential([
        layers.Dense(units),
        layers.LayerNormalization()
    ])

  def call(self, x):
    return self.seq(x)

In [34]:
def add_residual_block(input, filters, kernel_size):
  """
    Add residual blocks to the model. If the last dimensions of the input data
    and filter size does not match, project it such that last dimension matches.
  """
  out = ResidualMain(filters, 
                     kernel_size)(input)
  
  res = input
  # Using the Keras functional APIs, project the last dimension of the tensor to
  # match the new filter size
  if out.shape[-1] != input.shape[-1]:
    res = Project(out.shape[-1])(res)

  return layers.add([res, out])

In [35]:
class ResizeVideo(keras.layers.Layer):
  def __init__(self, height, width):
    super().__init__()
    self.height = height
    self.width = width
    self.resizing_layer = layers.Resizing(self.height, self.width)

  def call(self, video):
    """
      Use the einops library to resize the tensor.  
      
      Args:
        video: Tensor representation of the video, in the form of a set of frames.
      
      Return:
        A downsampled size of the video according to the new height and width it should be resized to.
    """
    # b stands for batch size, t stands for time, h stands for height, 
    # w stands for width, and c stands for the number of channels.
    old_shape = einops.parse_shape(video, 'b t h w c')
    images = einops.rearrange(video, 'b t h w c -> (b t) h w c')
    images = self.resizing_layer(images)
    videos = einops.rearrange(
        images, '(b t) h w c -> b t h w c',
        t = old_shape['t'])
    return videos

In [36]:
input_shape = (None, 7, HEIGHT, WIDTH, 4)
input = layers.Input(shape=(input_shape[1:]))
x = input

x = Conv2Plus1D(filters=16, kernel_size=(3, 7, 7), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)

# Block 1
x = add_residual_block(x, 16, (3, 3, 3))
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)

# Block 2
x = add_residual_block(x, 32, (3, 3, 3))
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)

# Block 3
x = add_residual_block(x, 64, (3, 3, 3))
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)

# Block 4
x = add_residual_block(x, 128, (3, 3, 3))

x = layers.GlobalAveragePooling3D()(x)
x = layers.Flatten()(x)
x = layers.Dense(6)(x)

model = keras.Model(input, x)

In [37]:
frames, label = next(iter(train_ds))
model.build(frames)

In [38]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 7, 369, 433  0           []                               
                                , 4)]                                                             
                                                                                                  
 conv2_plus1d (Conv2Plus1D)     (None, 7, 369, 433,  3936        ['input_1[0][0]']                
                                 16)                                                              
                                                                                                  
 batch_normalization (BatchNorm  (None, 7, 369, 433,  64         ['conv2_plus1d[0][0]']           
 alization)                      16)                                                          

In [39]:
model.compile(loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
              optimizer = keras.optimizers.Adam(learning_rate = 0.0001), 
              metrics = ['accuracy'])

In [40]:
checkpoint_path = "./sub3_training_1/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

In [41]:
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True, # save the entire model or weights only
                                                 verbose=1,
                                                 monitor='val_accuracy',
                                                 mode='max',  # max for acc, min for loss
                                                 save_best_only=True)

In [212]:
# print('hi')
history = model.fit(x = train_ds,
                    epochs = 60,
                    validation_data = val_ds,
                    callbacks=[cp_callback])

Epoch 1/60
Epoch 1: val_accuracy improved from -inf to 0.18750, saving model to ./sub3_training_1\cp.ckpt
Epoch 2/60
Epoch 2: val_accuracy did not improve from 0.18750
Epoch 3/60
Epoch 3: val_accuracy did not improve from 0.18750
Epoch 4/60
Epoch 4: val_accuracy did not improve from 0.18750
Epoch 5/60
Epoch 5: val_accuracy improved from 0.18750 to 0.21875, saving model to ./sub3_training_1\cp.ckpt
Epoch 6/60
Epoch 6: val_accuracy did not improve from 0.21875
Epoch 7/60
 5/23 [=====>........................] - ETA: 4:26 - loss: 2.0059 - accuracy: 0.1250

if the training is interrupted

In [42]:
# if the training is interrupted
model.load_weights('./sub3_training_1/cp.ckpt')
# model.load_weights(checkpoint_path)
history = model.fit(x = train_ds,
                    epochs = 30, 
                    validation_data = val_ds,
                    callbacks=[cp_callback])

Epoch 1/30
Epoch 1: val_accuracy improved from -inf to 0.21875, saving model to ./sub3_training_1\cp.ckpt
Epoch 2/30
Epoch 2: val_accuracy did not improve from 0.21875
Epoch 3/30
Epoch 3: val_accuracy did not improve from 0.21875
Epoch 4/30
Epoch 4: val_accuracy did not improve from 0.21875
Epoch 5/30
Epoch 5: val_accuracy did not improve from 0.21875
Epoch 6/30
Epoch 6: val_accuracy did not improve from 0.21875
Epoch 7/30
Epoch 7: val_accuracy did not improve from 0.21875
Epoch 8/30
Epoch 8: val_accuracy did not improve from 0.21875
Epoch 9/30
Epoch 9: val_accuracy did not improve from 0.21875
Epoch 10/30
Epoch 10: val_accuracy did not improve from 0.21875
Epoch 11/30
Epoch 11: val_accuracy did not improve from 0.21875
Epoch 12/30
Epoch 12: val_accuracy did not improve from 0.21875
Epoch 13/30
Epoch 13: val_accuracy did not improve from 0.21875
Epoch 14/30
Epoch 14: val_accuracy did not improve from 0.21875
Epoch 15/30
Epoch 15: val_accuracy did not improve from 0.21875
Epoch 16/30
Ep

: 

In [None]:
# train acc: 93.89%; val acc: 28.125%; test acc: 18.9%   # sub1 train0
# train acc: 81.11%; val acc: 31.250%; test acc: 18.966%  # sub1 train1
# train acc: 91.11%; val acc: 31.250%; test acc: 22.41% # sub1 train2
# train acc: 91.67%; val acc: 31.250%;
# need to be at least 30% to have statistical meanings

# train acc: 74.44%; val acc:56.25%; test acc: 36.21% # sub2 train0
# train acc: 67.22%; val acc:34.38%; test acc: 39.66% # sub2 train1
# train acc: 67.78%; val acc:40.63%; test acc: 39.66% # sub2 train2-1

# train acc: 65.00%; val acc: 23.33%; test acc: 20.00% # sub3 train00

# train acc: 61.11%; val acc: 25.00%; test acc: 20.69% # sub4 train0
# train acc: 52.78%; val acc: 28.125%; test acc: 22,41% # sub4 train0-1
# train acc: 61.45%; val acc: 28.125%; test acc: 29.31% # sub4 train1

In [None]:
def plot_history(history):
  """
    Plotting training and validation learning curves.

    Args:
      history: model history with all the metric measures
  """
  fig, (ax1, ax2) = plt.subplots(2)

  fig.set_size_inches(18.5, 10.5)

  # Plot loss
  ax1.set_title('Loss')
  ax1.plot(history.history['loss'], label = 'train')
  ax1.plot(history.history['val_loss'], label = 'test')
  ax1.set_ylabel('Loss')
  
  # Determine upper bound of y-axis
  max_loss = max(history.history['loss'] + history.history['val_loss'])

  ax1.set_ylim([0, np.ceil(max_loss)])
  ax1.set_xlabel('Epoch')
  ax1.legend(['Train', 'Validation']) 

  # Plot accuracy
  ax2.set_title('Accuracy')
  ax2.plot(history.history['accuracy'],  label = 'train')
  ax2.plot(history.history['val_accuracy'], label = 'test')
  ax2.set_ylabel('Accuracy')
  ax2.set_ylim([0, 1])
  ax2.set_xlabel('Epoch')
  ax2.legend(['Train', 'Validation'])

  plt.show()

%matplotlib inline
plot_history(history)

In [170]:
model.load_weights('./sub3_training_1/cp.ckpt')
model.evaluate(test_ds, return_dict=True)



{'loss': 2.520019292831421, 'accuracy': 0.17241379618644714}

In [None]:
y_pred = []
y_true = []

for x, y in test_ds:
    y_pred_data = model.predict(x)
    y_pred.extend(tf.argmax(y_pred_data, axis=1).numpy())
    y_true.extend(y.numpy())

conf_mat = confusion_matrix(y_true, y_pred)

plt.figure(figsize=(10, 8))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted Labels')
plt.xticks([0, 1, 2, 3, 4, 5], ['front', 'back', 'left', 'right', 'up', 'down'])
plt.ylabel('True Labels')
plt.yticks([0, 1, 2, 3, 4, 5], ['front', 'back', 'left', 'right', 'up', 'down'])
plt.title(f'Confusion Matrix for Subject {sub_num} on Test Dataset')
plt.savefig(f'./illustration/Confusion Matrix for Subject {sub_num} on Test Dataset.png')

In [None]:
model.evaluate(val_ds, return_dict=True)

In [None]:
y_pred = []
y_true = []

for x, y in val_ds:
    y_pred_data = model.predict(x)
    y_pred.extend(tf.argmax(y_pred_data, axis=1).numpy())
    y_true.extend(y.numpy())

conf_mat = confusion_matrix(y_true, y_pred)

plt.figure(figsize=(10, 8))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted Labels')
plt.xticks([0, 1, 2, 3, 4, 5], ['front', 'back', 'left', 'right', 'up', 'down'])
plt.ylabel('True Labels')
plt.yticks([0, 1, 2, 3, 4, 5], ['front', 'back', 'left', 'right', 'up', 'down'])
plt.title(f'Confusion Matrix for Subject {sub_num} on Validation Dataset')
plt.savefig(f'./illustration/Confusion Matrix for Subject {sub_num} on Validation Dataset.png')

In [None]:
def get_actual_predicted_labels(dataset): 
  """
    Create a list of actual ground truth values and the predictions from the model.

    Args:
      dataset: An iterable data structure, such as a TensorFlow Dataset, with features and labels.

    Return:
      Ground truth and predicted values for a particular dataset.
  """
  actual = [labels for _, labels in dataset.unbatch()]
  predicted = model.predict(dataset)

  actual = tf.stack(actual, axis=0)
  predicted = tf.concat(predicted, axis=0)
  predicted = tf.argmax(predicted, axis=1)

  return actual, predicted

In [None]:
def plot_confusion_matrix(actual, predicted, labels, ds_type):
  cm = tf.math.confusion_matrix(actual, predicted)
  ax = sns.heatmap(cm, annot=True, fmt='g')
  sns.set(rc={'figure.figsize':(12, 12)})
  sns.set(font_scale=1.4)
  ax.set_title('Confusion matrix of action recognition for ' + ds_type)
  ax.set_xlabel('Predicted Action')
  ax.set_ylabel('Actual Action')
  plt.xticks(rotation=90)
  plt.yticks(rotation=0)
  ax.xaxis.set_ticklabels(labels)
  ax.yaxis.set_ticklabels(labels)

In [None]:
# model.save('./CNN_3D_1.keras') # epoch around 100 times: train acc: 84.4%; val acc: 22.6%; test acc: 20.67%

Reload trained model