This file is used to transform raw EEG data(CNT format) into numpy arrays.

The output arrays will only contain the fragments during watching and imaging

If you alreadly have the numpy arrays(files in ./data/sliced_eeg/watching or ./data/sliced_eeg/imaging), you can skip this step, and start with [01_extract_PSD_DE.ipyn](./01_extract_PSD_DE.ipynb)

## CONFIGURATIONS

In [33]:
CONFIG = {
    # raw CNT data path, please put they under ./data/raw_eeg
    "CNT_PATH": './data/raw_eeg', 
    # ensure the below path exists
    "watching_save_path": './data/sliced_eeg/watching',
    "imaging_save_path": './data/sliced_eeg/imaging', 
    "frequency": 200
}

## Downsample and slice the EEG signal by videos

In [34]:
import mne
import numpy as np
from scipy.signal import resample

def downsample_data(data, downsample_rate=5):
    if downsample_rate ==5: 
        resample_len = data.shape[1] // 5
    elif downsample_rate == 2.5:
        resample_len = data.shape[1]*2 // 5
    else:
        raise ValueError(f"Downsample rate should be 2.5 or 5")

    resampled_data = resample(data, resample_len, axis=1)
    return resampled_data

def slice_by_video(raw, downsample_rate, max_length=505000):
    '''Slice the raw data by video and downsample the data.
    Args:
        raw: mne.Raw object
        max_length: int, the maximum length of the data
        downsample_rate: int, the downsample rate, ensure the data frequency is 200Hz
    Returns:
        slice_data: np.array, the sliced data, shape (5, 62, 505*200)
            5: the number of videos
            62: the number of channels
            505*200: 505 seconds per video, data frequency is 200Hz
    '''
    bad_channels = ['M1', 'M2', 'VEO', 'HEO']
    raw.drop_channels(bad_channels)

    # slice by events
    events, _ = mne.events_from_annotations(raw)
    # filter events by id
    events = np.array([i for i in events if i[2] == 1 or i[2] == 2])
    start_events = events[0::2, 0]
    end_events = events[1::2, 0]
    time_range = list(zip(start_events, end_events))
    max_length = max_length 
    slice_data = []
    for start, end in time_range:
        data = raw.get_data(start=start, stop=end)
        data = data[:, :max_length]
        resample_data = downsample_data(data, downsample_rate)
        slice_data.append(resample_data)
    slice_data = np.stack(slice_data, axis=0)
    return slice_data

## Slice the data by clips

In [35]:
def divide_by_clip(video_segments):
    '''Divide the video segments by clip
    Args:
        video_segments: np.array, the video segments, shape (5, 62, 505*200)
    Returns:
        watching_clip: the watching data corresponding to each video, shape is (5, 50, 62, 2*200)
            5: 5 videos, 50: 50 clips per video, 62: 62 electrodes, 400: 2*200: 2 seconds * 200 Hz
        imaging_clip: the imaging data corresponding to each video, shape is (5, 50, 62, 3*200)
            5: 5 videos, 50: 50 clips per video, 62: 62 electrodes, 600: 3*200: 3 seconds * 200 Hz
    '''
    watching_start = np.array([i+5 for i in range(0, 500, 10)])
    watching_range = zip(watching_start, watching_start+2)
    imaging_start = watching_start + 4
    imaging_range = zip(imaging_start, imaging_start+3)
    fre = CONFIG["frequency"]
    watching_clip = []
    for start, end in watching_range:
        watching_clip.append(video_segments[:, :, start*fre:end*fre])
    watching_clip = np.stack((watching_clip), axis=1)
    imaging_clip = []
    for start, end in imaging_range:
        imaging_clip.append(video_segments[:, :, start*fre:end*fre])
    imaging_clip = np.stack((imaging_clip), axis=1)
    return watching_clip, imaging_clip

## Start!

In [None]:
import os
from tqdm import tqdm

for file in tqdm(os.listdir(CONFIG["CNT_PATH"])):
    if file.endswith('.cnt'):
        cnt_file = os.path.join(CONFIG["CNT_PATH"], file)
        cnt_data = mne.io.read_raw_cnt(cnt_file)
        if file == 'zhangyiran_20250722_session3.cnt':
            video_segments = slice_by_video(cnt_data, downsample_rate=2.5)
        else:
            video_segments = slice_by_video(cnt_data, downsample_rate=5)
        watching_clip, imaging_clip = divide_by_clip(video_segments = video_segments)
        np.save(os.path.join(CONFIG["watching_save_path"], file.split('.')[0]+'_watching.npy'), watching_clip)
        np.save(os.path.join(CONFIG["imaging_save_path"], file.split('.')[0]+'_imaging.npy'), imaging_clip)

print("="*50, "\nAll files are done!")

  0%|          | 0/6 [00:00<?, ?it/s]

Used Annotations descriptions: [np.str_('1'), np.str_('2')]


  cnt_data = mne.io.read_raw_cnt(cnt_file)
 17%|█▋        | 1/6 [00:07<00:37,  7.42s/it]

Used Annotations descriptions: [np.str_('1'), np.str_('2')]


 33%|███▎      | 2/6 [00:14<00:27,  6.99s/it]

Used Annotations descriptions: [np.str_('1'), np.str_('2'), np.str_('255')]


 50%|█████     | 3/6 [00:20<00:20,  6.75s/it]

Used Annotations descriptions: [np.str_('1'), np.str_('2')]


 67%|██████▋   | 4/6 [00:27<00:13,  6.66s/it]

Used Annotations descriptions: [np.str_('1'), np.str_('2')]


  cnt_data = mne.io.read_raw_cnt(cnt_file)
 83%|████████▎ | 5/6 [00:33<00:06,  6.53s/it]

Used Annotations descriptions: [np.str_('1'), np.str_('2')]


  cnt_data = mne.io.read_raw_cnt(cnt_file)
100%|██████████| 6/6 [00:39<00:00,  6.63s/it]

All files are done!



