# PARTE 0 | Importes Necessários

In [1]:
import kagglehub
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, PowerTransformer

# PARTE 1 | Conjunto de Dados

In [2]:
# 1. Carregar Dados do Kaggle
def carregarDados(caminhoData: str) -> pd.DataFrame:
    """
    Carrega e unifica os arquivos CSV do dataset de HRV via 'uuid'.
    """

    dataframesBrutos = {}
    try:
        arquivosCsv = [f for f in os.listdir(caminhoData) if f.endswith('.csv')]
        for arquivo in arquivosCsv:
            nomeChave = arquivo.replace('.csv', '')
            dataframesBrutos[nomeChave] = pd.read_csv(os.path.join(caminhoData, arquivo))

        listaChaves = list(dataframesBrutos.keys())
        dfUnificado = dataframesBrutos[listaChaves[0]]
        for chave in listaChaves[1:]:
            dfUnificado = pd.merge(dfUnificado, dataframesBrutos[chave], on='uuid', how='inner')

        print(f"Dataset carregado! E unificado:\n    {dfUnificado.shape}")
        return dfUnificado
    except Exception as e:
        print(f"Erro no carregamento: {e}")
        return None



# 2. Seleção e Filtragem dos Dados
def selecionarPreditoresETarget(dfEntrada: pd.DataFrame):
    """
    Isola os preditores, remove classes intermediárias e elimina variáveis
    com multicolinearidade extrema (correlação > 0.95).
    """

    # 1. Filtragem: Focar nos extremos (No Stress vs Time Pressure)
    # Removemos 'interruption' para limpar a fronteira de decisão e
    # realizar uma classificação binária, apenas
    dfLimpo = dfEntrada[dfEntrada['condition'] != 'interruption'].copy()
    dfLimpo = dfLimpo.reset_index(drop=True)

    # 2. Mapeamento Binário
    mapeamentoClasses = {'no stress': 0, 'time pressure': 1}
    y = dfLimpo['condition'].map(mapeamentoClasses).values

    # 3. Identificação Automática de Preditores Numéricos
    # Removemos colunas não-preditoras e o próprio alvo
    colunasExcluir = ['uuid', 'datasetId', 'condition', 'target']
    xBruto = dfLimpo.drop(columns=[c for c in colunasExcluir if c in dfLimpo.columns])
    xBruto = xBruto.select_dtypes(include=[np.number]) # Garante apenas números

    # 4. Filtro de Multicolinearidade (Estratégia Inteligente)
    # Calculamos a matriz de correlação absoluta
    matrizCorr = xBruto.corr().abs()

    # Selecionamos o triângulo superior da matriz para identificar pares
    superior = matrizCorr.where(np.triu(np.ones(matrizCorr.shape), k=1).astype(bool))

    # Identificamos colunas com correlação acima de 0.95
    colunasParaRemover = [coluna for coluna in superior.columns if any(superior[coluna] > 0.95)]

    x = xBruto.drop(columns=colunasParaRemover)

    print(f"Filtragem concluída.\n  Preditores finais: {x.shape[1]} (Removidas {len(colunasParaRemover)} redundantes).")

    return x, y

# 2.1 Equilíbrio de Amostras (Opcional)
def aplicarSubamostragem(xDados, yAlvo):
    """
    Realiza o Undersampling aleatório da classe majoritária para equilibrar o dataset.
    Retorna X e Y balanceados (proporção 50/50).
    """

    # Identificamos os índices de cada classe
    indicesClasse0 = np.where(yAlvo == 0)[0]
    indicesClasse1 = np.where(yAlvo == 1)[0]

    # Determinamos o tamanho da menor classe (geralmente Estresse)
    n_minoria = len(indicesClasse1)

    # Sorteamos aleatoriamente a mesma quantidade na classe majoritária
    np.random.seed(27) # Para reprodutibilidade
    indicesClasse0_Reduzidos = np.random.choice(indicesClasse0, n_minoria, replace=False)

    # Combinamos os índices e extraímos os dados
    indicesFinais = np.concatenate([indicesClasse0_Reduzidos, indicesClasse1])
    np.random.shuffle(indicesFinais) # Mistura os dados

    xBalanceado = xDados.iloc[indicesFinais] if isinstance(xDados, pd.DataFrame) else xDados[indicesFinais]
    yBalanceado = yAlvo[indicesFinais]

    print(f"Subamostragem concluída: {len(yBalanceado)} amostras totais (50/50).")

    return xBalanceado, yBalanceado


# 3. Divisão dos Conjuntos de Dados
def dividirDados(xDados, yAlvo, proporcaoTeste=0.2):
    """
    Apenas separa os dados em conjuntos de treino e teste de forma estratificada.
    """

    xTreino, xTeste, yTreino, yTeste = train_test_split(
        xDados, yAlvo, test_size=proporcaoTeste, random_state=42, stratify=yAlvo
    )
    print(f"Divisão dos conjuntos concluída (Teste: {proporcaoTeste*100}%).")
    return xTreino, xTeste, yTreino, yTeste


