In [1]:
import pathlib
import mne
import numpy as np
import pandas as pd

# 專案根目錄（與 notebooks 同級）
ROOT     = pathlib.Path(r'D:\COGBCI')
PREP_DIR = ROOT / 'data' / 'preproc'    # 存放 *-epo.fif 的資料夾
RES_DIR  = ROOT / 'results'
RES_DIR.mkdir(exist_ok=True)

# P300 要用的通道與時間窗
P300_CHANNELS = ['Cz', 'Pz']  # 視資料實際通道命名而定
P300_TMIN = 0.300  # 300 ms
P300_TMAX = 0.500  # 500 ms

# θ‐power 要用的通道與頻段
THETA_CHANNELS = ['Fz', 'FCz', 'Pz']
THETA_FREQ_BAND = (4, 7)   # 4–7 Hz

In [2]:
#  epoch Naming sub-XX_ses-SX_0b-epo.fif、1b-epo.fif、2b-epo.fif
epoch_paths = sorted(PREP_DIR.glob('*-epo.fif'))
len(epoch_paths)  # 應該是 261

180

In [3]:
# 隨機挑一個檔案看結構 Testing
example_path = epoch_paths[0]
epochs = mne.read_epochs(example_path, preload=True, verbose=False)
print(epochs)
# 應該看到 (n_epochs, n_channels, n_times) 以及 event_id 字典
# 檢查通道是否包含 P300_CHANNELS 與 THETA_CHANNELS
print("Channels:", epochs.ch_names)

<EpochsFIF | 201 events (all good), -0.2 – 0.8 s (baseline -0.2 – 0 s), ~47.7 MiB, data loaded,
 '601': 0
 '6011': 6
 '6012': 3
 '6021': 96
 '6022': 48
 '6031': 1
 '6032': 47
 'boundary': 0>
Channels: ['Fp1', 'Fz', 'F3', 'F7', 'FT9', 'FC5', 'FC1', 'C3', 'T7', 'CP5', 'CP1', 'Pz', 'P3', 'P7', 'O1', 'Oz', 'O2', 'P4', 'P8', 'TP10', 'CP6', 'CP2', 'FCz', 'C4', 'T8', 'FT10', 'FC6', 'FC2', 'F4', 'F8', 'Fp2', 'AF7', 'AF3', 'AFz', 'F1', 'F5', 'FT7', 'FC3', 'C1', 'C5', 'TP7', 'CP3', 'P1', 'P5', 'PO7', 'PO3', 'POz', 'PO4', 'PO8', 'P6', 'P2', 'CPz', 'CP4', 'TP8', 'C6', 'C2', 'FC4', 'FT8', 'F6', 'AF8', 'AF4', 'F2']


In [8]:
def extract_p300(epochs, channels, tmin, tmax):
    #"""
    #從 Epochs 物件抽出 P300 特徵：
    #- channels: list of channel names, e.g. ['Cz','Pz']
    #- tmin, tmax: time window in seconds, e.g. 0.300, 0.500
    #回傳：num_trials × len(channels) 的 numpy array，內容是各 trial 各通道平均振幅
    #""'

    # 1. 取出資料陣列 (n_epochs, n_channels, n_times)
    data_dict = {}
    # 2. 找到 300–500 ms 在 time 索引的位置
    times = epochs.times
    idx_min = np.argmin(np.abs(times - tmin))
    idx_max = np.argmin(np.abs(times - tmax))
    # 3. 取出該時間窗的資料，並對 axis=2 做平均
    for ch in channels:
        if ch in epochs.ch_names:
            vals = epochs.get_data(picks=[ch])[:, 0, idx_min:idx_max].mean(axis=1)
            data_dict[ch] = vals
        else:
            # 該檔沒有這個通道，就回傳全 NaN
            data_dict[ch] = np.full((len(epochs),), np.nan)
    # 組成 n_epochs×n_channels 的 array
    arr = np.column_stack([data_dict[ch] for ch in channels])
    return arr

In [14]:
import numpy as np

