In [4]:
import numpy as np
import mne
import matplotlib.pyplot as plt

In [5]:
def load_single_record(
    path: str,
    sample_rate: int = 250,
    low_freq: int = 0.5,
    high_freq: int = 45,
    epoch_size: int = 2,
    epoch_overlap: int = 1,
    normalize: bool = False,
    remove_dc: bool = False,
) -> np.ndarray:
    # Load a single recording
    raw = mne.io.read_raw_eeglab(
        path,
        preload=True,
    )
    # Resampling
    raw.resample(sample_rate)

    # Bandpass filter
    filtered = raw.copy().filter(l_freq=low_freq, h_freq=high_freq, verbose=False)

    # print(filtered.get_data().shape)
    if normalize:
    # Normalize signal
        scaler = mne.decoding.Scaler(scalings="mean")
        scaled_data = scaler.fit_transform(filtered.get_data())
        scaled_data = scaled_data.reshape(filtered.get_data().shape)
        # print(scaled_data.shape)

        scaled_raw = mne.io.RawArray(scaled_data, filtered.info)
    else:
        scaled_raw = filtered

    if remove_dc:
        scaled_raw = scaled_raw.copy().filter(l_freq=0.1, h_freq=None)
        # Remove DC component
        # scaled_raw.apply_function(
        #     lambda x: x - np.mean(x), picks=mne.pick_types(scaled_raw.info, eeg=True)
        # )

    # print(scaled_raw.info)

    # Segmentation into epochs
    epochs = mne.make_fixed_length_epochs(
        scaled_raw, duration=epoch_size, overlap=epoch_overlap, preload=True, verbose=False
    )

    epochs_data = epochs.get_data(copy=True)

    return epochs_data

In [6]:
# healthy subject_ids = ["037", "043", "044", "057", "065"]

train_ids = [1, 5, 20, 37, 43, 57, 65, 67, 76, 82, 83]
validation_ids = [2, 44, 70, 84]


raw_data = np.empty((0, 19, 500))
clean_data = np.empty((0, 19, 500))

# for s_id in train_ids:
for s_id in validation_ids:
    print(f"Loading subject {s_id}")
    raw_single = load_single_record(
        f"./data_raw/sub-{s_id:03d}/eeg/sub-{s_id:03d}_task-eyesclosed_eeg.set",
        normalize=False,
        remove_dc=False,
    )
    raw_data = np.vstack((raw_data, raw_single))
    clean_single = load_single_record(
        f"./data_clean/sub-{s_id:03d}/eeg/sub-{s_id:03d}_task-eyesclosed_eeg.set",
        normalize=False,
        remove_dc=False,
    )
    clean_data = np.vstack((clean_data, clean_single))

assert len(raw_data) == len(clean_data), "Raw and clean data are not of the same length"

save = True
if save:
    np.save("./data_npy/raw_data_validation_nonorm.npy", raw_data)
    np.save("./data_npy/clean_data_validation_nonorm.npy", clean_data)

Loading subject 2
Loading subject 44
Loading subject 70
Loading subject 84
