# Formatting Functions

In [None]:
import scipy.io as sio
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
from skimage.feature import hog
import matplotlib.pyplot as plt
from sklearn.svm import SVC
import os
import scipy.sparse as sp
import scipy.io as sio
import time
import cuml
from cuml.svm import SVC as cuSVC
import cudf
import scipy.stats
import librosa
import cupy as cp
import h5py
import gc
from scipy.stats import mode
import string

# to split data like papers for reproducibility
def split_trials(trial_data, num_gestures, training_trials, test_trials):

    # calculate number of trials from arrays given. Create 3D array to store indices for each trial, for each gesture
    num_trials = len(training_trials) + len(test_trials)
    trial_index = np.zeros((num_trials, num_gestures, 2))

    curr_label = 0                                     # label indexed from 0, so gesture 1 is label 0 etc.
    for i in range(1, len(trial_data)):
        curr_trial = trial_data[i]- 1              # minus 1 to make zero indexed - trial 1 is 0 etc.
        prev_trial = trial_data[i-1] - 1

        if curr_trial != prev_trial:                   # store index at beginning and end of trial

            # zero indexed
            if curr_trial == 0:                                 # if trial is zero (trial 1), that means it is a new gesture (label)
                trial_index[prev_trial][curr_label][1] = i      # store index for beginning of that trial, for that gesture
                curr_label += 1                                 # update gesture (label) by 1

                trial_index[curr_trial][curr_label][0] = i      # store index for end of that trial - it is an index for range() function, so this value will not be included, e.g. 2017, means up to 2016

            else:
                trial_index[prev_trial][curr_label][1] = i      # same as before, except there is no need to update gesture (label), as trial is between 1-6 (or 0-5 zero-indexed)
                trial_index[curr_trial][curr_label][0] = i

    # update last trial, last gesture (label) beginning index, as it is wrong!!
    trial_index[(num_trials-1)][(num_gestures-1)][0] = trial_index[(num_trials-2)][(num_gestures-1)][1]

    return trial_index

def split_labels(index_info, label_data, training_trials, test_trials):

    training_labels, test_labels = [], []

    # use the indices found to create arrays for training data (labels and features)
    for j in training_trials:
        trial_num = j-1

        for indices in index_info[trial_num]:
                beg, end = int(indices[0]), int(indices[1])

                # slice the training labels and features
                training_labels = np.append(training_labels, label_data[beg:end])

    # use the indices found to slice testing features and labels
    for j in test_trials:
        trial_num = j-1

        for indices in index_info[trial_num]:

                beg, end = int(indices[0]), int(indices[1])
                test_labels = np.append(test_labels, label_data[beg:end])

    return training_labels, test_labels

def rolling_window(arr, window_len, step, arr_dimension):

    if arr_dimension == 2:
        num_windows = (arr.shape[0] - window_len) // step + 1
        windows = np.zeros((num_windows, window_len, arr.shape[1]), dtype=arr.dtype)
    elif arr_dimension == 1:
        num_windows = (len(arr) - window_len) // step + 1
        windows = np.zeros((num_windows, window_len), dtype=arr.dtype)

    for i in range(num_windows):
        start = i * step
        end = start + window_len
        windows[i] = arr[start:end]

    return windows

def rolling_window_electrodes(arr, window_len, step):

    num_windows = (arr.shape[1] - window_len) // step + 1
    windows = np.zeros((arr.shape[0], num_windows, window_len), dtype=arr.dtype)

    for i in range(num_windows):
      start = i * step
      end = start + window_len
      windows[:, i] = arr[:, start:end]  # Slice along columns

    return windows

def one_hot_encoder(labels, gestures):

        one_hot = np.zeros((len(labels), gestures))

        for index, value in enumerate(labels):
            label_encode = int(value)
            one_hot[index][label_encode] = 1

        return one_hot


# To run

In [None]:
import numpy as np
from google.colab import drive

# CHANGE AS APPROPRIATE
database = 'DB3'
#CHANGE AS APPROPRIATE
evaluation = 'STFT'