# 4. Transformação e normalização dos dados
def aplicarTransformacoes(xTreino, xTeste):
    """
    Aplica transformações para aproximar a distribuição normal e padroniza as escalas.
    Utiliza Yeo-Johnson para corrigir assimetria e StandardScaler para Z-score.
    """

    # 1. Transformação de Potência (Para aproximar da normalidade multivariada)
    # O método Yeo-Johnson é ideal para corrigir a assimetria (skewness) das features de HRV!!
    powerTrans = PowerTransformer(method='yeo-johnson')

    # 2. Padronização (Média 0, Variância 1)
    scaler = StandardScaler()

    # Aplicamos o pipeline de transformação
    # Nota: fit() apenas no treino para evitar vazamento de informação!
    xTreinoTrans = powerTrans.fit_transform(xTreino)
    xTreinoFinal = scaler.fit_transform(xTreinoTrans)

    xTesteTrans = powerTrans.transform(xTeste)
    xTesteFinal = scaler.transform(xTesteTrans)

    print("Transformação Yeo-Johnson e padronização z-score aplicadas.")
    return xTreinoFinal, xTesteFinal

In [4]:
# >> Pipeline de Execução

# 1. Download e definição de caminhos
path = kagglehub.dataset_download("vinayakshanawad/heart-rate-prediction-to-monitor-stress-level")
caminhoDados = os.path.join(path, 'Train Data', 'Train Data Zip')

Downloading from https://www.kaggle.com/api/v1/datasets/download/vinayakshanawad/heart-rate-prediction-to-monitor-stress-level?dataset_version_number=1...


100%|██████████| 140M/140M [00:00<00:00, 154MB/s]

Extracting files...





In [5]:
# 2. Ingestão e Unificação
dfHrv = carregarDados(caminhoDados)


Dataset carregado! E unificado:
    (369289, 37)


In [6]:
if dfHrv is not None:
    # 3. Seleção de Features e Mapeamento do Target (Binário)
    # Aqui a função retorna os dados brutos ainda, apenas selecionados
    xBruto, yAlvo = selecionarPreditoresETarget(dfHrv)

    # 3.1 Aplicação da Subamostragem
    xBalanceado, yBalanceado = aplicarSubamostragem(xBruto, yAlvo)

    # 4. Divisão do Conjunto de Dados (Estratificada)
    # Fundamental dividir ANTES de qualquer transformação para evitar Data Leakage!!!
    xTreinoBruto, xTesteBruto, yTreino, yTeste = dividirDados(xBalanceado, yBalanceado, proporcaoTeste=0.2)

    # 5. Transformação (Normalidade via Yeo-Johnson + Padronização Z-score)
    # Agora aplicamos a matemática para atender às premissas do LDA
    xTreino, xTeste = aplicarTransformacoes(xTreinoBruto, xTesteBruto)

    print("\nPipeline concluído com sucesso!")

Filtragem concluída.
  Preditores finais: 23 (Removidas 11 redundantes).
Subamostragem concluída: 128114 amostras totais (50/50).
Divisão dos conjuntos concluída (Teste: 20.0%).
Transformação Yeo-Johnson e padronização z-score aplicadas.

Pipeline concluído com sucesso!


# PARTE 2 | Regressão Logística

In [13]:
class MeuClassificadorLogistico:
    """
    Implementação manual de Regressão Logística Binária.
    Baseada no método de Máxima Verossimilhança e Gradiente Descendente.
    """

    def __init__(self, taxaAprendizado=0.1, iteracoes=5000):
        self.taxaAprendizado = taxaAprendizado
        self.iteracoes = iteracoes
        self.weights = None  # Coeficientes Beta (incluindo intercepto)

    def _sigmoid(self, z):
        """Função logística descrita nos slides: 1 / (1 + e^-z)"""
        return 1 / (1 + np.exp(-z))

    def treinarModelo(self, xTreino, yTreino):
        """
        Encontra os melhores coeficientes Beta minimizando a Log-Loss
        (equivalente a maximizar a Verossimilhança).
        """
        nAmostras, nPreditores = xTreino.shape

        # Adiciona uma coluna de 1s para o Intercepto (Beta 0)
        X = np.hstack([np.ones((nAmostras, 1)), xTreino])

        # Inicializa pesos com zero
        self.weights = np.zeros(nPreditores + 1)

        # Gradiente Descendente
        for _ in range(self.iteracoes):
            # Modelo linear: z = beta0 + beta1*X1 + ...
            modelo_linear = np.dot(X, self.weights)
            # Predição probabilística (p(X))
            previsoes = self._sigmoid(modelo_linear)

            # Cálculo do erro e do gradiente (derivada da verossimilhança)
            erro = previsoes - yTreino
            gradiente = np.dot(X.T, erro) / nAmostras

            # Atualização dos pesos: beta = beta - learning_rate * gradiente
            self.weights -= self.taxaAprendizado * gradiente

        print(f"Treino Logístico concluído.\n  Betas: {self.weights[:3]}...")

    def predizerProbabilidade(self, xTeste):
        """Retorna a probabilidade P(Y=1|X)"""
        X = np.hstack([np.ones((xTeste.shape[0], 1)), xTeste])
        return self._sigmoid(np.dot(X, self.weights))

    def predizer(self, xTeste, threshold=0.5):
        """Classifica como 1 se p(X) > 0.5, senão 0"""
        probabilidades = self.predizerProbabilidade(xTeste)
        return np.array([1 if p >= threshold else 0 for p in probabilidades])

