# SSVEP Dataset
## Classification Using Canonical Correaltion Analysis (CCA)
### The following is implemented on a 12-Class publicly available SSVEP EEG Dataset

<img src="../images/12_classSSVEP.png">

#### Dataset URL: 
https://github.com/mnakanishi/12JFPM_SSVEP/tree/master/data

#### Dataset Paper:
Masaki Nakanishi, Yijun Wang, Yu-Te Wang and Tzyy-Ping Jung, 
"A Comparison Study of Canonical Correlation Analysis Based Methods for Detecting Steady-State Visual Evoked Potentials," 
PLoS One, vol.10, no.10, e140703, 2015. 

In [14]:
import sys
import os
sys.path.insert(0, os.path.abspath('..'))

In [15]:
%%capture
import math

import numpy as np
import scipy.io as sio
from sklearn.cross_decomposition import CCA
from sklearn.metrics import confusion_matrix,accuracy_score
from scipy.signal import butter, filtfilt
from  utils import *

#### Canonical Correlation Analysis (CCA)
$$\DeclareMathOperator*{\argmax}{argmax}$$

Consider two multidimensional variables $X$, $Y$ where $X$ refers to the set of multi-channel EEG data and $Y$ refers to the set of reference signals of the same length as $X$. The linear combinations of $X$ and $Y$ are given as $x = X'W_{x}$ and $y = Y'W_{y}$. CCA finds the weights, $W_{x}$ and $W_{y}$ that maximize the correlation between $x$ and $y$ by solving (1). The maximum of $\rho$ with respect to $W_{x}$ and $W_{y}$ is the maximum correlation.

$$\max_{W_{x},W_{y}} \rho(x,y) = \frac{\mathbb{E}{[W_{x}'XY'W_{y}]}}{\sqrt{\mathbb{E}{[W_{x}'XX'W_{x}]}\mathbb{E}{[W_{y}'YY'W_{y}]}}}$$

The reference signals $Y_{n}$  are defined as:

$$Y_{n} = \begin{bmatrix} \sin({2 \pi f_{n}t}) \\ \cos({2 \pi f_{n}t}) \\ \vdots \\ \sin({4 \pi  f_{n}t}) \\ \cos({4 \pi  f_{n}t}) \end{bmatrix},t = \begin{bmatrix} 
    \frac{1}{f_{s}}
    \frac{2}{f_{s}}
    \dots
    \frac{N_{s}}{f_{s}}
    \end{bmatrix}$$
    
where $Y_{n} \in \mathbb{R}^{2 N_{h} \times N_{s}} $, $f_{n}$ is the stimulation frequency, $f_{s}$ is the sampling frequency, $N_{s}$ is number of samples, and $N_{h}$ is the number of harmonics. Here, $N_{h}=2$. The canonical correlation features $\rho_{f_{i}}$, where $i = 1,2,...,7$ are extracted for each segment of the EEG data, and the output class $C$ for a given sample can be determined as: $C = \argmax (\rho_{f_{i}})$



## CCA 

In [16]:
import numpy as np
import scipy.io as sio
from sklearn.cross_decomposition import CCA
from sklearn.metrics import accuracy_score

data_path = r"C:\Users\lenovo\Desktop\bci\data"
all_segment_data = dict()
all_acc = list()
window_len = 4
shift_len = 4
sample_rate = 256

# Bandpass filter function
def get_filtered_eeg(eeg, lowcut, highcut, order, sample_rate):
    nyquist_freq = 0.5 * sample_rate
    low = lowcut / nyquist_freq
    high = highcut / nyquist_freq
    b, a = butter(order, [low, high], btype='band')
    
    filtered_eeg = np.zeros_like(eeg)
    for target in range(eeg.shape[0]):
        for ch in range(eeg.shape[1]):
            for trial in range(eeg.shape[3]):
                filtered_eeg[target, ch, :, trial] = filtfilt(b, a, eeg[target, ch, :, trial])
    
    return filtered_eeg

