In [1]:
# all imports, numpy, scipy, matplotlib, seaborn, pandas, plotly


import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import sklearn 
import os
from matplotlib import cm as colours 
from colorama import Fore, Back, Style
from scipy.stats import sem
import joblib
import pickle 


In [2]:
# load the data for both mfa and normal annotations 
pkl_path = r"Pickle Files\patient_data.pkl"

if os.path.exists(pkl_path):
    with open(pkl_path, "rb") as f:
        patient_data = pickle.load(f)
    print("Loaded patient_data from pickle.")
else:
    path = r"C:\Users\Nabiya\Box\Academic-Duke\CoganLab\Summer 2024\intraop\ieeg_data_intraop\Zac_intraop_data"
    patients = ['S14', 'S22', 'S23', 'S26', 'S33']
    positions = ['p1', 'p2', 'p3']

    patient_data = {}

    for patient in patients:
        patient_data[patient] = {}
        for position in positions:
            patient_data[patient][position] = {'Kumar': {}, 'MFA': {}}

            data_kumar = sp.io.loadmat(f"{path}\\{patient}\\{patient}_HG_{position}_sigChannel_goodTrials.mat")
            data_mfa = sp.io.loadmat(f"{path}\\{patient}\\{patient}_HG_{position}_sigChannel_goodTrials_MFA.mat")

            patient_data[patient][position]['Kumar'] = {
                'hg_trace': data_kumar['hgTrace'],
                'hg_map': data_kumar['hgMap'],
                'phon_seq': data_kumar['phonSeqLabels']
            }

            patient_data[patient][position]['MFA'] = {
                'hg_trace': data_mfa['hgTrace'],
                'hg_map': data_mfa['hgMap'],
                'phon_seq': data_mfa['phonSeqLabels']
            }

    # Save to pickle
    with open(pkl_path, "wb") as f:
        pickle.dump(patient_data, f)

    print("Processed and saved patient_data to pickle.")
# dictionary structure is patient_data[patient][position][method][hg_trace, hg_map, phon_seq]
# you can access the data as follows: patient_data['S14']['p1']['Kumar']['hg_trace'] etc. 



Loaded patient_data from pickle.


# Decoding

In [3]:
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.decomposition import PCA
from sklearn.svm import SVC, LinearSVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.ensemble import RandomForestClassifier, BaggingClassifier
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.metrics import balanced_accuracy_score, accuracy_score, confusion_matrix
from tqdm.notebook import tqdm
from sklearn.base import BaseEstimator, TransformerMixin


In [4]:
from sklearnex import patch_sklearn
patch_sklearn()

