In [None]:
import numpy as np
from tqdm import tqdm
import logging
import os
import pickle
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import mutual_info_classif as MIBIF
from sklearn.model_selection import cross_val_score, StratifiedKFold
from scipy.signal import cheby1, butter, sosfilt

import tensorflow as tf
import matplotlib.pyplot as plt
from mne.decoding import CSP
from sklearn.metrics import accuracy_score, confusion_matrix

from utilities import get_npz_data, get_project_data, get_bci_iii_data, get_prev_project_data

logger = logging.getLogger('FBCSP-SVM')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [None]:
LOG_PATH = f'{os.getcwd()}/LOG/1dAX_LSTM'

HYBRID_DATA_PATH = f'{os.getcwd()}/Data/Hybrid'
BCI_IV_2a_DATA_PATH = f'{os.getcwd()}/Data/BCI'
BCI_III_IVa_DATA_PATH = f'{os.getcwd()}/Data/BCI III IVa'

EEG, TARGET = get_project_data(HYBRID_DATA_PATH, [2.0, 3.0], sec=3, offset=1)

if not os.path.exists(LOG_PATH):
    os.makedirs(LOG_PATH)

# Data Preprocessing
----

In [None]:
def _preprocessing(data):
    logger.info(f'Preprocessing data')
    return StandardScaler().fit_transform(X=data)

def ivstack(data, shape):
    unstack = np.empty(shape=shape)
    for idx, _ in enumerate(unstack):
        for idy, da in enumerate(data):
            unstack[idx, idy] = da[idx*shape[-1]:(idx+1)*shape[-1]]
    return unstack

# Data Processing
-----
### We pass the data through a filter bank and calculate CSP features afterwards.

In [None]:
def filter_bank(data, bands=None):
    if bands is None:
        keys = [f'b{i}' for i in range(9)]
        vals = [[i*4, (i+1)*4] for i in range(1, 10)]
        bands = dict(zip(keys, vals))
    else:
        assert isinstance(bands, dict), logger.error('Bands for filter bank have wrong data type')

    num_trials, num_samples, num_channels = data.shape
    num_bands = len(bands)
    filter_data = np.zeros(shape=(num_bands, num_trials, num_channels, num_samples))

    for band_idx, (band, frequencies) in enumerate(tqdm(bands.items(), desc='Filter bank')):
        for trial_idx, trial in enumerate(data):
            for channel_idx, channel in enumerate(trial.T):
                sos = butter(N=5, Wn=[i/128 for i in frequencies], output='sos', btype='bandpass')
                filter_data[band_idx, trial_idx, channel_idx, :] = sosfilt(sos, channel)

    return filter_data

In [None]:
def common_spatial_pattern(data, target, num_components):
    num_bands, num_trials, num_channels, num_samples = data.shape
    reduced_dim = 4
    csp_features = np.zeros(shape=(num_bands, num_trials, reduced_dim))
    
    if num_components == 4:
        csp_models = [CSP(n_components=num_channels, transform_into='csp_space')
                      for _ in range(num_bands)]
    else:
        csp_models = [CSP(n_components=num_channels, transform_into='csp_space', component_order='alternate')
                      for _ in range(num_bands)]
        
    for band_idx in tqdm(range(num_bands), desc='CSP feature extraction'):
        Z_p = csp_models[band_idx].fit(X=data[band_idx], y=target).transform(X=data[band_idx])[:, :reduced_dim]
        for trial_idx in range(num_trials):
            z = Z_p[trial_idx]
            log_var = np.log(np.std(z, axis=1)**2 / np.sum(np.std(z, axis=1)**2))
            csp_features[band_idx, trial_idx] = log_var
            
    return csp_features, csp_models

# Feature Selection
------
### Select num_features which maximise the mutual information.

In [None]:
def feature_selection(data, target, num_features=4):
    logger.info(f'Feature selection')
    all_idx = list()
    for band in data:
        all_idx.append(np.sum(MIBIF(band, target)))

    most_informative_idx = list()
    for i in range(num_features):
        idx = np.argmax(all_idx)
        most_informative_idx.append(idx)
        all_idx[idx] = -1

    return np.concatenate(np.copy(data[most_informative_idx]), axis=1), most_informative_idx

# Suport Vector Machine
-----------
### Training a SVM with a linear kernel.

In [None]:
def train(data, target):
    logger.info(f'Training model')
    model = SVC(kernel='linear', verbose=1, random_state=3333)
    model.fit(data, target)
    
    return model

# Pipeline
----------
### Combining all processing steps into a pipeline whih returns the needed parameters for further classification.

In [None]:
def pipeline(data, target, classes_idx):
    num_components = len(classes_idx)
    logger.info(f'Entering pipeline')
    filtered_data = filter_bank(data)
    csp_features, csp_models = common_spatial_pattern(filtered_data, target, num_components)
    fsdata, idx_params = feature_selection(csp_features, target)
    svm_model = train(fsdata, target)

    return csp_models, idx_params, svm_model

In [None]:
def csp_fit(data, csp_models, reduced_dim=4):
    logger.info(f'CSP transform')
    num_bands, num_trials, num_channels, num_samples = data.shape
    csp_features = np.zeros(shape=(num_bands, num_trials, reduced_dim))

    for band_idx, model in enumerate(csp_models):
        Z_p = model.transform(data[band_idx])[:,:reduced_dim]
        for trial_idx in range(num_trials):
            z = Z_p[trial_idx]
            log_var = np.log(np.std(z, axis=1)**2 / np.sum(np.std(z, axis=1)**2))
            csp_features[band_idx, trial_idx] = log_var

    return csp_features

def mi_fit(data, params):
    logger.info(f'MI transform')
    return np.concatenate(data[params], axis=1)

def predict(data, true_labels, csp_models, idx_params, svm_model):
    filtered_data = filter_bank(data)
    csp_features = csp_fit(filtered_data, csp_models)
    mi_features = mi_fit(csp_features, idx_params)
    logger.info(f'Prediction')
    pred_labels = svm_model.predict(mi_features)
    
    return accuracy_score(true_labels, pred_labels), confusion_matrix(true_labels, pred_labels)

In [None]:
k_cross_val = 5
t_sec = 3
session_idx = 0

for comb_idx, comb in enumerate([[769, 770], [770, 772]]):
    EEG, TARGET, _ = get_npz_data(path=BCI_IV_2a_DATA_PATH, user='A01E', labels=comb, sec=t_sec)
    accuracies = list()
    
    path = f'LOG/CIDX-{comb_idx}'
    with tf.summary.create_file_writer(path).as_default(): 
        for fold_idx, (train_idx, test_idx) in enumerate(StratifiedKFold(n_splits=k_cross_val).split(np.zeros(shape=TARGET.shape), TARGET)):
            EEG_TRAIN, TARGET_TRAIN = EEG[train_idx], TARGET[train_idx]
            EEG_TEST, TARGET_TEST = EEG[test_idx], TARGET[test_idx]

            trained_csp, trained_fs, trained_svm = pipeline(EEG_TRAIN, TARGET_TRAIN, comb)
            acc, conf_matrix = predict(EEG_TEST, TARGET_TEST, trained_csp, trained_fs, trained_svm)
            accuracies.append(acc)
        
            tf.summary.scalar(f'FIDX-{fold_idx}', tf.reduce_mean(accuracies), step=session_idx)
    
    session_idx += 1
        