# Segment epochs function
def get_segmented_epochs(eeg_data, window_len, shift_len, sample_rate):
    num_targets, num_channels, num_samples, num_trials = eeg_data.shape
    window_samples = int(window_len * sample_rate)
    shift_samples = int(shift_len * sample_rate)
    
    num_segments = (num_samples - window_samples) // shift_samples + 1
    segmented_data = np.zeros((num_targets, num_channels, num_trials, num_segments, window_samples))
    
    for target in range(num_targets):
        for trial in range(num_trials):
            for seg_idx in range(num_segments):
                start_idx = seg_idx * shift_samples
                end_idx = start_idx + window_samples
                segmented_data[target, :, trial, seg_idx, :] = eeg_data[target, :, start_idx:end_idx, trial]
    
    return segmented_data

# Generate CCA reference signals
def get_cca_reference_signals(data_len, target_freq, sampling_rate):
    reference_signals = []
    t = np.arange(0, (data_len/(sampling_rate)), step=1.0/(sampling_rate))
    reference_signals.append(np.sin(np.pi*2*target_freq*t))
    reference_signals.append(np.cos(np.pi*2*target_freq*t))
    reference_signals.append(np.sin(np.pi*4*target_freq*t))
    reference_signals.append(np.cos(np.pi*4*target_freq*t))
    reference_signals = np.array(reference_signals)
    
    return reference_signals

# Find correlation function
def find_correlation(n_components, np_buffer, freq):
    cca = CCA(n_components)
    corr = np.zeros(n_components)
    result = np.zeros(freq.shape[0])

    for freq_idx in range(0,freq.shape[0]):
        cca.fit(np_buffer.T,np.squeeze(freq[freq_idx, :, :]).T)
        O1_a,O1_b = cca.transform(np_buffer.T, np.squeeze(freq[freq_idx, :, :]).T)
        corr = np.corrcoef(O1_a[: ,0], O1_b[:, 0])[0 ,1]        
        result[freq_idx] = corr
    
    return result

# CCA classification function
def cca_classify(segmented_data, reference_templates):
    predicted_class = []
    labels = []
    for target in range(0, segmented_data.shape[0]):
        for trial in range(0, segmented_data.shape[2]):
            for segment in range(0, segmented_data.shape[3]):
                labels.append(target)

                result = find_correlation(1, segmented_data[target, :, trial, segment, :], 
                                      reference_templates)
                predicted_class.append(np.argmax(result)+1) 
    labels = np.array(labels)+1
    predicted_class = np.array(predicted_class)

    return labels, predicted_class


# Main loop for processing data
for subject in np.arange(0, 10):
    dataset = sio.loadmat(f'{data_path}/s{subject+1}.mat')
    eeg = np.array(dataset['eeg'], dtype='float32')
    print(eeg.shape)
    num_classes = eeg.shape[0]
    n_ch = eeg.shape[1]
    total_trial_len = eeg.shape[2]
    num_trials = eeg.shape[3]
    
    filtered_data = get_filtered_eeg(eeg, 6, 80, 4, sample_rate)
    # print("filtered_data.shape")
    
    all_segment_data[f's{subject+1}'] = get_segmented_epochs(filtered_data, window_len, shift_len, sample_rate)

# Generate reference signals
duration = int(window_len * sample_rate)
flicker_freq = np.array([9.25, 11.25, 13.25, 9.75, 11.75, 13.75, 
                         10.25, 12.25, 14.25, 10.75, 12.75, 14.75])

reference_templates = []
for fr in range(0, len(flicker_freq)):
    reference_templates.append(get_cca_reference_signals(duration, flicker_freq[fr], sample_rate))
reference_templates = np.array(reference_templates, dtype='float32')

# Classification and accuracy calculation
all_acc = []
for subject in all_segment_data.keys():
    labels, predicted_class = cca_classify(all_segment_data[subject], reference_templates)

    acc = accuracy_score(labels, predicted_class)
    all_acc.append(acc)
    print(f'Subject: {subject}, Accuracy: {acc*100} %')

average_accuracy = np.mean(all_acc)
print(f'Average Accuracy: {average_accuracy * 100:.2f} %')  # Print average accuracy with 2 decimal places

