## Análise da base de dados `Beta` utilizando algoritmos de ML

Neste notebook será analisado o `Beta dataset` utilizando algoritmos de ML para realizar a (1) extração de características, (2) seleção de características e (3) classificação dos dados

### Pontos importantes do dataset

- Frequências estimuladas (total de 40, com a diferença de 0.2 Hz uma da outra): 8.0, 8.2, ..., 15.6, 15.8;
- Taxa de amostragem: 250 Hz

### Analisar os "momentos" em que ocorrem evocação do sinal SSVEP

1. Criar o objeto `MNE` a partir dos dados dados do participante;
2. Aplicar no objeto `MNE` o filtro passa-faixa nos valores de 6 - 18 Hz;
3. Criar cópias do objeto `MNE` com fatias de tempo menores para analisar momentos que ocorrem estimulos ou não (verificar artigo);
    a) 0.0 - 0.5 segundos e 2.5 - 3.0 segundos ocorre apenas ruído;
    b) 0.5 - 2.5 segundos ocorre sinal SSVEP (com ruídos)
4. Com os sinais separados em objetos `MNE`, aplicar a `FFT`, para que seja possível plotar gráficos que contenham (ou não) as informações.
    - Os dados devem ser plotados no domínio da frequência (após a transformada de Fourier). O FFT pode ser realizado pela biblioteca `scipy.fft`.
    - Deve ser observado que as janelas (a) com ruído não aparecerão de fato o sinal SSVEP.

### Extração de características

Uma característica importante de acordo com o artigo base do dataset `BETA` é o *signal-to-noise ratio* (SNR).
São dois tipos de características SNR que podem ser implementadas: SNR de banda estreita (`narrow-SNR`) e SNR de banda larga (`wide-band SNR`).

Uma boa prática, é considerar o ruído das medidas de `SNR`, uma vez que os dados `SSVEP` não estão estimulados durante os períodos de 0 a 0,5 segundos e de 2,5 a 3 segundos. O ruído pode afetar a precisão das medidas de `SNR` e, portanto, é aconselhável levar isso em consideração.

Vamos realizar todos esses cálculos com dados fictícios:

# Carregando os dados

In [69]:
%matplotlib inline
import numpy as np
from sklearn.preprocessing import LabelEncoder
import mne
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler
from sklearn.svm import SVC
from sklearn import svm
from copy import deepcopy
from scipy.io import loadmat
from sklearn.metrics import accuracy_score, f1_score
from sklearn.feature_selection import RFECV, SelectKBest, chi2
import itertools
from joblib import Parallel, delayed

Carregando dados

In [2]:
data = loadmat(f"../../dataset/beta/S12.mat")['data'][0][0]
eeg_data = data[0]
eeg = eeg_data.reshape(eeg_data.shape[0], eeg_data.shape[1], eeg_data.shape[2] * eeg_data.shape[3])
labels = np.array(list(data[1]['freqs'][0][0].flatten()) * 4)
print(eeg.shape, labels.shape)

(64, 750, 160) (160,)


In [3]:
data_correct = eeg.swapaxes(0, 2)
data_correct = data_correct.swapaxes(1, 2)
print(data_correct.shape)

(160, 64, 750)


Agora iremos estimar o ruído de fundo, para calcular posteriormente o `narrow SNR` e o `wide-band SNR`. 

In [4]:
dataset = deepcopy(data_correct)
print(dataset.shape)

# intervalos de tempo sem estímulo (0 a 0,5 segundos e 2,5 a 3 segundos)
base_start = 0
base_end = 125
rest_start = 625
rest_end = 750

# Estimando o ruído de fundo
estimated_background_noise = []

for trial in dataset: #para cada sinal   
    noise_power = [] # armazena uma lista com as médias de potência para cada canal

    for electrode in trial:
        psd = np.abs(np.fft.fft(electrode)) ** 2

        # média da potência nos intervalos de tempo sem estímulo
        base_power = np.mean(psd[base_start:base_end])
        rest_power = np.mean(psd[rest_start:rest_end])
        mean_noise_power = (base_power + rest_power) / 2

        noise_power.append(mean_noise_power)

    #média das médias de potência de todos os canais para estimar o ruído de fundo
    estimated_background_noise.append(np.mean(noise_power))
