# TCC - Análise de Sinais EEG

Este notebook apresenta um fluxo de trabalho completo para a análise de sinais de EEG, desde a leitura e pré-processamento dos dados até a construção, treinamento e avaliação de um modelo de aprendizado profundo para classificação.

## 1. Bibliotecas e Configurações Iniciais

Importação das bibliotecas necessárias e configuração do ambiente, incluindo a alocação de memória da GPU, se disponível.

In [None]:
!rclone mount drive-thiago: ~/gdrive --daemon

import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Input, Bidirectional, LSTM, Dropout, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# Configuração da GPU (opcional)
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_virtual_device_configuration(
            gpus[0],
            [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4000)]
        )
    except RuntimeError as e:
        print(e)

# Caminhos dos arquivos
DATALAKE_DIR = '/home/thiago/gdrive/Acadêmico/UFRGS/tcc-ufrgs/datalake'
RAW_MUSE_PATH = f'{DATALAKE_DIR}/raw/Muse-v1.0/MU.txt'
INDEX = 10_000

## 2. Leitura dos Dados

Leitura do arquivo bruto e conversão dos dados em um formato adequado para o treinamento do modelo de deep learning. Cada amostra é convertida em um array NumPy com shape `(n_amostras, TARGET_LEN, n_canais)`.

In [None]:
def load_data():
    MAX_LEN = 440

    df = pd.read_csv(RAW_MUSE_PATH, sep='\t', header=None, names=["id", "event", "device", "channel", "code", "size", "data"])

    df = df[df['code'] != -1]

    df["data_array"] = df["data"].apply(lambda x: np.array([int(v) for v in x.split(",")]))

    df = df.sort_values(["event", "channel"]).reset_index(drop=True)

    def pad_or_truncate(arr):
        if len(arr) > MAX_LEN:
            return arr[:MAX_LEN]
        elif len(arr) < MAX_LEN:
            return np.pad(arr, (0, MAX_LEN - len(arr)), mode='constant')
        return arr

    df["data_array"] = df["data_array"].apply(pad_or_truncate)

    X = []
    y = []

    for event, group in df.groupby("event"):
        channel_arrays = [arr for arr in group["data_array"].to_list()]
        sample = np.stack(channel_arrays, axis=-1)  # (timesteps, n_channels)
        X.append(sample)
        y.append(group["code"].iloc[0])

    X = np.array(X)
    y = np.array(y)

    print("Shape final de X:", X.shape)
    print("Shape final de y:", y.shape)

    return X, y

X, y = load_data()

## 3. Pré-processamento e Filtragem dos Sinais

Aplicação de filtros para remover ruídos e artefatos dos sinais de EEG. As seguintes técnicas são utilizadas:
- **Filtro Butterworth Passa-Alta:** Para remover a flutuação da linha de base.
- **Filtro Notch:** Para remover a interferência da rede elétrica (60 Hz).
- **Denoising com Transformada Wavelet Discreta (DWT):** Para atenuar ruídos de alta frequência.

In [None]:
from scipy.signal import butter, filtfilt, iirnotch
import pywt

