<a href="https://colab.research.google.com/github/emmazchen/SSVEP-Speller/blob/main/filteredCCA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple CCA Classifier on SSVEP Data (GG)

In [None]:
!git clone -b MatheuBranch https://github.com/Neurotech-X-Columbia/Data.git

Cloning into 'Data'...
remote: Enumerating objects: 4382, done.[K
remote: Counting objects: 100% (484/484), done.[K
remote: Compressing objects: 100% (371/371), done.[K
remote: Total 4382 (delta 121), reused 472 (delta 110), pack-reused 3898[K
Receiving objects: 100% (4382/4382), 307.15 MiB | 24.13 MiB/s, done.
Resolving deltas: 100% (136/136), done.
Checking out files: 100% (297/297), done.


In [None]:
import pandas as pd 
df = pd.read_csv('Data/SSVEP/Unity GUI 0.1/JE2/BottomLeft/64sec_1_18-34-41.csv')
df.shape

#avg ch 15 and 16, full avg and then moving avg if it doesn't work
#apply cca with two channels
#don't fold time dimension
#avg across trials, don't avg across subjects

(16, 8001)

In [None]:
from scipy import signal

In [None]:
num_chans = 16
fs = 125
notch_f = 60 #was 60
Q_factor = 30 #was 30
lp_f = 2
iir_hp_2 = signal.iirfilter(2, 2. / fs, btype='highpass')
a, b = signal.iirnotch(notch_f, Q_factor, fs)

In [None]:
def filtering(d):
    filtered_sig = signal.filtfilt(iir_hp_2[0], iir_hp_2[1], d, padlen=0)
    filtnnotch_sig = signal.filtfilt(a, b, filtered_sig, padlen=0)
    return filtnnotch_sig

In [None]:
import numpy as np
from sklearn.cross_decomposition import CCA
import pandas as pd

# Code for CCAAnalysis taken from https://github.com/Mentalab-hub/explorepy/blob/master/examples/ssvep_demo/analysis.py
class CCAAnalysis:
    """Canonical Correlation Analysis for SSVEP paradigm"""
    def __init__(self, freqs, win_len, s_rate, n_harmonics=1):
        """
        Args:
            freqs (list): List of target frequencies
            win_len (float): Window length
            s_rate (int): Sampling rate of EEG signal
            n_harmonics (int): Number of harmonics to be considered
        """
        self.freqs = freqs
        self.win_len = win_len
        self.s_rate = s_rate
        self.n_harmonics = n_harmonics
        self.train_data = self._init_train_data()
        self.cca = CCA(n_components=1)

    def _init_train_data(self):
        t_vec = np.linspace(0, self.win_len, int(self.s_rate * self.win_len))
        targets = {}
        for freq in self.freqs:
            sig_sin, sig_cos = [], []
            for harmonics in range(self.n_harmonics):
                sig_sin.append(np.sin(2 * np.pi * harmonics * freq * t_vec))
                sig_cos.append(np.cos(2 * np.pi * harmonics * freq * t_vec))
            targets[freq] = np.array(sig_sin + sig_cos).T
        return targets


    def apply_cca(self, eeg):
        """Apply CCA analysis to EEG data and return scores for each target frequency

        Args:
            eeg (np.array): EEG array [n_samples, n_chan]

        Returns:
            list of scores for target frequencies
        """
        scores = []
        for key in self.train_data:
            sig_c, t_c = self.cca.fit_transform(eeg, self.train_data[key])
            scores.append(np.corrcoef(sig_c.T, t_c.T)[0, 1])
        return scores

# Main method edited, with specific data
if __name__ == '__main__':
    freqs = [8, 10, 15, 18, 22] # twice the stimulation frequencies COM 1,2,3,4,5
    t_len = 2 # 2 second window
    s_rate = 125
    #t_vec = np.linspace(0, t_len, s_rate * t_len)

    class_names = ['TopLeft', 'TopRight', 'BottomLeft', 'BottomMiddle', 'BottomRight']

    # Reading one trial from each stimulus for GG subject
    fnames = ['Data/SSVEP/Unity GUI 0.1/JE2/' + i + '/64sec_3.csv' for i in class_names]
    dfs = [pd.read_csv(name) for name in fnames]

    for i,df in enumerate(dfs):
        # test_sig = np.sin(2 * np.pi * 10 * t_vec) + 0.05 * np.random.rand(len(t_vec))
        # Looking at channel 15, with a 2 second window 251:501 which is 250 datapoints
        test_sig = filtering(np.array(list(df.loc[14,:])[251:1251])) #3 segments
        cca_analysis = CCAAnalysis(freqs=freqs, win_len=t_len, s_rate=s_rate, n_harmonics=2)
        r1 = cca_analysis.apply_cca(np.array(test_sig[0:250])[:, np.newaxis]) 
        r2 = cca_analysis.apply_cca(np.array(test_sig[250:500])[:, np.newaxis]) 
        r3 = cca_analysis.apply_cca(np.array(test_sig[500:750])[:, np.newaxis])
        r4 = cca_analysis.apply_cca(np.array(test_sig[750:1000])[:, np.newaxis])

        # Printing the max correlation coefficient across all stimulus frequencies, the Command Number (1-5), the Predicted Stimulus Frequency (highest correlated) from CCA, The Actual Stimulus that was tested 
        print(np.max(r1), '  Pred:', np.argmax(r1)+1, np.argmax(r2)+1, np.argmax(r3)+1, np.argmax(r4)+1, class_names[np.argmax(r1)], freqs[np.argmax(r1)], '           Actual:', i+1, class_names[i], freqs[i],   "           correct?", i==np.argmax(r1))


0.023800428131054215   Pred: 2 1 5 3 TopRight 20            Actual: 1 TopLeft 16            correct? False
0.047229967196901174   Pred: 3 2 1 1 BottomLeft 30            Actual: 2 TopRight 20            correct? False
0.08363553808654446   Pred: 2 1 1 2 TopRight 20            Actual: 3 BottomLeft 30            correct? False
0.05173002117630312   Pred: 2 1 1 1 TopRight 20            Actual: 4 BottomMiddle 36            correct? False
0.03375419190257397   Pred: 5 2 1 3 BottomRight 44            Actual: 5 BottomRight 44            correct? True


In [None]:
    def _avg_train_data(self):
        t_vec = np.linspace(0, self.win_len, int(self.s_rate * self.win_len))
        targets = {}
        for freq in self.freqs:
            sig_sin, sig_cos = [], []
            for harmonics in range(self.n_harmonics):
                sig_sin.append(np.sin(2 * np.pi * harmonics * freq * t_vec))
                sig_cos.append(np.cos(2 * np.pi * harmonics * freq * t_vec))
            targets[freq] = np.array(sig_sin + sig_cos).T
        return targets