In [1]:
# Reload modules
%load_ext autoreload
%autoreload 2

import scipy.io as sio
import os

In [2]:
data_path = os.path.join("data")

In [3]:
# beta_path = os.path.join(data_path, "BETA")
# subject_files = {}

# import re
# # List the files with .mat suffix
# mat_files = list(filter(lambda x: re.search(".mat", x) != None, os.listdir(beta_path)))

# for file_name in mat_files:
#     file_path = os.path.join(beta_path, file_name)
#     subject_files[file_name[1:-4]] = sio.loadmat(os.path.join(beta_path, file_name))

In [4]:
import re

class BETALoader:
    # BETA dataset loader class 
    # dataset contains two entities which are "EEG" and "suppl_info".
    # EEG data shape: 64 x 750 or 1000 x 4 x 40, channels x samples x frequencies x trail
    # suppl_info entity contains planty of sub-entities
    #   > suppl_info.sub
    #   > suppl_info.age
    #   > suppl_info.gender
    #   > suppl_info.chan
    #   > suppl_info.freqs
    #   > suppl_info.phases
    #   > suppl_info.bci_quotient
    #   > suppl_info.wide_snr
    #   > suppl_info.narrow_snr
    #   > suppl_info.srate

    def __init__(
        self,
        beta_path = "BETA"
    ) -> None:
        self.beta_path = os.path.join(data_path, beta_path)
        self.subject_files = {}
    
    def get_trail(self, subject_no, trail_no):
        return self.get(subject_no)["data"]["EEG"][0][0][:, :, :, trail_no]

    def list_subjects(self):
        mat_files = list(filter(lambda x: re.search(".mat", x) != None, os.listdir(self.beta_path)))
        return list(map(lambda x: int(x[1:-4]), mat_files))

    def __load_subject(self, subject_no):
        self.subject_files[subject_no] = sio.loadmat(os.path.join(self.beta_path, f"S{subject_no}.mat"))

    def get(self, subject_no):
        if self.subject_files.get(subject_no) == None:
            self.__load_subject(subject_no)
        return self.subject_files[subject_no]


In [5]:
import math

def buffer(data, duration, data_overlap):
    '''
    Returns segmented data based on the provided input window duration and overlap.

    Args:
        data (numpy.ndarray): array of samples. 
        duration (int): window length (number of samples).
        data_overlap (int): number of samples of overlap.

    Returns:
        (numpy.ndarray): segmented data of shape (number_of_segments, duration).
    '''
    
    number_segments = int(math.ceil((len(data) - data_overlap)/(duration - data_overlap)))
    temp_buf = [data[i:i+duration] for i in range(0, len(data), (duration - int(data_overlap)))]
    temp_buf[number_segments-1] = np.pad(temp_buf[number_segments-1],
                                         (0, duration-temp_buf[number_segments-1].shape[0]),
                                         'constant')
    segmented_data = np.vstack(temp_buf[0:number_segments])

    return segmented_data

def data_segmentation(data, window_len, step_len, sample_rate):
    """
    Divide the dataset into segmented epochs

    data: (trails, channels, time_series, blocks)
    window_len: time gap for each epoch
    step_len: iteration distance between epochs
    """
    
    num_trails, num_channel, num_time_point, num_block = data.shape
    sample_count = int(window_len * sample_rate)
    sample_overlap = int((window_len - step_len) * sample_rate)

    number_of_segments = int(math.ceil((num_time_point - sample_overlap) / (sample_count - sample_overlap)))
    segmented_data = np.zeros((num_trails, num_channel,
        num_block, number_of_segments, sample_count))
    
    for target in range(0, num_trails):
        for channel in range(0, num_channel):
            for block in range(0, num_block):
                segmented_data[target, channel, block, :, :] = buffer(data[target, channel, :, block], sample_count, sample_overlap)

    return segmented_data

In [6]:
beta_loader = BETALoader()

In [7]:
import numpy as np

s1 = beta_loader.get(16)

eeg = np.array(s1["data"]["EEG"][0][0], dtype=np.float32)
supplemental = s1["data"]["suppl_info"]

In [8]:
eeg.shape

(64, 1000, 4, 40)

In [9]:
beta_loader.get_trail(16, 3)

def labelize(subject_no):
    labels = np.array(range(1, 41), dtype=np.int32)
    data = np.array([beta_loader.get_trail(subject_no, x - 1) for x in labels], dtype=np.float32)
    return data, labels

data, labels = labelize(16)

print(data.shape, labels.shape)

segmented_data = data_segmentation(data, 1, 1, 250)
labels
segmented_data.shape

(40, 64, 1000, 4) (40,)


(40, 64, 4, 4, 250)

In [11]:
import importlib
from bcilib.ssvep_utils_pytorch_test import CNN, RasterizeSlice, CustomTensorDataset
from torchsummary import summary

input_shape = (4, 64, 4, 4, 250)
summary(
    CNN().network.cuda(),
    input_shape
)

RuntimeError: Failed to run torchsummary. See above stack traces for more details. Executed layers up to: []