# EEG Preprocessor

In [1]:
# Import dependencies.
import mne
import os
import shutil # High-level file organisation.
from pymatreader import read_mat # Reading .mat files.
import numpy as np
import re
from itertools import chain
import ipywidgets as widgets
import matplotlib

## Create MNE data structures:

In [2]:
if not os.path.exists("eeg_data"):
    # Create the folder "eeg_data"
    os.makedirs("eeg_data")

data_folder = os.path.join(
    "eeg_data"
)

### Before running below, add your files to eeg_data. They should be .mat, .fif, .gdf or .edf files.

In [3]:
# Account for extensions by sorting into folder.

# Sort through folder.
for file in os.listdir(data_folder):
    name, ext = os.path.splitext(file)
    ext = ext[1:] # Remove '.' from extension name

    if ext == '':
        continue

    if os.path.exists(data_folder + "/" + ext):
        shutil.move(data_folder + "/" + file, data_folder + "/" + ext)

    else:
        os.makedirs(data_folder + "/" +  ext)
        shutil.move(data_folder + "/" + file, data_folder + "/" + ext)

In [4]:
def simplify_dict(d):
    for key, value in d.items():
        if isinstance(value, list):
            # If the list contains nested lists, flatten them:
            while len(value) == 1 and isinstance(value[0], list):
                value = value[0]
            d[key] = value
        elif isinstance(value, dict):
            # If the value is a dictionary, simplify it recursively:
            d[key] = simplify_dict(value)
    return d

def regex_search_dict(data, regex_pattern):

    match = [] # Initialise match
    
    # Search dict:
    for key, value in data.items():
        if re.search(regex_pattern, key):
             match.append(data[key])
        if isinstance(value, dict): # Also search for nested dictionaries.
            match.append(regex_search_dict(value, regex_pattern))
    return match

def create_mne_from_mat_dict(mat_dict):
    
    # These regexp search patterns will be used for searching .mat file types:
    regex_info = {
        "ch_names": r"\b(ch(?:annel)?_?names?)\b", # Different permutations of channel name.
        "sfreq": r"\b((?:samp(?:ling)?|s)_?Freq(?:uency)?)\b", # Different permutations of sampling frequency.
        "ch_types": r"\b(ch(?:annel)?_?types?)\b"
    }
        #"projs": r"\bproj(?:ector?s?)\b", # Different permutations of projectors.
        #"events": r"\b(?:event\w*|marker\w*)\b", # Different permutations of events.
    parameters = {}
    
    for pattern in regex_info:
        match = regex_search_dict(mat_dict, regex_info[pattern])
        parameters[pattern] = match

    simplify_dict(parameters)

    # Sampling frequency is taken as a float value, so if it's a list it'll be converted:
    if isinstance(parameters['sfreq'], list) and len(parameters['sfreq']) == 1:
        parameters['sfreq'] = float(parameters['sfreq'][0])

    # Check that the amount of channels named and
    
    data_patterns = r"\b(?:eeg|raw[_\s]?eeg|data|eeg[_\s]?data|data[_\s]?eeg)\b" # Different permutations of eeg data.
    data = regex_search_dict(mat_dict, data_patterns)
    data = np.array(data)
    data = data.squeeze() # Flatten any superfluous dimensions of 1.
    
    # This checks that channels are on the x-axis and samples are on the y-axis. Almost unilaterally, the number of samples will be greater than the number of channels (and if not, the sample size is insufficient).
    if data.shape[0] > data.shape[1]:
        data = data.T

    len1 = len(parameters['ch_names'])
    len2 = len(data)

    # If there aren't as many ch_names as there are channels in data, then ch_names will be defaulted to ['channel0', ...]:
    if len1 < len2:
        parameters['ch_names'] = []
        parameters['ch_names'] = [f"channel{i}" for i in range(len(data))]

    # If there's no input for channel types, they are assumed to all be EEG:
    if len(parameters['ch_types']) != len(parameters['ch_names']):
        parameters['ch_types'] = ["eeg"]*len(parameters['ch_names'])
        
    info = mne.create_info(**parameters) # Create the MNE info structure.
    
    raweeg = mne.io.RawArray(data, info) # Create the MNE raw object.
    
    return raweeg

In [5]:
raw_eeg_list = []

for folder in os.listdir(data_folder):
    path = os.path.join(data_folder, folder)
    for file in os.listdir(path):
        raweeg=os.path.join(data_folder, folder, file)

        # While .mat files are common, they don't have a standardised recording format, and so they need to be handled separately.
        if folder == "mat":
            matfile = read_mat(raweeg)
            mne_mat = create_mne_from_mat_dict(matfile)
            raw_eeg_list.append(mne_mat)
        else:
            raw_eeg_list.append(mne.io.read_raw(raweeg, verbose=False))

Creating RawArray with float64 data, n_channels=22, n_times=621892
    Range : 0 ... 621891 =      0.000 ...  3109.455 secs
Ready.
Creating RawArray with float64 data, n_channels=22, n_times=666800
    Range : 0 ... 666799 =      0.000 ...  3333.995 secs
Ready.
Creating RawArray with float64 data, n_channels=22, n_times=621884
    Range : 0 ... 621883 =      0.000 ...  3109.415 secs
Ready.
Creating RawArray with float64 data, n_channels=22, n_times=620168
    Range : 0 ... 620167 =      0.000 ...  3100.835 secs
Ready.
Creating RawArray with float64 data, n_channels=22, n_times=671600
    Range : 0 ... 671599 =      0.000 ...  3357.995 secs
Ready.
Creating RawArray with float64 data, n_channels=22, n_times=729400
    Range : 0 ... 729399 =      0.000 ...  3646.995 secs
Ready.
Creating RawArray with float64 data, n_channels=22, n_times=667800
    Range : 0 ... 667799 =      0.000 ...  3338.995 secs
Ready.
Creating RawArray with float64 data, n_channels=22, n_times=667400
    Range : 0 ..

## Apply filters:

In [10]:
filter_check = widgets.Checkbox(
    value=False,
    description='Apply bandpass',
    disabled=False
)

lowfreq = widgets.BoundedFloatText(
    value=8.0,
    min=0,
    max=150,
    step=0.1,
    description='l_freq filter:',
    disabled=False
)

highfreq = widgets.BoundedFloatText(
    value=35.0,
    min=0,
    max=150,
    step=0.1,
    description='h_freq filter:',
    disabled=False
)

display(filter_check)

Checkbox(value=False, description='Apply bandpass')

In [20]:
if filter_check.value == True:
    display(lowfreq)
    display(highfreq)

In [21]:
if filter_check.value == True:
    freqs = [lowfreq.value, highfreq.value]
else:
    freqs = [None, None]

test = raw_eeg_list[0].copy().filter(l_freq=freqs[0], h_freq=freqs[1])

Filtering raw data in 1 contiguous segment

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal allpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Filter length: 1 samples (0.005 s)



In [22]:
test.plot()

<mne_qt_browser._pg_figure.MNEQtBrowser at 0x7f665b5632f0>

Channels marked as bad:
none
