In [None]:
import argparse
import os
gpus = [1]
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, gpus))
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
import numpy as np
import math
import glob
import random
import itertools
import datetime
import time
import datetime
import sys
import scipy.io

import torchvision.transforms as transforms
from torchvision.utils import save_image, make_grid

import torch.utils.data as Data
from torch.utils.data import DataLoader
from torch.autograd import Variable
from torchsummary import summary
import torch.autograd as autograd
from torchvision.models import vgg19

import torch.nn as nn
import torch.nn.functional as F
import torch
import torch.nn.init as init

from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms
from sklearn.decomposition import PCA

import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

from torch import nn
from torch import Tensor
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor
from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange, Reduce
# from common_spatial_pattern import csp

import matplotlib.pyplot as plt
import mne
from matplotlib import mlab as mlab
from torch.backends import cudnn
# from tSNE import plt_tsne
# from grad_cam.utils import GradCAM, show_cam_on_image
from utils import GradCAM, show_cam_on_image

cudnn.benchmark = False
cudnn.deterministic = True

import myimporter
from BCI_functions import *

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch.utils.data as Data
import torch.nn.functional as F

import pandas as pd
import ot

In [None]:
# keep the overall model class, omitted here
class ViT(nn.Sequential):
    def __init__(self, emb_size=40, depth=2, n_classes=2, **kwargs):
        super().__init__(
            # ... the model
        )
        
# ! A crucial step for adaptation on Transformer
# reshape_transform  b 61 40 -> b 40 1 61
def reshape_transform(tensor):
    result = rearrange(tensor, 'b (h w) e -> b e (h) (w)', h=1)
    return result

In [None]:
# Convolution module
# use conv to capture local features, instead of postion embedding.
class PatchEmbedding(nn.Module):
    def __init__(self, emb_size=40):
        # self.patch_size = patch_size
        super().__init__()

        self.shallownet = nn.Sequential(
            nn.Conv2d(1, 40, (1, 25), (1, 1)),
            nn.Conv2d(40, 40, (22, 1), (1, 1)),
            nn.BatchNorm2d(40),
            nn.ELU(),
            nn.AvgPool2d((1, 75), (1, 15)),  # pooling acts as slicing to obtain 'patch' along the time dimension as in ViT
            nn.Dropout(0.5),
        )

        self.projection = nn.Sequential(
            nn.Conv2d(40, emb_size, (1, 1), stride=(1, 1)),  # transpose, conv could enhance fiting ability slightly
            Rearrange('b e (h) (w) -> b (h w) e'),
        )


    def forward(self, x: Tensor) -> Tensor:
        b, _, _, _ = x.shape
        x = self.shallownet(x.float())
        x = self.projection(x)
        return x


class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size, num_heads, dropout):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.keys = nn.Linear(emb_size, emb_size)
        self.queries = nn.Linear(emb_size, emb_size)
        self.values = nn.Linear(emb_size, emb_size)
        self.att_drop = nn.Dropout(dropout)
        self.projection = nn.Linear(emb_size, emb_size)

    def forward(self, x: Tensor, mask: Tensor = None) -> Tensor:
        queries = rearrange(self.queries(x), "b n (h d) -> b h n d", h=self.num_heads)
        keys = rearrange(self.keys(x), "b n (h d) -> b h n d", h=self.num_heads)
        values = rearrange(self.values(x), "b n (h d) -> b h n d", h=self.num_heads)
        energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys)  
        if mask is not None:
            fill_value = torch.finfo(torch.float32).min
            energy.mask_fill(~mask, fill_value)

        scaling = self.emb_size ** (1 / 2)
        att = F.softmax(energy / scaling, dim=-1)
        att = self.att_drop(att)
        out = torch.einsum('bhal, bhlv -> bhav ', att, values)
        out = rearrange(out, "b h n d -> b n (h d)")
        out = self.projection(out)
        return out


