In [None]:
#install ASR artifact rejection
!pip install asrpy -q


In [None]:

#importing all requirements

import mne
from mne.datasets import ssvep
from asrpy import ASR
import numpy as np
import matplotlib.pyplot as plt
import glob
import os
%matplotlib inline
from mne_icalabel import label_components
from mne.preprocessing import ICA
from mne import export
from scipy.stats import linregress


In [None]:
# Defining the path where the data is secured

path = r'C:\Users\PC1\Documents\MATLAB\test' # use your path
all_files = glob.glob(os.path.join(path, "*.set")) 
output_folder = 'processed'
os.makedirs(output_folder, exist_ok=True)  # Create folder if it doesn't exist



In [None]:
# Function to contain all the code which runs all preprocessing steps and then call PSD function for the quality check



for filename in all_files:
    raw = mne.io.read_raw_eeglab(filename, preload=True)
    print(filename)
    
    # Select EEG channels
    picks = mne.pick_types(raw.info, meg=False, eeg=True, stim=False, eog=False)
    raw.pick(picks)

    # Set EEG reference and apply filters
    raw.set_eeg_reference("average")
    raw.filter(l_freq=1., h_freq=100.)
    raw.notch_filter(freqs=[60])  # Bandstop for power grid

    # Make a copy of uncleaned data
    raw_uncleaned = raw.copy()

    # Apply ASR
    asr = ASR(sfreq=raw.info["sfreq"], cutoff=15)
    asr.fit(raw)
    CleanData = asr.transform(raw)

    # Initialize and fit ICA with extended infomax
    ica = ICA(method='infomax', fit_params=dict(extended=True), random_state=97)
    ica.fit(CleanData)

    # Label components and exclude those with brain label and proba <= 0.8
    IC_COMP = label_components(CleanData, ica, method='iclabel')
    indices = [i for i, (proba, label) in enumerate(zip(IC_COMP['y_pred_proba'], IC_COMP['labels']))
               if label == 'brain' and proba <= 0.8]
    ica.exclude.extend(indices)

    # Apply ICA to CleanData
    CleanData = ica.apply(CleanData)

    # Generate a unique output filename based on the original filename
    base_name = os.path.splitext(os.path.basename(filename))[0]  # Get base name without extension
    output_filename = os.path.join(output_folder, f"{base_name}_processed.set")

    # Save each processed file to the "processed" folder
    export.export_raw(output_filename, CleanData, fmt='eeglab', overwrite=True)
    print(f"Saved: {output_filename}")


In [None]:
#CleanData.plot()
events = mne.events_from_annotations(CleanData)

# Check the event IDs
print(events[1]) 

Removing Boundary events

In [None]:
# Get current annotations
annotations = CleanData.annotations

# Filter out annotations labeled as 'BAD boundary'
filtered_annotations = annotations[annotations.description != 'boundary']

# Replace the annotations in the CleanData object
CleanData.set_annotations(filtered_annotations)


In [None]:
window_duration = 2.0  # seconds
step_size = 1.0        # seconds
# Total recording time
recording_duration = CleanData.times[-1]

# Create overlapping windows
start_times = []
t = 0.0
while t + window_duration <= recording_duration:
    start_times.append(t)
    t += step_size

print(f"Number of windows: {len(start_times)}")


In [None]:

# Extract data for each window
epochs_data = []
for start_time in start_times:
    stop_time = start_time + window_duration
    epochs_data.append(CleanData.copy().crop(tmin=start_time, tmax=stop_time).get_data())

# Convert to MNE Epochs object
info = CleanData.info  # Use the same info as the CleanData object
epochs = mne.EpochsArray(data=np.array(epochs_data), info=info)


In [None]:
# Example: Label all pseudo-epochs as "resting" (label = 1)
event_ids = {'rest': 1}
events = np.array([[int(start_time * CleanData.info['sfreq']), 0, 1] for start_time in start_times])

# Use events to create epochs
epochs = mne.Epochs(CleanData, events, event_id=event_ids, tmin=0, tmax=window_duration,
                    baseline=None, detrend=1)


In [None]:
CleanData.plot()
epochs.plot()


SOURCE LOCALIZATION

Forward Model

In [19]:
from mne import make_bem_model, make_bem_solution, setup_source_space, make_forward_solution

