In [7]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import seaborn as sns
import os
from typing import List
from matrices import confusion_matrix, plot_confusion_matrix
from metrics import calculate_metrics, update_results, build_summary, plot_leaning_curves

In [16]:
def load_csv_data(filepath: str, columns: List[str], transpose: bool, sep: str = ',') -> pd.DataFrame:
    """
    Carrega o dataset EMG e organiza as colunas.

    Args:
        filepath (str): Caminho para o arquivo.
        columns (List[str]): Lista com os nomes das colunas.
        transpose (bool): Transpor o DataFrame.
        sep (str): Separador dos dados.

    Returns:
        pd.DataFrame: DataFrame com dados dos sensores e classes.
    """
    if transpose:
        df: pd.DataFrame = pd.read_csv(filepath, header=None, sep=sep).T
    else:
        df: pd.DataFrame = pd.read_csv(filepath, sep=sep)
    
    df.columns = columns
    
    return df

In [17]:
columns = ["x", "y", "spiral"]

df: pd.DataFrame = load_csv_data(filepath = "../../resources/spiral.csv", columns = columns, transpose = False)

print(df.head())

          x         y  spiral
0  15.07298  -1.56346     1.0
1 -15.43986   0.16502    -1.0
2  -9.26071  12.24981    -1.0
3   7.59201   7.56913    -1.0
4  -2.37130 -10.69521     1.0


In [18]:
def plot_data(df: pd.DataFrame) -> None:
    plt.figure(figsize=(8, 6))

    plt.scatter(df[df['spiral'] == 1.0]['x'], 
                df[df['spiral'] == 1.0]['y'], 
                color='blue', label='Classe 1.0', alpha=0.7)

    plt.scatter(df[df['spiral'] == -1.0]['x'], 
                df[df['spiral'] == -1.0]['y'], 
                color='red', label='Classe -1.0', alpha=0.7)

    plt.title('Gráfico de Dispersão (Espalhamento)', fontsize=16)
    plt.xlabel('X', fontsize=14)
    plt.ylabel('Y', fontsize=14)

    plt.legend()

    plt.grid(alpha=0.3)
    plt.show()

In [19]:
def plot_training_data(data_frame: pd.DataFrame) -> None:
    """Plota os dados de treinamento."""
    plt.scatter(data_frame[data_frame['spiral'] == 1.0]['x'],
                data_frame[data_frame['spiral'] == 1.0]['y'], 
                color='blue', label='Classe 1.0', alpha=0.7)
    plt.scatter(data_frame[data_frame['spiral'] == -1.0]['x'], 
                data_frame[data_frame['spiral'] == -1.0]['y'], 
                color='red', label='Classe -1.0', alpha=0.7)
    plt.title('Treinamento do Adaline - Linha de Decisão', fontsize=16)
    plt.xlabel('X', fontsize=14)
    plt.ylabel('Y', fontsize=14)
    plt.legend()
    plt.grid(alpha=0.3)
    plt.xlim(-20, 20)
    plt.ylim(-20, 20)

In [20]:
def update_decision_boundary(W: np.ndarray, x_axis: np.ndarray) -> None:
    """Atualiza a linha de decisão no gráfico."""
    if W[2, 0] != 0:
        x2 = -W[1, 0] / W[2, 0] * x_axis + W[0, 0] / W[2, 0]
        x2 = np.nan_to_num(x2)
        plt.plot(x_axis, x2, color='orange', alpha=0.1)
        plt.pause(0.1)

In [21]:
def plot_final_decision_boundary(W: np.ndarray, x_axis: np.ndarray) -> None:
    """Plota a linha de decisão final."""
    if W[2, 0] != 0:
        x2 = -W[1, 0] / W[2, 0] * x_axis + W[0, 0] / W[2, 0]
        x2 = np.nan_to_num(x2)
        plt.plot(x_axis, x2, color='green', linewidth=2)
    plt.show()