def extract_theta_power(epochs, channels, fmin, fmax):
    """
    從 Epochs 物件抽出 theta (4–7 Hz) 功率特徵：
    - channels: list of channel names, e.g. ['Fz','FCz','Pz']
    - fmin, fmax: 頻段下限與上限，e.g. 4, 7
    回傳：num_trials × len(channels) 的 numpy array，內容是各 trial 各通道的平均功率
    """
    # 先找出該 Epochs 中實際存在的通道
    avail = [ch for ch in channels if ch in epochs.ch_names]
    n_epochs = len(epochs)
    n_chans  = len(channels)

    # 如果一個都不存在，直接回傳全 NaN
    if len(avail) == 0:
        return np.full((n_epochs, n_chans), np.nan)

    # 只對存在的通道做 compute_psd
    # compute_psd(method='welch', ...) 會回傳一個 PSD-Epochs 物件
    psd_epochs = epochs.copy().pick_channels(avail).compute_psd(
        method='welch',
        fmin=fmin,
        fmax=fmax,
        n_fft=int(epochs.info['sfreq'] * 0.5),
        n_overlap=0,
        verbose=False
    )
    # psd_epochs.get_data() 形狀 = (n_epochs, len(avail), n_freqs)
    psds   = psd_epochs.get_data()
    # psd_epochs.freqs 是對應的頻率陣列 shape = (n_freqs,)
    # 直接算出平均功率 (axis=2 對頻率平均)
    theta_mean_avail = psds.mean(axis=2)  # (n_epochs, len(avail))

    # 最後把結果放回「所有 channels」的順序，缺的通道設為 NaN
    out = np.full((n_epochs, n_chans), np.nan)
    for i, ch in enumerate(channels):
        if ch in avail:
            j = avail.index(ch)
            out[:, i] = theta_mean_avail[:, j]

    return out

In [15]:
records = []  # 存放所有 trial 的資訊
for epo_path in epoch_paths:
    # 範例路徑： D:/COGBCI/data/preproc/sub-01_ses-S1_0b-epo.fif
    fname = epo_path.name
    parts = fname.split('_')  # ['sub-01', 'ses-S1', '0b-epo.fif']
    sub = parts[0]            # 'sub-01'
    ses = parts[1]            # 'ses-S1'
    cond = parts[2].split('-')[0]  # '0b' 或 '1b' 或 '2b'
    
    # 讀取 Epochs
    epochs = mne.read_epochs(epo_path, preload=True, verbose=False)
    
    # 提取 P300 與 θ
    p300_vals = extract_p300(epochs, P300_CHANNELS, P300_TMIN, P300_TMAX)
    # 用 nanmean 只對非 NaN 值取平均
    p300_mean = np.nanmean(p300_vals, axis=1)
    theta_vals = extract_theta_power(epochs, THETA_CHANNELS, *THETA_FREQ_BAND)
    theta_mean = np.nanmean(theta_vals, axis=1)
    # 把每個 trial 的資訊加到 records
    for i in range(len(epochs)):
        rec = {
            'sub': sub,
            'ses': ses,
            'cond': cond,  # '0b','1b','2b'
            # P300 可取 Cz 和 Pz 平均值，也可分開，這裡分開存
            'p300_Cz': p300_vals[i, P300_CHANNELS.index('Cz')],
            'p300_Pz': p300_vals[i, P300_CHANNELS.index('Pz')],
            # theta_power 也分開存 Fz, FCz, Pz
            'theta_Fz':  theta_vals[i, THETA_CHANNELS.index('Fz')],
            'theta_FCz': theta_vals[i, THETA_CHANNELS.index('FCz')],
            'theta_Pz':  theta_vals[i, THETA_CHANNELS.index('Pz')],
            # 也可以加上 trial number, reaction time (若存於 epochs.metadata)
            'trial': i + 1
        }
        records.append(rec)

# 整理成 DataFrame，再算各 trial 的「平均 P300」（Cz & Pz）與「平均 θ」（Fz/FCz/Pz）
df = pd.DataFrame(records)
df['p300_mean'] = df[['p300_Cz', 'p300_Pz']].mean(axis=1)
df['theta_mean'] = df[['theta_Fz', 'theta_FCz', 'theta_Pz']].mean(axis=1)

# 最後只保留子集欄位：sub, ses, cond, trial, p300_mean, theta_mean
df_features = df[['sub', 'ses', 'cond', 'trial', 'p300_mean', 'theta_mean']].copy()
df_features.to_csv(RES_DIR / 'features_all.csv', index=False)

print("✅ 特徵擷取完成，已存到 features_all.csv")

NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy functi