# BEM model
bem_model = make_bem_model(subject='subject_name', conductivity=[0.3], ico=4)
bem_solution = make_bem_solution(bem_model)

# Source space
src = setup_source_space(subject='subject_name', spacing='oct6', 
                         subjects_dir='/path/to/freesurfer/subjects')

# Forward solution
fwd = make_forward_solution(epochs.info, trans='subject_name-trans.fif', 
                            src=src, bem=bem_solution, eeg=True)


KeyError: 'Key "SUBJECTS_DIR" not found in the environment or in the the mne-python config file (C:\\Users\\PC1\\.mne\\mne-python.json). Try either os.environ["SUBJECTS_DIR"] = VALUE for a temporary solution, or mne.utils.set_config("SUBJECTS_DIR", VALUE, set_env=True) for a permanent one. You can also set the environment variable before running python.'

Noise COV

In [None]:
from mne import compute_covariance

# Estimate noise covariance
noise_cov = compute_covariance(epochs, tmax=0., method='auto')  # Use baseline


Inverse Model

In [None]:
from mne.minimum_norm import make_inverse_operator, apply_inverse

# Create the inverse operator
inverse_operator = make_inverse_operator(epochs.info, fwd, noise_cov, loose=0.2, depth=0.8)

# Apply inverse solution to epochs
stc = apply_inverse(epochs.average(), inverse_operator, lambda2=1/9., method='dSPM')


In [None]:
from mne.time_frequency import tfr_multitaper, tfr_morlet

# Define frequencies of interest
frequencies = np.arange(1, 40, 1)  # 1-40 Hz
n_cycles = frequencies / 2.0       # Number of cycles, adaptable to frequency

# Morlet wavelet
tfr = tfr_morlet(epochs, freqs=frequencies, n_cycles=n_cycles, 
                 use_fft=True, return_itc=False, decim=3, average=True)

# Plot TFR for a single channel
tfr.plot([0], baseline=(-0.5, 0), mode='logratio', title='TFR for Channel 0')


In [None]:
from mne.time_frequency import tfr_multitaper

# Define frequencies of interest
frequencies = np.arange(1, 40, 1)  # 1-40 Hz

# Define the number of cycles for each frequency
n_cycles = frequencies / 2.0  # Adjust as needed (e.g., cycles = frequency / 2)

# Multitaper Time-Frequency Analysis
tfr_mt = tfr_multitaper(epochs, freqs=frequencies, n_cycles=n_cycles,
                        time_bandwidth=4.0, return_itc=False, average=True)

# Plot TFR for a single channel
tfr_mt.plot([1], baseline=(-0.5, 0), mode='logratio', title='Multitaper TFR')



In [None]:
PSD_CHECK()

In [None]:


#Quality check function on the processed data 

# Define the folder where processed files are saved
def PSD_CHECK():
 processed_folder = 'processed'

  # Get a list of all .set files in the processed folder
 processed_files = [os.path.join(processed_folder, f) for f in os.listdir(processed_folder) if f.endswith('.set')]

 # Loop through each processed file and calculate the PSD
 for processed_file in processed_files:
    # Read the processed .set file
    raw = mne.io.read_raw_eeglab(processed_file, preload=True)
    print(f"Calculating PSD for {processed_file}")
    psd_data = raw.compute_psd(fmin=1, fmax=80)
    psd, freqs = psd_data.get_data(return_freqs=True)
    # Plot the PSD similar to MNE's style
    # Convert PSD to dB scale
    psd_db = 10 * np.log10(psd.mean(axis=0))  # Mean across channels

    # Choose the frequency range to fit (e.g., from 1 Hz to 80 Hz)
    freq_range = (freqs >= 1) & (freqs <= 80)  # Boolean mask for selected frequencies
    freqs_selected = freqs[freq_range]
    psd_db_selected = psd_db[freq_range]

    # Fit a linear regression line to the selected frequency range
    slope, intercept, r_value, p_value, std_err = linregress(freqs_selected, psd_db_selected)
    # Data quality assessment based on slope
    if slope >= 0:
     quality = "Garbage"
    elif 0 > slope >= -0.1:
     quality = "Poor"
    elif -0.1 > slope >= -0.2:
     quality = "Fair"
    elif -0.2 > slope >= -0.3:
     quality = "Good"
    else:  # slope < -0.3
     quality = "Excellent"

    print(f"Data Quality: {quality}")