In [2]:
from scipy.io import loadmat
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
"""
# this block is for getting familiar with the structure of the dataset

# taking eeg_record1.mat for example
mat = loadmat('/EEG Data/eeg_record1.mat')

#mat
#mat['o']
#mat['o']['data']
#mat['o']['data'][0]
#mat['o']['data'][0][0]
mat['o']['data'][0][0].shape   #(308868, 25) = 308868 samples * 25 channels
"""

# **Dataset Preparation**

In [4]:
# frequency = 128Hz
fs = 128

# 5 participants
n_subjects = 5

# samples of the first 10 minutes are for 'focused'
# 10~20 for 'unfocused'
# 20~ for 'drowsed'
mkpt1 = int(fs*60*10)
mkpt2 = int(fs*60*20)
mkpt3 = int(fs*60*30)

In [5]:
# according to the paper, each participant took part in 7 experiments (except for the 5th participant who only participated in 6),
# but the first two were for getting familiar with the experiment, so only the last 5 were considered
subject_map = {1: [3, 4, 5, 6, 7], 2: [10, 11, 12, 13, 14], 3: [17, 18, 19, 20, 21], 4: [24, 25, 26, 27, 28], 5: [31, 32, 33, 34]}

# names of useful EEG channels (14 out of 25 channels)
channels = ['F7','F3','P7','O1','O2','P8','AF4']
c_list = [4, 5, 8, 9, 10, 11, 16]

In [6]:
dir = "/EEG Data/"

# for each participant:
for s in range(1, n_subjects+1):
  data = {}
  data['channels'] = channels
  data['fs'] = fs

  # for each experiment the participant has been engaged in:
  for i, record in enumerate(subject_map[s]):
    exp = {}

    exp_data = loadmat(dir+f'eeg_record{record}.mat')
    # only select channels [4, 5, 8, 9, 10, 11, 16]
    eeg = exp_data['o']['data'][0][0][:, c_list]
    # print(eeg.shape)

    # split the data into 'focused', 'unfocused', and 'drowsed' phases
    exp['focused'] = eeg[:mkpt1]
    exp['unfocused'] = eeg[mkpt1:mkpt2]
    exp['drowsed'] = eeg[mkpt2:mkpt3]

    data[f'exp_{i+1}'] = exp

  # save the data of this participant
  with open(f'subject_{s}.pkl', 'wb') as f:
    pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)

# **Visualization**

In [None]:
# load the data of the first participant for example
with open('subject_1.pkl', 'rb') as f:
  data = pickle.load(f)
data

In [None]:
# plot the EEG graph of 'exp_1' during the 'focused' phase for inspection
fig, ax = plt.subplots(7,1)
fig.set_figwidth(20)
fig.set_figheight(40)

num_samples = data['exp_1']['focused'].shape[0]
time = np.arange(num_samples) / fs

for c in range(7):
  channel = data['exp_1']['focused'][:, c]
  ax[c].plot(time, channel)
  ax[c].set_xlabel('time (s)')
  ax[c].set_ylabel(f'Channel {channels[c]}')
  ax[c].set_title(channels[c])

plt.show()

# **Data Analysis**

In [9]:
subject = 1

with open(f'subject_{subject}.pkl', 'rb') as f:
  data = pickle.load(f)

phases = ['focused', 'unfocused', 'drowsed']

Lag Plot: 輸出資料分布看是否適合使用線性模型

In [None]:
# 輸出受試者的3個phase的7個channel資料
## note: change this parameter to test different lags
lag = 1

for exp_num in range(len(subject_map[subject])):
  for phase in phases:
    phase_data = data[f'exp_{exp_num+1}'][phase]
    #print(phase_data.shape)
    for i in range(phase_data.shape[1]):
      data_series = pd.Series(phase_data[:, i])
      #print("data: ", data_series.shape)
      pd.plotting.lag_plot(data_series, lag=lag)
      plt.show()

ADF Test: 判斷資料是否平穩 (p_value <= 0.05)

In [None]:
from statsmodels.tsa.stattools import adfuller

def adf_test(series, title=''):
    print(f'Augmented Dickey-Fuller Test: {title}')
    result = adfuller(series.dropna(), autolag='AIC') # .dropna() handles differenced data
    labels = ['ADF test statistic', 'p-value', 'Number of lags used', 'Number of observations used']
    out = pd.Series(result[0:4], index=labels)
    for k, v in result[4].items():
        out[f'critical value ({k})'] = v

    #print(out)
    if result[1] <= 0.05:  # 有顯著性，推翻虛無假設
        print("Data has no unit root and is stationary")
        return 1
    else:
        print("Data has a unit root and is non-stationary")
        return 0

