In [14]:
import sys
import mne
import pandas as pd
import numpy as np
import os.path
from os import path
import pyprep
from mne import Epochs, pick_types, find_events, pick_types, set_eeg_reference, channels, preprocessing
from mne.io import concatenate_raws, read_raw_bdf, read_raw_eeglab, eeglab
from mne.preprocessing import ICA, create_eog_epochs, create_ecg_epochs, corrmap,  annotate_muscle_zscore
import matplotlib.pyplot as plt
from scipy.integrate import simps
from mne.viz import plot_topomap
from matplotlib import cm, colors, colorbar
from fooof.bands import Bands
from scipy import stats
from mne.epochs import equalize_epoch_counts
from tqdm import tqdm
import time



from mne.datasets import fetch_fsaverage
import os.path as op

from mne.minimum_norm import make_inverse_operator, apply_inverse, \
                             write_inverse_operator

from mne.datasets import sample
from mne.minimum_norm import apply_inverse_epochs, read_inverse_operator
from mne.viz import circular_layout
from mne_connectivity import spectral_connectivity_epochs
from mne_connectivity.viz import plot_connectivity_circle

## Absolute Band Power
Calculate the absolute power for the different channels, decompose these into five frequency bands. Plot the results. This method will need to be extended to compare the pre and post outcomes. Currently takes as input continuous data, but the same compute_psd data will also work for epochs. This blog has a nice rationale for the approach --> https://raphaelvallat.com/bandpower.html. 

import tkinter as tk
import mne
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg

# Create a fake Raw object with random EEG data for demonstration
sfreq = 1000  # Sampling frequency in Hz
n_channels = 1  # Number of EEG channels
n_samples = 10000  # Number of time points
data = np.random.randn(n_channels, n_samples)  # Simulated EEG data
ch_names = ['EEG 1']  # Channel names
info = mne.create_info(ch_names=ch_names, sfreq=sfreq)
raw = mne.io.RawArray(data, info)

def analyze_data_and_plot():
    # Calculate PSD using MNE's psd method
    fmin, fmax = 0, 100  # Define your frequency range of interest
    power, freqs = mne.time_frequency.psd_multitaper(raw, fmin=fmin, fmax=fmax)
    
    # Create a plot
    plt.figure(figsize=(6, 4))
    plt.semilogy(freqs, power[0], color='k')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Power/Frequency')
    plt.title('Power Spectral Density')
    
    # Embed the plot in the Tkinter GUI
    canvas = FigureCanvasTkAgg(plt.gcf(), master=root)
    canvas.get_tk_widget().pack()
    canvas.draw()

# Create a main window
root = tk.Tk()
root.title("MNE Analysis GUI")

# Create a button to trigger the analysis and plot
analyze_button = tk.Button(root, text="Analyze Data and Plot", command=analyze_data_and_plot)
analyze_button.pack()

# Run the GUI main loop
root.mainloop()


## Methods that apply across different metrics

In [15]:
    
def perform_permutation_test(X, n_perms):
    t_obs, clusters, clusters_pv, H0 = mne.stats.permutation_cluster_1samp_test(X, n_permutations=n_perms, tail=0, seed=42)
    return t_obs, clusters, clusters_pv, H0

def plot_band_results(band_def, t_obs, colourmap, axes):
    im, cm = plot_topomap(t_obs, pos=preraw.info, cmap=colourmap, show=False, axes=axes)
    cbar = plt.colorbar(im, ax=axes, shrink=0.3)
    cbar.set_label('t')
    axes.set_title(band_def, {'fontsize': 20})
    
def prepare_data_for_permutation_test(pre, post, n_subjects):
    np.random.seed(0)
    X = np.ones((n_subjects, 62, 2))
    print(X.shape)
    X[:, :, 0] = pre[:, :]
    X[:, :, 1] = post[:, :]
    X = X[:, :, 1] - X[:, :, 0]
    return X




## Absolute Power band