In [14]:
# 1. Instanciar o modelo manual
modeloLogistico = MeuClassificadorLogistico()

# 2. Treinar com os dados transformados
modeloLogistico.treinarModelo(xTreino, yTreino)

# 3. Realizar as predições no conjunto de treino e teste
yPreditoTreino = modeloLogistico.predizer(xTreino)
yPreditoTeste = modeloLogistico.predizer(xTeste)

Treino Logístico concluído.
  Betas: [ 0.02999352  0.14667867 -1.27874175]...


# EXTRA | Avaliação de Desempenho

In [15]:
def calcularMetricas(yReal, yPrevisto):
    """
    Calcula métricas de desempenho para classificação binária e exibe
    um relatório estruturado no console.
    """

    # Garante que os dados sejam arrays do numpy para as operações lógicas
    yReal = np.array(yReal)
    yPrevisto = np.array(yPrevisto)

    # 1. Cálculos de Base (Matriz de Confusão)
    tp = np.sum((yReal == 1) & (yPrevisto == 1))
    tn = np.sum((yReal == 0) & (yPrevisto == 0))
    fp = np.sum((yReal == 0) & (yPrevisto == 1))
    fn = np.sum((yReal == 1) & (yPrevisto == 0))

    # 2. Cálculo das Métricas Derivadas
    acuracia = (tp + tn) / (tp + tn + fp + fn)

    # Sensibilidade (Recall): Capacidade de detectar a classe 1 (Estresse)
    sensibilidade = tp / (tp + fn) if (tp + fn) > 0 else 0

    # Especificidade: Capacidade de detectar a classe 0 (Calma)
    especificidade = tn / (tn + fp) if (tn + fp) > 0 else 0

    # Precisão: Quando o modelo diz que é estresse, qual a chance de ser verdade?
    precisao = tp / (tp + fp) if (tp + fp) > 0 else 0

    # F1-Score: Média harmônica entre precisão e sensibilidade
    f1 = 2 * (precisao * sensibilidade) / (precisao + sensibilidade) if (precisao + sensibilidade) > 0 else 0

    # 3. Prints Organizados e Estruturados)
    print("Relatório de Desempenho")
    print("="*45)

    print(f"{'Métrica':<25} | {'Valor':<15}")
    print("-" * 45)
    print(f"{'Acurácia Global':<25} | {acuracia:.2%}")
    print(f"{'Sensibilidade (Recall)':<25} | {sensibilidade:.2%}")
    print(f"{'Especificidade':<25} | {especificidade:.2%}")
    print(f"{'Precisão':<25} | {precisao:.2%}")
    print(f"{'F1-Score':<25} | {f1:.4f}")

    print("\n\n")
    print("Matriz de Confusão")
    print("-" * 45)
    print(f"{'':>18} | {'Previsto: 0':<12} | {'Previsto: 1':<12}")
    print(f"{'Real: 0 (Calma)':>18} | {tn:<12} | {fp:<12}")
    print(f"{'Real: 1 (Estresse)':>18} | {fn:<12} | {tp:<12}")
    print("="*45 + "\n")

    matrizConfusao = np.array([[tn, fp], [fn, tp]])
    return acuracia, matrizConfusao

In [16]:
# 4. Calcular métricas finais no teste
print("Treino")
acuraciaGeralTreino, matrizConfusaoTreino = calcularMetricas(yTreino, yPreditoTreino)

Treino
Relatório de Desempenho
Métrica                   | Valor          
---------------------------------------------
Acurácia Global           | 75.09%
Sensibilidade (Recall)    | 76.84%
Especificidade            | 73.35%
Precisão                  | 74.25%
F1-Score                  | 0.7552



Matriz de Confusão
---------------------------------------------
                   | Previsto: 0  | Previsto: 1 
   Real: 0 (Calma) | 37590        | 13655       
Real: 1 (Estresse) | 11871        | 39375       



In [17]:
# 4. Calcular métricas finais no teste
print("Teste")
acuraciaGeralTeste, matrizConfusaoTeste = calcularMetricas(yTeste, yPreditoTeste)

Teste
Relatório de Desempenho
Métrica                   | Valor          
---------------------------------------------
Acurácia Global           | 75.28%
Sensibilidade (Recall)    | 76.86%
Especificidade            | 73.70%
Precisão                  | 74.50%
F1-Score                  | 0.7566



Matriz de Confusão
---------------------------------------------
                   | Previsto: 0  | Previsto: 1 
   Real: 0 (Calma) | 9442         | 3370        
Real: 1 (Estresse) | 2965         | 9846        