for exp_num in range(len(subject_map[subject])):
  for i, phase in enumerate(phases):
    phase_data = data[f'exp_{exp_num+1}'][phase]
    # print(phase_data[:, 0].shape)
    for c in range(phase_data.shape[1]):
      data_series = pd.DataFrame(phase_data[:, c])
      adf_test(data_series, title=i)
      print()

# **Feature Extraction**  
4 methods

In [14]:
from scipy.signal import get_window

# 以8秒為單位切割
segment_length = 8*fs
# 相鄰切割視窗重疊量
overlap = segment_length - int(fs * 0.5)
# 窗函數
window = get_window('blackman', segment_length)

phases = ['focused', 'unfocused', 'drowsed']

## **Method1 - Standard Statistics**
Standard Deviation + Skewness + Kurtosis

In [15]:
from scipy.stats import kurtosis, skew

def Stats(subject, data):
  processed_data = []
  labels = []

  for exp_num in range(len(subject_map[subject])):
    for i, phase in enumerate(phases):
      phase_data = data[f'exp_{exp_num+1}'][phase]
      time_points = len(phase_data)

      for start in range(0, time_points-segment_length+1, segment_length-overlap):
        end = start+segment_length
        segment = phase_data[start:end, :]

        deviation = np.std(segment, axis=0)
        skewness = skew(segment, axis=0)
        kurtosis_data = kurtosis(segment, axis=0)
        concatenated_data = np.concatenate((deviation, skewness, kurtosis_data))

        processed_data.append(concatenated_data)
        labels.append(i)

  return np.array(processed_data), np.array(labels)

## **Method2 - STFT**

In [16]:
from scipy.signal import stft


def STFT(subject, data):
  processed_data = []
  labels = []

  for exp_num in range(len(subject_map[subject])):
    for i, phase in enumerate(phases):
      phase_data = data[f'exp_{exp_num+1}'][phase]

      all_c_data = []
      for c in range(len(data['channels'])):
      # 計算每個channel的STFT
        f, t, Zxx = stft(phase_data[:, c], fs=fs, window=window, nperseg=segment_length, noverlap=overlap)
        all_c_data.append((np.abs(Zxx))**2)

      # 堆疊all_c_data list為矩陣
      all_c_data = np.concatenate(all_c_data, axis=0)
      # 切割每個時間步
      for t in range(all_c_data.shape[1]):
        feature = all_c_data[:, t]
        vector = 10*np.log(feature+1e-10)

        processed_data.append(vector)
        labels.append(i)

  return np.array(processed_data), np.array(labels)

## **Method3- parametric spectral estimation**
VAR model (多變數)

In [17]:
from statsmodels.tsa.api import VAR

def Var(subject, data):
  processed_data = []
  labels = []
  lag = 1

  for exp_num in range(len(subject_map[subject])):
    for i, phase in enumerate(phases):
      phase_data = data[f'exp_{exp_num+1}'][phase]
      time_points = len(phase_data)

      for start in range(0, time_points-segment_length+1, segment_length-overlap):
        end = start+segment_length
        segment = phase_data[start:end, :]

        model = VAR(segment)
        results = model.fit(lag)   # best: 44(11.75)
        # print(results.summary())
        # 提取係數、截距、殘差
        coefficients, intercepts, residuals = results.params, results.intercept, results.resid
        residuals = np.pad(residuals, ((lag, 0), (0, 0)), mode='constant', constant_values=0)
        coefficients = coefficients.reshape(-1)   # (8, 7) -> (56,)
        coefficients = np.tile(coefficients, (len(segment), 1))
        concat_data = np.concatenate((coefficients, residuals), axis=1) # (128, 63)
        concat_data = np.mean(concat_data, axis=0)
        c = len(concat_data)
        concat_data = np.reshape(concat_data, (1, c)) # (1, 128, 63)

        processed_data.append(concat_data)
        labels.append(i)

  processed_data = np.concatenate(processed_data, axis=0)

  return np.array(processed_data), np.array(labels)

## **Method4- Barlett**
channel分開

In [18]:
from scipy.signal import welch
from scipy.integrate import simps
import seaborn as sns