class ResidualAdd(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, **kwargs):
        res = x
        x = self.fn(x, **kwargs)
        x += res
        return x


class FeedForwardBlock(nn.Sequential):
    def __init__(self, emb_size, expansion, drop_p):
        super().__init__(
            nn.Linear(emb_size, expansion * emb_size),
            nn.GELU(),
            nn.Dropout(drop_p),
            nn.Linear(expansion * emb_size, emb_size),
        )


class GELU(nn.Module):
    def forward(self, input: Tensor) -> Tensor:
        return input*0.5*(1.0+torch.erf(input/math.sqrt(2.0)))


class TransformerEncoderBlock(nn.Sequential):
    def __init__(self,
                 emb_size,
                 num_heads=10,
                 drop_p=0.5,
                 forward_expansion=4,
                 forward_drop_p=0.5):
        super().__init__(
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                MultiHeadAttention(emb_size, num_heads, drop_p),
                nn.Dropout(drop_p)
            )),
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                FeedForwardBlock(
                    emb_size, expansion=forward_expansion, drop_p=forward_drop_p),
                nn.Dropout(drop_p)
            )
            ))


class TransformerEncoder(nn.Sequential):
    def __init__(self, depth, emb_size):
        super().__init__(*[TransformerEncoderBlock(emb_size) for _ in range(depth)])


class ClassificationHead(nn.Sequential):
    def __init__(self, emb_size, n_classes):
        super().__init__()
        
        # global average pooling
        self.clshead = nn.Sequential(
            Reduce('b n e -> b e', reduction='mean'),
            nn.LayerNorm(emb_size),
            nn.Linear(emb_size, n_classes)
        )
        self.fc = nn.Sequential(
            nn.Linear(8600, 256), # 25800 for 2s, 8600 for 1s # 2440 for bci
            nn.ELU(),
            nn.Dropout(0.5),
            nn.Linear(256, 32),
            nn.ELU(),
            nn.Dropout(0.3),
            nn.Linear(32, n_classes) #4 # change here for classes
        )

    def forward(self, x):
        x = x.contiguous().view(x.size(0), -1)
        out = self.fc(x)
        return out


class Conformer(nn.Sequential):
    def __init__(self, emb_size=40, depth=2, n_classes=4, **kwargs):
        super().__init__(

            PatchEmbedding(emb_size),
            TransformerEncoder(depth, emb_size),
            ClassificationHead(emb_size, n_classes)
        )

In [None]:
# TODO: This class if has list of subject id can later support combination of sub ids
# TODO: add a function transform to convert dataset to train test, avoiding repetition of same code

class EEGMMIDTrSet(Data.Dataset):
    def __init__(self, subject_id, transform=None):
        root_dir = "../Deep-Learning-for-BCI/dataset/"
        dataset = np.load(root_dir + str(subject_id) + '.npy')
        # keep 4,5 which are left and right fist open close imagery classes, remove rest
        # refer 1-Data.ipynb for the details
        removed_label = [0,1,6,7,8,9,10]  # [0,1,2,3,4,5,10] for hf # [0,1,6,7,8,9,10] for lr
        for ll in removed_label:
            id = dataset[:, -1]!=ll
            dataset = dataset[id]

        # Pytorch needs labels to be sequentially ordered starting from 0
        dataset[:, -1][dataset[:, -1] == 2] = 0
        dataset[:, -1][dataset[:, -1] == 4] = 0
        dataset[:, -1][dataset[:, -1] == 3] = 1
        dataset[:, -1][dataset[:, -1] == 5] = 1
