# Imports

In [47]:
import numpy as np
from scipy.io import loadmat
from scipy.fft import fft, fftfreq
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import math
from statistics import mean

# TESTING .mat file

Load test

In [4]:
data = loadmat('Data/train_data_1.mat')

 Show the data test

In [5]:
user_vars = {k: v for k, v in data.items() if not k.startswith('__')}

for key, value in user_vars.items():
    print(f"\n--- {key} ---")
    print(f"Type: {type(value)}")
    print("Shape:", getattr(value, "shape", "N/A"))

    # Print scalar values or array samples
    if isinstance(value, np.ndarray):
        if value.ndim == 0:
            print("Scalar value:", value.item())
        elif value.ndim == 1:
            print("First 5 elements:", value[::])
        elif value.ndim == 2:
            print("First 5 rows:\n", value[::])
        elif value.ndim == 3:
            print("First 5 slices along axis 0:\n", value[:5])


--- channels ---
Type: <class 'numpy.ndarray'>
Shape: (7,)
First 5 elements: ['F3' 'F4' 'Fz' 'C3' 'C4' 'Cz' 'Pz']

--- fs ---
Type: <class 'numpy.ndarray'>
Shape: (1, 1)
First 5 rows:
 [[128]]

--- data ---
Type: <class 'numpy.ndarray'>
Shape: (288, 1920, 7)
First 5 slices along axis 0:
 [[[ 158.65887621  -40.93540601  -60.65252916 ...  259.24820604
    -63.47535199  -32.21123372]
  [ 155.3602066   -47.43275047  -62.33666314 ...  255.04493307
    -66.78496375  -29.23758656]
  [ 153.6532714   -50.26994114  -66.64789913 ...  251.72664108
    -68.04001645  -29.6722016 ]
  ...
  [ -11.53179161   -8.78207989   -6.81586856 ...   -1.05662389
    -12.1903469     0.41410004]
  [ -14.68037102   -8.68984428   -4.82200136 ...   -0.85407445
    -15.27431229   -1.70154376]
  [ -17.30795586  -13.54489686   -8.04649496 ...   -7.18342427
    -20.66120881   -3.96859021]]

 [[ -14.58824193  -19.21987525  -10.41882248 ...  -10.82545612
    -25.08487698   -4.73698357]
  [ -15.90785163  -15.70998555   -8.5

# Load Subjects

In [6]:
Train = []
Test = []
for i in range(1,6):
    Train.append(loadmat(f'Data/train_data_{i}.mat'))
    Test.append(loadmat(f'Data/test_data_{i}.mat'))

In [7]:
len(Test)

5

# 1. Common Average Reference filter

Apply Common Average Reference (CAR) filter to the (EEG) data <br>
Input shape: (n_trials, n_samples, n_channels) <br> output shape: (n_trials, n_samples, n_channels)

In [8]:
def apply_car_filter(data):
    # Compute mean across channels for each trial and time sample
    car_data = data - data.mean(axis=2, keepdims=True)
    return car_data

## CAR Filter To Subjects

In [9]:
Train_filtered = []
Test_filtered = []
for i in range(0,5):
    Temp_train = Train[i]
    Temp_test = Test[i]
    Temp_test['data'] = apply_car_filter(Test[i]['data'])
    Temp_train['data'] = apply_car_filter(Train[i]['data'])
    Train_filtered.append(Temp_train)
    Test_filtered.append(Temp_test)

In [10]:
Train_filtered[0]['labels']

array([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1]])

# 2. For each electrode

## 2.1. For each trial of each class of attention

In [11]:
BANDS = {
    'delta': (0.5, 4),
    'theta': (4, 8),
    'alpha': (8, 13),
    'beta':  (13, 30),
    'gamma': (30, 45)
}

### 2.1.1 Compute the Fourier Transform of the trial signal

In [12]:
def compute_fft(signal, fs):
    fft_vals = fft(signal)
    freqs = fftfreq(len(signal), d=1/fs)
    power = np.abs(fft_vals) ** 2
    return freqs, power

### 2.1.2 Compute the power in each of the delta, theta, alpha, beta, and gamma bands as the mean of the power of the frequencies in each band

In [13]:
def compute_band_powers(freqs, power):
    band_powers = []
    for band_range in BANDS.values():
        low, high = band_range
        mask = (freqs >= low) & (freqs <= high) & (freqs >= 0)
        band_power = power[mask].mean()
        band_powers.append(band_power)
    return band_powers

## 2.2 Form a feature vector as required in each deliverable in the project description

In [40]:
def extract_features_per_class(data_class, fs):
    n_trials, _, n_channels = data_class.shape
    all_features = []

    for ch in range(n_channels):  # For each electrode
        ch_features = []
        for trial in range(n_trials):  # For each trial
            signal = data_class[trial, :, ch]
            freqs, power = compute_fft(signal, fs)                
            band_powers = compute_band_powers(freqs, power)        
            ch_features.append(band_powers)  
        all_ch_features = np.array(ch_features)  
        all_features.append(ch_features)

    return np.hstack(all_features)

In [44]:
extract_features_per_class(Train_filtered[0]['data'], Train_filtered[0]['fs'][0][0])[0][4]

27985.168239487837

## 2.3 Apply K-Nearest Neighbor (KNN) classifier examining K from 1 to 10 using the training and test datasets

In [91]:
def KNN(data_train,labels_train,data_test,k):
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(data_train, labels_train)
    return knn.predict(data_test)

In [110]:
def classify_samp(data_train,data_test,k,labels_train,labels_test):
    n_trial, n_channel_band = data_train.shape
    ch = 0
    band = 0
    accuracy = 0
    for i in range(0,n_channel_band):
        curr_chan_band_train = data_train[:,i].reshape(-1, 1)
        curr_chan_band_test = data_test[:,i].reshape(-1, 1)
        labels_pred = KNN(curr_chan_band_train,labels_train,curr_chan_band_test,k)
        if accuracy_score(labels_test,labels_pred) > accuracy:
            accuracy = accuracy_score(labels_test,labels_pred)
            ch = math.ceil((i + 1) / 5)
            band = list(BANDS.keys())[(i % 5)]
    return accuracy , ch, band

In [111]:
# 288 x 35 or 72 x 35
# combine all bands for each channel - mean method
def combfreq(data):
    res = []
    for i in range (len(data)):
        arr = []
        for n in range(0,35,5):
            arr.append(mean(data[i][n:n+5]))
        res.append(arr)
    return np.array(res)

In [112]:
def Calculate_acc(channel, band, chb):
    accuracys = []
    channels = []
    bands = []
    Ks = []
    K_error = []
    for sub in range(0,5):
        # 288 x 35 vector
        dataTrain = extract_features_per_class(Train_filtered[sub]['data'], Train_filtered[sub]['fs'][0][0])
        dataTest = extract_features_per_class(Test_filtered[sub]['data'], Test_filtered[sub]['fs'][0][0])
        labelsTrain = Train_filtered[sub]['labels'][0]
        labelsTest = Test_filtered[sub]['labels'][0]
        if channel: 
            dataTrain = combfreq(dataTrain)
            # dataTest = combfreq(dataTest)
        # if band:
        #     dataTrain = combbands(dataTrain)
        #     dataTest = combbands(dataTest)
        # if chb:
        #     dataTrain = combchbanc(dataTrain)
        #     dataTest = combchbanc(dataTest)
        for k in range(1,11):
            if chb:
                 KNN(dataTrain,labelsTrain,dataTest,k)
            else:
                acc , ch, band = classify_samp(dataTrain,dataTest,k,labelsTrain,labelsTest)
                if k == 1:
                    accuracys.append(acc)
                    channels.append(ch)
                    bands.append(band)
                    Ks.append(k)
                    K_error.append(1 - acc)
                elif accuracys[sub] < acc:
                    accuracys[sub] = acc
                    channels[sub] = ch
                    bands[sub] = band
                    Ks[sub] = k
                    K_error[sub] = 1 - acc

    return accuracys,channels,bands,Ks,K_error

In [113]:
dataTrain = extract_features_per_class(Train_filtered[0]['data'], Train_filtered[0]['fs'][0][0])
combfreq(dataTrain).shape


(288, 7)

In [114]:
accuracys, channels, bands, Ks, K_error = Calculate_acc(channel=True, band=False, chb=False)
print("Accuracys: ", accuracys)
print("Channels: ", channels)
print("Bands: ", bands)
print("Ks: ", Ks)
print("K_error: ", K_error)

Accuracys:  [0.6388888888888888, 0.6111111111111112, 0.7083333333333334, 0.7230769230769231, 0.8194444444444444]
Channels:  [1, 2, 1, 1, 2]
Bands:  ['alpha', 'delta', 'delta', 'alpha', 'theta']
Ks:  [9, 2, 9, 3, 2]
K_error:  [0.36111111111111116, 0.38888888888888884, 0.29166666666666663, 0.27692307692307694, 0.18055555555555558]


In [None]:
# Calculate_acc(True,False,False)