In [16]:
def compute_power(low,high,data):
    # Extract the names of the EEG channels
    picks = data.info['ch_names']
    
    # Reset bad channels to an empty list
    data.info['bads'] = []
    
    # Print the number of channels and bad channels
    print("number of chans ", len(data.info['ch_names']), data.info['bads'])
    
    # Compute power spectral density (PSD) using multitaper method
    spectrum = data.compute_psd(picks=picks,method = 'multitaper',fmin=0, fmax=45, verbose = True)#, reject_by_annotation=True) #
    # Get PSD values and corresponding frequency values
    psds, freqs = spectrum.get_data(exclude=(), return_freqs=True, picks=picks)
    

    
    # Frequency resolution
    freq_res = freqs[1] - freqs[0]
    idx_band = np.logical_and(freqs >= low, freqs <= high)
  
    
     # Initialize lists and arrays for computed power values
    abp_averaged = []
    abp_epochs = np.reshape(np.ones(psds.shape[0]*psds.shape[1]), (psds.shape[0], psds.shape[1]))
    
    # Loop through each EEG channel
    for i in range(len(data.info['ch_names'])):
        bp_epoch = []
        for epoch in range(psds.shape[0]):
           # Compute band power using Simpson's rule (integral approximation)
            bp = simps(psds[epoch][i][idx_band], dx=freq_res)
            bp_epoch.append(bp)
            
            # Store computed band power for each epoch and channel
            abp_epochs[epoch][i] = bp
        # Calculate average band power across epochs for the current channel
        bp = sum(bp_epoch)/len(bp_epoch) 
        abp_averaged.append(bp)
    # convert list to numpy array
    abp_averaged = np.asarray(abp_averaged)
    print(psds.shape, freqs.shape)
    
    # Return computed average band power, per-epoch band power, and spectrum object
    return abp_averaged, abp_epochs, spectrum

In [17]:
def process_abs_band(band_def, pre_subjects, post_subjects, merged_subjects, n_perms=8500):
    low, high = band_def
    all_pre_abp = []
    all_post_abp = []
    pre_psds = []
    post_psds = []
    band_data = {}

    for sub in tqdm(range(len(merged_subjects)), desc="Processing subject"):
        preraw = pre_subjects[sub]
        postraw = post_subjects[sub]

        pre_abp_averaged, pre_abp_epochs, pre_spectrum = compute_power(low, high, preraw)
        post_abp_averaged, post_abp_epochs, post_spectrum = compute_power(low, high, postraw)

        all_pre_abp.append(pre_abp_averaged)
        all_post_abp.append(post_abp_averaged)

        power_difference = post_abp_averaged - pre_abp_averaged
        subject_data = {
            'pre_abp': pre_abp_averaged,
            'post_abp': post_abp_averaged,
            'difference': power_difference
        }

        band_data[merged_subjects[sub]] = subject_data

    X = prepare_data_for_permutation_test(all_pre_abp, all_post_abp, n_subjects)
    
    print(band_def, np.mean(np.mean(np.array(all_pre_abp))) * 1e3)
    print(band_def, np.mean(np.mean(np.array(all_post_abp))) * 1e3)

    t_obs, clusters, clusters_pv, H0 = perform_permutation_test(X, n_perms)
    preraw.info['bads'] = []

    return t_obs, clusters, clusters_pv, band_data, X



In [18]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