#         dataset[:, -1][dataset[:, -1] == 10] = 2
        
        # data segmentation
        n_class = 2 #int(11-len(removed_label))  # 0~9 classes ('10:rest' is not considered)
        no_feature = 64  # the number of the features
        segment_length = 160 #160  # selected time window; 16=160*0.1
        
        #Overlapping is removed to avoid training set overlap with test set
        data_seg = extract(dataset, n_classes=n_class, n_fea=no_feature, 
                           time_window=segment_length, moving=(segment_length))  # /2 for 50% overlapping
        print('After segmentation, the shape of the data:', data_seg.shape)

        # split training and test data
        no_longfeature = no_feature*segment_length
        data_seg_feature = data_seg[:, :no_longfeature]
        self.data_seg_label = data_seg[:, no_longfeature:no_longfeature+1]
        
        # Its important to have random state set equal for Training and test dataset
        train_feature, test_feature, train_label, test_label = train_test_split(
            data_seg_feature, self.data_seg_label,random_state=0, shuffle=True,stratify=self.data_seg_label)

        # Check the class label splits to maintain balance
        unique, counts = np.unique(self.data_seg_label, return_counts=True)
        left_perc = counts[0]/sum(counts)
        if left_perc < 0.4 or left_perc > 0.6:
            print("Imbalanced dataset with split of: ",left_perc,1-left_perc)
        else:
            print("Classes balanced.")
        unique, counts = np.unique(train_label, return_counts=True)
        print("Class label splits in training set \n ",np.asarray((unique, counts)).T)
        unique, counts = np.unique(test_label, return_counts=True)
        print("Class label splits in test set\n ",np.asarray((unique, counts)).T)



        # normalization
        # before normalize reshape data back to raw data shape
        train_feature_2d = train_feature.reshape([-1, no_feature])
        test_feature_2d = test_feature.reshape([-1, no_feature])

        scaler1 = StandardScaler().fit(train_feature_2d)
        train_fea_norm1 = scaler1.transform(train_feature_2d) # normalize the training data
        test_fea_norm1 = scaler1.transform(test_feature_2d) # normalize the test data
        print('After normalization, the shape of training feature:', train_fea_norm1.shape,
              '\nAfter normalization, the shape of test feature:', test_fea_norm1.shape)

        # after normalization, reshape data to 3d
        train_fea_norm1 = train_fea_norm1.reshape([-1, segment_length, no_feature])
        test_fea_norm1 = test_fea_norm1.reshape([-1, segment_length, no_feature])
        print('After reshape, the shape of training feature:', train_fea_norm1.shape,
              '\nAfter reshape, the shape of test feature:', test_fea_norm1.shape)
        
        # reshape for data shape: (trial, conv channel, electrode channel, time samples)
        # earlier it was (trial,timesamples,electrode_channel)
        train_fea_reshape1 = np.swapaxes(np.expand_dims(train_fea_norm1,1),2,3)
        test_fea_reshape1 = np.swapaxes(np.expand_dims(test_fea_norm1,1),2,3)
        print('After expand dims, the shape of training feature:', train_fea_reshape1.shape,
              '\nAfter expand dims, the shape of test feature:', test_fea_reshape1.shape)
        
        self.data = torch.tensor(train_fea_reshape1)
        self.targets = torch.tensor(train_label.flatten()).long()
        
        print("data and target type:",type(self.data),type(self.targets))


    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        data, target = self.data[idx], self.targets[idx]
        return data, target
    
    def get_class_weights(self):
        class_weights=class_weight.compute_class_weight('balanced',np.unique(self.data_seg_label),
                                                        self.data_seg_label[:,0])
        return class_weights



In [None]:
import pandas as pd
device = torch.device("cpu")
model = Conformer(n_classes=2)
cat_dict = {0:'left hand movement',1:'right hand movement'}
biosemi_montage = mne.channels.make_standard_montage('biosemi64')
index = [8, 9, 10, 46, 45, 44, 43, 13, 12, 11, 47, 48, 49, 50, 16, 17, 18, 
         31, 55, 54, 53, 0, 32, 33, 1, 2, 36, 35, 34, 6, 5, 4, 3, 37, 38, 
         39, 40, 41, 7, 42, 14, 51, 23, 60, 15, 52, 22, 21, 20, 19, 30, 56, 
         57, 58, 59, 24, 25, 29, 62, 61, 26, 28, 63, 27]#range(64)#[37, 9, 10, 46, 45, 44, 13, 12, 11, 47, 48, 49, 50, 17, 18, 31, 55, 54, 19, 30, 56, 29]  # for bci competition iv 2a