def Welch(subject, data):
  processed_data = []
  labels = []
  feature = 5

  for exp_num in range(len(subject_map[subject])):
    for i, phase in enumerate(phases):
      phase_data = data[f'exp_{exp_num+1}'][phase]
      time_points = len(phase_data)

      for start in range(0, time_points-segment_length+1, segment_length-overlap):
        features_channels = np.empty((feature, 0))
        end = start+segment_length
        segment = phase_data[start:end, :]

        # 對每個channel計算 Welch 方法的功率谱密度
        for channel in range(segment.shape[1]):
            freqs, psd = welch(segment[:, channel], fs=fs, nperseg=segment_length)
            # Frequency resolution
            freq_res = freqs[1] - freqs[0]  # = 1 / 4 = 0.25
            total_power = simps(psd, dx=freq_res)
            delta_power = simps(psd[(freqs >= 0.5) & (freqs < 4)], dx=freq_res) / total_power
            theta_power = simps(psd[(freqs >= 4) & (freqs < 8)], dx=freq_res) / total_power
            alpha_power = simps(psd[(freqs >= 8) & (freqs < 13)], dx=freq_res) / total_power
            beta_power = simps(psd[(freqs >= 13) & (freqs < 30)], dx=freq_res) / total_power
            gamma_power = simps(psd[(freqs >= 30)], dx=freq_res) / total_power
            features = np.array([delta_power, theta_power, alpha_power, beta_power, gamma_power])  # 有加入delta
            # features = np.array([theta_power, alpha_power, beta_power, gamma_power])      # 不加入delta
            # print("feature: ", features)
            features = features.reshape(-1, 1)
            # print("feature: ", features.shape)

            features_channels = np.concatenate((features_channels, features), axis=1)
            # print("scale feature: ", features_channels.shape)

        labels.append(i)
        processed_data.append(features_channels)

  processed_data = np.array(processed_data)
  concat_processed_data = processed_data.reshape(processed_data.shape[0], processed_data.shape[1] * processed_data.shape[2])
  concat_processed_data = np.array(concat_processed_data)
  labels = np.array(labels)
  #print("labels: ", len(labels))
  #print("process: ", len(processed_data))
  return concat_processed_data, labels

# **Testing**
Show the results of one subject. (Subject-Specific paradigm)

In [22]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

# Subject-Specific paradigm:
# train the classifier individually for each participant based on the data collected for that participant

## note: change this parameter for inspecting other participants
subject = 1

with open(f'subject_{subject}.pkl', 'rb') as f:
  data = pickle.load(f)

In [None]:
# method 1
processed_data, labels = Stats(subject, data)  # processed_data.shape = (17775, 21)
train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

svm = SVC(C=40.0, kernel='rbf', gamma='scale')
print("------\nMethod 1\n------")

cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
print("Cross-Validation scores:", cv_scores)
print("Average Cross-Validation score:", np.mean(cv_scores))

svm.fit(train_x,train_y)
train_score = svm.score(train_x, train_y)
test_score = svm.score(test_x, test_y)

print('\nTraining score:', train_score)
print('Testing score:', test_score)
md1 = [round(train_score, 2), round(test_score, 2)]

In [None]:
# method 2
# 初始化PCA，並指定要保留的主成分數量
pca = PCA(n_components=200)

processed_data, labels = STFT(subject, data)  # processed_data.shape = (18015, 252)

# 對數據進行PCA
processed_data = pca.fit_transform(processed_data)
train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

svm = SVC(C=5.0, kernel='rbf', gamma='scale')
print("------\nMethod 2\n------")

cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
print("Cross-Validation scores:", cv_scores)
print("Average Cross-Validation score:", np.mean(cv_scores))

svm.fit(train_x,train_y)
train_score = svm.score(train_x, train_y)
test_score = svm.score(test_x, test_y)

print('\nTraining score:', train_score)
print('Testing score:', test_score)
md2 = [round(train_score, 2), round(test_score, 2)]

In [None]:
# method 3
processed_data, labels = Var(subject, data)
train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

svm = SVC(C=40.0, kernel='rbf', gamma='scale')
print("------\nMethod 3\n------")

cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
print("Cross-Validation scores:", cv_scores)
print("Average Cross-Validation score:", np.mean(cv_scores))

svm.fit(train_x,train_y)
train_score = svm.score(train_x, train_y)
test_score = svm.score(test_x, test_y)

print('\nTraining score:', train_score)
print('Testing score:', test_score)
md3 = [round(train_score, 2), round(test_score, 2)]

In [None]:
# method 4
processed_data, labels = Welch(subject, data)
train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

svm = SVC(C=20.0, kernel='rbf', gamma='scale')
print("------\nMethod 4\n------")

cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
print("Cross-Validation scores:", cv_scores)
print("Average Cross-Validation score:", np.mean(cv_scores))

svm.fit(train_x,train_y)
train_score = svm.score(train_x, train_y)
test_score = svm.score(test_x, test_y)