class Preprocessing:
    def __init__(self, fs=220):
        self.fs = fs

    def butterworth_highpass(self, data, cutoff=0.1, order=5):
        b, a = butter(order, cutoff / (self.fs / 2), btype="high", analog=False)
        return filtfilt(b, a, data)

    def notch_filter(self, data, freq=60.0, Q=30.0):
        b, a = iirnotch(w0=freq/(self.fs/2), Q=Q)
        return filtfilt(b, a, data)

    def dwt_denoise_reconstruct(self, signal, wavelet='db4', level=3, mode='soft'):
        coeffs = pywt.wavedec(signal, wavelet=wavelet, level=level)
        n = len(signal)
        for i in range(1, len(coeffs)):
            cd = coeffs[i]
            sigma = np.median(np.abs(cd)) / 0.6745 if cd.size > 0 else 0.0
            thresh = sigma * np.sqrt(2 * np.log(n)) if sigma > 0 else 0.0
            coeffs[i] = pywt.threshold(cd, thresh, mode=mode)
        rec = pywt.waverec(coeffs, wavelet=wavelet)
        return np.asarray(rec[:n])
    
    def _save_sample(self, X, y, etapa):
        df_sample = pd.DataFrame(X, columns=[f"canal_{i+1}" for i in range(X.shape[1])])
        df_sample["label"] = y
        df_sample.to_csv(f"sample_{etapa}.csv", index=False)

    def execute(self, X, y):
        n_samples, timesteps, n_channels = X.shape
        print(f"🔹 Iniciando pré-processamento: {n_samples} amostras, {n_channels} canais, {timesteps} timesteps")

        # Arrays intermediários
        X_raw = np.zeros((1, 440, 4), dtype=float)
        X_highpass = np.zeros((1, 440, 4), dtype=float)
        X_notch = np.zeros((1, 440, 4), dtype=float)
        X_dwt = np.zeros((1, 440, 4), dtype=float)
        X_filtered = np.zeros_like(X, dtype=float)

        for i in range(n_samples):
            for ch in range(n_channels):
                signal = X[i, :, ch].astype(float)
                if i == INDEX:
                    X_raw[0, :, ch] = signal

                # Etapa 1: High-pass
                signal = self.butterworth_highpass(signal, cutoff=0.1, order=5)
                if i == INDEX:
                    X_highpass[0, :, ch] = signal

                # Etapa 2: Notch
                signal = self.notch_filter(signal, freq=60.0, Q=30.0)
                if i == INDEX:
                    X_notch[0, :, ch] = signal

                # Etapa 3: DWT
                signal = self.dwt_denoise_reconstruct(signal, wavelet='db4', level=3, mode='soft')
                if i == INDEX:
                    X_dwt[0, :, ch] = signal

                X_filtered[i, :, ch] = signal
        
        print(X_highpass.shape, X_notch.shape, X_dwt.shape)
        self._save_sample(X_raw[0], y[INDEX], "raw")
        self._save_sample(X_highpass[0], y[INDEX], "highpass")
        self._save_sample(X_notch[0], y[INDEX], "notch")
        self._save_sample(X_dwt[0], y[INDEX], "dwt")

        print("✅ Pré-processamento concluído.")
        return X_filtered, y

# Exemplo de uso:
X, y = Preprocessing().execute(X, y)

## 4. Divisão dos Dados

Os dados são divididos em conjuntos de treino, validação e teste

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

## 5. Normalização

Os dados são normalizados utilizando Z-score seguido por `MinMaxScaler` para escalar os valores entre 0 e 1. 

In [None]:
def normalize(X_train: np.ndarray, X_val: np.ndarray, X_test: np.ndarray):
    mu = X_train.mean(axis=(0, 1), keepdims=True)
    sigma = X_train.std(axis=(0, 1), keepdims=True)
    sigma[sigma == 0] = 1.0

    X_train_z = (X_train - mu) / sigma
    X_val_z   = (X_val - mu) / sigma
    X_test_z  = (X_test - mu) / sigma

    X_train_final = np.zeros_like(X_train_z)
    X_val_final   = np.zeros_like(X_val_z)
    X_test_final  = np.zeros_like(X_test_z)

    n_channels = X_train_z.shape[2]

    for ch in range(n_channels):
        vals = X_train_z[:, :, ch].reshape(-1, 1)  # Flatten canal: (N*T, 1)
        scaler = MinMaxScaler(feature_range=(0, 1))
        scaler.fit(vals)  # um único min/max para todo o canal
    
        for i in range(X_train_z.shape[0]):  # aplica em cada amostra
            X_train_final[i, :, ch] = scaler.transform(X_train_z[i, :, ch].reshape(-1, 1)).flatten()
        
        for i in range(X_val_z.shape[0]):
            X_val_final[i, :, ch]   = scaler.transform(X_val_z[i, :, ch].reshape(-1, 1)).flatten()
        
        for i in range(X_test_z.shape[0]):
            X_test_final[i, :, ch]  = scaler.transform(X_test_z[i, :, ch].reshape(-1, 1)).flatten()

    return X_train_final, X_val_final, X_test_final

X_train, X_val, X_test = normalize(X_train, X_val, X_test)

teste, _, _ = normalize(X, X, X)
teste = teste[INDEX]
print(teste.shape)
df_sample = pd.DataFrame(teste, columns=[f"canal_{i+1}" for i in range(teste.shape[1])])
df_sample.to_csv(f"sample_minmax.csv", index=False)