# ok, so E2 and E3 adjusted means to adjust them gesture back to 1-N gestures, otherwise it is say 18-40 but depending on what gesture set went first
data_dict = {
        'DB1': {'E1': 13, 'E2': 18, 'E3': 24, 'E1_adjusted': 0, 'E2_adjusted': 0, 'E3_adjusted': 0, 'fs': 100, 'electrodes': 10, 'subjects': 27, 'train': [1, 3, 4, 6, 7, 8, 9], 'test': [2, 5, 10], 'window length': 20, 'step': 1},
        'DB2': {'E1': 18, 'E2': 24, 'E3': 10, 'E1_adjusted': 0, 'E2_adjusted': -17, 'E3_adjusted': -40, 'fs': 100, 'electrodes': 12, 'subjects': 40, 'train': [1, 3, 4, 6], 'test': [2, 5], 'window length': 20, 'step': 1},
        'DB3': {'E1': 18, 'E2': 24, 'E3': 10, 'E1_adjusted': 0, 'E2_adjusted': -17, 'E3_adjusted': -40, 'fs': 200, 'electrodes': 12, 'subjects': 11, 'train': [1, 3, 4, 6], 'test': [2, 5], 'window length': 40, 'step': 2},
        'DB4': {'E1': 13, 'E2': 18, 'E3': 24, 'E1_adjusted': 0, 'E2_adjusted': 0, 'E3_adjusted': 0,'fs': 200, 'electrodes': 12, 'subjects': 10, 'train': [1, 3, 4, 6], 'test': [2, 5], 'window length': 40, 'step': 2},
        'DB5': {'E1': 13, 'E2': 18, 'E3': 24, 'E1_adjusted': 0, 'E2_adjusted': 0, 'E3_adjusted': 0,'fs': 200, 'electrodes': 16, 'subjects': 10, 'train': [1, 3, 4, 6], 'test': [2, 5], 'window length': 40, 'step': 2}
        }

num_subjects = data_dict[database]['subjects']
fs = data_dict[database]['fs']
num_electrodes = data_dict[database]['electrodes']

train_trials =  data_dict[database]['train']
test_trials = data_dict[database]['test']
M, step = data_dict[database]['window length'], data_dict[database]['step']
num_freq_bins = int((fs / 2) / (1 / (1/fs * M)))
freq_bins = np.linspace(0, fs/2, num_freq_bins)

drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# javascript to prevent idle timeout
%%javascript
function ClickConnect(){
    console.log("Clicked on connect button");
    document.querySelector("colab-connect-button").click()
}
setInterval(ClickConnect,60000)

<IPython.core.display.Javascript object>

# Training

In [None]:
import gc
from sklearn.metrics import accuracy_score
import scipy.sparse as sp

tot_num_gestures = data_dict[database]['E1'] + data_dict[database]['E2'] + data_dict[database]['E3'] - 2
combined_cm = np.zeros((tot_num_gestures, tot_num_gestures), dtype=int)
accuracy_dict = {'E1': [], 'E2': [], 'E3': []}