print('\nTraining score:', train_score)
print('Testing score:', test_score)
md4 = [round(train_score, 2), round(test_score, 2)]

In [None]:
labels = ['Training', 'Testing']
x = np.arange(len(labels))
width = 0.2  # 柱狀圖的寬度

fig, ax = plt.subplots()

# 繪製四組資料的柱狀圖
rects1 = ax.bar(x - 3*width/2, md1, width, label='Stats', color='skyblue')
rects2 = ax.bar(x - width/2, md2, width, label='STFT', color='lightgreen')
rects3 = ax.bar(x + width/2, md3, width, label='VAR', color='lightpink')
rects4 = ax.bar(x + 3*width/2, md4, width, label='PSD', color='gold')

ax.set_xlabel('Accuracy')
ax.set_title(f'Comparison of Four Methods for subject {subject}')
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.legend()

# 顯示圖表
plt.show()

# **Results**
Show the testing accuracy for all subjects using
different feature extraction methods.

In [None]:
md1 = []
md2 = []
md3 = []
md4 = []

for s in range(5):
  subject = s+1

  with open(f'subject_{subject}.pkl', 'rb') as f:
    data = pickle.load(f)

  # method 1
  processed_data, labels = Stats(subject, data)  # processed_data.shape = (17775, 21)
  train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

  svm = SVC(C=40.0, kernel='rbf', gamma='scale')
  print("------\nMethod 1\n------")

  cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
  print("Cross-Validation scores:", cv_scores)
  print("Average Cross-Validation score:", np.mean(cv_scores))

  svm.fit(train_x,train_y)
  train_score = svm.score(train_x, train_y)
  test_score = svm.score(test_x, test_y)

  print('\nTraining score:', train_score)
  print('Testing score:', test_score)
  md1.append(round(test_score, 2))


  # method 2
  pca = PCA(n_components=200)

  processed_data, labels = STFT(subject, data)  # processed_data.shape = (18015, 252)
  processed_data = pca.fit_transform(processed_data)
  train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

  svm = SVC(C=5.0, kernel='rbf', gamma='scale')
  print("------\nMethod 2\n------")

  cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
  print("Cross-Validation scores:", cv_scores)
  print("Average Cross-Validation score:", np.mean(cv_scores))

  svm.fit(train_x,train_y)
  train_score = svm.score(train_x, train_y)
  test_score = svm.score(test_x, test_y)

  print('\nTraining score:', train_score)
  print('Testing score:', test_score)
  md2.append(round(test_score, 2))


  # method 3
  processed_data, labels = Var(subject, data)
  train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

  svm = SVC(C=40.0, kernel='rbf', gamma='scale')
  print("------\nMethod 3\n------")

  cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
  print("Cross-Validation scores:", cv_scores)
  print("Average Cross-Validation score:", np.mean(cv_scores))

  svm.fit(train_x,train_y)
  train_score = svm.score(train_x, train_y)
  test_score = svm.score(test_x, test_y)

  print('\nTraining score:', train_score)
  print('Testing score:', test_score)
  md3.append(round(test_score, 2))


  # method 4
  processed_data, labels = Welch(subject, data)
  train_x, test_x, train_y, test_y = train_test_split(processed_data, labels, test_size=0.2)

  svm = SVC(C=20.0, kernel='rbf', gamma='scale')
  print("------\nMethod 4\n------")

  cv_scores = cross_val_score(svm, train_x, train_y, cv=5)
  print("Cross-Validation scores:", cv_scores)
  print("Average Cross-Validation score:", np.mean(cv_scores))

  svm.fit(train_x,train_y)
  train_score = svm.score(train_x, train_y)
  test_score = svm.score(test_x, test_y)

  print('\nTraining score:', train_score)
  print('Testing score:', test_score)
  md4.append(round(test_score, 2))

In [None]:
labels = ['#1', '#2', '#3', '#4', '#5']
x = np.arange(len(labels))
width = 0.2  # 柱狀圖的寬度

fig, ax = plt.subplots()

# 繪製四組資料的柱狀圖
rects1 = ax.bar(x - 3*width/2, md1, width, label='Stats', color='skyblue')
rects2 = ax.bar(x - width/2, md2, width, label='STFT', color='lightgreen')
rects3 = ax.bar(x + width/2, md3, width, label='VAR', color='lightpink')
rects4 = ax.bar(x + 3*width/2, md4, width, label='PSD', color='gold')

ax.set_xlabel('Subject')
ax.set_ylabel('Accuracy')
ax.set_title('window size = 8s')
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.legend()

# 顯示圖表
plt.show()