In [22]:
def prepare_data(
    df: pd.DataFrame, 
    input_columns: List[str], 
    target_column: str,
    transpose: bool = False,
    normalize: bool = False,
    test_size: float = 0.2, 
    random_state: int = 42,
) -> dict:
    """
    Prepara o conjunto de dados para redes neurais, organizando entradas, saídas e divisões.

    Args:
        df (pd.DataFrame): DataFrame contendo o conjunto de dados.
        input_columns (list[str]): Lista com os nomes das colunas de entrada.
        target_column (str): Nome da coluna de saída (rótulos).
        transpose (bool): Transpor o DataFrame. Default é False.
        test_size (float): Proporção do conjunto de teste (0 a 1). Default é 0.2 (20%).
        random_state (int): Semente para reprodutibilidade da divisão. Default é 42.

    Returns:
        dict: Um dicionário contendo os conjuntos organizados:
            - 'X_train': Entradas para treinamento.
            - 'X_test': Entradas para teste.
            - 'Y_train': Rótulos para treinamento.
            - 'Y_test': Rótulos para teste.
    """
    X = df[input_columns].values
    Y = df[target_column].values
    
    if transpose:
        X = X.T
        Y = Y.T
        
        p, N = X.shape
        X = np.concatenate((
            -np.ones((1, N)),
            X
        ))
        
    else:
        N, p = X.shape
        X = np.concatenate((
            -np.ones((N, 1)),
            X
        ), axis = 1)
    
    if normalize:
        X = 2 * (X - X.min()) / (X.max() - X.min()) - 1

    return {
        'X': X,
        'Y': Y,
    }

In [24]:
data = prepare_data(
    df=df, 
    input_columns=["x", "y"], 
    target_column="spiral",
    transpose=True,
    test_size=0.2, 
    random_state=42
)

print(data['X'].shape)
print(data['Y'].shape)

(3, 1999)
(1999,)


In [25]:
def compute_root_mean_square_error(
    X_train: np.ndarray,
    Y_train: np.ndarray,
    w: np.ndarray
) -> float:
    """
    Calcula o erro quadrático médio.

    Args:
        X_train (np.ndarray): Entradas para treinamento.
        Y_train (np.ndarray): Rótulos para treinamento.
        w (np.ndarray): Vetor de pesos.

    Returns:
        float: Erro quadrático médio.
    """
    p_1, N = X_train.shape
    square_error = 0

    for t in range(N):
        x_t = X_train[:, t].reshape(p_1, 1)
        u_t = (w.T @ x_t)[0, 0]
        d_t = Y_train[0, t]
        square_error += (d_t - u_t)**2

    return square_error / (2 * N)

In [26]:
def activation_function(
    u: np.ndarray, 
    logistic: bool = True, 
    hyperbolic: bool = False
) -> np.ndarray:
    """
    Função de ativação para a rede MLP. Pode ser logística ou hiperbólica.
    
    Args:
        u (np.ndarray): Vetor de entradas.
        logistic (bool): Função logística. Default é True.
        hyperbolic (bool): Função hiperbólica. Default é False.
    
    Returns:
        np.ndarray: Vetor de saídas.
    """
    
    if np.min(u) == np.max(u):
        raise ValueError("Input array 'u' must have more than one unique value.")
    
    if logistic:
        return (u - np.min(u)) / (np.max(u) - np.min(u))
    
    if hyperbolic:
        return 2 * ((u - np.min(u)) / (np.max(u) - np.min(u))) - 1
    
    raise ValueError("Either 'logistic' or 'hyperbolic' must be True.")

In [27]:
def activation_derivate(
    u: np.ndarray, 
    logistic: bool = True, 
    hyperbolic: bool = False
) -> np.ndarray:
    """
    Derivada da função de ativação para a rede MLP. Pode ser logística ou hiperbólica.
    
    Args:
        u (np.ndarray): Vetor de entradas.
        logistic (bool): Função logística. Default é True.
        hyperbolic (bool): Função hiperbólica. Default é False.
    
    Returns:
        np.ndarray: Vetor de saídas.
    """
    
    if np.min(u) == np.max(u):
        raise ValueError("Input array 'u' must have more than one unique value.")
    
    if logistic:
        return u * (1 - u)
    
    if hyperbolic:
        return 1 - u**2
    
    raise ValueError("Either 'logistic' or 'hyperbolic' must be True.")