biosemi_montage.ch_names = [biosemi_montage.ch_names[i] for i in index]
biosemi_montage.dig = [biosemi_montage.dig[i+3] for i in index]
info = mne.create_info(ch_names=biosemi_montage.ch_names, sfreq=160,ch_types='eeg')

top_channel_dict = {}

for sub_id in [7, 12, 22, 42, 43, 48, 49, 53, 70, 80, 82, 85, 94, 102]:#range(1,11):
    
    model.load_state_dict(torch.load("../results/eegconformer/for_topfr/eegmmid_ws_offline_"+ str(sub_id)+ "_model0" + '.pth', map_location=device)) 
    target_layers = [model[1]]  # set the target layer 
    cam = GradCAM(model=model, target_layers=target_layers, use_cuda=False, reshape_transform=reshape_transform)

    train_ds = EEGMMIDTrSet(subject_id=sub_id)


    data = train_ds.data
    
    
    for target_category in range(2):

        all_cam = []
        # this loop is used to obtain the cam of each trial/sample
        for i in range(data.shape[0]):
            test = torch.as_tensor(data[i:i+1, :, :, :], dtype=torch.float32)
            test = torch.autograd.Variable(test, requires_grad=True)
            print("Ip shape: ",test.shape)

            grayscale_cam = cam(input_tensor=test,target_category=target_category) #,target_category=2
            grayscale_cam = grayscale_cam[0, :]
            all_cam.append(grayscale_cam)

        # the mean of all data
        test_all_data = np.squeeze(np.mean(data.detach().cpu().numpy(), axis=0)) #.detach().cpu().numpy()
        test_all_data = (test_all_data - np.mean(test_all_data)) / np.std(test_all_data)
        mean_all_test = np.mean(test_all_data, axis=1)

        # the mean of all cam
        test_all_cam = np.mean(all_cam, axis=0)
#         test_all_cam = (test_all_cam - np.mean(test_all_cam)) / np.std(test_all_cam)
        mean_all_cam = np.mean(test_all_cam, axis=1)

        # apply cam on the input data
        hyb_all = test_all_data * test_all_cam
        hyb_all = (hyb_all - np.mean(hyb_all)) / np.std(hyb_all)
        mean_hyb_all = np.mean(hyb_all, axis=1)
        
        # get top 3 channel index -> feature relevance
        nind = np.argsort(mean_hyb_all)[-10:]
        print([biosemi_montage.ch_names[i] for i in nind])
        
        imp_ind = [mean_hyb_all[i] for i in nind]

        ch_names = [biosemi_montage.ch_names[i] for i in nind]
        print(imp_ind)
        if not math.isnan(imp_ind[0]):
            top_channel_dict[(sub_id,target_category)] = ch_names 



In [None]:
left_list = []
right_list = []
for key in top_channel_dict:
    if key[1] == 0:
        left_list.extend(top_channel_dict[key])
    elif key[1] == 1:
        right_list.extend(top_channel_dict[key])

In [None]:
from collections import Counter,OrderedDict
freq_left = Counter(left_list)
freq_right = Counter(right_list)

In [None]:
f, ax = plt.subplots(figsize=(15, 6))
plt.xticks(rotation = 45)
ax.tick_params(axis='x', which='major')
ax.set_xlabel('Channel Names', fontsize=10)
ax.set_ylabel('Frequency in Top 10', fontsize=10)
plt.bar(OrderedDict(freq_left.most_common()).keys(), OrderedDict(freq_left.most_common()).values())
plt.tight_layout()

