In [None]:
import numpy as np
import mne
from moabb.datasets import BNCI2014_001
from sklearn.preprocessing import StandardScaler
from scipy.signal import savgol_filter
from sklearn.metrics import accuracy_score, recall_score, precision_score
import tensorflow as tf

In [12]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer, Input, BatchNormalization, Activation, AveragePooling1D, Dropout, Flatten, Dense
from tensorflow.keras.models import Model

# 1. Basis Function: Fourier (사인/코사인 파형)
def fourier_basis_functions(q, length):
    x = np.linspace(0, 1, length)
    basis = [np.ones_like(x)]
    for k in range(1, q // 2 + 1):
        basis.append(np.sin(2 * np.pi * k * x))
        basis.append(np.cos(2 * np.pi * k * x))
    basis = np.stack(basis, axis=0)  # (q+1, length)
    return basis.astype(np.float32)

# 2. Functional Conv1D Layer (basis function 합성)
class FunctionalConv1D(Layer):
    def __init__(self, filters, basis_functions, **kwargs):
        super().__init__(**kwargs)
        self.filters = filters
        self.basis_functions = tf.constant(basis_functions)  # (q+1, kernel_size)
        self.q = basis_functions.shape[0]
        self.kernel_size = basis_functions.shape[1]

    def build(self, input_shape):
        # input: (batch, time, channels)
        self.in_channels = input_shape[-1]
        # 학습 파라미터: (in_channels, filters, q)
        self.weight = self.add_weight(
            shape=(self.in_channels, self.filters, self.q),
            initializer='glorot_uniform',
            trainable=True,
            name='basis_weights'
        )
        self.bias = self.add_weight(
            shape=(self.filters,),
            initializer='zeros',
            trainable=True,
            name='bias'
        )

    def call(self, inputs):
        # inputs: (batch, time, in_channels)
        basis = self.basis_functions  # (q, kernel_size)
        # basis function의 선형결합으로 커널 생성
        # kernel: (in_channels, filters, kernel_size)
        kernel = tf.einsum('ifq,qk->ifk', self.weight, basis)
        # 커널을 (in_channels, filters, kernel_size, 1)로 reshape (conv1d와 호환)
        kernel = tf.transpose(kernel, [1, 2, 0])  # (filters, kernel_size, in_channels)
        kernel = tf.transpose(kernel, [2, 1, 0])  # (in_channels, kernel_size, filters)
        kernel = tf.transpose(kernel, [2, 1, 0])  # (filters, kernel_size, in_channels)

        # Conv1D: 각 필터에 대해 (in_channels, kernel_size)로 conv
        # tf.nn.conv1d expects (batch, time, in_channels)
        outputs = []
        for f in range(self.filters):
            # (batch, time, in_channels) * (in_channels, kernel_size) conv
            filt = kernel[f, :, :]  # (kernel_size, in_channels)
            filt = tf.transpose(filt, [1, 0])  # (in_channels, kernel_size)
            # (batch, time, in_channels)와 (in_channels, kernel_size)로 conv1d
            out = tf.nn.conv1d(inputs, filt[None, :, :], stride=1, padding='SAME')
            outputs.append(out[..., 0] + self.bias[f])
        outputs = tf.stack(outputs, axis=-1)
        return outputs

# 3. FNN(40) 전체 모델 구현
def build_FNN40_basis(input_shape=(250, 22), num_classes=4, q=5):
    basis = fourier_basis_functions(q, kernel_size:=25)
    inputs = Input(shape=input_shape)  # (time, channels)
    x = FunctionalConv1D(40, basis)(inputs)  # (batch, time, 40)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)
    x = AveragePooling1D(pool_size=2)(x)
    x = Dropout(0.5)(x)

    # 두 번째 functional conv (EEG 채널 합성, spatial pooling)
    # 보통 (batch, time, filters) 형태로 맞추면 됨
    x = FunctionalConv1D(40, basis)(x)
    x = BatchNormalization()(x)
    x = Activation('elu')(x)
    x = AveragePooling1D(pool_size=5)(x)
    x = Dropout(0.5)(x)

    x = Flatten()(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs, outputs)
    return model


In [None]:
import numpy as np
import mne
from moabb.datasets import BNCI2014_001
from sklearn.preprocessing import StandardScaler
from scipy.signal import savgol_filter
from sklearn.metrics import accuracy_score, recall_score, precision_score
import tensorflow as tf
from pprint import pprint

# --- 1. BCI IV 2a 세션별 trial 전체 신호, 이벤트 정보 로딩 ---
def load_raw_trials(subject, session):
    dataset = BNCI2014_001()
    runs = dataset._get_single_subject_data(subject)[session]
    X_all, y_all = [], []
    sfreq = None

    print(f"\n[load_raw_trials_debug] Subject: {subject}, Session: {session}")
    print(f"  ▶ Runs available: {list(runs.keys())}")

    for run_key in runs.keys():
        print(f"\n  ▶▶ RUN {run_key}")
        raws = runs[run_key]

        if sfreq is None:
            sfreq = int(raws.info["sfreq"])
            print(f"  Sampling frequency (sfreq): {sfreq} Hz")

        print(f"  Raw data shape: {raws.get_data().shape} (channels, samples)")
        print(f"  Channel names: {raws.ch_names}")

        # ✅ stim 채널로부터 이벤트 추출
        events = mne.find_events(raws, stim_channel='stim', shortest_event=0, verbose=False)
        print(f"{len(events)} events found on stim channel 'stim'")
        print(f"Event IDs: {np.unique(events[:, 2])}")

        if len(events) == 0:
            print("    [Warning] No events found in this run.")
            continue

        # ✅ MOABB에서 1~4는 실제 movement (769~772) 매핑된 것
        movement_ids = [1, 2, 3, 4]  # left, right, feet, tongue
        movement_events = np.array([e for e in events if e[2] in movement_ids])

        if len(movement_events) == 0:
            print("    [Skipped] No movement events (1~4) in this run.")
            continue

        print(f"  Filtered movement events shape: {movement_events.shape}")
        print(f"  Unique kept movement labels: {np.unique(movement_events[:, 2])}")

        # ✅ event_id를 수동으로 지정
        event_id = {
            'left': 1,
            'right': 2,
            'feet': 3,
            'tongue': 4
        }

        # ✅ Epoch 생성
        epochs = mne.Epochs(raws, movement_events, event_id=event_id, tmin=0, tmax=8, baseline=None, preload=True)
        print(f"  Epochs shape: {epochs.get_data().shape} (trials, channels, time)")
        print(f"  Epoch event labels: {np.unique(epochs.events[:, 2])}")
        print(f"Original events found: {len(events)}")
        print(f"Epochs created: {len(epochs)}")
        print(f"Dropped: {len(events) - len(epochs)}")        # ✅ EEG 데이터 22채널만 사용
        X = epochs.get_data()[:, :22, :]
        y_events = epochs.events[:, 2]  # class: 1~4

        print(f"  X shape: {X.shape}, y_events shape: {y_events.shape}")
        print(f"  y_events (first 10): {y_events[:10]}")

        X_all.append(X)
        y_all.append(y_events)

    # ✅ 모든 run 통합
    if len(X_all) == 0:
        print("\n❌ No valid data collected. Check event triggers in stim channel.")
        return None, None, None

    X_all = np.concatenate(X_all, axis=0)
    y_all = np.concatenate(y_all, axis=0)
    print(f"\n[Finished] All X shape: {X_all.shape}, y_all shape: {y_all.shape}")
    print(f"           Unique y labels: {np.unique(y_all)}")

    return X_all, y_all, sfreq



# --- 2. 7-class 레이블 생성: 논문 trial 시간구간 정의에 맞춤 ---
def get_7class_labels(trial_event, sfreq, trial_len=8.0):
    labels = np.zeros(int(trial_len * sfreq), dtype=int)
    labels[:int(2*sfreq)] = 1
    labels[int(2*sfreq):int(2.5*sfreq)] = 3
    labels[int(2.5*sfreq):int(3*sfreq)] = 2

    # 🔁 1~4 (stim) → 4~7 (논문 class)로 매핑
    movement_class = 4 + (trial_event - 1)
    labels[int(3*sfreq):int(6*sfreq)] = movement_class
    labels[int(6*sfreq):] = 1
    return labels

# --- 3. 슬라이딩 윈도우 생성 + 라벨 (majority voting) ---
def make_sliding_windows(X, labels_7class, window_size=1.0, step_size=0.004, sfreq=250):
    win_len = int(window_size * sfreq)
    step = int(step_size * sfreq)
    windows, y_windows = [], []
    for trial_data, trial_labels in zip(X, labels_7class):
        for start in range(0, trial_data.shape[1] - win_len + 1, step):
            window = trial_data[:, start:start+win_len]
            # LLE smoothing + standardization
            window = savgol_filter(window, window_length=51, polyorder=3, axis=-1)
            window = StandardScaler().fit_transform(window.T).T
            window = window.T
            # majority voting label
            window_label = np.bincount(trial_labels[start:start+win_len]).argmax()
            windows.append(window)
            y_windows.append(window_label)
    return np.array(windows), np.array(y_windows)

# --- 4. tf.data.Dataset 생성 ---
def make_tf_dataset_generator(X, y, batch_size=32, repeat=False):
    import tensorflow as tf
    import numpy as np

    print(f"\n[make_tf_dataset_generator] Efficient tf.data.Dataset creation...")
    print(f"  Input X shape: {X.shape}, dtype: {X.dtype}")
    print(f"  Input y shape: {y.shape}, dtype: {y.dtype}")

    def generator():
        for xi, yi in zip(X, y):
            yield xi.astype(np.float32), int(yi)

    ds = tf.data.Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(X.shape[1], X.shape[2]), dtype=tf.float32),
            tf.TensorSpec(shape=(), dtype=tf.int64),
        )
    )

    ds = ds.batch(batch_size)
    if repeat:
        ds = ds.repeat()

    for xb, yb in ds.take(1):
        print(f"  ✅ Example batch X shape: {xb.shape}, y shape: {yb.shape}")
        print(f"  ✅ y batch unique values: {np.unique(yb.numpy())}")
    return ds