# end


background_noise = np.array(estimated_background_noise)
print(background_noise.shape)
print(np.mean(background_noise))

(160, 64, 750)
(160,)
28376881.989714384


In [57]:
teste = np.mean(background_noise)
teste

28376881.989714384

Antes de calcular os SNRs, precisamos obter as amplitudes alvo por meio dos dados EEG:

In [5]:
from scipy.signal import find_peaks

sr = 250

# frequências alvo
target_frequencies = np.arange(8, 16, 0.2)
# lista para armazenar as amplitudes nas frequências alvo
target_amplitudes = []

for x in range(dataset.shape[0]):
    samples = []
    
    for channel_data in dataset[x, :, :]:
        fft_result = np.fft.fft(channel_data)
        psd = np.abs(fft_result) ** 2
        frequencies = np.fft.fftfreq(len(fft_result), 1 / sr)
        target_amplitudes_trial = []
        for target_frequency in target_frequencies:
        # encontrando o índice da frequência alvo no espectro de frequência
            index = np.argmin(np.abs(frequencies - target_frequency))
            # amplitude na frequência alvo
            amplitude = np.sqrt(psd[index])
            target_amplitudes_trial.append(amplitude)
        samples.append(target_amplitudes_trial)
    target_amplitudes.append(samples)
target_amplitudes = np.array(target_amplitudes)
target_amplitudes.shape

(160, 64, 40)

Vamos calcular o SNR de "banda estreita". Pode ser observado pela seguinte equação:

$SNR_{banda\ estreita} = 10 \cdot \log_{10}\left(\frac{\text{energia total do espectro}}{\text{média das amplitudes nas frequências vizinhas}}\right)$

Já o SNR de banda larga é definido da seguinte forma:

$SNR_{banda\ larga} = 10 \cdot \log_{10}\left(\frac{\text{energia total do espectro}}{\text{energia total do espectro de amplitude}}\right)$

In [58]:
def narrow_band_SNR(target_amplitudes, estimated_background_noise):
  target_amplitudes_adjusted = np.abs(target_amplitudes - teste)
  return 10 * np.log10(target_amplitudes_adjusted / teste)

def wide_band_SNR(target_amplitudes, estimated_background_noise):
  target_amplitudes_adjusted = np.abs(target_amplitudes - teste)
  total_power = np.sum(target_amplitudes_adjusted)
  return 10 * np.log10(target_amplitudes_adjusted / total_power)

# Remoção manual de caracteristicas

In [59]:
X = []
y = labels

for i, trial in enumerate(dataset):
  X.append([
    narrow_band_SNR(target_amplitudes[i], background_noise[i]),
    wide_band_SNR(target_amplitudes[i], background_noise[i])
  ])
  
X = np.array(X)
X = X.swapaxes(1, 2)
X = X.reshape(X.shape[0], X.shape[1], X.shape[2]*X.shape[3])
print(X.shape)

(160, 64, 80)


In [44]:
# Remoção
# opções: ['PZ', 'PO3', 'PO5', 'PO4', 'PO6', 'POZ', 'O1', 'OZ', 'O2'] -> [47, 53, 54, 55, 56, 57, 60, 61, 62]
Xmanual = deepcopy(X)
Xmanual = Xmanual[:,[47, 53, 54, 55, 56, 57, 60, 61, 62],:]
Xmanual = Xmanual.reshape(Xmanual.shape[0], Xmanual.shape[1]*Xmanual.shape[2])

#preparing
Xmanual = StandardScaler().fit_transform(Xmanual)
ym = LabelEncoder().fit_transform(y)

x_train, x_test, y_train, y_test = train_test_split(Xmanual, ym, test_size=0.3, random_state=42)

