In [1]:
%matplotlib inline
import numpy as np
import mne
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, f1_score
from sklearn.feature_selection import RFECV
from sklearn.feature_selection import SelectKBest, chi2, f_regression, f_classif

## Análise  da base de dados `Beta` utilizando algoritmos de ML

Neste notebook será analisado o `Beta dataset` utilizando algoritmos de ML para realizar a (1) extração de características. (2) seleção de características e (3) classificação dos dados.

### Pontos Importantes do Dataset
 
- Frequências estimuladas (total de 40, com a diferença de 0.2 Hz uma da outra): 8.0, 8.2, ..., 15.6, 15.8;
- Taxa de amostragem: 250 Hz


### Análisar os "momentos" em que ocorrem evocação do sinal SSVEP

1. Criar o objeto `MNE` a partir dos dados do participante.
2. Aplicar no objeto `MNE` o filtro passa-faixa nos valores de 6 - 18 Hz.
3. Criar cópias do objeto `MNE` com fatias de tempo menores para analisar momentos que ocorrem estimulos ou não (verificar artigo).
   - **a)** 0.0 - 0.5 segundos e  2.5 - 3.0 segundos ocorre apenas ruído;
   - **b**) 0.5 - 2.5 segundos ocorre sinal SSVEP (com ruídos)
4. Com os sinais separados em objetos MNE, aplicar a `FFT`, para que seja possível plotar gráficos que contenham (ou não) as informações. 
   - Os dados devem ser plotados no dominio da frequência (após a transformada de Fourier). O FFT pode ser realizada pela biblioteca `scipy.fft`.
   - Deve ser observado que as janelas (a) com ruídos não aparecerão de fato o sinal SSVEP.

In [2]:
data = np.load("../../datasets/beta/data.npy")

print(data.shape)

(160, 64, 750)


In [4]:
n_channels = 64
sfreq = 250
ch_names = list(np.load("../../datasets/beta/channels.npy"))
ch_types = ['eeg'] * len(ch_names)
info = mne.create_info(ch_names, sfreq=sfreq, ch_types=ch_types)
best = ['P6', 'PO3']

In [5]:
labels = np.load("../../datasets/beta/labels.npy")
unique_labels = sorted(set(labels))
event_dict = {str(value): index  for index, value in enumerate(unique_labels)}
labels.shape

(160,)

In [6]:
# método para transformar labels categóricos
le = LabelEncoder()
events = np.column_stack((
    np.array(range(len(labels))),
    np.zeros(160, dtype=int),
    le.fit_transform(labels))
)

# print(events.shape)

mne_data = mne.EpochsArray(data, info, events, event_id=event_dict)
filtered_mne_data = mne_data.filter(6, 18)

filtered_mne_data