# --- 5. 실험 (9명 × 10회) ---
results = []
for subject in range(1, 10):   # 9명
    print(f"\n=== Subject {subject} ===")
    # 세션 1(train), 세션 2(test)
    X_train, y_events_train, sfreq = load_raw_trials(subject, "0train")
    X_test, y_events_test, _ = load_raw_trials(subject, "1test")
    # 7-class 레이블: movement 구간은 이벤트에 따라 각각(4~7)
    labels_7class_train = [get_7class_labels(ev, sfreq) for ev in y_events_train]
    labels_7class_test = [get_7class_labels(ev, sfreq) for ev in y_events_test]
    # 슬라이딩 윈도우 & 라벨
    Xw_train, yw_train = make_sliding_windows(X_train, labels_7class_train, sfreq=sfreq)
    Xw_test, yw_test = make_sliding_windows(X_test, labels_7class_test, sfreq=sfreq)
    print(f"Train windows: {len(Xw_train)}, Test windows: {len(Xw_test)}")

    # tf.data.Dataset
    train_ds = make_tf_dataset_generator(Xw_train, yw_train, batch_size=32, repeat=True)
    test_ds = make_tf_dataset_generator(Xw_test, yw_test, batch_size=32, repeat=False)

    for repeat in range(10):
        print(f"  Repeat {repeat+1}/10")
        model = build_FNN40_basis(input_shape=(250, 22), num_classes=8, q=5)  # num_classes=8 (1~7, +0 for any null)
        model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
        model.fit(train_ds, epochs=1, steps_per_epoch=16000, verbose=1)
        # 평가
        y_true, y_pred = [], []
        for x_batch, y_batch in test_ds:
            y_pred_batch = model.predict(x_batch, verbose=0)
            y_pred.append(np.argmax(y_pred_batch, axis=1))
            y_true.append(y_batch.numpy())
        y_pred = np.concatenate(y_pred)
        y_true = np.concatenate(y_true)
        acc = accuracy_score(y_true, y_pred)
        rec = recall_score(y_true, y_pred, average='macro', zero_division=0)
        prec = precision_score(y_true, y_pred, average='macro', zero_division=0)
        results.append([subject, repeat, acc, rec, prec])
        print(f"    ACC: {acc:.4f} / RECALL: {rec:.4f} / PRECISION: {prec:.4f}")

