In [None]:
!python3 -m pip install xmltodict numpy pandas matplotlib mne scikit-learn scipy joblib autoreject tqdm PyWavelets spectrum xgboost seaborn mock



In [None]:
#%% #* Import Statements
import os
import sys
import xmltodict
import json
import numpy as np
import pandas as pd
import scipy.stats as sp
import matplotlib.pyplot as plt
import mne
import mock
import pywt

from tqdm import tqdm as tqdm
from spectrum import arburg
import sklearn.preprocessing as skpr
from scipy import signal

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
DATA_DIR = '/content/drive/My Drive/mahnob/Sessions'
WINSIZE = 3  # 3 seconds
NCHAN = 32

In [None]:
def getDataFiles(rootFolder):
  sessionFolders, sessions = list(sorted(os.listdir(rootFolder))), []
  for sessionFolder in sessionFolders:
    _path = os.path.join(DATA_DIR, sessionFolder)
    if os.path.isdir(_path):
      node = {'folder': sessionFolder}
      for subfile in os.listdir(_path):
        if subfile.endswith('.bdf'):
          node['bdf'] = subfile
        elif subfile.endswith('.xml'):
          node['xml'] = subfile
      if 'bdf' in node and 'xml' in node and 'S_Trial' in node['bdf']:
        sessions.append(node)
  return sessions

In [None]:
# %%
# ratio of the biased standard deviation to the mean
def coeff_var(epochs):
  return sp.variation(epochs, axis=2)

# sharpness of the peak
def kurtosis(epochs):
  return sp.kurtosis(epochs, axis=2)


#! Returns d1_mean, d1_max, d2_mean, d2_max
def diff(epochs):
  d1, d2 = np.diff(epochs, n=1, axis=2), np.diff(epochs, n=2, axis=2)
  return np.mean(
      d1, axis=2), np.max(
          d1, axis=2), np.mean(
              d2, axis=2), np.max(
                  d2, axis=2)


def skew(epochs):
  return sp.skew(epochs, axis=2)


def ar_burg(epochs):

  model_order = 3

  def ar(row):
    v1, _, _ = arburg(row, model_order) # Estimate the complex autoregressive parameters by the Burg algorithm.
    return v1

  def my_arburg(x):
    return np.apply_along_axis(lambda x: x.real, 0, ar(x))

  abc = np.apply_along_axis(my_arburg, axis=2, arr=epochs)
  return [abc[:, :, i] for i in range(model_order)]


def hjorth(epochs):
  d1 = np.diff(epochs, axis=2)
  d2 = np.diff(epochs, axis=2, n=2)
  h_activity = np.var(epochs, axis=2)
  h_mobility = np.sqrt(np.var(d1, axis=2) / h_activity)
  h_mobility_diff = np.sqrt(np.var(d2, axis=2) / np.var(d1, axis=2))
  h_complexity = h_mobility_diff / h_mobility
  return h_activity, h_mobility, h_complexity


def max_power_welch(epochs, sfreq):
  BandF = [0.1, 3, 7, 12, 30]
  f, Psd = signal.welch(
      epochs,
      sfreq,
  )
  return [
      np.max(
          Psd[:, :, np.where((f > BandF[i]) & (f <= BandF[i + 1]))].squeeze(),
          axis=2) for i in range(len(BandF) - 1)
  ]


def wavelet_features(epochs, nchan=NCHAN):
  cA, cD = pywt.dwt(epochs, 'coif1')

  def w_mean(c):
    return np.mean(c, axis=2)

  def w_std(c):
    return np.std(c, axis=2)

  def w_energy(c):
    return np.sum(np.square(c), axis=2)

  def w_entropy(c):
    return np.sum(np.square(c) * np.log(np.square(c)), axis=2)

  feats = [w_mean, w_std, w_energy, w_entropy]
  return [feat(c) for c in [cA, cD] for feat in feats]