Not setting metadata
160 matching events found
No baseline correction applied
0 projection items activated
Setting up band-pass filter from 6 - 18 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 6.00
- Lower transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 5.00 Hz)
- Upper passband edge: 18.00 Hz
- Upper transition bandwidth: 4.50 Hz (-6 dB cutoff frequency: 20.25 Hz)
- Filter length: 413 samples (1.652 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done  71 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 161 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 287 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 449 tasks      | elapsed:    0.0s
[Parallel(n_jobs=1)]: Done 647 tasks      | elapsed:    0.1s
[Parallel(n_jobs=1)]: Done 881 tasks      | elapsed:    0.1s
[Parallel(n_jobs=1)]: Done 1151 tasks      | elapsed:    0.1s
[Parallel(n_jobs=1)]: Done 1457 tasks      | elapsed:    0.1s
[Parallel(n_jobs=1)]: Done 1799 tasks      | elapsed:    0.2s
[Parallel(n_jobs=1)]: Done 2177 tasks      | elapsed:    0.2s
[Parallel(n_jobs=1)]: Done 2591 tasks      | elapsed:    0.2s
[Parallel(n_jobs=1)]: Done 3041 tasks      | elapsed:    0.3s
[Parallel(n_jobs=1)]: Done 3527 tasks      | elapsed:    0.3s
[Parallel(n_jobs=1)]: Done 4049 tasks      | elapsed:    0.4s
[Parallel(n_jobs=1)]: Done 4607 tasks      | elapsed:    0.4s
[Parallel(n_job

0,1
Number of events,160
Events,10.0: 4 10.2: 4 10.4: 4 10.6: 4 10.8: 4 11.0: 4 11.2: 4 11.4: 4 11.6: 4 11.8: 4 12.0: 4 12.2: 4 12.4: 4 12.600000000000001: 4 12.8: 4 13.0: 4 13.200000000000001: 4 13.4: 4 13.600000000000001: 4 13.8: 4 14.0: 4 14.200000000000001: 4 14.4: 4 14.600000000000001: 4 14.8: 4 15.0: 4 15.200000000000001: 4 15.4: 4 15.600000000000001: 4 15.8: 4 8.0: 4 8.2: 4 8.4: 4 8.6: 4 8.799999999999999: 4 9.0: 4 9.2: 4 9.4: 4 9.6: 4 9.8: 4
Time range,0.000 – 2.996 s
Baseline,off


In [7]:
start_noise = filtered_mne_data.copy().crop(tmin=0.0, tmax=0.5)
middle = filtered_mne_data.copy().crop(tmin=0.5, tmax=2.5)
end_noise = filtered_mne_data.copy().crop(tmin=2.5, tmax=3.0)

noise_power = []

fft_start_result = np.fft.fft(start_noise)
fft_end_result = np.fft.fft(end_noise)

# densidade espectral de potência (PSD)
psd_start = np.abs(fft_start_result) ** 2
psd_end = np.abs(fft_end_result) ** 2

# média da potência nos intervalos de tempo sem estímulo
base_power = np.mean(psd_start, axis=-1)
rest_power = np.mean(psd_end, axis=-1)

# média das duas médias de potência obtidas anteriorment
mean_noise_power = np.mean([base_power + rest_power])

mean_noise_power

  end_noise = filtered_mne_data.copy().crop(tmin=2.5, tmax=3.0)


5896.0918793349665

In [8]:
sr = 250

# frequências alvo
target_frequencies = np.arange(8, 16, 0.2)

# lista para armazenar as amplitudes nas frequências alvo
target_amplitudes = []

for channel_data in middle.get_data():
    target = []
    for electrode in channel_data:

        fft_result = np.fft.fft(electrode)
        psd = np.abs(fft_result) ** 2
        frequencies = np.fft.fftfreq(len(fft_result), 1 / sr)
        target_amplitudes_trial = []
        for target_frequency in target_frequencies:
            # encontrando o índice da frequência alvo no espectro de frequência
            index = np.argmin(np.abs(frequencies - target_frequency))
            # amplitude na frequência alvo
            amplitude = np.sqrt(psd[index])
            target_amplitudes_trial.append(amplitude)

        target.append(target_amplitudes_trial)


    target_amplitudes.append(target)

target_amplitudes = np.array(target_amplitudes)

In [9]:
# forçando (estragando) valor de "estimated_background_noise" para não sobrar valores negativos
target_amplitudes_adjusted = np.abs(target_amplitudes - mean_noise_power)

# subtraindo o ruído de fundo das amplitudes
narrow_band_SNR = 10 * np.log10(target_amplitudes_adjusted / mean_noise_power)

total_power = np.sum(target_amplitudes_adjusted)
wide_band_SNR = 10 * np.log10(target_amplitudes_adjusted / total_power)

print(target_amplitudes_adjusted.shape)

data = np.array([target_amplitudes_adjusted, narrow_band_SNR, wide_band_SNR])
data = data.swapaxes(0, 1)
data = data.swapaxes(1, 2)
data = data.reshape(data.shape[0], data.shape[1], data.shape[2] * data.shape[3])
print(data.shape)

(160, 64, 40)
(160, 64, 120)


### Primeira alternativa (remoção manual de características)

In [10]:
# Removendo eletrodos não listados

ch_names = np.load("../../datasets/beta/channels.npy")

channels_to_keep = ['PZ', 'PO3', 'PO5', 'PO4', 'PO6', 'POZ', 'O1', 'OZ', 'O2']
indexes_to_remove = np.where(np.isin(ch_names, channels_to_keep, invert=True))

print("Before", data.shape)
data_filtered = np.delete(data, indexes_to_remove, axis=1)
print("After", data_filtered.shape)
print("Labels", labels.shape)

Before (160, 64, 120)
After (160, 9, 120)
Labels (160,)


In [11]:
data_filtered_reshape = data_filtered.reshape(data_filtered.shape[0], data_filtered.shape[1] * data_filtered.shape[2])
print(data_filtered_reshape.shape)

(160, 1080)


In [12]:
X = StandardScaler().fit_transform(data_filtered_reshape)
y = LabelEncoder().fit_transform(labels)

x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

clf = SVC(kernel='linear', C=1, random_state=42, probability=True)
clf.fit(x_train, y_train)
y_pred = clf.predict(x_test)

print("accuracy", accuracy_score(y_test, y_pred))
print("f1_score", f1_score(y_test, y_pred, average="weighted"))

accuracy 0.25
f1_score 0.2284722222222222


### Segunda alternativa (utilizando remoção automático de características RFE)

In [13]:
# reorganizando os dados

X_reshape_auto = data.reshape(data.shape[0], data.shape[1] * data.shape[2])
print(X_reshape_auto.shape)


(160, 7680)


In [14]:
# Adicionando mais características

middle = filtered_mne_data.copy().crop(tmin=0.5, tmax=2.5)
print(middle.get_data().shape)

fft_middle_result = np.fft.fft(middle)

# densidade espectral de potência (PSD)
psd_middle = np.abs(fft_middle_result) ** 2

# média da potência nos intervalos de tempo sem estímulo
middle_power = np.mean(psd_middle, axis=-1)

sr = 250

# frequências alvo
target_frequencies = np.arange(8, 16, 0.2)

# lista para armazenar as amplitudes nas frequências alvo
target_amplitudes = []

for channel_data in middle.get_data():
    target = []
    for electrode in channel_data:

        fft_result = np.fft.fft(electrode)
        psd = np.abs(fft_result) ** 2
        frequencies = np.fft.fftfreq(len(fft_result), 1 / sr)
        target_amplitudes_trial = []

        for target_frequency in target_frequencies:
            # encontrando o índice da frequência alvo no espectro de frequência
            index = np.argmin(np.abs(frequencies - target_frequency))
            # amplitude na frequência alvo
            amplitude = np.sqrt(psd[index])
            target_amplitudes_trial.append(amplitude)

        target.append(target_amplitudes_trial)


    target_amplitudes.append(target)

target_amplitudes = np.array(target_amplitudes)
target_amplitudes.shape


(160, 64, 501)


(160, 64, 40)

In [23]:

X_reshape_auto = StandardScaler().fit_transform(X_reshape_auto)

rfe = RFECV(SVC(kernel="linear"), step=0.0001, min_features_to_select=1, cv=3)
X_final = rfe.fit_transform(X_reshape_auto, y)

print(X_final.shape)

KeyboardInterrupt: 

In [24]:
x_train, x_test, y_train, y_test = train_test_split(X_final, y, test_size=0.3, random_state=42)

clf = SVC(kernel='linear', C=1, random_state=42, probability=True)
clf.fit(x_train, y_train)
y_pred = clf.predict(x_test)

print("accuracy", accuracy_score(y_test, y_pred))
print("f1_score", f1_score(y_test, y_pred, average="weighted"))

accuracy 0.4583333333333333
f1_score 0.40972222222222227


# Utilizando validação cruzada

In [15]:
# reorganizando os dados

X_reshape_auto = data.reshape(data.shape[0], data.shape[1] * data.shape[2])
print(X_reshape_auto.shape)

(160, 7680)


In [16]:
# Adicionando mais características

middle = filtered_mne_data.copy().crop(tmin=0.5, tmax=2.5)
print(middle.get_data().shape)

fft_middle_result = np.fft.fft(middle)

# densidade espectral de potência (PSD)
psd_middle = np.abs(fft_middle_result) ** 2

# média da potência nos intervalos de tempo sem estímulo
middle_power = np.mean(psd_middle, axis=-1)

sr = 250

# frequências alvo
target_frequencies = np.arange(8, 16, 0.2)

# lista para armazenar as amplitudes nas frequências alvo
target_amplitudes = []

for channel_data in middle.get_data():
    target = []
    for electrode in channel_data:

        fft_result = np.fft.fft(electrode)
        psd = np.abs(fft_result) ** 2
        frequencies = np.fft.fftfreq(len(fft_result), 1 / sr)
        target_amplitudes_trial = []

        for target_frequency in target_frequencies:
            # encontrando o índice da frequência alvo no espectro de frequência
            index = np.argmin(np.abs(frequencies - target_frequency))
            # amplitude na frequência alvo
            amplitude = np.sqrt(psd[index])
            target_amplitudes_trial.append(amplitude)

        target.append(target_amplitudes_trial)


    target_amplitudes.append(target)

target_amplitudes = np.array(target_amplitudes)
target_amplitudes.shape


(160, 64, 501)


(160, 64, 40)

In [26]:

X_reshape_auto = StandardScaler().fit_transform(X_reshape_auto)

rfe = RFECV(SVC(kernel="linear"), step=0.0001, min_features_to_select=1, cv=3)
X_final = rfe.fit_transform(X_reshape_auto, y)

print(X_final.shape)

(160, 1066)


In [43]:
x_train, x_test, y_train, y_test = train_test_split(X_final, y, test_size=0.2, random_state=42)


svm = GridSearchCV(SVC(), param_grid={'C' : [1, 10, 100, 1000], 'kernel': ['linear']}, cv=StratifiedKFold(n_splits=3, shuffle=True), n_jobs=-1, scoring='accuracy')

svm.fit(x_train, y_train)
y_pred = svm.predict(x_test)

print("accuracy", accuracy_score(y_test, y_pred))
print("f1_score", f1_score(y_test, y_pred, average="weighted"))

accuracy 0.5
f1_score 0.49999999999999994




In [268]:




num_of_chars = X_reshape_auto.shape[1]

# <!-- usar o k best 

# usar o k inicial como metade
# e ir reduzindo na potencia de 2

# 512
# 256
# 128
# ...
# 2
#  -->

ks = [2**i for i in range(0, 12)]
initial_k = num_of_chars
accs = []

for k in ks:
    X_new = SelectKBest(f_classif, k=initial_k - k).fit_transform(X_reshape_auto, y)
    X_train, X_test, y_train, y_test = train_test_split(X_new, y, shuffle=True, test_size=0.3, random_state=42)
    
    # unique, counts = np.unique(y_train, return_counts=True)
    # print(np.asarray((unique, counts)).T)

    parameters = {'C':[1, 10, 100, 1000], 'kernel': ['linear']}
    svm = GridSearchCV(SVC(), cv=4, param_grid=parameters, scoring="accuracy")
    svm.fit(X_train, y_train.ravel())
    predicts = svm.predict(X_test)
    accs.append(accuracy_score(predicts, y_test))


print(np.average(accs))



0.13194444444444445


#### Extração de características

Uma característica importante de acordo com o artigo base do dataset `BETA` é o *signal-to-noise ratio* (SNR).

Ao final desta etapa, será obtido um vetor de características. Estas podem ser:
- `SNR` (obrigatória);
- Maior valor espectral (FFT);
- Médio dos valores espectrais (FFT);


Dimensionalidade dos dados será explicada da seguinte forma:

`40, 4, 64, 750` -> 40 targets, 4 trials, 64 channels e 750 values.

`160, 64 (SRN) + 64 (MÉDIA) + 64 (MAIOR) ...`

Resultando em  `160, 192`

#### Seleção de características e classsificação

Como existem diversos eletrodos (canais) que não obtem sinal SSVEP, podemos extrair as características que não contribuem para a classificação dos dados.


Podemos utilizar o método `RFE` (*Recursive Feature Elimination*) aplicador por meio de `sklearn.feature_selection.RFE`, aprimorand o parâmetro `n_features_to_select` até obter o melhor resultado de classificação.

Para a classificação propriamente dita, é considerado o uso do método `SVM`.

# How to calculate SNR https://saturncloud.io/blog/calculating-signaltonoise-ratio-in-python-with-scipy-v11/