print(f"Treino: {X_train.shape}, Validação: {X_val.shape}, Teste: {X_test.shape}")

## 6. Construção do Modelo

Definição da arquitetura do modelo, que consiste em uma rede neural recorrente com camadas LSTM bidirecionais, dropout para regularização e camadas densas para a classificação final.

In [None]:
def create_simple_model():
    N_CHANNELS = 4
    N_SAMPLES = 440
    N_CLASSES = 10

    input = Input(shape=(N_SAMPLES, N_CHANNELS))
    x = Bidirectional(LSTM(units=32, return_sequences=True))(input)
    x = Dropout(0.1)(x)
    x = Bidirectional(LSTM(units=16, return_sequences=False))(x)
    x = Dense(64, activation='elu')(x)
    output = Dense(N_CLASSES, activation='softmax')(x)

    model = tf.keras.Model(input, output)
    
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

def create_model():
    N_CHANNELS = 4
    N_SAMPLES = 440
    N_CLASSES = 10

    input = Input(shape=(N_SAMPLES, N_CHANNELS))
    x = Bidirectional(LSTM(units=N_SAMPLES, return_sequences=True))(input)
    x = Dropout(0.1)(x)
    x = Bidirectional(LSTM(units=N_SAMPLES // 2, return_sequences=True))(x)
    x = Dropout(0.1)(x)
    x = Bidirectional(LSTM(units=N_SAMPLES // 4, return_sequences=False))(x)
    x = Dropout(0.1)(x)
    x = Flatten()(x)
    x = Dense(128, activation='elu')(x)
    output = Dense(N_CLASSES, activation='softmax')(x)

    model = tf.keras.Model(input, output)
    
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model


model = create_model()

model.summary()

## 7. Treinamento do Modelo

Treinamento do modelo com os dados preparados. São utilizados callbacks para `EarlyStopping` (interromper o treino se a performance não melhorar) e `ModelCheckpoint` (salvar o melhor modelo encontrado durante o treino).

In [None]:
early_stop = EarlyStopping(
    monitor='val_accuracy',
    patience=10,
    restore_best_weights=True
)

checkpoint = ModelCheckpoint(
    'melhor_modelo.keras',
    monitor='val_accuracy',
    save_best_only=True,
    mode='max',
    verbose=0
)

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=10, 
    batch_size=64,
    callbacks=[early_stop, checkpoint],
    verbose=1
)

In [None]:
import matplotlib.pyplot as plt
accuracy = history.history["accuracy"]
val_accuracy = history.history["val_accuracy"]
loss = history.history["loss"]
val_loss = history.history["val_loss"]
epochs = range(1, len(accuracy) + 1)
plt.plot(epochs, accuracy, "bo", label="Training accuracy")
plt.plot(epochs, val_accuracy, "b", label="Validation accuracy")
plt.title("Training and validation accuracy with Preprocessing and Min Max")
plt.legend()
plt.figure()
plt.plot(epochs, loss, "bo", label="Training loss")
plt.plot(epochs, val_loss, "b", label="Validation loss")
plt.title("Training and validation loss with Preprocessing and Min Max")
plt.legend()
plt.show()

## 8. Avaliação do Modelo

Avaliação da performance do modelo treinado no conjunto de teste. São calculadas métricas como acurácia, precisão, recall e F1-score, além da exibição de um relatório de classificação detalhado por classe.

In [None]:
def validate(model, X_test, y_test):
    y_pred_probs = model.predict(X_test)
    y_pred = np.argmax(y_pred_probs, axis=1)

    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, average='macro')
    rec = recall_score(y_test, y_pred, average='macro')
    f1 = f1_score(y_test, y_pred, average='macro')

    print(f"\n📊 Desempenho no conjunto de teste:")
    print(f"Acurácia: {acc:.4f}")
    print(f"Precisão (macro): {prec:.4f}")
    print(f"Recall (macro): {rec:.4f}")
    print(f"F1-score (macro): {f1:.4f}")

    print("\nRelatório por classe:")
    print(classification_report(y_test, y_pred, digits=4))

model = load_model('melhor_modelo.keras')
validate(model, X_test, y_test)