def plot_individual_bars(results, *labels, title):
    """
    Plot individual differences for specified bands.

    Parameters:
        differences(dict): A dictionary containing the data.
        *labels (str): Labels for the bands to be plotted.

    Example:
        plot_individual_bars(absolute_power_changes, 'Alpha', 'Beta', 'Absolute Power Band Individual differences')
    """
    if not labels:
        print("No bands specified for plotting.")
        return

    # Create a figure with enhanced Seaborn styling
    sns.set(style="whitegrid")
    plt.figure(figsize=(10, 6))

    # Initialize data lists for the specified labels
    data = {label: [] for label in labels}

    # Extract the subjects (assumed to be the keys in the inner dictionaries)
    subjects = list(results[labels[0]].keys())

    # Extract the absolute power differences for specified bands
    for label in labels:
        differences = [np.mean(results[label][subject]['difference']) for subject in subjects]
        data[label] = differences

    # Define the width of the bars
    bar_width = 0.35

    # Create a list of x positions for the bars
    x = np.arange(len(subjects))

    # Plot the specified bands with different colors
    colors = ['blue', 'green', 'red', 'purple', 'orange']  # You can extend this list for more bands
    for i, label in enumerate(labels):
        plt.bar(x + i * bar_width, data[label], bar_width, label=label, color=colors[i])

    # Set the x-axis labels and increase font size
    plt.xticks(x + bar_width * len(labels) / 2, subjects, fontsize=13)

    # Set the y-axis label and increase font size
    plt.ylabel(title + ' Post > Pre', fontsize=16)

    # Set the title and increase font size
    plt.title(' and '.join(labels) + ' Band ' + title, fontsize=24)

    # Add a legend with increased font size
    plt.legend(fontsize=16)

    # Show the plot
    plt.tight_layout()
    plt.show()

# Example usage
#plot_individual_absolute_bar(absolute_power_changes, 'Alpha', 'Beta')
# Or you can call it with any number of bands, e.g., plot_individual_absolute_bar(absolute_power_changes, 'Alpha', 'Beta', 'Gamma')


# Lempel Ziv

In [19]:
from scipy import signal
from scipy.signal import (butter, lfilter, hilbert, resample)
from random import shuffle
#from pylab import *
import os as os

def Pre2(X):
    '''
    Linear-detrend and subtract mean of X, a multidimensional times series (channels x observations)
    '''
    ro, co = np.shape(X)
    Z = np.zeros((ro, co))
    for i in range(ro):
        Z[i,:]=signal.detrend((X[i,:]-np.mean(X[i,:]))/np.std(X[i,:]), axis=0)
    return Z

def PSpec(X):
    '''
    X: multidimensional time series, ch x obs
    fs: sampling rate in Hz
    '''
    def find_closest(A, target):
        '''
        helper function
        '''
        # A must be sorted
        idx = A.searchsorted(target)
        idx = np.clip(idx, 1, len(A) - 1)
        left = A[idx - 1]
        right = A[idx]
        idx -= target - left < right - target
        return idx

    fs = 1024  # 250
    de = [1, 4]  # in Hz
    th = [4, 8]
    al = [8, 13]
    be = [13, 30]
    ga = [30,45]
    F = [de, th, al, be, ga]  # ,ga,hga]

    ro, co = np.shape(X)
    Q = []

    for i in range(ro):
        v = X[i]
        co = len(v)
        N = co  # Number of samplepoints
        T = 1.0 / fs  # sample spacing (denominator in Hz)
        y = v
        yf = fft(y)
        xf = np.linspace(0.0, 1.0 / (2.0 * T), int(N / 2))
        yff = 2.0 / N * np.abs(yf[0:int(N / 2)])
        bands = np.zeros(len(F))
        for i in range(len(F)):
            bands[i] = sum(yff[find_closest(xf, F[i][0]):find_closest(xf, F[i][1])])
        bands = bands / sum(bands)
        Q.append(bands)
        print("Q ", Q)
    return Q

def cpr(string):
    '''
    Lempel-Ziv-Welch compression of binary input string, e.g. string='0010101'. It outputs the size of the dictionary of binary words.
    '''
    d = {}
    w = ''
    for c in string:
        wc = w + c
        if wc in d:
            w = wc
        else:
            d[wc] = wc
            w = c
    return len(d)

def str_col(X):
    '''
    Input: Continuous multidimensional time series
    Output: One string being the binarized input matrix concatenated column-by-column
    '''
    ro, co = np.shape(X)
    TH = np.zeros(ro)
    M = np.zeros((ro, co))
    for i in range(ro):
        M[i, :] = abs(hilbert(X[i, :]))
        TH[i] = np.mean(M[i, :])

    s = ''
    for j in range(co):
        for i in range(ro):
            if M[i, j] > TH[i]:
                s += '1'
            else:
                s += '0'

    return s