In [None]:
f, ax = plt.subplots(figsize=(15, 6))
# fig = plt.figure(figsize=(15,6))
plt.xticks(rotation = 45)
ax.tick_params(axis='x', which='major')
ax.set_xlabel('Channel Names', fontsize=10)
ax.set_ylabel('Frequency in Top 10', fontsize=10)
plt.bar(OrderedDict(freq_right.most_common()).keys(), OrderedDict(freq_right.most_common()).values())
plt.tight_layout()

In [None]:
print(freq_left.most_common()[:11])
print(freq_right.most_common()[:11])

## code to find float values corresponding to relevance based on freq

In [None]:
topfr_list = {}
# Remove the last element from top 22 which has least freq if no overlaps
for ch in ['CP2', 'Fp1', 'P6', 'AFz', 'C5', 'O2', 'FC6', 'F6', 'Fpz', 'FC4', 'AF8', 'AF7', 'P9', 'AF3', 'Fp2', 'F7', 'O1', 'F8', 'FT7', 'Oz', 'AF4']:
    topfr_list[ch] = freq_left[ch] + freq_right[ch]
print(topfr_list)
assert len(topfr_list) ==21

In [None]:
unit_weight = 21/sum(topfr_list.values())
print(unit_weight,sum(topfr_list.values()))

for key in topfr_list:
    topfr_list[key] = topfr_list[key] * unit_weight
    
print(sum(topfr_list.values()))
topfr_list['T9'] = topfr_list.pop('P9')
topfr_list['T10'] = topfr_list.pop('P10')

In [None]:
import pandas as pd
import numpy as np

# Replace 'your_file.csv' with the actual file path
file_path = 'channel_loc_mat.csv'

# Read CSV into a pandas DataFrame
data_frame = pd.read_csv(file_path,header=None)

# Extract values as a NumPy array
topo_ref = data_frame.values

# Display the NumPy array
print(topo_ref)

In [None]:
# Define the dimensions of the scalp grid (adjust as needed)
rows, cols = 11,11

# Create a 2D matrix to represent the location map
topo_pred = np.zeros((rows, cols), dtype=float)
for ch in topfr_list:
    topo_pred = np.where(topo_ref == ch,topfr_list[ch] , topo_pred) #topfr_list[ch]
print(sum(sum(topo_pred)))

In [None]:
plt.imshow(topo_pred)
plt.colorbar()