results = np.array(results)
for metric_idx, name in zip([2,3,4], ["ACC", "RECALL", "PRECISION"]):
    mean = results[:, metric_idx].astype(float).mean()
    std = results[:, metric_idx].astype(float).std()
    print(f"{name}: {mean:.4f} ± {std:.4f}")


2025-06-11 20:27:07.450342: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-06-11 20:27:07.469809: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-06-11 20:27:07.469831: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-06-11 20:27:07.470427: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-06-11 20:27:07.474001: I tensorflow/core/platform/cpu_feature_guar


=== Subject 1 ===

[load_raw_trials_debug] Subject: 1, Session: 0train
  ▶ Runs available: ['0', '1', '2', '3', '4', '5']

  ▶▶ RUN 0
  Sampling frequency (sfreq): 250 Hz
  Raw data shape: (26, 96735) (channels, samples)
  Channel names: ['Fz', 'FC3', 'FC1', 'FCz', 'FC2', 'FC4', 'C5', 'C3', 'C1', 'Cz', 'C2', 'C4', 'C6', 'CP3', 'CP1', 'CPz', 'CP2', 'CP4', 'P1', 'Pz', 'P2', 'POz', 'EOG1', 'EOG2', 'EOG3', 'stim']
48 events found on stim channel 'stim'
Event IDs: [1 2 3 4]
  Filtered movement events shape: (48, 3)
  Unique kept movement labels: [1 2 3 4]
