In [1]:
import matplotlib.pyplot as plt
import numpy as np
import flet as ft
import os
from pylsl import StreamInlet, resolve_stream
from scipy.signal import welch, spectrogram
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, accuracy_score
import pandas as pd
from joblib import dump, load
import pyxdf
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
import seaborn as sns
from keras.models import load_model

Authorization classes are not loaded, using fake implementations.


In [2]:
def create_overlapping_sets(data, set_size=500, overlap_fraction=0.5):
    step = int(set_size * (1 - overlap_fraction))
    sets = []
    for i in range(0, len(data) - set_size + 1, step):
        sets.append(data[i:i + set_size])
    return sets

In [3]:
import numpy as np
from scipy.signal import welch, find_peaks, butter, filtfilt
from sklearn.preprocessing import StandardScaler
from sklearn.cross_decomposition import CCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support

# Define function to calculate PSD
def calculate_psd(data, fs=250, nperseg=250*4):
    f, Pxx = welch(data, fs=fs, nperseg=nperseg)
    return f, Pxx

# Define function to detect peaks
def detect_peaks(Pxx):
    peaks, _ = find_peaks(Pxx)
    return peaks

# Define function for band-pass filtering
def band_pass_filter(data, lowcut, highcut, fs=250, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    y = filtfilt(b, a, data)
    return y

# Apply band-pass filter and calculate PSD for each dataset
def process_data(data_set):
    filtered_data = [band_pass_filter(epoch, 0.5, 50) for epoch in data_set]
    data_fft = []
    for epoch in filtered_data:
        f, Pxx = calculate_psd(epoch)
        peaks = detect_peaks(Pxx)
        data_fft.append(Pxx[:121])
    return data_fft

In [4]:
streams_test1, header = pyxdf.load_xdf('../../../../data_ssvep/Toey/SSVEP_data/test/6hz_10')
raw_test1 = streams_test1[0]["time_series"].T #From Steam variable this query is EEG data

streams_test2, header = pyxdf.load_xdf('../../../../data_ssvep/Toey/SSVEP_data/test/20hz_10')
raw_test2 = streams_test2[0]["time_series"].T #From Steam variable this query is EEG data

streams_test3, header = pyxdf.load_xdf('../../../../data_ssvep/Toey/SSVEP_data/test/0hz_10')
raw_test3 = streams_test3[0]["time_series"].T #From Steam variable this query is EEG data

In [5]:
data_test1 = raw_test1[0:4,:]
data_test1_oz = data_test1[0] - data_test1[1]
data_test1_o1 = data_test1[2] - data_test1[1]
data_test1_o2 = data_test1[3] - data_test1[1]
data_test1_set_oz = create_overlapping_sets(data_test1_oz, set_size=1000, overlap_fraction=0.5)
data_test1_set_o1 = create_overlapping_sets(data_test1_o1, set_size=1000, overlap_fraction=0.5)
data_test1_set_o2 = create_overlapping_sets(data_test1_o2, set_size=1000, overlap_fraction=0.5)

data_test2 = raw_test2[0:4,:]
data_test2_oz = data_test2[0] - data_test2[1]
data_test2_o1 = data_test2[2] - data_test2[1]
data_test2_o2 = data_test2[3] - data_test2[1]
data_test2_set_oz = create_overlapping_sets(data_test2_oz, set_size=1000, overlap_fraction=0.5)
data_test2_set_o1 = create_overlapping_sets(data_test2_o1, set_size=1000, overlap_fraction=0.5)
data_test2_set_o2 = create_overlapping_sets(data_test2_o2, set_size=1000, overlap_fraction=0.5)

data_test3 = raw_test3[0:4,:]
data_test3_oz = data_test3[0] - data_test3[1]
data_test3_o1 = data_test3[2] - data_test3[1]
data_test3_o2 = data_test3[3] - data_test3[1]
data_test3_set_oz = create_overlapping_sets(data_test3_oz, set_size=1000, overlap_fraction=0.5)
data_test3_set_o1 = create_overlapping_sets(data_test3_o1, set_size=1000, overlap_fraction=0.5)
data_test3_set_o2 = create_overlapping_sets(data_test3_o2, set_size=1000, overlap_fraction=0.5)

In [6]:
# Process each dataset
data1_fft_oz = process_data(data_test1_set_oz)
data1_fft_o1 = process_data(data_test1_set_o1)
data1_fft_o2 = process_data(data_test1_set_o2)

data2_fft_oz = process_data(data_test2_set_oz)
data2_fft_o1 = process_data(data_test2_set_o1)
data2_fft_o2 = process_data(data_test2_set_o2)

data3_fft_oz = process_data(data_test3_set_oz)
data3_fft_o1 = process_data(data_test3_set_o1)
data3_fft_o2 = process_data(data_test3_set_o2)

# Combine the data
combined_data1 = np.hstack((data1_fft_oz, data1_fft_o1, data1_fft_o2))
combined_data2 = np.hstack((data2_fft_oz, data2_fft_o1, data2_fft_o2))
combined_data3 = np.hstack((data3_fft_oz, data3_fft_o1, data3_fft_o2))

# Combine all data
combined_test = np.vstack((combined_data1, combined_data2, combined_data3))

# Create labels for each class
labels_test = np.array([0]*len(data1_fft_oz) + [1]*len(data2_fft_oz) + [2]*len(data3_fft_oz))

# Check that the combined data and labels have the correct shape
print(f"Combined data shape: {combined_test.shape}")
print(f"Labels shape: {labels_test.shape}")

Combined data shape: (83, 363)
Labels shape: (83,)


In [7]:
processed_new_data_cca = cca.transform(combined_test)

NameError: name 'cca' is not defined