clf = SVC(kernel='linear', C=1, random_state=42, probability=True)
clf.fit(x_train, y_train)
y_pred = clf.predict(x_test)

print("accuracy", accuracy_score(y_test, y_pred))
print("f1_score", f1_score(y_test, y_pred, average="weighted"))

accuracy 0.020833333333333332
f1_score 0.013888888888888888


# Remoção automática de caracteristicas

In [67]:
#Cs e gammas são listas com os valores a serem avaliados para os respectivos parâmetros.
def selecionar_melhor_svm(Cs, gammas, X_treino : np.ndarray, X_val : np.ndarray, 
                          y_treino : np.ndarray, y_val : np.ndarray, n_jobs=4):
    
    def treinar_svm(C, gamma, X_treino, X_val, y_treino, y_val):
        svm = SVC(C=C, gamma=gamma)
        svm.fit(X_treino, y_treino)
        pred = svm.predict(X_val)
        return accuracy_score(y_val, pred)
    
    #gera todas as combinações de parametros C e gamma, de acordo com as listas de valores recebidas por parametro.
    #Na prática faz o produto cartesiano entre Cs e gammas.
    combinacoes_parametros = list(itertools.product(Cs, gammas))
    
    #Treinar modelos com todas as combinações de C e gamma
    acuracias_val = Parallel(n_jobs=n_jobs)(delayed(treinar_svm)
                                       (c, g, X_treino, X_val, y_treino, y_val) for c, g in combinacoes_parametros)       
    
    melhor_val = max(acuracias_val)
    #Encontrar a combinação que levou ao melhor resultado no conjunto de validação
    melhor_comb = combinacoes_parametros[np.argmax(acuracias_val)]   
    melhor_c = melhor_comb[0]
    melhor_gamma = melhor_comb[1]
    
    #Treinar uma SVM com todos os dados de treino e validação usando a melhor combinação de C e gamma.
    svm = SVC(C=melhor_c, gamma=melhor_gamma)
    svm.fit(np.vstack((X_treino, X_val)), [*y_treino, *y_val])

    return svm, melhor_comb, melhor_val

#Implementa a validação cruzada para avaliar o desempenho da SVM na base de dados com as instâncias X e as saídas y.
#cv_splits indica o número de partições que devem ser criadas.
#Cs é a lista com os valores C que devem ser avaliados na busca exaustiva de parametros para a SVM.
#gammas s é a lista com os valores gamma que devem ser avaliados na busca exaustiva de parametros para a SVM.
def do_cv_svm(X, y, cv_splits, Cs=[1], gammas=['scale']):

    skf = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=1)

    acuracias = []
    
    #pgb = tqdm(total=cv_splits, desc='Folds avaliados')
    
    for treino_idx, teste_idx in skf.split(X, y):

        X_treino = X[treino_idx]
        y_treino = y[treino_idx]

        X_teste = X[teste_idx]
        y_teste = y[teste_idx]

        X_treino, X_val, y_treino, y_val = train_test_split(X_treino, y_treino, test_size=0.2, random_state=42)

        ss = StandardScaler()
        ss.fit(X_treino)
        X_treino = ss.transform(X_treino)
        X_teste = ss.transform(X_teste)
        X_val = ss.transform(X_val)

        svm, _, _ = selecionar_melhor_svm(Cs, gammas, X_treino, X_val, y_treino, y_val)
        pred = svm.predict(X_teste)

        acuracias.append(accuracy_score(y_teste, pred))
        
        #pgb.update(1)
        
    #pgb.close()
    
    return acuracias

In [60]:
X = []
y = labels

for i, trial in enumerate(dataset):
  X.append([
    narrow_band_SNR(target_amplitudes[i], background_noise[i]),
    wide_band_SNR(target_amplitudes[i], background_noise[i])
  ])
  
X = np.array(X)
X = X.swapaxes(1, 2)
X = X.reshape(X.shape[0], X.shape[1], X.shape[2]*X.shape[3])
print(X.shape)

(160, 64, 80)