Not setting metadata
48 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 48 events and 2001 original time points ...
1 bad epochs dropped
  Epochs shape: (47, 26, 2001) (trials, channels, time)
  Epoch event labels: [1 2 3 4]
Original events found: 48
Epochs created: 47
Dropped: 1
  X shape: (47, 22, 2001), y_events shape: (47,)
  y_events (first 10): [4 3 2 1 1 2 3 4 2 3]

KeyboardInterrupt: 

In [7]:
import numpy as np
import mne
from moabb.datasets import BNCI2014_001

subject = 1    # 예시로 1번
session = "0train"  # 또는 "1test"

dataset = BNCI2014_001()
runs = dataset._get_single_subject_data(subject)[session]

for run_key in runs.keys():
    raws = runs[run_key]
    print(f"\n=== RUN {run_key} ===")
    print(f"  Raw shape: {raws.get_data().shape} (channels, samples)")
    # 이벤트와 이벤트코드 확인
    events, event_id = mne.events_from_annotations(raws)
    print(f"  events: {events.shape}")
    print(f"  event_id: {event_id}")
    # 각 이벤트 하나씩 출력 (이벤트 인덱스, 샘플 위치, 코드)
    if events is not None and len(events) > 0:
        for i, e in enumerate(events):
            print(f"    event[{i}] - sample: {e[0]}, before: {e[1]}, code: {e[2]}")
    else:
        print("    [NO EVENTS]")

    # 실제 epochs 생성 가능 여부/shape 확인
    try:
        epochs = mne.Epochs(raws, events, event_id=None, tmin=0, tmax=8, baseline=None, preload=True)
        print(f"  epochs: {len(epochs)} (trials), shape: {epochs.get_data().shape}")
        if len(epochs) > 0:
            print(f"    epoch label(events[:,2]): {epochs.events[:,2]}")
    except Exception as e:
        print(f"  [Epoch ERROR] {e}")



=== RUN 0 ===
  Raw shape: (26, 96735) (channels, samples)
  events: (0, 3)
  event_id: {}
    [NO EVENTS]
  [Epoch ERROR] max() arg is an empty sequence

=== RUN 1 ===
  Raw shape: (26, 96735) (channels, samples)
  events: (0, 3)
  event_id: {}
    [NO EVENTS]
  [Epoch ERROR] max() arg is an empty sequence

=== RUN 2 ===
  Raw shape: (26, 96735) (channels, samples)
  events: (0, 3)
  event_id: {}
    [NO EVENTS]
  [Epoch ERROR] max() arg is an empty sequence

=== RUN 3 ===
  Raw shape: (26, 96735) (channels, samples)
  events: (0, 3)
  event_id: {}
    [NO EVENTS]
  [Epoch ERROR] max() arg is an empty sequence

=== RUN 4 ===
  Raw shape: (26, 96735) (channels, samples)
  events: (0, 3)
  event_id: {}
    [NO EVENTS]
  [Epoch ERROR] max() arg is an empty sequence

=== RUN 5 ===
  Raw shape: (26, 96735) (channels, samples)
  events: (0, 3)
  event_id: {}
    [NO EVENTS]
  [Epoch ERROR] max() arg is an empty sequence