In [None]:
#%% #*The MahnobEEG class
class MahnobEEG:

  def __init__(self, sessionInfo, rootFolder):
    self.sessionInfo, self.rootFolder = sessionInfo, rootFolder
    self.metafile = '{}/{}/{}'.format(self.rootFolder,
                                      self.sessionInfo['folder'],
                                      self.sessionInfo['xml'])
    self.eegfile = '{}/{}/{}'.format(self.rootFolder,
                                     self.sessionInfo['folder'],
                                     self.sessionInfo['bdf'])
    self.featureFile = '{}/{}/features.pkl'.format(self.rootFolder,
                                                   self.sessionInfo['folder'])
    self.channels = []

  def extractMetadata(self):
    emodims = [
        '@feltArsl', '@feltCtrl', '@feltEmo', '@feltPred', '@feltVlnc',
        '@isStim'
    ]
    # *Extract metadata into meta
    temp = None
    with open(self.metafile) as f:
      temp = xmltodict.parse('\n'.join(f.readlines()))
    temp = json.loads(json.dumps(temp))['session']
    metadata = {
        'subjectid': temp['subject']['@id'],
        'results': {k[1:]: int(temp[k]) for k in emodims},
        'media': {
            'name': temp['@mediaFile'],
            'durationSec': float(temp['@cutLenSec'])
        }
    }
    self.metadata = metadata

  def extractBDF(self):
    stdout_old, stderr_old = sys.stdout, sys.stderr
    sys.stdout, sys.stderr = mock.MagicMock(), mock.MagicMock()

    raw = mne.io.read_raw_bdf(self.eegfile, preload=True, stim_channel='Status')
    t20 = mne.channels.make_standard_montage(kind='biosemi32')
    raw.set_montage(t20, raise_if_subset=False)
    events = mne.find_events(raw, stim_channel='Status')
    start_samp, end_samp = events[0][0] + 1, events[1][0] - 1
    raw.crop(raw.times[start_samp], raw.times[end_samp])
    self.nchan = 32
    ch2idx = dict(
        map(lambda x: (x[1], x[0]), list(enumerate(raw.ch_names[:self.nchan]))))
    raw.pick_channels(raw.ch_names[:self.nchan])
    self.ch2idx = dict(
        map(lambda x: (x[1], x[0]), list(enumerate(raw.ch_names[:self.nchan]))))
    self.df = raw.to_data_frame().rename(columns=ch2idx).T
    self.nda = self.df.to_numpy()
    # self.nda = self.nda - np.mean(self.nda, axis=-1, keepdims=True)
    self.raw = raw
    self.nsamp = self.nda.shape[1]
    self.sfreq = int(raw.info['sfreq'])
    self.samp_step = self.sfreq * WINSIZE
    self.chunk_shape = (self.nchan, WINSIZE * self.sfreq)

    sys.stdout, sys.stderr = stdout_old, stderr_old

  def createEpochs(self):
    split_idcs = [
        self.chunk_shape[1] * (i + 1)
        for i in range(self.nda.shape[1] // self.chunk_shape[1])
    ]
    epochsArr = np.split(self.nda, split_idcs, axis=1)
    #? num_of_epochs * nchan * pts_per_win
    self.epochs = np.stack(epochsArr[:-1])

  def extract_features(self):
    fd = {}
    df = pd.DataFrame()
    ep = self.epochs
    fd['coeff_var'] = coeff_var(ep)
    fd['kurtosis'] = kurtosis(ep)
    fd['skew'] = skew(ep)
    fd['d1_mean'], fd['d1_max'], fd['d2_mean'], fd['d2_max'] = diff(ep)
    fd['ar1'], fd['ar2'], fd['ar3'] = ar_burg(ep)

    h = 'hjworth_'
    fd[f'{h}activity'], fd[f'{h}mobility'], fd[f'{h}complexity'] = hjorth(ep)

    a, b, c, d = max_power_welch(ep, self.sfreq)
    pr, pm = 'PRatio', 'PMax'
    fd[f'{pm}1'], fd[f'{pm}2'], fd[f'{pm}3'], fd[f'{pm}4'] = a, b, c, d
    fd[f'{pr}1'], fd[f'{pr}2'], fd[f'{pr}3'], fd[
        f'{pr}4'] = a / b, a / c, b / d, (a + b) / c

    wvf_names = [
        f'{c}_{feat}' for c in ['cA', 'cD']
        for feat in ['mean', 'std', 'energy', 'entropy']
    ]
    wvf_values = wavelet_features(ep)
    for i, name in enumerate(wvf_names):
      fd[name] = wvf_values[i]

    for i in range(self.nchan):
      for feat in fd.keys():
        df[f'ch{i}_{feat}'] = fd[feat][:, i]

    ##
    # for feat in fd.keys():
    #   df[f'{feat}'] = fd[feat].mean(axis=-1)
    ##

    df['valence'] = self.metadata['results']['feltVlnc']
    df['arousal'] = self.metadata['results']['feltArsl']
    df['control'] = self.metadata['results']['feltCtrl']
    df['prediction'] = self.metadata['results']['feltPred']
    df['emotion'] = self.metadata['results']['feltEmo']
    df['stim_video'] = self.metadata['media']['name']
    df['subjectid'] = self.metadata['subjectid']

    self.features = df

  def save_features(self):
    self.features.to_pickle(self.featureFile)

In [None]:
#%% #* Extract BDF
sessions = getDataFiles(DATA_DIR)
for session in tqdm(sessions):
  pt = MahnobEEG(session, DATA_DIR)
  pt.extractMetadata()
  pt.extractBDF()
  pt.createEpochs()
  pt.extract_features()
  pt.save_features()

100%|██████████| 527/527 [1:32:24<00:00, 10.52s/it]