In [None]:
topo_true = np.array([ [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 1., 1., 1., 1., 1., 1., 1., 0., 0.],
                       [0., 0., 1., 1., 1., 1., 1., 1., 1., 0., 0.],
                       [0., 0., 1., 1., 1., 1., 1., 1., 1., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])

def compare_topomaps(topo_pred,topo_true):
    """
    Show optimal transport on a moving disc in a 50x50 grid
    """
    ## Step 1: Setup problem
    pix = np.linspace(-1, 1, 11) # max channels are 13
    # Setup grid
    X, Y = np.meshgrid(pix, pix)
    # Compute pariwise distances between points on 2D grid so we know
    # how to score the Wasserstein distance
    coords = np.array([X.flatten(), Y.flatten()]).T
    coordsSqr = np.sum(coords**2, 1)
    M = coordsSqr[:, None] + coordsSqr[None, :] - 2*coords.dot(coords.T)
    M[M < 0] = 0
    M = np.sqrt(M)
    wass = ot.emd2(1e-5 +topo_pred.flatten(), 1e-5 +topo_true.flatten(), M, 1.0)
    return wass

compare_topomaps(topo_pred,topo_true)

## Code to find top FR channels and then use them to generate a spatial map as in riemannian_allsubs_topFR

In [None]:
top = [t[0] for t in freq_left.most_common()[:11]]

top.extend([t[0] for t in freq_right.most_common()[:11]] )
print(set(top))

In [None]:
topfr_list = []
# # Remove the last element from top 22 which has least freq
for ch in ['Oz', 'C5', 'O2', 'F7', 'F6', 'AFz', 'AF3', 'Fp2', 'CP2', 'O1', 'AF4', 'Fpz', 'F8', 'FC6', 'FT7', 'AF7', 'P9', 'FC4', 'P6', 'AF8', 'Fp1']:
    topfr_list.append(biosemi_montage.ch_names.index(ch))
print(sorted(topfr_list))

In [None]:
biosemi_layout = mne.channels.read_layout("biosemi")

In [None]:
biosemi_layout.pos = np.asarray([biosemi_layout.pos[i] for i in index])
biosemi_layout.names = [biosemi_layout.names[i] for i in index]

In [None]:
# Define the dimensions of the scalp grid (adjust as needed)
rows, cols = 11,11

# Create a 2D matrix to represent the location map
location_map = np.zeros((rows, cols), dtype=float)

# Define the positions of EEG sensors as 2D coordinates
sensor_positions = biosemi_layout.pos[[5, 6, 7, 18, 21, 22, 23, 24, 25, 26, 27, 28, 29, 36, 37, 38, 42, 53, 60, 61, 62],:2]*11  # Replace with your actual sensor positions

# Assign values in the matrix based on sensor positions
for position in sensor_positions:
    row, col = map(int, position)  # Convert coordinates to integers
    location_map[row, col] = 1.0  # You can use different values for different sensors if needed

# Now, location_map represents the 2D matrix of EEG sensor locations with floating-point coordinates
location_map.T


In [None]:
# Define the dimensions of the scalp grid (adjust as needed)
rows, cols = 11,11

# Create a 2D matrix to represent the location map
location_map = np.zeros((rows, cols), dtype=float)

# Define the positions of EEG sensors as 2D coordinates
sensor_positions = biosemi_layout.pos[:21,:2]*11  # Replace with your actual sensor positions

# Assign values in the matrix based on sensor positions
for position in sensor_positions:
    row, col = map(int, position)  # Convert coordinates to integers
    location_map[row, col] = 1.0  # You can use different values for different sensors if needed

# Now, location_map represents the 2D matrix of EEG sensor locations with floating-point coordinates
location_map.T

In [None]:
topo_pred = np.array([ [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 1., 1., 1., 0., 0., 0., 0., 0.],
                       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
                       [0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [1., 0., 0., 0., 0., 0., 1., 1., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
                       [0., 1., 0., 1., 1., 0., 1., 0., 1., 0., 0.],
                       [0., 0., 1., 1., 1., 1., 0., 1., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])

topo_true = np.array([ [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0.],
                       [0., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0.],
                       [0., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])

In [None]:
np.sum(topo_pred)

In [None]:
plt.imshow(topo_pred)

In [None]:
plt.imshow(topo_true)

In [None]:
def compare_topomaps(topo_pred,topo_true):
    """
    Show optimal transport on a moving disc in a 50x50 grid
    """
    ## Step 1: Setup problem
    pix = np.linspace(-1, 1, 11) # max channels are 13
    # Setup grid
    X, Y = np.meshgrid(pix, pix)
    # Compute pariwise distances between points on 2D grid so we know
    # how to score the Wasserstein distance
    coords = np.array([X.flatten(), Y.flatten()]).T
    coordsSqr = np.sum(coords**2, 1)
    M = coordsSqr[:, None] + coordsSqr[None, :] - 2*coords.dot(coords.T)
    M[M < 0] = 0
    M = np.sqrt(M)
    wass = ot.emd2(1e-5 +topo_pred.flatten(), 1e-5 +topo_true.flatten(), M, 1.0)
    return wass

In [None]:
# for i in range(10):
#     print(compare_topomaps(np.roll(topo_true, i,0),topo_true))

compare_topomaps(topo_pred,topo_true)

## The code for wasserstein distance comparison ends here. 
## Code for stat analysis

In [None]:
from scipy.stats import fisher_exact

# Example data
model_correct = 10
model_incorrect = 4
baseline_correct = 8
baseline_incorrect = 6

# Create a 2x2 contingency table
contingency_table = [[model_correct, model_incorrect], [baseline_correct, baseline_incorrect]]

# Perform Fisher's exact test
odds_ratio, p_value = fisher_exact(contingency_table, alternative='greater')

# Set significance level (alpha)
alpha = 0.05

# Compare p-value to alpha
if p_value < alpha:
    print(f"Reject the null hypothesis. Model accuracy is statistically significantly better than the baseline.")
else:
    print("Fail to reject the null hypothesis. No significant difference in accuracy between the model and baseline.")


In [None]:
70.97,68.82,69.15,69.89,68.82,82.80,69.89,73.12,73.12,70.97,70.97,77.42,68.82,72.04,82.80,72.04
65.59,62.37,58.51,62.37,66.67,63.44,65.59,74.19,66.67,69.89,74.19,79.57,58.06,65.59,65.59,67.74
77.42,65.59,70.21,70.97,73.12,80.65,63.44,69.89,68.82,70.97,78.49,68.82,69.89,60.22,75.27,69.89

In [None]:
from scipy.stats import ttest_rel, wilcoxon

#Riem
#r_all_channel_accuracies=[83.87,73.12,73.12,78.49,68.82,77.42,72.04,74.19,69.89,70.97,70.97,70.97,77.42,69.57]
# mi_accuracies=[75.27,69.89,61.29,75.27,64.52,75.27,73.12,62.37,60.22,60.22,66.67,74.19,84.95,71.74]
# topfr_acc = [74.19,73.12,63.44,77.42,61.29,72.04,65.59,73.12,67.74,64.52,67.74,65.59,75.27,58.70]

# # EEGConformer
c_all_channel_accuracies=[70.97,68.82,69.15,69.89,68.82,82.80,69.89,73.12,73.12,70.97,70.97,77.42,68.82,72.04,82.80,72.04]
mi_accuracies=[77.42,65.59,70.21,70.97,73.12,80.65,63.44,69.89,68.82,70.97,78.49,68.82,69.89,60.22,75.27,69.89]
topfr_acc=[65.59,62.37,58.51,62.37,66.67,63.44,65.59,74.19,66.67,69.89,74.19,79.57,58.06,65.59,65.59,67.74]


# # EEGNet
# topfr_acc = [66.67,62.37,56.99,56.99,59.14,64.52,73.12,65.59,61.29,51.61,63.44,63.44,62.37,59.78]
# e_all_channel_accuracies = [78.49,65.59,59.14,68.82,63.44,70.97,75.27,68.82,59.14,60.22,62.37,70.97,77.42,57.61]
# mi_accuracies = [81.72,58.06,61.29,78.49,58.06,63.44,73.12,64.52,58.06,59.14,54.84,65.59,78.49,60.87]

# Perform a paired t-test
t_stat, t_p_value = ttest_rel(topfr_acc, c_all_channel_accuracies)

# Perform a Wilcoxon signed-rank test as a non-parametric alternative
wilcoxon_stat, wilcoxon_p_value = wilcoxon(topfr_acc, c_all_channel_accuracies)

# Set significance level (alpha)
alpha = 0.05

# Compare p-values to alpha
if t_p_value < alpha:
    print(f"{t_p_value}: Paired t-test: Reject the null hypothesis. Model accuracies are statistically significantly better than the baseline.")
else:
    print(f"{t_p_value}: Paired t-test: Fail to reject the null hypothesis. No significant difference in accuracies between the model and baseline.")

if wilcoxon_p_value < alpha:
    print(f"{wilcoxon_p_value}: Wilcoxon signed-rank test: Reject the null hypothesis. Model accuracies are statistically significantly better than the baseline.")
else:
    print(f"{wilcoxon_p_value}: Wilcoxon signed-rank test: Fail to reject the null hypothesis. No significant difference in accuracies between the model and baseline.")
