In [1]:
# 데이터 불러오기 부터 전반적인 논문구현 시범

import os
from glob import glob

import numpy as np

import torch

from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

from braindecode.datasets.moabb import MOABBDataset
from braindecode.datautil.preprocess import exponential_moving_standardize
from braindecode.datautil.preprocess import MNEPreproc
from braindecode.datautil.preprocess import NumpyPreproc
from braindecode.datautil.preprocess import preprocess
from braindecode.datautil.windowers import create_windows_from_events

import sys

from matplotlib import pyplot as plt

import scipy.io as sio

In [2]:
def print_off():
    sys.stdout = open(os.devnull, 'w')
def print_on():
    sys.stdout = sys.__stdout__
    
def bcic4_2a(subject, low_hz=None, high_hz=None):
    X = []
    y = []
    if isinstance(subject, int):
        subject = [subject]
    for subject_id in subject:
        # Load data
        #print_off()
        dataset = MOABBDataset(dataset_name="BNCI2014001", subject_ids=[subject_id])
        # Preprocess
        factor_new = 1e-3
        init_block_size = 1000
        preprocessors = [
            # keep only EEG sensors
            MNEPreproc(fn='pick_types', eeg=True, meg=False, stim=False, eog=False),
            # convert from volt to microvolt
            NumpyPreproc(fn=lambda x: x * 1e+06),
            # bandpass filter
            MNEPreproc(fn='filter', l_freq=low_hz, h_freq=high_hz),
            # exponential moving standardization
            NumpyPreproc(fn=exponential_moving_standardize, factor_new=factor_new,
                         init_block_size=init_block_size)
        ]
        preprocess(dataset, preprocessors)
        # Divide data by trial
        # - Check sampling frequency
        sfreq = dataset.datasets[0].raw.info['sfreq']
        assert all([ds.raw.info['sfreq'] == sfreq for ds in dataset.datasets])
        trial_start_offset_seconds = -0.5
        trial_start_offset_samples = int(trial_start_offset_seconds * sfreq)
        windows_dataset = create_windows_from_events(
            dataset,
            trial_start_offset_samples=trial_start_offset_samples,
            trial_stop_offset_samples=0,
            preload=True
            # verbose=True
        )
        
        # Merge subject
        for trial in windows_dataset:
            X.append(trial[0])
            y.append(trial[1])
            
    #print_on()
    return np.array(X), np.array(y)

#label 선택할때 필요한거
def label_selection(X, y, labels):
    idx = [0]
    for label in labels:
        if len(idx) == 1:
            idx = (y == label)
        else:
            idx += (y == label)
    X = X[idx]
    y = y[idx]
    for mapping, label in enumerate(np.unique(y)):
        y[y == label] = mapping
        
    return X, y

# 데이터 및 초기값 설정

class CustomDataset(Dataset):
    
    def __init__(self, data):
        
        super(CustomDataset, self).__init__()
        
        X = data[0]
        y = data[1]
        
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y)

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
X, y = bcic4_2a(train_subject)

train_X, train_y = label_selection(X,y,[0,1])

In [None]:
data_X = {'train_X' : train_X}
data_y = {'train_y' : train_y}

sio.savemat('train_X.mat', data_X)
sio.savemat('train_y.mat', data_y)

In [None]:
X, y = bcic4_2a(val_subject)

test_X, test_y = label_selection(X,y,[0,1])

In [None]:
data_X = {'test_X' : test_X}
data_y = {'test_y' : test_y}

sio.savemat('test_X.mat', data_X)
sio.savemat('test_y.mat', data_y)