In [72]:
# preparing
Xauto = deepcopy(X)
Xauto = Xauto.reshape(Xauto.shape[0], Xauto.shape[1]*Xauto.shape[2])
Xauto = StandardScaler().fit_transform(Xauto)
#Xauto = MinMaxScaler().fit_transform(Xauto)
ya = LabelEncoder().fit_transform(y)
print(Xauto.shape, ya.shape)

#rfe
#rfe = RFECV(svm.SVC(kernel="linear"), step=0.0001, min_features_to_select=1, cv=3)
#X_final = rfe.fit_transform(Xauto, ya)
#print(X_final.shape)

#kbest
ks = [2**i for i in range ( 10 ) ]
for k in ks:
    
    kbest= SelectKBest(k=k)
    X_final = kbest.fit_transform(Xauto,ya)
    print(X_final.shape)

    accs_svm = do_cv_svm(X_final, ya, 3, Cs=[1, 10, 100, 1000], gammas=['scale', 'auto', 2e-2, 2e-3, 2e-4])
    print ( f" para k = {k}")
    print("min: %.2f, max: %.2f, avg +- std: %.2f +- %.2f" % (min(accs_svm), max(accs_svm), np.mean(accs_svm), np.std(accs_svm)))
    #svm
    #x_train, x_test, y_train, y_test = train_test_split(X_final, ya, test_size=0.3, random_state=42)
    
    #clf = SVC(kernel='linear', C=1, random_state=42, probability=True)
    #clf.fit(x_train, y_train)
    #y_pred = clf.predict(x_test)
    
    #svm cross validation
    #accs = do_cv_svm(X_final,ya,3)
    #print(f"média : {np.mean(accs)}")

    #print("accuracy", accuracy_score(y_test, y_pred))
    #print("f1_score", f1_score(y_test, y_pred, average="weighted"))
    

(160, 5120) (160,)
(160, 1)
 para k = 1
min: 0.00, max: 0.06, avg +- std: 0.02 +- 0.02
(160, 2)
 para k = 2
min: 0.00, max: 0.06, avg +- std: 0.02 +- 0.02
(160, 4)
 para k = 4
min: 0.02, max: 0.04, avg +- std: 0.03 +- 0.01
(160, 8)
 para k = 8
min: 0.02, max: 0.04, avg +- std: 0.02 +- 0.01
(160, 16)
 para k = 16
min: 0.04, max: 0.04, avg +- std: 0.04 +- 0.00
(160, 32)
 para k = 32
min: 0.08, max: 0.13, avg +- std: 0.10 +- 0.02
(160, 64)
 para k = 64
min: 0.08, max: 0.11, avg +- std: 0.09 +- 0.02
(160, 128)
 para k = 128
min: 0.13, max: 0.23, avg +- std: 0.18 +- 0.04
(160, 256)
 para k = 256
min: 0.17, max: 0.32, avg +- std: 0.26 +- 0.06
(160, 512)
 para k = 512
min: 0.20, max: 0.34, avg +- std: 0.26 +- 0.06


Ao final desta etapa, será obtido um vetor de características. Estas podem ser:
- `narrow SNR` (brigatória);
- `wide-band SNR` (brigatória);
- Maior valor espectral (FFT);
- Média dos valores espectrais (FFT).

Dimensionalidade dos dados será explicada da seguinte forma:

`40, 4, 64, 750` -> 40 targets, 4 trials, 64 canais e 750 valores
`160, 64 (SNR) + 64 (média) + 64 (maior) ...`
Resultando em `160, 192`.

### Seleção de características e classificação

Como existem diversos eletrodos (canais) que não obtém sinal SSVEP, podemos extrair as caracteríscas que não contribuem para a classificação dos dados.

Podemos utilizar o método `RFE` (*Recursive Feature Elimination*) aplicado por meio de `sklearn.feature_selection.RFE`, aprimorando o parâmetro `n_features_to_select` até obter o melhor resultado de classificação.

Para a classificação propriamente dita, é considerado o uso do método `SVM`.