for exercise in ['E1', 'E2', 'E3']:

  label_dict = {f'S{num}': [] for num in range(1, num_subjects+1)}
  features_dict = {f'S{num}': [] for num in range(1, num_subjects+1)}
  num_gestures = data_dict[database][exercise]

  # for all subjects
  for subject in range(1,(num_subjects+1)):
      # to prevent issues with SVM
      gc.collect()

      # subjects 6 and 7 were not evaulated for DB3
      if database == 'DB3':
        if subject in [6,7]:
          continue

      file = sio.loadmat(f'/content/drive/My Drive/{database}/Electrode Data/S{subject}_{exercise}_A1.mat')
      label = file['restimulus'].flatten()
      trials = np.int8(file['rerepetition']).flatten()

      if database in ['DB3', 'DB4']:
        downsample_factor = 10
        label = label[::downsample_factor]
        trials = trials[::downsample_factor]

      # find indexes of where trials begin and end
      trial_split_index = split_trials(trials, num_gestures, train_trials, test_trials)
      # split labels with trial info
      train_labels, test_labels = split_labels(trial_split_index, label, train_trials, test_trials)

      train_feature_path = f'/content/drive/My Drive/{database}/Features_down/Training_{exercise}_S{subject}_features.h5'
      with h5py.File(train_feature_path, 'r') as train_file:
          train_mav = train_file['MAV'][:]
          train_mavs = train_file['MAVS'][:]
          train_wap = train_file['WAP'][:]
          train_zcr = train_file['ZC'][:]
          train_ssc = train_file['SSC'][:]
          train_ar1 = train_file['ar1'][:]
          train_ar2 = train_file['ar2'][:]
          train_ar3 = train_file['ar3'][:]
          train_ar4 = train_file['ar4'][:]
          train_wl = train_file['WL'][:]
          train_rms = train_file['RMS'][:]
          train_ssc = train_file['SSC'][:]
          train_var = train_file['VAR'][:]
          train_iemg = train_file['IEMG'][:]

      test_feature_path = f'/content/drive/My Drive/{database}/Features_down/Test_{exercise}_S{subject}_features.h5'
      with h5py.File(test_feature_path, 'r') as test_file:
          test_mav = test_file['MAV'][:]
          test_mavs = test_file['MAVS'][:]
          test_wap = test_file['WAP'][:]
          test_zcr = test_file['ZC'][:]
          test_ssc = test_file['SSC'][:]
          test_ar1 = test_file['ar1'][:]
          test_ar2 = test_file['ar2'][:]
          test_ar3 = test_file['ar3'][:]
          test_ar4 = test_file['ar4'][:]
          test_wl = test_file['WL'][:]
          test_rms = test_file['RMS'][:]
          test_ssc = test_file['SSC'][:]
          test_var = test_file['VAR'][:]
          test_iemg = test_file['IEMG'][:]

      if evaluation == 'HHT':
          hht_train_path = f'/content/drive/My Drive/{database}/HHT/Training_{exercise}_S{subject}_hht.h5'
      elif evaluation == 'STFT':
          hht_train_path = f'/content/drive/My Drive/{database}/STFT/Training_{exercise}_S{subject}_stft.h5'

      with h5py.File(hht_train_path, 'r') as hht_file:
          train_mean_freq = hht_file['mean freq'][:]
          train_skew_freq = hht_file['skew freq'][:]
          train_psr = hht_file['psr'][:]
          #train_imfs = hht_file['num imfs'][:]
          train_peak_freq = hht_file['peak freq'][:]
          train_mean_power = hht_file['mean power'][:]
          train_kurt_freq = hht_file['kurt freq'][:]
          train_var_freq = hht_file['var freq'][:]

      if evaluation == 'HHT':
          hht_test_path = f'/content/drive/My Drive/{database}/HHT/Test_{exercise}_S{subject}_hht.h5'
      elif evaluation == 'STFT':
          hht_test_path = f'/content/drive/My Drive/{database}/STFT/Test_{exercise}_S{subject}_stft.h5'

      with h5py.File(hht_test_path, 'r') as hht_file:
          test_mean_freq = hht_file['mean freq'][:]
          test_skew_freq = hht_file['skew freq'][:]
          test_psr = hht_file['psr'][:]
          #test_imfs = hht_file['num imfs'][:]
          test_peak_freq = hht_file['peak freq'][:]
          test_mean_power = hht_file['mean power'][:]
          test_kurt_freq = hht_file['kurt freq'][:]
          test_var_freq = hht_file['var freq'][:]

      if evaluation == 'HHT':
          train_mean_power = np.squeeze(train_mean_power)
          train_mean_freq = np.squeeze(train_mean_freq)
          train_psr = np.squeeze(train_psr)
          test_mean_power = np.squeeze(test_mean_power)
          test_mean_freq = np.squeeze(test_mean_freq)
          test_psr = np.squeeze(test_psr)

      # set 1
      #train_features = np.concatenate([train_mean_freq.T, train_psr.T, train_wl.T], axis=1)
      #test_features = np.concatenate([test_mean_freq.T, test_psr.T, test_wl.T], axis=1)

      # set 2
      #train_features = np.concatenate([train_mean_power.T, train_wl.T], axis=1)
      #test_features = np.concatenate([test_mean_power.T, test_wl.T], axis=1)

      # set 3
      #train_features = np.concatenate([train_mean_power.T, train_wl.T, train_mav.T], axis=1)
      #test_features = np.concatenate([test_mean_power.T, test_wl.T, test_mav.T], axis=1)

      # set 4
      #train_features = np.concatenate([train_iemg.T, train_var.T, train_wap.T, train_wl.T, train_ssc.T, train_zcr.T, train_mean_power.T], axis=1)
      #test_features = np.concatenate([test_iemg.T, test_var.T, test_wap.T, test_wl.T, test_ssc.T, test_zcr.T, test_mean_power.T], axis=1)

      # set 5
      #train_features = np.concatenate([train_iemg.T, train_var.T, train_wap.T, train_wl.T, train_ssc.T, train_zcr.T], axis=1)
      #test_features = np.concatenate([test_iemg.T, test_var.T, test_wap.T, test_wl.T, test_ssc.T, test_zcr.T], axis=1)

      # set 6
      #train_features = np.concatenate([train_mav.T, train_wl.T, train_ssc.T, train_zcr.T], axis=1)
      #test_features = np.concatenate([test_mav.T, test_wl.T, test_ssc.T, test_zcr.T], axis=1)

      # set 7
      #train_features = np.concatenate([train_mav.T, train_wl.T, train_ssc.T, train_zcr.T, train_mean_power.T], axis=1)
      #test_features = np.concatenate([test_mav.T, test_wl.T, test_ssc.T, test_zcr.T, test_mean_power.T], axis=1)

      # set 8
      train_features = np.concatenate([train_mav.T, train_mavs.T, train_wap.T, train_zcr.T, train_ar1.T, train_ar2.T, train_ar3.T, train_ar4.T, train_wl.T, train_mean_freq.T, train_psr.T], axis=1)
      test_features = np.concatenate([test_mav.T, test_mavs.T, test_wap.T, test_zcr.T, test_ar1.T, test_ar2.T, test_ar3.T, test_ar4.T, test_wl.T, test_mean_freq.T, test_psr.T], axis=1)

      print(train_features.shape, test_features.shape)

      # window labels - only to predict whether gesture or not
      test_label_arr = rolling_window(test_labels, M, step, 1)
      train_label_arr = rolling_window(train_labels, M, step, 1)

      # format labels
      train_label_one = [np.max(arr) for arr in train_label_arr]
      test_label_one = [np.max(arr) for arr in test_label_arr]

      print(np.unique(test_label_one))

      # adjust gestures back to 1-N - it's a list
      train_label_ = [x + data_dict[database][f'{exercise}_adjusted'] if x != 0 else 0 for x in train_label_one]
      test_label_ = [x + data_dict[database][f'{exercise}_adjusted'] if x != 0 else 0 for x in test_label_one]
      print("fitting model now")

      # SVM
      X_cudf = cudf.DataFrame(train_features, dtype=np.float32)
      y_cudf = cudf.Series(train_label_, dtype=np.float32)

      # fit SVM
      clf = cuSVC(kernel='rbf', C=5.0)
      clf.fit(X_cudf, y_cudf)
      x_test = cudf.DataFrame(test_features, dtype=np.float32)
      y_test = cudf.Series(test_label_, dtype=np.float32)

      label_prediction = clf.predict(x_test)
      accuracy = accuracy_score(y_test.to_numpy(), label_prediction.to_numpy())
      print(f"{exercise}, S{subject} Accuracy: {(accuracy*100):.2f}")

      accuracy_dict[exercise].append(accuracy*100)
      del clf

for key in accuracy_dict.keys():
  print(f'Average accuracy for {key}: {np.mean(accuracy_dict[key])}, and std: {np.std(accuracy_dict[key], ddof=1)}')
  print(f'num elements {len(accuracy_dict[key])}')

ye = []
for val in accuracy_dict.values():
  ye.extend(val)
# display average and sample standard deviation
print(f'Overall average: {np.mean(ye)} and std: {np.std(ye, ddof=1)}')