In [9]:
import numpy as np
import mne
from moabb.datasets import BNCI2014_001

subject = 1
session = "0train"

dataset = BNCI2014_001()
runs = dataset._get_single_subject_data(subject)[session]

for run_key in runs.keys():
    raws = runs[run_key]
    print(f"\n=== RUN {run_key} ===")
    print(f"  Raw shape: {raws.get_data().shape} (channels, samples)")

    # stim 채널에서 이벤트 추출
    events = mne.find_events(raws, stim_channel='stim')
    print(f"  events: {events.shape}")
    if events.shape[0] > 0:
        for i, e in enumerate(events):
            print(f"    event[{i}] - sample: {e[0]}, before: {e[1]}, code: {e[2]}")
    else:
        print("    [NO EVENTS]")

    # 실제 epochs 생성 가능 여부/shape 확인
    try:
        epochs = mne.Epochs(raws, events, event_id=None, tmin=0, tmax=8, baseline=None, preload=True)
        print(f"  epochs: {len(epochs)} (trials), shape: {epochs.get_data().shape}")
        if len(epochs) > 0:
            print(f"    epoch label(events[:,2]): {epochs.events[:,2]}")
    except Exception as e:
        print(f"  [Epoch ERROR] {e}")



=== RUN 0 ===
  Raw shape: (26, 96735) (channels, samples)
48 events found on stim channel stim
Event IDs: [1 2 3 4]
  events: (48, 3)
    event[0] - sample: 250, before: 0, code: 4
    event[1] - sample: 2253, before: 0, code: 3
    event[2] - sample: 4171, before: 0, code: 2
    event[3] - sample: 6123, before: 0, code: 1
    event[4] - sample: 8131, before: 0, code: 1
    event[5] - sample: 10242, before: 0, code: 2
    event[6] - sample: 12159, before: 0, code: 3
    event[7] - sample: 14209, before: 0, code: 4
    event[8] - sample: 16140, before: 0, code: 2
    event[9] - sample: 18138, before: 0, code: 3
    event[10] - sample: 20044, before: 0, code: 1
    event[11] - sample: 21939, before: 0, code: 1
    event[12] - sample: 23911, before: 0, code: 1
    event[13] - sample: 25855, before: 0, code: 4
    event[14] - sample: 27822, before: 0, code: 2
    event[15] - sample: 29942, before: 0, code: 2
    event[16] - sample: 31950, before: 0, code: 1
    event[17] - sample: 34016,

In [5]:
dataset = BNCI2014_001()
subject = 1  # 아무 subject 번호
data = dataset._get_single_subject_data(subject)
print(data.keys())           # ['0train', '1test']
print(data['0train'].keys()) # run 이름들(예: ['1', '2', ...] 혹은 ['run_0', ...])
print(data['1test'].keys())


dict_keys(['0train', '1test'])
dict_keys(['0', '1', '2', '3', '4', '5'])
dict_keys(['0', '1', '2', '3', '4', '5'])


In [18]:
for run_key in runs.keys():
    raws = runs[run_key]
    events = mne.find_events(raws, stim_channel='stim', shortest_event=0)
    print(f"run {run_key} events shape: {events.shape}")


48 events found on stim channel stim
Event IDs: [1 2 3 4]
run 0 events shape: (48, 3)
48 events found on stim channel stim
Event IDs: [1 2 3 4]
run 1 events shape: (48, 3)
48 events found on stim channel stim
Event IDs: [1 2 3 4]
run 2 events shape: (48, 3)
48 events found on stim channel stim
Event IDs: [1 2 3 4]
run 3 events shape: (48, 3)
48 events found on stim channel stim
Event IDs: [1 2 3 4]
run 4 events shape: (48, 3)
48 events found on stim channel stim
Event IDs: [1 2 3 4]
run 5 events shape: (48, 3)


In [17]:
print(raws.info["ch_names"])

['Fz', 'FC3', 'FC1', 'FCz', 'FC2', 'FC4', 'C5', 'C3', 'C1', 'Cz', 'C2', 'C4', 'C6', 'CP3', 'CP1', 'CPz', 'CP2', 'CP4', 'P1', 'Pz', 'P2', 'POz', 'EOG1', 'EOG2', 'EOG3', 'stim']