Extension for Scikit-learn* enabled (https://github.com/uxlfoundation/scikit-learn-intelex)


In [5]:
class PCA_noCenter(BaseEstimator, TransformerMixin):

    def __init__(self, n_components=None):
        self.n_components = n_components

    def fit(self, X, y=None):
        U, S, Vt = np.linalg.svd(X, full_matrices=False)
        k = self._get_components(X, S)
        self.components_ = Vt[:k].T
        self.explained_variance_ = S**2
        return self

    def transform(self, X):
        return X @ self.components_

    def fit_transform(self, X, y=None):
        self.fit(X, y)
        return self.transform(X)
    
    def _get_components(self, X, S):
        if self.n_components is None or self.n_components >= min(X.shape):
            print("n_components is None or greater than the number of features/samples. Using n_components = min(X.shape)")
            return min(X.shape)
        elif self.n_components < 1:
            cum_var = np.cumsum(S**2) / np.sum(S**2)
            return np.argmax(cum_var >= self.n_components) + 1
        else:
            return int(self.n_components)

In [6]:
var_expl = np.arange(0.1, 1, 0.1)  
n_folds = 20  
do_cv = True 

# main cv - defines train/test splits
cv = StratifiedKFold(n_splits=n_folds, shuffle=True)

# combine PCA and decoder into pipeline for ease of use with grid search
# clf = Pipeline([('pca', PCA_noCenter(n_components=0.8)), ('decoder', SVC(kernel='rbf', class_weight='balanced'))])
clf = Pipeline([('pca', PCA_noCenter(n_components=0.8)), ('classifier', LinearDiscriminantAnalysis())])

param_grid = {'pca__n_components': var_expl, 'decoder__C': np.logspace(-3, 3, 7), 'decoder__gamma': np.logspace(-3, 3, 7)}

In [7]:
def warn(*args, **kwargs):
    
    pass
import warnings
warnings.warn = warn

## Decoder for each position separately for S14 Kumar and S14 MFA

In [8]:
def run_decoder(X, y, num_folds=20, do_cv=True, var_expl=[0.8], num_iter=5):
    cv = StratifiedKFold(n_splits=num_folds, shuffle=True)
    accs = []
    for _ in range(num_iter):
        y_true_all = []
        y_pred_all = []
        if do_cv:
            iter = cv.split(X, y)
        else:
            iter = cv.split(X, y)
        for train_idx, test_idx in iter:
        # for train_idx, test_idx in cv.split(X, y):
            # split data
            X_train, X_test = X[train_idx], X[test_idx]
            y_train, y_test = y[train_idx], y[test_idx]

            # print(X_train.shape)

            if do_cv:
                # nested cv - searches for best variance explained parameter by splitting training data over multiple folds
                grid = GridSearchCV(clf, {'pca__n_components': var_expl}, cv=num_folds, scoring='balanced_accuracy', n_jobs=-1)
                grid.fit(X_train, y_train)
                #print('Best variance explained:', grid.best_params_['pca__n_components'], 'Best score:', grid.best_score_)

                # grid search automatically refits the best model to the training data so we can just predict on the test data
                y_pred = grid.predict(X_test)
                
            else:
                clf.fit(X_train, y_train)
                y_pred = clf.predict(X_test)

            y_true_all.extend(y_test)
            y_pred_all.extend(y_pred)

        cmat = confusion_matrix(y_true_all, y_pred_all)
        acc_cmat = np.trace(cmat) / np.sum(cmat)
        # cmat_norm = cmat / np.sum(cmat, axis=1)[:, None]
        # acc_norm = np.trace(cmat_norm) / cmat_norm.shape[0]
        acc_bal = balanced_accuracy_score(y_true_all, y_pred_all)
        # cmat = cmat / np.sum(cmat, axis=1)[:, None]

        accs.append(acc_cmat)
        #print('Balanced accuracy:', round(acc_bal, 3), 'Confusion matrix accuracy:', round(acc_cmat, 3))

    return accs

## 10 Iterations -- Every position -- Every patient -- Kumar and MFA

In [9]:
# run the decoder for each patient and position separately
patients = ['S14', 'S22', 'S23', 'S26', 'S33']
positions = ['p1', 'p2', 'p3']
methods = ['Kumar', 'MFA']
DWs = ["1S", "0.8S", "0.6S", "0.4S", "0.2S", "0.1S", "0.05S", 
       "0.025S", "0.015S", "0.01S"]

dwdict = {
    "1S": [-0.5, 0.5], 
    "0.8S": [-0.4, 0.4],
    "0.6S": [-0.3, 0.3],
    "0.4S": [-0.2, 0.2],
    "0.2S": [-0.1, 0.1],
    "0.1S": [-0.05, 0.05],
    "0.05S": [-0.025, 0.025], 
    "0.025S": [-0.015, 0.010],
    "0.015S": [-0.010, 0.005],
    "0.01S": [-0.005, 0.005]
    
}


accuraciesDWs = {dw: {method: {patient: {position: [] for position in positions} for patient in patients}
                      for method in methods} for dw in DWs}
def cutX(X, dw):
    timevec = np.linspace(-0.5, 0.5, 200) 
    dws, dwe =  dwdict[dw][0], dwdict[dw][1] 
    maski = (timevec >= dws) & (timevec <= dwe)
    X = X[:, :, maski]
    timevec = timevec[maski]
    return timevec, X
    

for patient_idx, patient in tqdm(enumerate(patients), desc='Patients', total=len(patients), colour='#b58f48'):
    numfoldds = 5 if patient == "S33" else 20
    for pos_idx, position in enumerate(positions):
        for method_idx, method in enumerate(methods):
            for dw in DWs:
                if dw in ["1S", "0.8S", "0.6S", "0.4S", "0.2S", "0.1S", "0.05S"]:
                    continue
                X = patient_data[patient][position][method]['hg_trace']
                X = X.transpose(0,2,1)
                print(f"This is X before cutting {X.shape}")
                print(f"This is dw: {dw}")
                timevec,X = cutX(X, dw)
                print(f"This is X after cutting {X.shape}")
                print(timevec) 
                X = X.reshape(X.shape[0], -1) # reshape to (n_trials, n_features)
                y = patient_data[patient][position][method]['phon_seq'][:,pos_idx]
                accuraciesDWs[dw][method][patient][position] = run_decoder(X, y, num_folds=numfoldds, do_cv=True, var_expl=[0.8], num_iter=10)
                print(f"Phoneme Decoding for {patient}, {position}, {method}, {dw} completed.")
                avacc = np.mean(accuraciesDWs[dw][method][patient][position])
                print(f"Average Accuracy for {patient}, {position}, {method}, {dw}: {avacc}")
                


Patients:   0%|          | 0/5 [00:00<?, ?it/s]

This is X before cutting (144, 111, 200)
This is dw: 0.025S
This is X after cutting (144, 111, 5)
[-0.01256281 -0.00753769 -0.00251256  0.00251256  0.00753769]
Phoneme Decoding for S14, p1, Kumar, 0.025S completed.
Average Accuracy for S14, p1, Kumar, 0.025S: 0.33472222222222225
This is X before cutting (144, 111, 200)
This is dw: 0.015S
This is X after cutting (144, 111, 3)
[-0.00753769 -0.00251256  0.00251256]
Phoneme Decoding for S14, p1, Kumar, 0.015S completed.
Average Accuracy for S14, p1, Kumar, 0.015S: 0.3305555555555556
This is X before cutting (144, 111, 200)
This is dw: 0.01S
This is X after cutting (144, 111, 2)
[-0.00251256  0.00251256]
Phoneme Decoding for S14, p1, Kumar, 0.01S completed.
Average Accuracy for S14, p1, Kumar, 0.01S: 0.3402777777777778
This is X before cutting (148, 111, 200)
This is dw: 0.025S
This is X after cutting (148, 111, 5)
[-0.01256281 -0.00753769 -0.00251256  0.00251256  0.00753769]
Phoneme Decoding for S14, p1, MFA, 0.025S completed.
Average Accu

In [10]:
# get dictionary full structure 

print(accuraciesDWs.items())

dict_items([('1S', {'Kumar': {'S14': {'p1': [], 'p2': [], 'p3': []}, 'S22': {'p1': [], 'p2': [], 'p3': []}, 'S23': {'p1': [], 'p2': [], 'p3': []}, 'S26': {'p1': [], 'p2': [], 'p3': []}, 'S33': {'p1': [], 'p2': [], 'p3': []}}, 'MFA': {'S14': {'p1': [], 'p2': [], 'p3': []}, 'S22': {'p1': [], 'p2': [], 'p3': []}, 'S23': {'p1': [], 'p2': [], 'p3': []}, 'S26': {'p1': [], 'p2': [], 'p3': []}, 'S33': {'p1': [], 'p2': [], 'p3': []}}}), ('0.8S', {'Kumar': {'S14': {'p1': [], 'p2': [], 'p3': []}, 'S22': {'p1': [], 'p2': [], 'p3': []}, 'S23': {'p1': [], 'p2': [], 'p3': []}, 'S26': {'p1': [], 'p2': [], 'p3': []}, 'S33': {'p1': [], 'p2': [], 'p3': []}}, 'MFA': {'S14': {'p1': [], 'p2': [], 'p3': []}, 'S22': {'p1': [], 'p2': [], 'p3': []}, 'S23': {'p1': [], 'p2': [], 'p3': []}, 'S26': {'p1': [], 'p2': [], 'p3': []}, 'S33': {'p1': [], 'p2': [], 'p3': []}}}), ('0.6S', {'Kumar': {'S14': {'p1': [], 'p2': [], 'p3': []}, 'S22': {'p1': [], 'p2': [], 'p3': []}, 'S23': {'p1': [], 'p2': [], 'p3': []}, 'S26': {'

In [11]:
data = []
for patient in patients:
    for position in positions:
        for method in methods:
            for dw in DWs:
                data.extend([(patient, position, method, dw, acc) for acc in accuraciesDWs[dw][method][patient][position]])

Accuracy_DWs = pd.DataFrame(data, columns=['Patient', 'Position', 'Method', 'Decoding TW', 'Accuracy'])
# print head
print(Accuracy_DWs.head())


  Patient Position Method Decoding TW  Accuracy
0     S14       p1  Kumar      0.025S  0.333333
1     S14       p1  Kumar      0.025S  0.340278
2     S14       p1  Kumar      0.025S  0.333333
3     S14       p1  Kumar      0.025S  0.340278
4     S14       p1  Kumar      0.025S  0.333333


In [12]:
# save it to a csv file
Accuracy_DWs.to_csv('Accuracy_DWs3.csv', index=False) # :) this is the 3rd time we add more DWs



In [None]:
# load kumar mfa dataframe
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import sklearn 
import os
from matplotlib import cm as colours 
from colorama import Fore, Back, Style
from scipy.stats import sem
import joblib

Accuracy_df_Kumar_MFA = pd.read_csv(r"C:\Users\Nabiya\Box\Academic-Duke\CoganLab\Summer 2024\intraop\ieeg_data_intraop\Accuracy_df_Kumar_MFA.csv")
print(Accuracy_df_Kumar_MFA.head(20))
# print shape of the dataframe
print(Accuracy_df_Kumar_MFA.shape)


## Bar Plots to show difference between Kumar and MFA 

In [None]:
from scipy.stats import ttest_rel, ttest_ind

In [None]:
# rename method column to Annotator



In [None]:
# before plotting, run statistical tests, ttest_rel



kumar_array = Accuracy_df_Kumar_MFA[Accuracy_df_Kumar_MFA['Method'] == 'Kumar']['Accuracy']
mfa_array = Accuracy_df_Kumar_MFA[Accuracy_df_Kumar_MFA['Method'] == 'MFA']['Accuracy']

result = ttest_rel(kumar_array, mfa_array)
print(result)


# plot the accuracies as a barplot. for now, let's not separate by patient nor position. just hueing by method

fig, ax = plt.subplots(figsize=(10, 10))
sns.barplot(data=Accuracy_df_Kumar_MFA, hue='Method', y='Accuracy', ax=ax)
sns.stripplot(data=Accuracy_df_Kumar_MFA, hue='Method', y='Accuracy', ax=ax, dodge=True, legend=False, palette='dark')
ax.set_title('HG Power Decoding Accuracy for Kumar and MFA', fontsize=20, fontweight='bold', fontname='Arial')
ax.set_xlabel('Method', fontsize=15)
ax.set_ylabel('Accuracy', fontsize=15)
# put the pvalue up there
ax.text(1.1, 1.1, f"Paired t-test pvalue={result.pvalue}", fontsize=15, ha='center', va='center', transform=ax.transAxes,
        color='blue', bbox=dict(facecolor='yellow', alpha=0.5, edgecolor='black', boxstyle='round,pad=1'))

plt.show()


## Distribution: All Patients, by positions, Kumar vs. MFA

In [None]:
from statsmodels.stats.anova import AnovaRM

model = AnovaRM(Accuracy_df_Kumar_MFA, depvar='Accuracy', subject='Patient', within=['Position', 'Method'], 
                aggregate_func=np.mean)
result = model.fit().anova_table
print(result)


## Barplot: All Patients, by positions, Kumar vs. MFA

fig, ax = plt.subplots(figsize=(10, 10))
sns.barplot(data=Accuracy_df_Kumar_MFA, hue='Method', y='Accuracy', x='Position')
sns.stripplot(data=Accuracy_df_Kumar_MFA, hue='Method', y='Accuracy', ax=ax, dodge=True, legend=False, palette='dark', x='Position')
ax.set_title('Decoding Accuracy for Kumar and MFA - By Position', fontsize=20, fontweight='bold', fontname='Arial')
ax.set_xlabel('Position', fontsize=15)
ax.set_ylabel('Accuracy', fontsize=15)
# put the pvalue up there
ax.text(1.1, 1.1, f"Paired t-test pvalue={result.loc['Method', 'Pr > F']:.3g}", fontsize=15, ha='center', va='center', transform=ax.transAxes,
        color='blue', bbox=dict(facecolor='yellow', alpha=0.5, edgecolor='black', boxstyle='round,pad=1'))

plt.show()


## Distribution: All positions, by Patients, Kumar vs. MFA

In [None]:
## Distribution: All positions, by Patients, Kumar vs. MFA

import statsmodels.api as sm
import statsmodels.formula.api as smf

formula = 'Accuracy ~ Method'
model = smf.mixedlm(formula, Accuracy_df_Kumar_MFA, groups=Accuracy_df_Kumar_MFA['Patient'])
result = model.fit()
print(result.summary())

# plotting now 

sns.set_theme(style='whitegrid')
plt.figure(figsize=(10,8))
fig, ax = plt.subplots(figsize=(10, 10))
sns.barplot(data=Accuracy_df_Kumar_MFA, hue='Method', y='Accuracy', x='Patient')
sns.stripplot(data=Accuracy_df_Kumar_MFA, hue='Method', y='Accuracy', ax=ax, dodge=True, legend=False, palette='dark', x='Patient')
ax.set_title('Decoding Accuracy for Kumar and MFA - By Patient', fontsize=20, fontweight='bold', fontname='Arial')
ax.set_xlabel('Position', fontsize=15)
ax.set_ylabel('Accuracy', fontsize=15)
# put the pvalue up there
ax.text(1.1, 1.1, f"Paired t-test pvalue={result.loc['Method', 'Pr > F']:.3g}", fontsize=15, ha='center', va='center', transform=ax.transAxes,
        color='blue', bbox=dict(facecolor='yellow', alpha=0.5, edgecolor='black', boxstyle='round,pad=1'))

plt.show()