def LZc(X):
    '''
    Compute LZc and use shuffled result as normalization
    '''
    X = Pre2(X)
    SC = str_col(X)
    M = list(SC)
    shuffle(M)
    w = ''
    for i in range(len(M)):
        w += M[i]
    return cpr(SC) / float(cpr(w))


In [20]:

def process_lpz_band(band, low, high, pre_subjects, post_subjects, subjects, LZc_results, n_subjects, n_perms):
    band_data = {}
    LZc_df = pd.DataFrame()
    LZc_df['subjects'] = subjects
    channels_pre = []
    channels_post = []

    for sub in range(n_subjects):
        row = [subjects[sub]]
        preraw = pre_subjects[sub]
        postraw = post_subjects[sub]

        preraw = preprocess_raw_LZc(preraw, low, high)
        postraw = preprocess_raw_LZc(postraw, low, high)

        chan_pre_LPz, chan_post_LPz = calculate_LZc(preraw, postraw)

        row.extend([np.mean(chan_pre_LPz), np.mean(chan_post_LPz), np.mean(chan_post_LPz) - np.mean(chan_pre_LPz)])

        channels_pre.append(chan_pre_LPz)
        channels_post.append(chan_post_LPz)
        band_data[subjects[sub]] = {
            'pre_LPz': chan_pre_LPz,
            'post_LPz': chan_post_LPz,
            'LPz_difference': chan_post_LPz - chan_pre_LPz
        }

        LZc_results[band] = band_data

      #  LZc_df = LZc(LZc_df, subjects, chan_pre_LPz, chan_post_LPz, row)
    print(len(channels_pre), len(channels_post), n_subjects)
    pre = np.array(channels_pre)
    post = np.aray(channels_post)
    X = prepare_data_for_permutation_test(pre, post,n_subjects)
    t_obs, clusters, clusters_pv, H0 = perform_permutation_test(X, n_perms)
    print(t_obs, clusters, clusters_pv, H0)


    return LZc_df, t_obs, X, LZc_results

def preprocess_raw_LZc(raw_data, low, high):
    transp = raw_data._data.transpose([1, 0, 2])
    data = transp.reshape([transp.shape[0], -1])
    info_recon = mne.create_info(ch_names=raw_data.info['ch_names'], sfreq=1024)
    raw_data = mne.io.RawArray(data, info_recon)
    raw_data = raw_data.filter(l_freq=low, h_freq=high, picks='all')
    return raw_data

def calculate_LZc(pre_raw, post_raw):
    channels_pre_LPz = []
    channels_post_LPz = []

    for channel in range(62):
        pre_LPz = LZc(np.expand_dims(pre_raw.get_data()[channel], axis=0))
        post_LPz = LZc(np.expand_dims(post_raw.get_data()[channel], axis=0))
        channels_pre_LPz.append(pre_LPz)
        channels_post_LPz.append(post_LPz)

    return np.array(channels_pre_LPz), np.array(channels_post_LPz)



def merge_dataframes_LZc(LZc_df, subjects, chan_pre_LPz, chan_post_LPz, row):
    df = pd.DataFrame()
    df['subjects'] = subjects
    df[f'{band}_pre_LPz'] = chan_pre_LPz
    df[f'{band}_post_LPz'] = chan_post_LPz
    df[f'{band}_change'] = chan_post_LPz - chan_pre_LPz

    LZc_df = pd.merge(LZc_df, df, on="subjects", how='outer')





In [21]:
def global_LPz(pre_subjects, post_subjects, subjects, n_subjects, n_perms):
    channels_pre = []
    channels_post = []

    for sub in range(n_subjects):
        row = [subjects[sub]]
        preraw = pre_subjects[sub]
        postraw = post_subjects[sub]

        preraw = preprocess_raw_LZc(preraw, low, high)
        postraw = preprocess_raw_LZc(postraw, low, high)

        chan_pre_LPz, chan_post_LPz = calculate_LZc(preraw, postraw)
        channels_pre.append(chan_pre_LPz)
        channels_post.append(chan_post_LPz)


      #  LZc_df = LZc(LZc_df, subjects, chan_pre_LPz, chan_post_LPz, row)
    print(len(channels_pre), len(channels_post), n_subjects)
    X = prepare_data_for_permutation_test(channels_pre, channels_post,n_subjects)
    t_obs, clusters, clusters_pv, H0 = perform_permutation_test(X, n_perms)
    print(t_obs, clusters, clusters_pv, H0)
    return LZc_df, t_obs, X, LZc_results
    