In [None]:
def mlp_train(
    data: np.ndarray,
    labels: np.ndarray,
    hidden_units: int,
    last_layer_units: int = 1,
    learning_rate: float = 0.01,
    epochs: int = 100,
    tolerance: float = 1e-3,
    patience: int = 10,
    transpose: bool = False
):
    """
    Treina uma rede MLP.
    
    Args:
        data (np.ndarray): Dados de entrada.
        labels (np.ndarray): Rótulos.
        hidden_units (int): Número de neurônios na camada oculta.
        last_layer_units (int): Número de neurônios na última camada. Default é 1.
        learning_rate (float): Taxa de aprendizado. Default é 0.01.
        epochs (int): Número máximo de épocas. Default é 100.
        tolerance (float): Tolerância para convergência. Default é 1e-3.
        patience (int): Paciência para early stopping. Default é 10.
        transpose (bool): Transpor o conjunto de dados. Default é False.
    
    Returns:
        tuple: Tupla contendo os pesos treinados e o histórico de erro.
    """
    
    if transpose:
        data = data.T
        labels = labels.T
    
    N, p = data.shape
    
    layers = [N] + hidden_units + [last_layer_units]
    
    weights = [np.random.randn(layers[i + 1], layers[i] + 1) * np.sqrt(2 / layers[i]) for i in range(len(layers) - 1)]
    
    mse_history = []
    no_improvement = 0
    
    for epoch in range(epochs):
        # Forward pass
        activations = [data]
        z_s = []
        
        for w in range(len(weights)):
            a_with_bias = np.vstack([np.ones((1, activations[-1].shape[1])), activations[-1]])
            z = np.dot(w, a_with_bias)
            z_s.append(z)
            a = activation_function(u=z, logistic=False, hyperbolic=True)
            activations.append(a)

        y_pred = activations[-1]
        mse = np.mean(np.sum((y_pred - labels) ** 2, axis=1))
        mse_history.append(mse)
        
        if epoch > 0 and mse >= mse_history[-2]:
            no_improvement += 1
            if no_improvement >= patience:
                print(f"Early stopping at epoch {epoch}.")
                break
        else:
            no_improvement = 0
        
        if epoch > 0 and abs(mse_history[-1] - mse_history[-2]) < tolerance:
            print(f"Converged at epoch {epoch}.")
            break
        
        # Backpropagation
        deltas = [None] * len(weights)
        deltas[-1] = (y_pred - labels) * activation_derivate(u=y_pred, logistic=False, hyperbolic=True)
        
        for l in range(len(weights) - 2, -1, -1):
            a_with_bias = np.vstack([np.ones((1, activations[l + 1].shape[1])), activations[l + 1]])
            deltas[l] = np.dot(weights[l + 1][:, 1:].T, deltas[l + 1]) * activation_derivate(u=activations[l + 1], logistic=False, hyperbolic=True)
        
        for l in range(len(weights)):
            a_with_bias = np.vstack([np.ones((1, activations[l].shape[1])), activations[l]])
            weights[l] -= learning_rate * np.dot(deltas[l], a_with_bias.T) / p
    
    return weights, mse_history

In [29]:
def mlp_predict(
    data: np.ndarray,
    weights: List[np.ndarray]
):
    activations = data
    
    for w in range(len(weights)):
        a_with_bias = np.vstack([np.ones((1, activations.shape[1])), activations])
        z = np.dot(w, a_with_bias)
        activations = activation_function(u=z, logistic=False, hyperbolic=True)
    
    final_output = activations
    predictions = np.argmax(final_output, axis=1)
    
    return predictions