(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
(12, 8, 1114, 15)
filtered_data.shape
Subject: s1, Accuracy: 60.55555555555555 %
Subject: s2, Accuracy: 64.44444444444444 %
Subject: s3, Accuracy: 97.77777777777777 %
Subject: s4, Accuracy: 99.44444444444444 %
Subject: s5, Accuracy: 98.33333333333333 %
Subject: s6, Accuracy: 100.0 %
Subject: s7, Accuracy: 99.44444444444444 %
Subject: s8, Accuracy: 100.0 %
Subject: s9, Accuracy: 98.33333333333333 %
Subject: s10, Accuracy: 95.0 %
Average Accuracy: 91.33 %


In [19]:
import numpy as np
import pandas as pd
import scipy.io as sio
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

# Function to convert EEG data to a DataFrame
def convert_to_dataframe(eeg_data, num_classes):
    rows = []
    for stimulus in range(num_classes):
        for trial in range(eeg_data.shape[3]):  # Loop through trials
            for channel in range(eeg_data.shape[1]):  # Loop through channels
                row = [stimulus, trial, channel] + eeg_data[stimulus, channel, :, trial].tolist()
                rows.append(row)
    df = pd.DataFrame(rows, columns=['Stimulus', 'Trial', 'Channel'] + [f'Sample_{i}' for i in range(eeg_data.shape[2])])
    return df

# Path to your data
data_path = r"C:\Users\lenovo\Desktop\bci\data"

# Initialize lists to store features and labels for all subjects
all_X, all_y = [], []

# Loop through subjects and process the data
for subject in np.arange(0, 10):
    # Load the dataset for the current subject
    dataset = sio.loadmat(f'{data_path}/s{subject+1}.mat')
    eeg = np.array(dataset['eeg'], dtype='float32')  # EEG data (shape: num_classes, n_ch, total_trial_len, num_trials)
    print(f"Subject {subject+1} EEG shape: {eeg.shape}")
    
    num_classes = eeg.shape[0]  # Number of classes (flicker frequencies)
    
    # Apply any filtering to the data if necessary
    filtered_data = get_filtered_eeg(eeg, 6, 80, 4, sample_rate)
    
    # Convert filtered EEG data to DataFrame
    df = convert_to_dataframe(filtered_data, num_classes)
    
    # Split the data into features (X) and target (y)
    X = df.drop(columns=['Stimulus', 'Trial', 'Channel'])
    y = df['Stimulus']  # The target variable (stimulus)
    
    # Append current subject's data to the overall data
    all_X.append(X)
    all_y.append(y)

# Concatenate all subjects' data into a single DataFrame
all_X = pd.concat(all_X, ignore_index=True)
all_y = pd.concat(all_y, ignore_index=True)

# Split the combined data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(all_X, all_y, test_size=0.2, random_state=42)

# Random Forest Classifier
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)
y_pred_rf = rf_model.predict(X_test)
acc_rf = accuracy_score(y_test, y_pred_rf)

print(f'Random Forest Accuracy: {acc_rf * 100:.2f}%')
print(f"RF Classification Report:\n{classification_report(y_test, y_pred_rf)}")

# SVM Classifier
svm_model = SVC(kernel='linear', random_state=42)  # You can change the kernel if needed
svm_model.fit(X_train, y_train)
y_pred_svm = svm_model.predict(X_test)
acc_svm = accuracy_score(y_test, y_pred_svm)

print(f'SVM Accuracy: {acc_svm * 100:.2f}%')
print(f"SVM Classification Report:\n{classification_report(y_test, y_pred_svm)}")


Subject 1 EEG shape: (12, 8, 1114, 15)
Subject 2 EEG shape: (12, 8, 1114, 15)
Subject 3 EEG shape: (12, 8, 1114, 15)
Subject 4 EEG shape: (12, 8, 1114, 15)
Subject 5 EEG shape: (12, 8, 1114, 15)
Subject 6 EEG shape: (12, 8, 1114, 15)
Subject 7 EEG shape: (12, 8, 1114, 15)
Subject 8 EEG shape: (12, 8, 1114, 15)
Subject 9 EEG shape: (12, 8, 1114, 15)
Subject 10 EEG shape: (12, 8, 1114, 15)
Random Forest Accuracy: 98.65%
RF Classification Report:
              precision    recall  f1-score   support

           0       0.98      0.98      0.98       282
           1       0.99      0.99      0.99       215
           2       0.98      0.99      0.99       240
           3       1.00      0.97      0.98       233
           4       0.99      0.99      0.99       244
           5       0.98      1.00      0.99       230
           6       0.99      0.98      0.98       225
           7       0.98      1.00      0.99       216
           8       0.99      0.98      0.98       253
           