## Connectivity

In [22]:
def montage_alignment(data, montage):

    data.set_montage(montage)
    data.set_eeg_reference(projection=True) # necerssary for inverse modeling
    # Check that the locations of EEG electrodes is correct with respect to MRI
    mne.viz.plot_alignment(
        data.info,
        src=src,
        eeg=["original", "projected"],
        trans=trans,
        show_axes=True,
        mri_fiducials=True,
        dig="fiducials",
    )
    return data

the purpose of this function is to generate a forward solution for EEG source localization based on the input data and the provided parameters. Specifies the Boundary Element Model (BEM) solution to be used for forward modeling.

In [23]:
def make_forward(data):
    fwd = mne.make_forward_solution(
        data.info, trans=trans, src=src, bem=bem, eeg=True, mindist=5.0, n_jobs=None, meg= False
    )
    return fwd

### Compute inverse operator
prepares the noise covariance and computes the inverse operator, which is crucial for estimating the sources of EEG signals in the brain.

- estimates the noise covariance matrix from the EEG data data. The noise covariance describes the statistical properties of noise in the recorded EEG signals. It's an essential component for inverse source localization because it helps separate the signal of interest from noise.
- After estimating the noise covariance, this line regularizes it to improve its numerical stability. 
- returns the inverse operator

In [24]:
def compute_inverse_operator(data, fwd):
    noise_cov = mne.compute_covariance(data) # stay away from the stim artifact
    noise_cov = mne.cov.regularize(noise_cov, data.info,
                               mag=0.1, grad=0.1, eeg=0.1, proj=True)

    inverse_operator = make_inverse_operator(data.info, fwd, noise_cov,
                                         loose=0.2, depth=0.8)
    return noise_cov, inverse_operator
#fname_inv = data_path + '/EEG/data-inv.fif'
#write_inverse_operator(fname_inv, inverse_operator)

In [25]:


# Define a function to compute inverse solutions
def compute_inverse_solutions(inverse_operator, stcs, band_def, con_methods):
    fmin, fmax = band_def

    # Average the source estimates within each label using sign-flips to reduce signal cancellations
    src = inverse_operator['src']
    label_ts = mne.extract_label_time_course(
        stcs, labels=labels, src=src, mode='mean_flip', return_generator=True)

    con = spectral_connectivity_epochs(
        label_ts, method=con_methods, mode='multitaper', sfreq=sfreq, fmin=fmin,
        fmax=fmax, faverage=True, mt_adaptive=True, n_jobs=1)

    # Extract connectivity data for each method
    con_res = {}
    for method, c in zip(con_methods, con):
        con_res[method] = c.get_data(output='dense')[:, :, 0]

    return con_res


In [26]:
#function to compute connectivity pre and psot
def compute_connectivity(pre, post, band_def, con_methods, montage):
    # Process the pre-EEG data
    preraw = montage_alignment(pre, montage)
    fwd = make_forward(preraw)
    noise_cov, inverse_operator = compute_inverse_operator(preraw, fwd)
    stcs = apply_inverse_epochs(preraw, inverse_operator, lambda2, method,
                             pick_ori="normal", return_generator=True)
    pre_con_res = compute_inverse_solutions(inverse_operator, stcs, band_def,con_methods)

    # Process the post-EEG data
    postraw = montage_alignment(post, montage)
    fwd = make_forward(postraw)
    noise_cov, inverse_operator = compute_inverse_operator(postraw, fwd)
    stcs = apply_inverse_epochs(postraw, inverse_operator, lambda2, method,
                             pick_ori="normal", return_generator=True)
    post_con_res = compute_inverse_solutions(inverse_operator, stcs, band_def,con_methods)
    
    return pre_con_res, post_con_res
