In [None]:
import os
import pandas as pd
import kagglehub



In [None]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import ks_2samp
from scipy.signal import savgol_filter
from sklearn.preprocessing import RobustScaler, StandardScaler




In [2]:
import os
import pandas as pd
import kagglehub


class Dataset:
    """Classe para gerenciar coleta e carregamento de dados de IMU de braço robótico"""

    def __init__(self, dataset_name='hkayan/industrial-robotic-arm-imu-data-casper-1-and-2'):
        """
        Inicializa o Dataset

        Args:
            dataset_name: Nome do dataset no Kaggle
        """
        self.dataset_name = dataset_name
        self.dataset_path = self._obter_caminho_dataset()
        self.df_normal = None
        self.df_faulty = None
        self.lista_dfs_anomaly = None
        self.df_combined = None
        self._carregar_dados()
        self._combinar_dados()

    def _obter_caminho_dataset(self):
        """Obtém o caminho do dataset (local ou download)"""
        caminho = os.environ.get("DATASET_PATH")
        if not caminho:
            caminho = kagglehub.dataset_download(self.dataset_name) + '/'
        return caminho

    def listar_arquivos(self, caminho='/kaggle/input'):
        """Lista todos os arquivos disponíveis no diretório"""
        arquivos = []
        for dirname, _, filenames in os.walk(caminho):
            for filename in filenames:
                arquivo_completo = os.path.join(dirname, filename)
                print(arquivo_completo)
                arquivos.append(arquivo_completo)
        return arquivos

    def _carregar_dados(self, arquivo_normal='IMU_10Hz.csv'):
        """
        Carrega os dados normais e todos os tipos de falha disponíveis

        Args:
            arquivo_normal: Nome do arquivo com dados normais

        Returns:
            tuple: (df_normal, df_faulty, lista_dfs_anomaly)
        """
        print("--- CARREGAMENTO MANUAL DE CENÁRIOS ---")

        # 1. Carregar o NORMAL (df)
        # ------------------------------------------------------------------
        print("Lendo Base Normal...")
        self.df_normal = pd.read_csv(self.dataset_path + arquivo_normal)
        self.df_normal['label'] = 0
        self.df_normal['scenario'] = 'Normal'

        # 2. Carregar AS ANOMALIAS (faultydf)
        # ------------------------------------------------------------------
        print("Lendo Base de Falhas...")

        # Lista simples e direta com TODOS os arquivos de problema disponíveis
        arquivos_falha = [
            'IMU_hitting_platform.csv',   # Colisão: Plataforma
            'IMU_hitting_arm.csv',        # Colisão: Braço (Robô se batendo)
            'IMU_extra_weigth.csv',       # Mecânico: Peso Extra (Esforço)
            'IMU_earthquake.csv',         # Ambiental: Terremoto (Vibração externa)
        ]

        lista_dfs = []

        for arquivo in arquivos_falha:
            # Carrega cada um individualmente
            temp_df = pd.read_csv(self.dataset_path + arquivo)

            # Padroniza
            temp_df['label'] = 1             # Todo mundo aqui é erro
            temp_df['scenario'] = arquivo    # Guarda o nome pra você saber o que é

            lista_dfs.append(temp_df)
            print(f"-> Adicionado: {arquivo} ({len(temp_df)} linhas)")

        self.lista_dfs_anomaly = lista_dfs

        df_train, df_val, df_test = self.split_train_val_test()

        # Junta todos os arquivos da lista em um só DataFrame
        self.df_faulty = df_val[df_val['label'] == 1].copy()
        self.lista_dfs_anomaly = [df for scenario, df in self.df_faulty.groupby('scenario')]

        print("="*60)
        print(f"DATASET PRONTO:")
        print(f"-> Dados Normais: {len(self.df_normal)} linhas")
        print(f"-> Dados de Falha:  {len(self.df_faulty)} linhas (Total de 4 tipos de defeito)")

        return self.df_normal, self.df_faulty, self.lista_dfs_anomaly

    def _combinar_dados(self):
        """
        Combina os datasets normal e faulty em um único DataFrame

        Returns:
            pd.DataFrame: Dataset combinado
        """
        if self.df_normal is None or self.df_faulty is None:
            raise ValueError("Carregue os dados primeiro usando _carregar_dados()")

        self.df_combined = pd.concat([self.df_normal, self.df_faulty],
                                      ignore_index=True)
        return self.df_combined

    def obter_info(self):
        """Retorna informações sobre os datasets carregados"""
        info = {}
        if self.df_normal is not None:
            info['normal'] = {
                'shape': self.df_normal.shape,
                'colunas': list(self.df_normal.columns)
            }
        if self.df_faulty is not None:
            info['faulty'] = {
                'shape': self.df_faulty.shape,
                'colunas': list(self.df_faulty.columns)
            }
        if self.df_combined is not None:
            info['combined'] = {
                'shape': self.df_combined.shape,
                'distribuicao_labels': self.df_combined['label'].value_counts().to_dict()
            }
        return info

    @staticmethod
    def split_sequencial(df, p_train=0.7, p_val=0.1, p_test=0.2):
        """Corta um DataFrame em 3 pedaços sequenciais baseados nas porcentagens."""
        size = len(df)
        end_train = int(size * p_train)
        end_val = int(size * (p_train + p_val))

        train = df.iloc[:end_train].copy()
        val = df.iloc[end_train:end_val].copy()
        test = df.iloc[end_val:].copy()

        return train, val, test

    def split_train_val_test(self):
        norm_train, norm_val, norm_test = self.split_sequencial(self.df_normal)
        # Listas para acumular os pedaços (começamos com o normal)
        final_train_list = [norm_train]
        final_val_list   = [norm_val]
        final_test_list  = [norm_test]

        print(f"1. Normal processado: {len(self.df_normal)} linhas divididas.")

        for df_falhas in self.lista_dfs_anomaly:
            f_train, f_val, f_test = self.split_sequencial(df_falhas,0,0.5,0.5)
            print(f"Falha {df_falhas.iloc[0]['scenario']} processado" )
            final_val_list.append(f_val)
            final_test_list.append(f_test)

        df_train_final = pd.concat(final_train_list, ignore_index=True)
        df_val_final = pd.concat(final_val_list, ignore_index=True)
        df_test_final = pd.concat(final_test_list, ignore_index=True)

        return df_train_final, df_val_final, df_test_final




In [3]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import ks_2samp
from scipy.signal import savgol_filter
from sklearn.preprocessing import RobustScaler, StandardScaler

class EDA:
    """Classe para análise exploratória de dados de IMU de braço robótico"""

    def __init__(self, dataset_name='hkayan/industrial-robotic-arm-imu-data-casper-1-and-2'):
        """
        Inicializa a classe EDA com um dataset

        Args:
            dataset_name: Nome do dataset no Kaggle
        """
        self.dataset = Dataset(dataset_name)
        self.df_normal = None
        self.df_faulty = None
        self.df_normal_resampled = None
        self.df_faulty_resampled = None
        self.df_normal_robust_scaled = None
        self.df_faulty_robust_scaled = None
        self.df_normal_standard_scaled = None
        self.df_faulty_standard_scaled = None
        self.scaler_robust = None
        self.scaler_standard = None
        self.lista_dfs_anomaly = []
        self.dict_scenarios = {}
        self.faultydfs_resampled = {}

    def carregar_e_preparar_dados(self, arquivo_normal='IMU_10Hz.csv'):
        """
        Carrega e prepara os dados para análise.
        Carrega dados normais e todos os 4 tipos de anomalias disponíveis.

        Args:
            arquivo_normal: Nome do arquivo com dados normais

        Returns:
            tuple: (df_normal, df_faulty)
        """
        # Os dados já foram carregados no __init__ do Dataset
        # Apenas referencia os DataFrames já existentes
        self.df_normal = self.dataset.df_normal
        self.df_faulty = self.dataset.df_faulty # não será usado agora, ele será o concat do resampling de cada df_anomaly
        self.lista_dfs_anomaly = self.dataset.lista_dfs_anomaly
        self.dict_scenarios = {df['scenario'].iloc[0] : df for df in self.lista_dfs_anomaly}

        # Conversões e limpezas iniciais
        print("\nConvertendo `time` de nanossegundos para milissegundos")
        self.df_normal['time'] = self.df_normal['time'].map(lambda x: x/1e6)
        self.df_faulty['time'] = self.df_faulty['time'].map(lambda x: x/1e6)

        print("\nRemovendo a coluna `name` e `scenario` em df_normal pois não agrega valor preditivo")
        cols_to_drop = ['name', 'scenario']
        self.df_normal = self.df_normal.drop(columns=cols_to_drop)

        for scenario_name in self.dict_scenarios:
            self.dict_scenarios[scenario_name] = self.dict_scenarios[scenario_name].drop(columns=cols_to_drop)
            self.dict_scenarios[scenario_name]['time'] = (self.dict_scenarios[scenario_name]['time'].map(lambda x: x/1e6))

        print("\nIdentificando duplicatas")
        print(f"[Normal] Duplicatas de Tempo: {self.df_normal['time'].duplicated().any()}")
        for scenario_name in self.dict_scenarios:
            print(f"[{scenario_name}] duplicatas de tempo: {self.dict_scenarios[scenario_name]['time'].duplicated().any()}")

        return self.df_normal, self.df_faulty

    def executar_analise_completa(self):
        """Executa a análise exploratória completa"""
        if self.df_normal is None or self.dict_scenarios is {}:
            raise ValueError("Carregue os dados primeiro usando carregar_e_preparar_dados()")

        print("""## Análise Exploratória Estrutural [Classe Normal]
        - Informações básicas do dataset
        - Tipos de dados
        - Informações detalhadas
        - Estatísticas descritivas
        - Análise de valores únicos""")

        # Análise estrutural
        self.analise_estrutural_sensores(self.df_normal)

        # Visualizações
        sensores = ['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ', 'magX', 'magY', 'magZ']
        print("\n[NORMAL] Visualizando os Acelerômetros:")
        for sensor in sensores[0:3]:
            self.plot_sensor(self.df_normal, col_sensor=sensor)
        print("\n[NORMAL] Visualizando os Giroscópios:")
        for sensor in sensores[3:6]:
            self.plot_sensor(self.df_normal, col_sensor=sensor)
        print("\n[NORMAL] Visualizando os Magnetômetros:")
        for sensor in sensores[6:9]:
            self.plot_sensor(self.df_normal, col_sensor=sensor)

        print("""## Análise Exploratória Estrutural [Classe Anômala]
        - Informações básicas do dataset
        - Tipos de dados
        - Informações detalhadas
        - Estatísticas descritivas
        - Análise de valores únicos""")

        print(self.df_faulty['scenario'].value_counts())

        print("\nAnalisando por tipo de anomalia")
        for scenario_name in self.dict_scenarios:
            print(f"\n-------Análise Estrutural dos Sensores em {scenario_name}-------")
            self.analise_estrutural_sensores(self.dict_scenarios[scenario_name])

        print("\nVisualizando `accZ` em diferentes anomalias")
        for scenario_name in self.dict_scenarios:
            print(f"\n-----------{scenario_name}-----------")
            self.plot_sensor(self.dict_scenarios[scenario_name], col_sensor='accZ')

        # Comparação Normal vs Falha
        self.comparar_normal_vs_falha(self.df_normal, self.dict_scenarios['IMU_hitting_arm.csv'], col_sensor='accZ', anomalia='IMU_hitting_arm.csv')
        self.comparar_normal_vs_falha(self.df_normal, self.dict_scenarios['IMU_earthquake.csv'], col_sensor='accZ', anomalia='IMU_earthquake.csv')
        self.comparar_normal_vs_falha(self.df_normal, self.dict_scenarios['IMU_hitting_arm.csv'], col_sensor='gyroY', anomalia='IMU_hitting_arm.csv')
        self.comparar_normal_vs_falha(self.df_normal, self.dict_scenarios['IMU_extra_weigth.csv'], col_sensor='accX', anomalia='IMU_extra_weigth.csv')
        self.comparar_normal_vs_falha(self.df_normal, self.dict_scenarios['IMU_extra_weigth.csv'], col_sensor='accY', anomalia='IMU_extra_weigth.csv')

        # Análise de valores faltantes
        print("""\n## Análise de Valores Faltantes e Outliers
        - Identificação de valores faltantes e outliers
        - Visualizações de apoio, caso necessário
        - Análise dos mecanismos\n""")

        print(f"Quantidade de valores faltantes em df_normal: {self.df_normal.isnull().sum().sum()}")
        for scenario_name in self.dict_scenarios:
            print(f"[{scenario_name}] quantidade de Valores faltantes: {self.dict_scenarios[scenario_name].isnull().sum().sum()}")
        print(f"\nNão serão removidos os outliers estatísticos pois são movimentos reais")

    def analise_estrutural_sensores(self, df, time_col='time', label_col='label'):
        """
        Realiza uma análise exploratória estrutural focada em dados de sensores (Séries Temporais).

        Args:
            df: DataFrame com os dados.
            time_col: Nome da coluna de tempo.
            label_col: Nome da coluna de target (anomalia/normal).
        """

        print("="*60)
        print("1. INFORMAÇÕES BÁSICAS E TIPOS DE DADOS")
        print("="*60)
        print(f"Dimensões do Dataset: {df.shape[0]} linhas, {df.shape[1]} colunas")
        print("\nTipos de Dados (Dtypes):")
        print(df.dtypes)

        # Verificação de memória
        memoria = df.memory_usage(deep=True).sum() / 1024**2
        print(f"\nUso de Memória: {memoria:.2f} MB")

        print("\n" + "="*60)
        print("2. ANÁLISE DE INTEGRIDADE TEMPORAL (CRÍTICO PARA IMU)")
        print("="*60)

        # Ordenar por tempo para garantir análise correta
        df = df.sort_values(by=time_col)

        # Calcular diferenças de tempo (Delta T)
        delta_t = df[time_col].diff().dropna()

        mean_dt = delta_t.mean()
        std_dt = delta_t.std()
        min_dt = delta_t.min()
        max_dt = delta_t.max()

        print(f"- Intervalo de Amostragem Médio (Sampling Rate): {mean_dt:.6f} ms")
        print(f"- Frequência de coleta de dados: {1/(df['time'].diff().mean()/1000):.2f} Hz")
        print(f"- Jitter (Desvio Padrão do tempo): {std_dt:.6f} ms")
        print(f"- Gap Mínimo: {min_dt:.6f} ms | Gap Máximo: {max_dt:.6f} ms")

        # Verificar se há gaps significativos (perda de pacotes)
        # Exemplo: Se o gap for maior que 2x a média, é uma quebra de continuidade
        gaps = (delta_t > 2 * mean_dt).sum()
        print(f"- Qtd. de Gaps Temporais Significativos (> 2x média): {gaps}")

        print("\n" + "="*60)
        print("3. ANÁLISE DE VALORES ÚNICOS E CONSTANTES (SENSOR FREEZE)")
        print("="*60)

        # Separa colunas de sensores (excluindo tempo e label)
        cols_sensores = [c for c in df.columns if c not in [time_col, label_col]]

        resumo_unicos = pd.DataFrame({
            'Tipo': df[cols_sensores].dtypes,
            'Qtd_Unicos': df[cols_sensores].nunique(),
            'Unicos (%)': (df[cols_sensores].nunique() / len(df)) * 100,
            'Qtd_Zeros': (df[cols_sensores] == 0).sum(),
            'Zeros (%)': ((df[cols_sensores] == 0).sum() / len(df)) * 100
        })

        print(resumo_unicos.sort_values('Qtd_Unicos'))

        # Alerta para colunas com baixíssima variabilidade (Sensor travado ou irrelevante)
        cols_travadas = resumo_unicos[resumo_unicos['Qtd_Unicos'] == 1].index.tolist()
        if cols_travadas:
            print(f"\n[ALERTA] Colunas com valor constante (irrelevantes): {cols_travadas}")
        else:
            print("\n[OK] Nenhuma coluna totalmente constante detectada.")

        print("\n" + "="*60)
        print("4. ESTATÍSTICAS DESCRITIVAS DETALHADAS (MOMENTOS)")
        print("="*60)
        # Inclui Skewness e Kurtosis que são vitais para detectar desvios de normalidade em sinais
        desc = df[cols_sensores].describe().T
        desc['skewness'] = df[cols_sensores].skew()
        desc['kurtosis'] = df[cols_sensores].kurt()

        print(desc[['mean', 'std', 'min', '50%', 'max', 'skewness', 'kurtosis']])

        print("\n" + "="*60)
        print("5. BALANCEAMENTO DAS CLASSES (TARGET)")
        print("="*60)
        if label_col in df.columns:
            contagem = df[label_col].value_counts()
            percentual = df[label_col].value_counts(normalize=True) * 100

            balanceamento = pd.DataFrame({'Contagem': contagem, 'Percentual (%)': percentual})
            print(balanceamento)

            ratio = contagem.max() / contagem.min() if len(contagem) > 1 else 0
            print(f"\nRazão de Desbalanceamento: 1 : {ratio:.1f}")
        else:
            print(f"Coluna de target '{label_col}' não encontrada.")

    def plot_sensor(self, df, col_sensor, col_time='time'):
        """
        Gera um painel triplo para diagnosticar o comportamento do sensor.
        1. Série Temporal (Visão Geral)
        2. Histograma (Verificar Curtose e Zeros)
        3. Boxplot (Verificar Outliers Extremos)
        """

        # Copia para não alterar o original
        df_plot = df.copy()

        # Converter tempo para segundos para ficar legível no eixo X
        df_plot['time_sec'] = (df_plot[col_time] - df_plot[col_time].iloc[0]) / 1e3

        # Configuração da Figura
        fig = plt.figure(figsize=(18, 10))
        gs = fig.add_gridspec(2, 2)

        # --- PLOT 1: SÉRIE TEMPORAL ---
        ax1 = fig.add_subplot(gs[0, :])
        ax1.plot(df_plot['time_sec'], df_plot[col_sensor], color='#1f77b4', linewidth=0.5, alpha=0.8)
        ax1.set_title(f'Série Temporal: {col_sensor}', fontsize=14, fontweight='bold')
        ax1.set_xlabel('Tempo (segundos)', fontsize=12)
        ax1.set_ylabel('Valor do Sensor', fontsize=12)
        ax1.grid(True, linestyle='--', alpha=0.5)

        # Destacar a linha do Zero (Ociosidade)
        ax1.axhline(0, color='red', linestyle='--', linewidth=1, alpha=0.7, label='Zero (Repouso)')
        ax1.legend()

        # --- PLOT 2: DISTRIBUIÇÃO (HISTOGRAMA + KDE) ---
        ax2 = fig.add_subplot(gs[1, 0])
        # Usamos escala logarítmica no Y devido a quantidade de zeros
        sns.histplot(data=df_plot, x=col_sensor, bins=100, kde=True, ax=ax2, color='#2ca02c')
        ax2.set_yscale('log') # Escala Log para ver as caudas pequenas e o pico gigante
        ax2.set_title(f'Distribuição (Escala Log): {col_sensor}', fontsize=14, fontweight='bold')
        ax2.set_xlabel('Valor do Sensor')
        ax2.set_ylabel('Frequência (Log)')

        # Anotação da Curtose
        kurt = df_plot[col_sensor].kurt()
        ax2.text(0.95, 0.95, f'Kurtosis: {kurt:.2f}', transform=ax2.transAxes,
                horizontalalignment='right', verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='white', alpha=0.5))

        # --- PLOT 3: BOXPLOT (DETECÇÃO DE OUTLIERS) ---
        ax3 = fig.add_subplot(gs[1, 1])
        sns.boxplot(x=df_plot[col_sensor], ax=ax3, color='#ff7f0e', fliersize=3)
        ax3.set_title(f'Boxplot de Outliers: {col_sensor}', fontsize=14, fontweight='bold')
        ax3.set_xlabel('Valor do Sensor')

        plt.tight_layout()
        plt.show()

    def comparar_normal_vs_falha(self, df_normal, df_falha, col_sensor, anomalia):
        """
        Plota comparativo visual entre operação Normal e Falha (Ataque/Colisão).
        """
        # 1. Ajuste de Tempo (reseta para começar do zero em ambos para facilitar visualização)
        t_norm = (df_normal['time'] - df_normal['time'].iloc[0]) / 1e3
        t_fail = (df_falha['time'] - df_falha['time'].iloc[0]) / 1e3

        # Recorte: Pegar apenas os primeiros 10 segundos de cada para não poluir
        mask_norm = t_norm <= 10
        mask_fail = t_fail <= 10

        fig, axes = plt.subplots(2, 1, figsize=(15, 10), sharex=False)

        # --- PLOT 1: COMPARAÇÃO NO TEMPO ---
        # Normal
        axes[0].plot(t_norm[mask_norm], df_normal.loc[mask_norm, col_sensor],
                    color='#1f77b4', label='Normal', alpha=0.7, linewidth=1)
        axes[0].set_title(f'Padrão Normal vs. Falha - {anomalia} ({col_sensor})', fontsize=14, fontweight='bold')
        axes[0].set_ylabel('Valor do Sensor')
        axes[0].set_xlabel('Tempo [s]')
        axes[0].legend(loc='upper left')
        axes[0].grid(True, alpha=0.3)

        # Falha (mesmo eixo para ver a diferença de magnitude)
        axes[0].plot(t_fail[mask_fail], df_falha.loc[mask_fail, col_sensor],
                    color='#d62728', label='Falha', alpha=0.7, linewidth=1)
        axes[0].legend()

        # --- PLOT 2: COMPARAÇÃO DE DENSIDADE (KDE) ---
        # Mostra se a "forma" dos dados mudou
        sns.kdeplot(df_normal[col_sensor], ax=axes[1], color='#1f77b4', fill=True, label='Normal')
        sns.kdeplot(df_falha[col_sensor], ax=axes[1], color='#d62728', fill=True, label='Falha')
        axes[1].set_title(f'Mudança na Distribuição de Probabilidade - {anomalia}', fontsize=14, fontweight='bold')
        axes[1].set_yscale('log') # Log para ver as caudas
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

    def resampling_and_interpolate(self, df, df_name):
        """
        Faz resampling e interpolação dos dados para frequência fixa de 100ms

        Args:
            df: DataFrame com os dados
            df_name: Nome do dataset (para logging)

        Returns:
            DataFrame reamostrado e interpolado
        """
        print(f"\nFazendo Resampling e Interpolate de {df_name}")
        # 1. Converter tempo para Datetime (necessário para resampling)
        df = df.copy()
        df['datetime'] = pd.to_datetime(df['time'], unit='ms')
        df = df.set_index('datetime')

        # 2. Resampling para 100ms (10Hz) - Ajuste conforme a média que vimos
        # .mean() pega todos os pontos que caíram naquele 0.1s e tira a média (reduz ruído)
        df_resampled = df.resample('100ms').mean()

        # 3. Verificar onde ficaram os buracos (NaNs gerados pelo resampling)
        print(f"Buracos gerados pelo alinhamento: {df_resampled['accX'].isnull().sum()}")

        # 4. Preencher buracos com Interpolação Linear
        # 'time' garante que a interpolação respeite a distância temporal
        df_final = df_resampled.interpolate(method='time')

        # 5. Drop nas colunas que não fazem sentido interpolar (ex: label)
        if 'label' in df_final.columns:
            df_final['label'] = df_resampled['label'].ffill().astype(int)

        print("Resampling concluído. Novo shape:", df_final.shape)

        return df_final

    def aplicar_resampling(self):
        """Aplica resampling nos datasets normal e faulty"""
        if self.df_normal is None or self.df_faulty is None:
            raise ValueError("Carregue os dados primeiro usando carregar_e_preparar_dados()")

        self.df_normal_resampled = self.resampling_and_interpolate(self.df_normal, "df_normal")
        for name, df in self.dict_scenarios.items():
            self.faultydfs_resampled[name] = self.resampling_and_interpolate(df, name)

        self.df_faulty_resampled = pd.concat(self.faultydfs_resampled.values(), ignore_index=True)

        return self.df_normal_resampled, self.df_faulty_resampled

    def _normalizar_df(self,df,fit_scaler=True,scaler=None):
        """Normaliza e retorna apenas um df"""
        if fit_scaler:
            scaler = RobustScaler()
            scaler.fit(df)
            df_scaled = scaler.transform(df)
        else:
            df_scaled = scaler.transform(df) #type:ignore
        return df_scaled,scaler

    def aplicar_normalizacao(self):
        """Aplica normalização RobustScaler e StandardScaler nos dados reamostrados"""
        if self.df_normal_resampled is None or self.df_faulty_resampled is None:
            raise ValueError("Execute o resampling primeiro usando aplicar_resampling()")

        # RobustScaler
        self.scaler_robust = RobustScaler()
        self.scaler_robust.fit(self.df_normal_resampled)

        self.df_normal_robust_scaled = self.scaler_robust.transform(self.df_normal_resampled)
        self.df_faulty_robust_scaled = self.scaler_robust.transform(self.df_faulty_resampled)

        # StandardScaler
        self.scaler_standard = StandardScaler()
        self.scaler_standard.fit(self.df_normal_resampled)

        self.df_normal_standard_scaled = self.scaler_standard.transform(self.df_normal_resampled)
        self.df_faulty_standard_scaled = self.scaler_standard.transform(self.df_faulty_resampled)

        print("\nNormalização concluída com RobustScaler e StandardScaler")

        return (self.df_normal_robust_scaled, self.df_faulty_robust_scaled,
                self.df_normal_standard_scaled, self.df_faulty_standard_scaled)

    def diagnostico_preprocessamento(self, df_raw, df_resampled, df_scaled,
                                     col_sensor='accZ',
                                     window_sec=1.0,
                                     scaler_name='RobustScaler'):
        """
        Gera um relatório visual comparando os estágios de pré-processamento.

        Args:
            df_raw: DataFrame original (Bruto).
            df_resampled: DataFrame após resampling e interpolação.
            df_scaled: DataFrame (ou array) após aplicação do Scaler.
            col_sensor: Nome da coluna do sensor para focar a análise.
            window_sec: Janela de tempo (em segundos) para o zoom do resampling.
            scaler_name: Nome do scaler usado (apenas para título).
        """

        # Configurar a figura
        fig = plt.figure(figsize=(18, 12))
        gs = fig.add_gridspec(2, 2, height_ratios=[1, 1])

        # ====================================================================
        # PARTE 1: EFEITO DO RESAMPLING (ZOOM NO TEMPO)
        # ====================================================================
        ax1 = fig.add_subplot(gs[0, :])

        # Preparar dados de tempo para plotagem
        # Assumindo que df_raw['time'] é int (ns) e df_resampled index é datetime
        if 'time' in df_raw.columns:
            t_raw = (df_raw['time'] - df_raw['time'].iloc[0]) / 1e3
        else:
            # Tenta usar o índice se não tiver coluna time
            t_raw = np.arange(len(df_raw))

        t_res = (df_resampled.index - df_resampled.index[0]).total_seconds()

        # Recorte (Zoom) para ver os detalhes
        # Pegamos apenas os primeiros 'window_sec' segundos
        mask_raw = t_raw <= window_sec
        mask_res = t_res <= window_sec

        # Plotar pontos originais (Scatter para mostrar o Jitter/Irregularidade)
        ax1.scatter(t_raw[mask_raw], df_raw.loc[mask_raw, col_sensor],
                    color='black', alpha=0.6, s=30, label='Original (Raw Points)', zorder=3)

        # Plotar linha reamostrada (Linha + X para mostrar a grade fixa)
        ax1.plot(t_res[mask_res], df_resampled.loc[mask_res, col_sensor],
                 color='#1f77b4', linewidth=2, marker='x', markersize=8,
                 label='Resampled (10Hz Grid)', alpha=0.8, zorder=2)

        ax1.set_title(f'1. Efeito do Resampling: Regularização do Tempo ({col_sensor}) - Zoom de {window_sec}s',
                      fontsize=14, fontweight='bold')
        ax1.set_xlabel('Tempo (segundos)')
        ax1.set_ylabel('Valor do Sensor (Físico)')
        ax1.legend()
        ax1.grid(True, linestyle='--', alpha=0.5)

        # ====================================================================
        # PARTE 2: EFEITO DO SCALER (DISTRIBUIÇÃO)
        # ====================================================================

        # Preparar o df_scaled se ele for um numpy array (saída comum do sklearn)
        if isinstance(df_scaled, np.ndarray):
            # Tenta encontrar o índice da coluna se for array
            try:
                col_idx = df_resampled.columns.get_loc(col_sensor)
                data_scaled = df_scaled[:, col_idx]
            except:
                data_scaled = df_scaled[:, 0] # Fallback
        else:
            data_scaled = df_scaled[col_sensor]

        # Plot A: Distribuição Original (Resampled)
        ax2 = fig.add_subplot(gs[1, 0])
        sns.histplot(df_resampled[col_sensor], kde=True, ax=ax2, color='#1f77b4', bins=50)
        ax2.set_title(f'2a. Distribuição ANTES do Scaler\n(Unidades Físicas Reais)', fontsize=12, fontweight='bold')
        ax2.set_xlabel(f'{col_sensor} Original')

        # Plot B: Distribuição Escalada
        ax3 = fig.add_subplot(gs[1, 1])
        sns.histplot(data_scaled, kde=True, ax=ax3, color='#2ca02c', bins=50)
        ax3.set_title(f'2b. Distribuição DEPOIS do {scaler_name}\n(Unidades Relativas)', fontsize=12, fontweight='bold')
        ax3.set_xlabel(f'{col_sensor} Scaled')

        # Adicionar estatísticas de texto para comparação
        orig_mean, orig_std = df_resampled[col_sensor].mean(), df_resampled[col_sensor].std()
        scale_mean, scale_std = np.mean(data_scaled), np.std(data_scaled)

        txt = (f"Original:\nMédia={orig_mean:.2f}\nStd={orig_std:.2f}\nMin={df_resampled[col_sensor].min():.2f}\nMax={df_resampled[col_sensor].max():.2f}")
        ax2.text(0.95, 0.95, txt, transform=ax2.transAxes, ha='right', va='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

        txt_sc = (f"Scaled:\nMédia={scale_mean:.2f}\nStd={scale_std:.2f}\nMin={np.min(data_scaled):.2f}\nMax={np.max(data_scaled):.2f}")
        ax3.text(0.95, 0.95, txt_sc, transform=ax3.transAxes, ha='right', va='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

        plt.tight_layout()
        plt.show()

    def analise_univariada_alvo(self, df_normal=None, df_falha=None, anomalia=''):
        """
        Realiza a análise univariada comparando atributos vs. alvo (Normal vs Falha).
        Gera estatísticas de separação e visualizações.

        Args:
            df_normal: DataFrame com dados normais (usa self.df_normal_resampled se None)
            df_falha: DataFrame com dados de falha (usa self.df_faulty_resampled se None)

        Returns:
            DataFrame com estatísticas de ranking dos sensores
        """
        if df_normal is None:
            df_normal = self.df_normal_resampled
        if df_falha is None:
            df_falha = self.df_faulty_resampled

        if df_normal is None or df_falha is None:
            raise ValueError("Execute o resampling primeiro ou forneça os DataFrames")

        # 1. Preparar dados
        # Remover colunas não-sensor (time, label, datetime, name)
        cols_ignore = ['time', 'label', 'datetime', 'name']
        sensores = [c for c in df_normal.columns if c not in cols_ignore]

        stats_list = []

        # Configuração dos Plots
        # Vamos fazer um grid de 3 colunas
        n_cols = 3
        n_rows = (len(sensores) + n_cols - 1) // n_cols
        fig, axes = plt.subplots(n_rows, n_cols, figsize=(18, 4 * n_rows))
        axes = axes.flatten()

        print(f"Analisando {len(sensores)} sensores...\n")

        i = -1  # Initialize i to handle empty sensores list
        for i, sensor in enumerate(sensores):
            # Dados das duas classes
            data_norm = df_normal[sensor].dropna()
            data_fail = df_falha[sensor].dropna()

            # --- A. ESTATÍSTICA ---
            # 1. Diferença de Médias
            mean_diff = abs(data_fail.mean() - data_norm.mean())

            # 2. Razão de Variância (Quantas vezes a vibração aumentou?)
            # Adicionamos um epsilon pequeno para evitar divisão por zero
            var_ratio = data_fail.var() / (data_norm.var() + 1e-9)

            # 3. Teste Kolmogorov-Smirnov (Poder de Separação Geral)
            # statistic: 0 a 1 (quanto maior, melhor separa as classes)
            ks_stat, p_value = ks_2samp(data_norm, data_fail)

            stats_list.append({
                'Sensor': sensor,
                'KS_Statistic': ks_stat, # Principal métrica de separação
                'Variance_Ratio': var_ratio,
                'Mean_Diff': mean_diff,
                'P_Value': p_value
            })

            # --- B. VISUALIZAÇÃO (Violin Plot) ---
            # Criamos um mini-df temporário para o seaborn
            df_temp = pd.DataFrame({
                'Valor': np.concatenate([data_norm, data_fail]),
                'Estado': ['Normal'] * len(data_norm) + ['Falha'] * len(data_fail)
            })

            sns.violinplot(data=df_temp, x='Estado', y='Valor', ax=axes[i],
                           palette={'Normal': '#1f77b4', 'Falha': '#d62728'}, split=False, hue='Estado')

            axes[i].set_title(f'{sensor}\nKS Stat: {ks_stat:.3f}', fontsize=10, fontweight='bold')
            axes[i].set_xlabel('')
            axes[i].set_ylabel('')
            axes[i].grid(True, alpha=0.3)

        # Remover axes vazios se houver
        for j in range(i + 1, len(axes)):
            fig.delaxes(axes[j])

        plt.tight_layout()
        plt.show()

        # --- C. TABELA DE RANKING ---
        df_stats = pd.DataFrame(stats_list)
        # Ordenar pelo KS Statistic (Melhor separador primeiro)
        df_stats = df_stats.sort_values(by='KS_Statistic', ascending=False).reset_index(drop=True)

        print("="*80)
        print(f"{anomalia} - RANKING DE IMPORTÂNCIA DOS SENSORES (Baseado em KS-Test)")
        print("="*80)
        print("KS_Statistic: 1.0 = Separação Perfeita | 0.0 = Indistinguível")
        print("Variance_Ratio: > 1.0 = Falha aumentou a variabilidade")
        print("-" * 60)
        print(df_stats[['Sensor', 'KS_Statistic', 'Variance_Ratio', 'Mean_Diff']])

        return df_stats

    def plotSensors(self, dfPlot, step=1, suptitle="SENSORES AO LONGO DE 1 MINUTO\n",
                    startTimeIdx=None, endTimeIdx=None):
        """
        Plota os sensores (acelerômetro, giroscópio e magnetômetro) ao longo do tempo

        Args:
            dfPlot: DataFrame com os dados
            step: Passo para amostragem dos dados
            suptitle: Título do gráfico
            startTimeIdx: Índice inicial (ou None para usar o primeiro)
            endTimeIdx: Índice final (ou None para usar o último)
        """
        df = dfPlot.iloc[::step]
        fig = plt.figure(figsize=(25, 15))

        if not any((startTimeIdx, endTimeIdx)):
            startTimeIdx, endTimeIdx = df['time'].iloc[0], df['time'].iloc[-1]
        else:
            startTimeIdx, endTimeIdx = df['time'].iloc[startTimeIdx], df['time'].iloc[endTimeIdx]

        # Helper to plot three axes in the same subplot
        def plotSensorsSameGraph(ax, cols, title, x="time"):
            for col in cols:
                mask = (df['time'] >= startTimeIdx) & (df['time'] < endTimeIdx)
                ax.plot(df[mask][x], df[mask][col], label=col)

            ax.set_title(title, fontsize=18)
            ax.set_xlabel(x)
            ax.set_ylabel("value")
            ax.legend(loc='lower left')

        # === Subplots ===
        ax1 = fig.add_subplot(3, 1, 1)
        plotSensorsSameGraph(ax1,
                             cols=["accX", "accY", "accZ"],
                             title="Accelerometer (X, Y, Z)")

        ax2 = fig.add_subplot(3, 1, 2)
        plotSensorsSameGraph(ax2,
                             cols=["gyroZ", "gyroX", "gyroY"],
                             title="Gyroscope (X, Y, Z)")

        ax3 = fig.add_subplot(3, 1, 3)
        plotSensorsSameGraph(ax3,
                             cols=["magZ", "magY", "magX"],
                             title="Magnetometer (X, Y, Z)")
        plt.suptitle(suptitle, fontsize='18')
        plt.tight_layout()
        plt.show()

    def aplicar_filtro_savgol(self, df, cols=None, window_length=8, polyorder=2):
        """
        Aplica o filtro Savitzky-Golay para suavizar os sinais dos sensores

        Args:
            df: DataFrame com os dados
            cols: Lista de colunas para aplicar o filtro (None = todas as colunas de sensores)
            window_length: Tamanho da janela do filtro
            polyorder: Ordem do polinômio

        Returns:
            DataFrame com as colunas suavizadas (_smooth)
        """
        df = df.copy()

        if cols is None:
            cols = ["accX", "accY", "accZ", "gyroX", "gyroY", "gyroZ", "magX", "magY", "magZ"]

        for col in cols:
            if col in df.columns:
                df[col + "_smooth"] = savgol_filter(df[col], window_length=window_length, polyorder=polyorder)

        print(f"Filtro Savitzky-Golay aplicado em {len(cols)} colunas")
        return df

    def plot_raw_vs_smooth(self, df, step=10):
        """
        Plota comparação entre dados brutos e suavizados

        Args:
            df: DataFrame com dados brutos e suavizados (_smooth)
            step: Passo para amostragem dos dados
        """
        sensor_groups = [
            ("Accelerometer", ["accX", "accY", "accZ"]),
            ("Gyroscope", ["gyroX", "gyroY", "gyroZ"]),
            ("Magnetometer", ["magX", "magY", "magZ"])
        ]

        fig, axes = plt.subplots(3, 3, figsize=(32, 24))
        df = df.iloc[::step]

        for row, (title, cols) in enumerate(sensor_groups):
            for col, axis in enumerate(cols):
                ax = axes[row][col]

                if axis not in df.columns:
                    continue

                raw = df[axis]
                smooth_col = axis + "_smooth"

                if smooth_col in df.columns:
                    smooth = df[smooth_col]
                    ax.plot(df["time"], raw, label="raw", alpha=0.35)
                    ax.plot(df["time"], smooth, label="smooth", linewidth=2)
                else:
                    ax.plot(df["time"], raw, label="raw")

                ax.set_title(f"{title} — {axis}", fontsize=16)
                ax.set_xlabel("time")
                ax.set_ylabel("value")
                ax.legend()

        plt.tight_layout()
        plt.show()

    def executar_pipeline_completo(self, mostrar_diagnostico=False, mostrar_analise_univariada=True,
                                   mostrar_sensores=False, mostrar_savgol=False, aplicar_filtro=False):
        """
        Executa o pipeline completo de pré-processamento e análise

        Args:
            mostrar_diagnostico: Se True, mostra diagnóstico de pré-processamento
            mostrar_analise_univariada: Se True, executa análise univariada
            mostrar_sensores: Se True, plota visualização dos sensores
            aplicar_filtro: Se True, aplica filtro Savitzky-Golay

        Returns:
            dict com os resultados do pipeline
        """
        if self.df_normal is None or self.df_faulty is None:
            raise ValueError("Carregue os dados primeiro usando carregar_e_preparar_dados()")

        resultados = {}

        # 1. Aplicar resampling
        print("\n" + "="*60)
        print("ETAPA 1: RESAMPLING")
        print("="*60)
        self.aplicar_resampling()
        resultados['df_normal_resampled'] = self.df_normal_resampled
        resultados['df_faulty_resampled'] = self.df_faulty_resampled

        # 2. Aplicar normalização
        print("\n" + "="*60)
        print("ETAPA 2: NORMALIZAÇÃO")
        print("="*60)
        scaled_data = self.aplicar_normalizacao()
        resultados['scaled_data'] = {
            'robust': (self.df_normal_robust_scaled, self.df_faulty_robust_scaled),
            'standard': (self.df_normal_standard_scaled, self.df_faulty_standard_scaled)
        }

        # 3. Diagnóstico de pré-processamento
        if mostrar_diagnostico:
            print("\n" + "="*60)
            print("ETAPA 3: DIAGNÓSTICO DE PRÉ-PROCESSAMENTO")
            print("="*60)
            print("\nDiagnóstico com RobustScaler:")
            self.diagnostico_preprocessamento(
                self.df_normal,
                self.df_normal_resampled,
                self.df_normal_robust_scaled,
                col_sensor='accZ',
                scaler_name='RobustScaler'
            )
            print("\nDiagnóstico com StandardScaler:")
            self.diagnostico_preprocessamento(
                self.df_normal,
                self.df_normal_resampled,
                self.df_normal_standard_scaled,
                col_sensor='accZ',
                scaler_name='StandardScaler'
            )

        # 4. Análise univariada
        if mostrar_analise_univariada:
            print("\n" + "="*60)
            print("ETAPA 4: ANÁLISE UNIVARIADA")
            print("="*60)

            for name, anomaly_resampled in self.faultydfs_resampled.items():
                print("\n"+"="*60)
                print(f'ANOMALIA: {name}')
                print("="*60)
                df_stats = self.analise_univariada_alvo(self.df_normal_resampled, anomaly_resampled, name)

        # 5. Visualização de sensores
        if mostrar_sensores:
            print("\n" + "="*60)
            print("ETAPA 5: VISUALIZAÇÃO DE SENSORES")
            print("="*60)
            self.plotSensors(self.df_normal, startTimeIdx=20050, endTimeIdx=20050 + 60*10)

        # 6. Aplicar filtro Savitzky-Golay
        if aplicar_filtro:
            print("\n" + "="*60)
            print("ETAPA 6: APLICAÇÃO DE FILTRO SAVITZKY-GOLAY")
            print("="*60)
            df_filtered = self.aplicar_filtro_savgol(self.df_normal)
            if mostrar_savgol: self.plot_raw_vs_smooth(df_filtered)
            resultados['df_filtered'] = df_filtered

        print("\n" + "="*60)
        print("PIPELINE COMPLETO FINALIZADO!")
        print("="*60)

        return resultados


# Exemplo de uso
# if __name__ == "__main__":
#     # Instancia a classe EDA
#     eda = EDA()

#     # Carrega e prepara os dados
#     df_normal, df_faulty = eda.carregar_e_preparar_dados()

    # Executa a análise exploratória completa
    # eda.executar_analise_completa()

    # Pipeline completo de pré-processamento (método integrado na classe)
    # resultados = eda.executar_pipeline_completo(
    #     mostrar_diagnostico=True,
    #     mostrar_analise_univariada=True,
    #     mostrar_sensores=True,
    #     aplicar_filtro=True
    # )

    # print(resultados)

    # OU executar etapas individuais:
    # eda.aplicar_resampling()
    # eda.aplicar_normalizacao()
    # eda.diagnostico_preprocessamento(eda.df_normal, eda.df_normal_resampled,
    #                                   eda.df_normal_robust_scaled, col_sensor='accZ')
    # df_stats = eda.analise_univariada_alvo()
    # eda.plotSensors(eda.df_normal, startTimeIdx=20050, endTimeIdx=20050 + 60*10)
    # df_filtered = eda.aplicar_filtro_savgol(eda.df_normal)
    # eda.plot_raw_vs_smooth(df_filtered)


In [4]:
from matplotlib.pylab import normal
import pandas as pd
import numpy as np
from pandas.core import apply
from scipy.stats import kurtosis, skew
import sklearn
from sklearn.preprocessing import RobustScaler
import time
df=None


class Preprocessing:
    eda= EDA()

    def __init__(self,arquivo_normal= "IMU_10Hz.csv", arquivos_anomalos=["IMU_hitting_platform.csv"], filtro_savgol=True):
        """Classe que carrega e prepara os dados. Ao ser instanciada, apenas fará
        preparações que não serão customizadas (isto é, sem hiperparâmetros).

        Methods:
            preprocessar_df: Preprocessa completamnete um df, incluindo hiperparâmetros

            preprocessar_todos: Preprocessa todos os datasets carregados por esta classe"""


        self.normal: pd.DataFrame = None #type: ignore
        self.anomalos: list[pd.DataFrame] = []
        self.anomalo_nomes: list[str] = []

        self.normal_splits: list[np.ndarray] = []
        self.anomalo_splits: list[list[np.ndarray]] = []


        eda= Preprocessing.eda
        normal = pd.read_csv(eda.dataset.dataset_path + arquivo_normal)
        anomalos = [pd.read_csv(eda.dataset.dataset_path + fname) for fname in arquivos_anomalos]

        cols = list(set(normal.columns).difference(set(["label", 'name'])))
        normal = normal[cols]
        normal['time'] = normal['time'].map(lambda x: x/1e6) #type:ignore
        if filtro_savgol:
            normal = Preprocessing.eda.aplicar_filtro_savgol(normal)


        # Corrige os anômalos e atualiza a lista
        novos_anomalos = []
        for anomalo in anomalos:
            if filtro_savgol:
                anomalo = Preprocessing.eda.aplicar_filtro_savgol(anomalo)
            cols = list(set(anomalo.columns).difference(set(["label", 'name'])))
            anomalo['time'] = anomalo['time'].map(lambda x: x/1e6)
            anomalo = anomalo[cols]  # Garante que 'time' está presente
            novos_anomalos.append(anomalo)
        anomalos = novos_anomalos


        def merge_smooth_columns(df):
            # for each column, if it contains _smooth, replace the original column with it
            for col in df.columns:
                if col.endswith('_smooth'):
                    original_col = col[:-7]
                    df[original_col] = df[col]
                    df.drop(columns=[col], inplace=True)
            return df
        normal = merge_smooth_columns(normal)
        anomalos = [merge_smooth_columns(anomalo) for anomalo in anomalos]


        normal = eda.resampling_and_interpolate(normal, arquivo_normal)
        anomalos = [eda.resampling_and_interpolate(anomalo, nome) for anomalo,nome in zip(anomalos,arquivos_anomalos)]

        normal_cols = list(normal.columns)
        anomalos = [anomalo[normal_cols] for anomalo in anomalos] #evita um problema de ordem nas colunas

        self.normal = normal
        self.anomalos = anomalos #type:ignore
        self.anomalo_nomes = arquivos_anomalos



    @staticmethod
    def _getFixedWindows(df:pd.DataFrame|np.ndarray, length, overlap):
        #drop incomplete implícito
        arr: np.ndarray = df.values if isinstance(df, pd.DataFrame) else df
        step = length - overlap
        n = arr.shape[0]

        starts = range(0, n - length + 1, step)
        windows = np.stack([arr[s:s+length] for s in starts], axis=0) #semelhante a np.array

        return windows


    @staticmethod
    def __preprocessar_DL__(df:pd.DataFrame, test_splits=[0.0,0.1,0.9], window_size=60, window_overlap=10, scaler=None, fit_scaler=False, stratifyCol=None) -> list[np.ndarray]:
        """Executa o preprocessamento completo de um df , sem exibir nada
        Args:
            window_size: tamanho da janela em samples continuas
            window_overlap: interseção entre uma janela e a seguinte ou à antecessora

        Returns:
            df_train, df_val, df_test, df_val, df_test: Windows já achatadas,
                com dados já normalizados, normalizados e limpos, com feature engineering pronto.
        """


        df_train, df_val,df_test = Preprocessing._train_test_split(df, *test_splits,stratifyCol=stratifyCol)
        if stratifyCol:
            #assume that we want to remove thje label (causa bug?)
            df_train = df_train.drop(columns=[stratifyCol]) if df_train is not None and len(df_train) else None
            df_val = df_val.drop(columns=[stratifyCol])
            df_test = df_test.drop(columns=[stratifyCol])
        if scaler is None:
            scaler = RobustScaler()
            fit_scaler = True
        dftrain_exists =  (df_train is not None and len(df_train))
        if fit_scaler and dftrain_exists:
            scaler.fit(df_train)


        df_train =  pd.DataFrame(
            scaler.transform(df_train),
            columns=df_train.columns) if dftrain_exists > 0 else df_train
        df_val = pd.DataFrame(scaler.transform(df_val), columns=df_val.columns)
        df_test = pd.DataFrame(scaler.transform(df_test), columns=df_test.columns)

        if dftrain_exists: df_train_w = Preprocessing._getFixedWindows(df_train, window_size, window_overlap)
        df_val_w = Preprocessing._getFixedWindows(df_val,  window_size, window_overlap)
        df_test_w  = Preprocessing._getFixedWindows(df_test,  window_size, window_overlap)


        if dftrain_exists: df_train_flat = df_train_w.reshape(df_train_w.shape[0], -1) #type:ignore
        df_val_flat = df_val_w.reshape(df_val_w.shape[0], -1)
        df_test_flat  = df_test_w.reshape(df_test_w.shape[0], -1)

        return [df_train_flat  if dftrain_exists else None, df_val_flat, df_test_flat] #type:ignore

    @staticmethod
    def _train_test_split(
        df: pd.DataFrame,
        trainPer=0.6,
        valPer=0.05,
        testPer=0.35,
        stratifyCol=None
    ) -> tuple[pd.DataFrame,pd.DataFrame,pd.DataFrame]:
        assert trainPer + valPer + testPer <= 1.0
        n = len(df)

        if stratifyCol is None:
            train_end = int(n * trainPer)
            val_end   = train_end + int(n * valPer)

            traindf = df.iloc[:train_end].copy() if trainPer else None
            valdf   = df.iloc[train_end:val_end].copy() if valPer else None
            testdf  = df.iloc[val_end:val_end + int(n * testPer)].copy()

            return (traindf, valdf, testdf) #type:ignore

        train_idx = []
        val_idx = []
        test_idx = []

        for label, group in df.groupby(stratifyCol, sort=False):
            idx = group.index.to_numpy()
            m = len(idx)

            t_end = int(m * trainPer)
            v_end = t_end + int(m * valPer)

            train_idx.append(idx[:t_end])
            val_idx.append(idx[t_end:v_end])
            test_idx.append(idx[v_end:v_end + int(m * testPer)])

        # Merge and restore temporal order
        train_idx = np.sort(np.concatenate(train_idx)) if trainPer else None
        val_idx   = np.sort(np.concatenate(val_idx)) if valPer else None
        test_idx  = np.sort(np.concatenate(test_idx))

        traindf = df.loc[train_idx].copy() if trainPer else None
        valdf   = df.loc[val_idx].copy() if valPer else None
        testdf  = df.loc[test_idx].copy()

        return traindf, valdf, testdf #type:ignore

    from scipy.stats import kurtosis, skew
    @staticmethod
    def __add_point_engineered_features__(df:pd.DataFrame) -> pd.DataFrame:
        #add point only features
        # sens_jerk e sens_norm
        sensors = "acc gyro mag".split(' ')
        df = df.copy()
        # for sensAxis in base_features:
        #     df[f"{sensAxis}_jerk"] = df[sensAxis].diff()
        #jerk removido por variância irrisória
        for sens in sensors:
            df[f"{sens}_norm"] = np.sqrt(df[f"{sens}X"]**2 + df[f"{sens}Y"]**2 + df[f"{sens}Z"]**2)

        return df
    @staticmethod
    def _make_windows(df, length, overlap, drop_incomplete=True):
        arr = df.values  # (T, C)
        step = length - overlap
        n = arr.shape[0]

        stops = n - length + 1 if drop_incomplete else n
        starts = range(0, stops, step)

        windows = []
        for s in starts:
            w = arr[s:s + length]
            if w.shape[0] == length:
                windows.append(w)

        return np.stack(windows, axis=0)  # (N_windows, L, C)

    @staticmethod
    def __window_feature_engineering__(windows, feature_names, stats=("mean", "std", "ptp", "kurtosis", "crest", "dom_freq")):
        """
        Extracts statistical features from time-series windows for traditional ML models.

        Args:
            windows (np.array): Shape (N_samples, Window_Size, N_sensors)
            feature_names (list): List of sensor names strings (e.g., ['accX', 'accY'...])
            stats (tuple): List of stats to compute.

        Returns:
            X_feat (np.array): Shape (N_samples, N_features)
            names (list): List of feature names
        """
        feats = []
        names = []

        # 1. Mean (General Position/Bias)
        if "mean" in stats:
            feats.append(np.mean(windows, axis=1))
            names += [f"mean_{c}" for c in feature_names]

        # 2. Standard Deviation (Vibration Energy)
        if "std" in stats:
            feats.append(np.std(windows, axis=1))
            names += [f"std_{c}" for c in feature_names]

        # 3. RMS (Total Energy - redundancy with Mean/Std, but useful for physics)
        if "rms" in stats:
            rms = np.sqrt(np.mean(windows**2, axis=1))
            feats.append(rms)
            names += [f"rms_{c}" for c in feature_names]

        # 4. Peak-to-Peak (Amplitude of Shocks - critical for Bumps)
        if "ptp" in stats:
            feats.append(np.ptp(windows, axis=1))
            names += [f"ptp_{c}" for c in feature_names]

        # 5. Kurtosis (Impulsiveness - critical for Hits/Earthquakes)
        if "kurtosis" in stats:
            # Fisher=False makes normal distribution = 3.0.
            # Often easier to use Fisher=True (normal = 0.0) for ML centering.
            k = kurtosis(windows, axis=1, fisher=True)
            feats.append(k)
            names += [f"kurt_{c}" for c in feature_names]

        # 6. Skewness (Asymmetry - useful for directional crashes)
        if "skew" in stats:
            s = skew(windows, axis=1)
            feats.append(s)
            names += [f"skew_{c}" for c in feature_names]

        # 7. Crest Factor (Impact Indicator - Peak / RMS)
        if "crest" in stats:
            peak = np.max(np.abs(windows), axis=1)
            rms = np.sqrt(np.mean(windows**2, axis=1))
            # Add small epsilon to avoid division by zero
            crest = peak / (rms + 1e-9)
            feats.append(crest)
            names += [f"crest_{c}" for c in feature_names]

        # 8. Dominant Frequency (Resonance - useful for Heavy Weight detection)
        if "dom_freq" in stats:
            # Perform FFT along the time axis (axis 1)
            fft_vals = np.fft.rfft(windows, axis=1)
            fft_freq = np.fft.rfftfreq(windows.shape[1])

            # Find index of max magnitude (ignoring DC component at index 0)
            magnitudes = np.abs(fft_vals)
            magnitudes[:, 0, :] = 0  # Zero out DC component

            dom_indices = np.argmax(magnitudes, axis=1) # Shape (N, C)

            # Map indices to actual frequencies
            dom_freqs = fft_freq[dom_indices]

            feats.append(dom_freqs)
            names += [f"domfreq_{c}" for c in feature_names]

        X_feat = np.concatenate(feats, axis=1)  # (N, Total_Features)
        return X_feat, names

    @staticmethod
    def __non_DL_feature_engineering_pipeline__(ds, WINDOW_SIZE=40, WINDOW_OVERLAP=10):
        #somente do ponto
        ds_pe = Preprocessing.__add_point_engineered_features__(ds)

        ds_w = Preprocessing._make_windows(ds_pe, WINDOW_SIZE, WINDOW_OVERLAP)

        #somente da janela
        ds_feat, feat_names = Preprocessing.__window_feature_engineering__(
            ds_w,
            feature_names=ds_pe.columns,
            )
        return ds_feat, feat_names

    @staticmethod
    def __PCA_normalize__(X_train_masked, X_val_masked=None, X_test_masked=None, variance_threshold=0.95):
        """
        Faz scaling e PCA nos splits. recebe os datasets ja somente com as features usadas
        """
        from sklearn.decomposition import PCA
        from sklearn.preprocessing import StandardScaler

        scaler = RobustScaler()
        X_train_scaled = scaler.fit_transform(X_train_masked)

        # 2. Fit PCA
        pca = PCA(n_components=variance_threshold)
        X_train_pca = pca.fit_transform(X_train_scaled)

        print(f"PCA:")
        print(f"  - Features antes : {X_train_masked.shape[1]}")
        print(f"  - Features após: {X_train_pca.shape[1]}")
        print(f"  - Variância explicada: {np.sum(pca.explained_variance_ratio_):.4f}")

        X_val_pca = None
        X_test_pca = None

        if X_val_masked is not None:
            X_val_scaled = scaler.transform(X_val_masked)
            X_val_pca = pca.transform(X_val_scaled)

        if X_test_masked is not None:
            X_test_scaled = scaler.transform(X_test_masked)
            X_test_pca = pca.transform(X_test_scaled)

        return pca, scaler, X_train_pca, X_val_pca, X_test_pca

    @staticmethod
    def __preprocessar_non_DL__(train:pd.DataFrame|np.ndarray, val, test, pca=None, scaler=None,mask=None,WINDOW_SIZE=60,WINDOW_OVERLAP=10):

        train,names = Preprocessing.__non_DL_feature_engineering_pipeline__(train, WINDOW_SIZE,WINDOW_OVERLAP)

        val,names = Preprocessing.__non_DL_feature_engineering_pipeline__(val, WINDOW_SIZE,WINDOW_OVERLAP)
        test,names = Preprocessing.__non_DL_feature_engineering_pipeline__(test, WINDOW_SIZE,WINDOW_OVERLAP)
        assert len(train.shape) == 2 and  len(val.shape) == 2 and len(test.shape) == 2

        if mask is None:
            var = train.var(axis=0)
            VAR_THRESHOLD = 1e-5
            keep_var = var > VAR_THRESHOLD

            # Apply variance mask first
            train_var = train[:, keep_var]

            corr = np.corrcoef(train_var, rowvar=False)
            CORR_THRESHOLD = 0.95
            to_drop = set()
            for i in range(corr.shape[0]):
                for j in range(i+1, corr.shape[0]):
                    if abs(corr[i,j]) > CORR_THRESHOLD:
                        to_drop.add(j)

            keep_corr = [i not in to_drop for i in range(corr.shape[0])]

            # Final mask: first variance, then correlation
            mask = np.zeros(train.shape[1], dtype=bool)
            mask[np.where(keep_var)[0][keep_corr]] = True
        def apply_mask(arr):
            return arr[:, mask]

        if not pca:
            assert not scaler
            a,b,c = apply_mask(train), apply_mask(val),apply_mask(test)
            pca, scaler, train_pca,val_pca,test_pca = Preprocessing.__PCA_normalize__(
                a,b,c,variance_threshold=0.95
            )
        else:
            assert pca and scaler
            a,b,c = apply_mask(train), apply_mask(val),apply_mask(test)
            train_pca,val_pca,test_pca = (pca.transform(scaler.transform(a)),
                pca.transform(scaler.transform(b)),
                pca.transform(scaler.transform(c)))

        return pca,scaler,mask,train_pca,val_pca,test_pca




    def preprocessar_todos_non_deepLearning(self, aplicar_savgol=True, train_splits = [0.6,0.2,0.2], test_splits=[0.0,0.5,0.5], window_size=60, window_overlap=10):
        assert aplicar_savgol #motivos de compatibilidade de api
        anomalos = [ano.copy() for ano in self.anomalos]
        for i,ano in enumerate(anomalos):
            ano['label'] = i
        anomalo_total = pd.concat(anomalos)

        xtrain,xval,xtest = Preprocessing._train_test_split(self.normal,*train_splits)

        _,anom_val, anom_test = Preprocessing._train_test_split(anomalo_total,trainPer=0,valPer=0.5,testPer=0.5)#type:ignore
        anom_val = anom_val.drop(columns=['label'])
        anom_test = anom_test.drop(columns=['label'])

        #feature enginering e windowing

        pca,scaler,mask,xtrain,xval,xtest = Preprocessing.__preprocessar_non_DL__(xtrain,xval,
            xtest,pca=None,scaler=None,mask=None,
            WINDOW_SIZE=window_size,WINDOW_OVERLAP=window_overlap)
        pca,scaler,mask,_,anom_val,anom_test = Preprocessing.__preprocessar_non_DL__(anom_val,anom_val,
            anom_test,pca=pca,scaler=scaler,mask=mask,
            WINDOW_SIZE=window_size,WINDOW_OVERLAP=window_overlap)

        self.normal_splits = [xtrain,xval,xtest] #type:ignore
        self.anomalo_splits=[anom_val,anom_test]#type:ignore


    def preprocessar_todos_deepLearning(self, aplicar_savgol=True, train_splits = [0.6,0.2,0.2], test_splits=[0.0,0.5,0.5], window_size=60, window_overlap=10):
        """Preprocessa todos os datasets carregados por esta classe e os coloca em
            self.normal_splits e self.anomalo_splits

        Args:
            aplicar_savgol: Se deve aplicar o filtro de Savgolay
            window_size: tamanho da janela em samples continuas
            window_overlap: interseção entre uma janela e a seguinte ou à antecessora

        Returns:
            dict: dicionário com chaves 'normal' e nomes dos datasets anômalos,
                  cada um contendo uma lista [df_train, df_val, df_test]
        """
        assert aplicar_savgol #motivos de compatibilidade de api
        anomalos = [ano.copy() for ano in self.anomalos]
        for i,ano in enumerate(anomalos):
            ano['label'] = i
        anomalo_total = pd.concat(anomalos)



        scaler = RobustScaler()
        self.normal_splits = Preprocessing.__preprocessar_DL__(self.normal, train_splits,
                                                         window_size,
                                                         window_overlap,
                                                         scaler=scaler,
                                                         fit_scaler=True)

        self.anomalo_splits = Preprocessing.__preprocessar_DL__(anomalo_total,  test_splits, window_size,
                                             window_overlap,scaler=scaler,
                                             fit_scaler=False, stratifyCol='label')[1:]

        return None
    @staticmethod
    def resizeFlattenedWindow( flattened_windows:np.ndarray, new_window_size:int, window_overlap:int, dimensionsPerSample=9) -> np.ndarray:
        """Redimensiona  as janelas (já) achatadas para um novo tamanho"""
        assert (len(flattened_windows.shape) == 2)
        if window_overlap >= new_window_size:
            window_overlap = new_window_size//2

        samplesPerWindow = flattened_windows.shape[1]//dimensionsPerSample
        samples = flattened_windows.reshape(-1, dimensionsPerSample)

        newWindows = Preprocessing._getFixedWindows(samples, new_window_size, window_overlap)
        flattenedNewWindows = newWindows.reshape(newWindows.shape[0], -1)
        return flattenedNewWindows



# pp = Preprocessing()
# pp.preprocessar_todos_non_deepLearning()
# print(pp.anomalo_splits[1].shape)


Using Colab cache for faster access to the 'industrial-robotic-arm-imu-data-casper-1-and-2' dataset.
--- CARREGAMENTO MANUAL DE CENÁRIOS ---
Lendo Base Normal...
Lendo Base de Falhas...
-> Adicionado: IMU_hitting_platform.csv (14967 linhas)
-> Adicionado: IMU_hitting_arm.csv (11924 linhas)
-> Adicionado: IMU_extra_weigth.csv (10885 linhas)
-> Adicionado: IMU_earthquake.csv (11409 linhas)
1. Normal processado: 874937 linhas divididas.
Falha IMU_hitting_platform.csv processado
Falha IMU_hitting_arm.csv processado
Falha IMU_extra_weigth.csv processado
Falha IMU_earthquake.csv processado
DATASET PRONTO:
-> Dados Normais: 874937 linhas
-> Dados de Falha:  24591 linhas (Total de 4 tipos de defeito)


In [12]:
from abc import ABCMeta, abstractmethod
from typing import Callable, Optional
import numpy as np
import itertools

class Trainer:
    @abstractmethod
    def __init__(self, **kwargs) -> None:
        ...

    @abstractmethod
    def TrainerFit(self, X_train:np.ndarray, Y_train:Optional[np.ndarray]) -> None:
        """Treina
        Args:
            Y_train: provavelmente você nao deve usar esse parametro"""
        ...

    @abstractmethod
    def TrainerPred(self, X:np.ndarray) -> np.ndarray:
        ...

class Evaluator:
    def __init__(self, **kwargs) -> None:
        ...

    def evaluate(self, Y: np.ndarray, Y_pred: np.ndarray, threshold=None, *args, **kwargs) -> tuple[float, dict]:
        assert threshold is None
        # Threshold not implemented yet
        """Evaluates the model's predictions, returning a main metric and other auxiliary metrics"""

        accuracy = accuracy_score(Y, Y_pred)
        precision = precision_score(Y, Y_pred, zero_division=0)
        recall = recall_score(Y, Y_pred, zero_division=0)
        f1 = f1_score(Y, Y_pred, zero_division=0)
        auc = roc_auc_score(Y, Y_pred)

        return auc, {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1": f1,
            "auc": auc
        }

from abc import ABC, abstractmethod
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

class BaseHyperParamTuner(ABC):
    def __init__(self,
                 modelTrainerParams: dict[str, list],
                 X_train: np.ndarray,
                 X_val: np.ndarray,
                 X_test: np.ndarray,
                 anom_val: np.ndarray,
                 anom_test: np.ndarray):

        self.param_grid = modelTrainerParams

        # NOTE: For StaticFeaturesTuner, these should be RAW data (or point-features).
        # For DeepLearningTuner, these are likely already flattened windows.
        self.X_train = X_train
        self.X_val = X_val
        self.X_test = X_test
        self.anom_val = anom_val
        self.anom_test = anom_test

        self.best_params: dict | None = None
        self.best_score: float = -np.inf
        self.best_model: Trainer | None = None
        self.results: list[dict] = []
        self._bestResult = {"EMPTY":0}

        self._expandedModelGrid = self._expand_grid(self.param_grid)

    @staticmethod
    def _expand_grid(grid: dict[str, list]) -> list[dict]:
        if not grid:
            return [{}]
        keys = list(grid.keys())
        values = [v if isinstance(v, (list, tuple, np.ndarray)) else [v] for v in grid.values()]
        return [dict(zip(keys, combo)) for combo in itertools.product(*values)]

    @abstractmethod
    def _get_window_grid(self) -> list[dict]:
        """Define iteration over window parameters."""
        pass

    @abstractmethod
    def _prepare_data(self, window_params: dict) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Transform data based on window params.
        This is called inside the OUTER loop, ensuring expensive operations run only once per config.
        """
        pass

    def tune(self, Trainer_factory: Callable[..., Trainer], evaluator: Evaluator) -> None:
        idx = 0

        window_grid = self._get_window_grid()

        #caro pra carmaba
        for window_params in window_grid:
            print(f"Processing Window Config: {window_params}")


            Xtr, validationX, validationLabels = self._prepare_data(window_params)


            for modelParams in self._expandedModelGrid:
                idx+=1
                trainer = Trainer_factory(**modelParams)

                # Fit
                trainer.TrainerFit(Xtr, None)

                # Predict
                anom_val_pred = trainer.TrainerPred(validationX)

                # Evaluate
                score, metrics = evaluator.evaluate(validationLabels, anom_val_pred)

                record = {
                    "score": score,
                    "metrics": metrics,
                    "window_params": window_params,
                    "model_params": modelParams,
                    "tuner_type": self.__class__.__name__
                }
                self.results.append(record)

                if score > self.best_score:
                    print(record["model_params"])
                    self.best_score = score
                    self.best_params = record
                    self.best_model = trainer
                    self._bestResult = record


class DeepLearningTuner(BaseHyperParamTuner):
    def __init__(self, window_params: dict[str, list], **kwargs):
        super().__init__(**kwargs)
        self.window_params = window_params
        self._expandedWindowGrid = self._expand_grid(self.window_params)

    def _get_window_grid(self) -> list[dict]:
        return self._expandedWindowGrid

    def _prepare_data(self, window_params: dict):
        #APENAS REDIMENSIONA SEM LIDAR COM FEATURES
        Xtr = Preprocessing.resizeFlattenedWindow(self.X_train, **window_params)
        Xval = Preprocessing.resizeFlattenedWindow(self.X_val, **window_params)
        anomVal = Preprocessing.resizeFlattenedWindow(self.anom_val, **window_params)

        validationX = np.concatenate((Xval, anomVal), axis=0)
        validationLabels = np.concatenate((np.ones(Xval.shape[0]), np.zeros(anomVal.shape[0])))

        return Xtr, validationX, validationLabels

def recompute_preprocessing(pp):
    # Adicione *args para capturar (e ignorar) X_train, X_val, anom_val
    # Ou defina explicitamente: def a(X_tr, X_val, anom, new_window_size, ...)
    def a(*args, new_window_size, window_overlap, dimensionsPerSample=None):
        pp.preprocessar_todos_deepLearning(window_size=new_window_size,window_overlap=window_overlap)
        return *(pp.normal_splits)[:2],pp.anomalo_splits[0]
    return a



class StaticFeaturesTuner(BaseHyperParamTuner):
    """
    Tuner for Traditional ML models where windowing involves expensive Feature Engineering.
    """
    def __init__(self,
                window_params: dict[str, list],
                 feature_engineering_fn: Callable[..., tuple[np.ndarray, np.ndarray, np.ndarray]]=recompute_preprocessing,  #type:ignore
                 **kwargs):
        """
        Args:
            feature_engineering_fn: Function that accepts (X_train, X_val, anom_val, **window_params)
                                    and returns (X_train_feats, X_val_feats, anom_val_feats).
                                    The inputs X_train etc. will be the RAW data stored in this class.
            window_params: Grid of parameters to pass to the function (e.g. {'window_size': [10, 20]}).
        """
        super().__init__(**kwargs)
        self.feature_engineering_fn = feature_engineering_fn
        self.window_params = window_params
        self._expandedWindowGrid = self._expand_grid(self.window_params)

    def _get_window_grid(self) -> list[dict]:
        return self._expandedWindowGrid

    def _prepare_data(self, window_params: dict):

        # 1. Call the user-provided function to generate features from RAW data
        # This is the "expensive" step, executed only once per window config via the Base class loop.
        X_train_feats, X_val_feats, anom_val_feats = self.feature_engineering_fn(
            self.X_train,
            self.X_val,
            self.anom_val,
            **window_params
        )

        # 2. Prepare validation concatenation (Standard logic)
        validationX = np.concatenate((X_val_feats, anom_val_feats), axis=0)
        validationLabels = np.concatenate((
            np.ones(X_val_feats.shape[0]),
            np.zeros(anom_val_feats.shape[0])
        ))

        return X_train_feats, validationX, validationLabels



In [42]:
from sklearn.mixture import GaussianMixture
import numpy as np

class GMMTrainer(Trainer):
    """GMMTrainer for anomaly detection using Gaussian Mixture Models."""

    def __init__(
        self,
        n_components: int = 2,
        covariance_type: str = "full",
        reg_covar: float = 1e-06,
        threshold_percentile: float = 5,
        **kwargs
    ) -> None:
        """
        Initializes the GMMTrainer with GMM parameters and an anomaly threshold percentile.

        Args:
            n_components: The number of mixture components.
            covariance_type: String describing the type of covariance parameters to use.
            reg_covar: Non-negative regularization added to the diagonal of covariance.
            threshold_percentile: Percentile of log-likelihoods to set as the anomaly threshold.
        """
        self.gmm = GaussianMixture(
            n_components=n_components,
            covariance_type=covariance_type,
            reg_covar=reg_covar,
            **kwargs
        )
        self.threshold: float | None = None
        self.threshold_percentile = threshold_percentile

    def TrainerFit(self, X_train: np.ndarray, Y_train: np.ndarray | None = None) -> None:
        """        Trains the GMM model and determines the anomaly threshold.

        Args:
            X_train: Normal training data.
            Y_train: Not used in this unsupervised anomaly detection method.
        """
        self.gmm.fit(X_train)
        log_likelihoods = self.gmm.score_samples(X_train)
        self.threshold = np.percentile(log_likelihoods, self.threshold_percentile)
        print(f"Anomaly threshold set at {self.threshold:.4f} (based on {self.threshold_percentile}th percentile).")

    def TrainerPred(self, X: np.ndarray) -> np.ndarray:
        """
        Predicts anomalies (0) or normal (1) for new data based on the trained GMM and threshold.

        Args:
            X: Data to classify.

        Returns:
            np.ndarray: An array of classifications (1 for normal, 0 for anomalous).
        """
        if self.threshold is None:
            raise ValueError("Model not fitted. Call TrainerFit first.")

        log_likelihoods = self.gmm.score_samples(X)
        # Classify as normal (1) if log-likelihood >= threshold, else anomalous (0)
        predictions = (log_likelihoods >= self.threshold).astype(int)
        return predictions

print("GMMTrainer class defined successfully.")

GMMTrainer class defined successfully.


In [9]:
pp = Preprocessing()
pp.preprocessar_todos_non_deepLearning(aplicar_savgol=True, train_splits=[0.7, 0.1, 0.2], test_splits=[0.0, 0.5, 0.5])



Filtro Savitzky-Golay aplicado em 9 colunas
Filtro Savitzky-Golay aplicado em 9 colunas

Fazendo Resampling e Interpolate de IMU_10Hz.csv
Buracos gerados pelo alinhamento: 44766
Resampling concluído. Novo shape: (876347, 10)

Fazendo Resampling e Interpolate de IMU_hitting_platform.csv
Buracos gerados pelo alinhamento: 900
Resampling concluído. Novo shape: (14992, 10)
PCA:
  - Features antes : 65
  - Features após: 32
  - Variância explicada: 0.9543


In [10]:
X_train = pp.normal_splits[0]
X_val = pp.normal_splits[1]
X_test = pp.normal_splits[2]

anom_val = pp.anomalo_splits[0]
anom_test = pp.anomalo_splits[1]

In [49]:
window_hyperparameters = {
    "new_window_size": [30, 60],
    "window_overlap": [0, 15],
    "dimensionsPerSample": [X_train.shape[1] // pp.normal.shape[1]]
}

model_hyperparameters = {
    "n_components": [1, 2, 3, 4],
    "covariance_type": ["diag", "full"],
    "reg_covar": [1e-05],
    "threshold_percentile": [1, 2.5, 5]
}

evaluator = Evaluator()

tuner = StaticFeaturesTuner(
    window_params=window_hyperparameters,
    modelTrainerParams=model_hyperparameters,
    feature_engineering_fn=recompute_preprocessing(pp),
    X_train=X_train,
    X_val=X_val,
    X_test=X_test,
    anom_val=anom_val,
    anom_test=anom_test
)

tuner.tune(GMMTrainer, evaluator)

print("fim")

if tuner.best_params:
    print("\nBest Hyperparameters Found:")
    print(f"  Window Params: {tuner.best_params['window_params']}")
    print(f"  Model Params: {tuner.best_params['model_params']}")
    print(f"Best Validation Score: {tuner.best_score:.4f}")
    print(f"Best scores: {tuner._bestResult}")
else:
    print("No best parameters found.")

Processing Window Config: {'new_window_size': 30, 'window_overlap': 0, 'dimensionsPerSample': 60}
Anomaly threshold set at -985.9116 (based on 1th percentile).
{'n_components': 1, 'covariance_type': 'diag', 'reg_covar': 1e-05, 'threshold_percentile': 1}
Anomaly threshold set at -970.3180 (based on 2.5th percentile).
Anomaly threshold set at -949.9687 (based on 5th percentile).
Anomaly threshold set at -813.8804 (based on 1th percentile).
Anomaly threshold set at -800.9970 (based on 2.5th percentile).
Anomaly threshold set at -787.5919 (based on 5th percentile).
Anomaly threshold set at -803.1392 (based on 1th percentile).
{'n_components': 3, 'covariance_type': 'diag', 'reg_covar': 1e-05, 'threshold_percentile': 1}
Anomaly threshold set at -797.4322 (based on 2.5th percentile).
Anomaly threshold set at -783.9876 (based on 5th percentile).
Anomaly threshold set at -802.7577 (based on 1th percentile).
{'n_components': 4, 'covariance_type': 'diag', 'reg_covar': 1e-05, 'threshold_percentile



Anomaly threshold set at -1574.9371 (based on 1th percentile).




Anomaly threshold set at -1555.0939 (based on 2.5th percentile).




Anomaly threshold set at -1517.0496 (based on 5th percentile).
Processing Window Config: {'new_window_size': 60, 'window_overlap': 15, 'dimensionsPerSample': 60}
Anomaly threshold set at -1913.4202 (based on 1th percentile).
Anomaly threshold set at -1871.2401 (based on 2.5th percentile).
Anomaly threshold set at -1768.8959 (based on 5th percentile).
Anomaly threshold set at -1622.1022 (based on 1th percentile).
Anomaly threshold set at -1587.6730 (based on 2.5th percentile).
Anomaly threshold set at -1547.4193 (based on 5th percentile).
Anomaly threshold set at -1593.5859 (based on 1th percentile).
Anomaly threshold set at -1562.4958 (based on 2.5th percentile).
Anomaly threshold set at -1524.6926 (based on 5th percentile).




Anomaly threshold set at -1588.7050 (based on 1th percentile).




Anomaly threshold set at -1555.4326 (based on 2.5th percentile).
Anomaly threshold set at -1516.5860 (based on 5th percentile).
fim

Best Hyperparameters Found:
  Window Params: {'new_window_size': 60, 'window_overlap': 0, 'dimensionsPerSample': 60}
  Model Params: {'n_components': 1, 'covariance_type': 'diag', 'reg_covar': 1e-05, 'threshold_percentile': 1}
Best Validation Score: 0.9765
Best scores: {'score': np.float64(0.9765491270112975), 'metrics': {'accuracy': 0.955008210180624, 'precision': 1.0, 'recall': 0.953098254022595, 'f1': 0.9759859772129711, 'auc': np.float64(0.9765491270112975)}, 'window_params': {'new_window_size': 60, 'window_overlap': 0, 'dimensionsPerSample': 60}, 'model_params': {'n_components': 1, 'covariance_type': 'diag', 'reg_covar': 1e-05, 'threshold_percentile': 1}, 'tuner_type': 'StaticFeaturesTuner'}




In [52]:
import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, confusion_matrix, classification_report
)

best_gmm = tuner.best_model

y_pred_test = best_gmm.TrainerPred(X_test)
y_pred_anom  = best_gmm.TrainerPred(anom_test)

y_pred = np.concatenate((y_pred_test, y_pred_anom))
y_true = np.concatenate((np.ones_like(y_pred_test, dtype=int), np.zeros_like(y_pred_anom, dtype=int)))

acc = accuracy_score(y_true, y_pred)
prec_anom = precision_score(y_true, y_pred, pos_label=0, zero_division=0)
rec_anom = recall_score(y_true, y_pred, pos_label=0, zero_division=0)
f1_anom = f1_score(y_true, y_pred, pos_label=0, zero_division=0)
cm = confusion_matrix(y_true, y_pred)

print("Confusion matrix (rows=true, cols=pred):\n", cm)
print(f"Accuracy: {acc:.4f}")
print(f"Precision (anomaly as +): {prec_anom:.4f}")
print(f"Recall    (anomaly as +): {rec_anom:.4f}")
print(f"F1-score  (anomaly as +): {f1_anom:.4f}")
print("\nClassification report:\n", classification_report(y_true, y_pred, target_names=['anomaly','normal'], zero_division=0))

try:
    ll_test = best_gmm.gmm.score_samples(X_test)
    ll_anom = best_gmm.gmm.score_samples(anom_test)
    print(f"\nMean log-likelihood (normal test): {np.mean(ll_test):.4f}")
    print(f"Mean log-likelihood (anomalous test): {np.mean(ll_anom):.4f}")

    scores_test = -ll_test
    scores_anom = -ll_anom
    scores = np.concatenate((scores_test, scores_anom))
    y_true_anom = (y_true == 0).astype(int)  # 1 = anomaly for roc/ap

    try:
        roc_auc = roc_auc_score(y_true_anom, scores)
    except ValueError:
        roc_auc = float("nan")
    try:
        ap = average_precision_score(y_true_anom, scores)
    except ValueError:
        ap = float("nan")

    print(f"ROC AUC (anomaly positive): {roc_auc:.4f}")
    print(f"Average Precision (PR AUC): {ap:.4f}")

except Exception as e:
    print("Não foi possível obter log-likelihoods do GMM para calcular scores contínuos:", e)
    print("Verifique se o best_gmm tem o atributo .gmm (uma GaussianMixture treinada).")


Confusion matrix (rows=true, cols=pred):
 [[ 166    0]
 [ 708 3186]]
Accuracy: 0.8256
Precision (anomaly as +): 0.1899
Recall    (anomaly as +): 1.0000
F1-score  (anomaly as +): 0.3192

Classification report:
               precision    recall  f1-score   support

     anomaly       0.19      1.00      0.32       166
      normal       1.00      0.82      0.90      3894

    accuracy                           0.83      4060
   macro avg       0.59      0.91      0.61      4060
weighted avg       0.97      0.83      0.88      4060


Mean log-likelihood (normal test): -1672.9433
Mean log-likelihood (anomalous test): -12554031.9700
ROC AUC (anomaly positive): 1.0000
Average Precision (PR AUC): 1.0000


In [13]:
from sklearn.ensemble import IsolationForest
import numpy as np

class IsolationForestTrainer(Trainer):
    """IsolationForestTrainer for anomaly detection using Isolation Forest."""

    def __init__(
        self,
        n_estimators: int = 100,
        max_samples: str | float = "auto",
        contamination: float | str = "auto",
        random_state: int | None = None,
        **kwargs
    ) -> None:
        """
        Initializes the IsolationForestTrainer with Isolation Forest parameters.

        Args:
            n_estimators: The number of base estimators in the ensemble.
            max_samples: The number of samples to draw from X to train each base estimator.
            contamination: The amount of contamination of the data set, i.e. the proportion of outliers in the data set.
            random_state: Controls the pseudo-randomness of the estimator.
            **kwargs: Additional parameters for sklearn.ensemble.IsolationForest.
        """
        self.model = IsolationForest(
            n_estimators=n_estimators,
            max_samples=max_samples,
            contamination=contamination,
            random_state=random_state,
            **kwargs
        )

    def TrainerFit(self, X_train: np.ndarray, Y_train: np.ndarray | None = None) -> None:
        """
        Trains the Isolation Forest model.

        Args:
            X_train: Normal training data.
            Y_train: Not used in this unsupervised anomaly detection method.
        """
        print("Training IsolationForest model...")
        self.model.fit(X_train)
        print("IsolationForest model trained successfully.")

    def TrainerPred(self, X: np.ndarray) -> np.ndarray:
        """
        Predicts anomalies (0) or normal (1) for new data.

        Args:
            X: Data to classify.

        Returns:
            np.ndarray: An array of classifications (1 for normal, 0 for anomalous).
        """
        raw_predictions = self.model.predict(X)
        # Convert Isolation Forest output (-1 for anomaly, 1 for normal) to Evaluator's format (0 for anomaly, 1 for normal)
        predictions = np.where(raw_predictions == 1, 1, 0)
        return predictions

print("IsolationForestTrainer class defined successfully.")

IsolationForestTrainer class defined successfully.


In [39]:
window_hyperparameters = {
    "new_window_size": [30, 60],
    "window_overlap": [0, 15],
    "dimensionsPerSample": [X_train.shape[1] // pp.normal.shape[1]]
}

model_hyperparameters = {
    "n_estimators": [50, 100, 150, 200, 250, 300],
    "max_samples": ["auto", 0.25, 0.5, 0.75],
    "contamination": ["auto", 0.01, 0.05, 0.1],
    "random_state": [42]
}

evaluator = Evaluator()

tuner = StaticFeaturesTuner(
    window_params=window_hyperparameters,
    modelTrainerParams=model_hyperparameters,
    feature_engineering_fn=recompute_preprocessing(pp),
    X_train=X_train,
    X_val=X_val,
    X_test=X_test,
    anom_val=anom_val,
    anom_test=anom_test
)

tuner.tune(IsolationForestTrainer, evaluator)

print(f"  Window Params: {tuner.best_params['window_params']}")
print(f"  Model Params: {tuner.best_params['model_params']}")
print(f"Best Validation Score: {tuner.best_score:.4f}")

Processing Window Config: {'new_window_size': 30, 'window_overlap': 0, 'dimensionsPerSample': 60}
Training IsolationForest model...
IsolationForest model trained successfully.
{'n_estimators': 50, 'max_samples': 'auto', 'contamination': 'auto', 'random_state': 42}
Training IsolationForest model...
IsolationForest model trained successfully.
Training IsolationForest model...
IsolationForest model trained successfully.
Training IsolationForest model...
IsolationForest model trained successfully.
Training IsolationForest model...
IsolationForest model trained successfully.
{'n_estimators': 50, 'max_samples': 0.25, 'contamination': 'auto', 'random_state': 42}
Training IsolationForest model...
IsolationForest model trained successfully.
Training IsolationForest model...
IsolationForest model trained successfully.
Training IsolationForest model...
IsolationForest model trained successfully.
Training IsolationForest model...
IsolationForest model trained successfully.
{'n_estimators': 50, 'ma

In [40]:
print(tuner.best_model)
tuner.best_params

<__main__.IsolationForestTrainer object at 0x7fb59136f200>


{'score': np.float64(0.701500918929957),
 'metrics': {'accuracy': 0.8588669950738916,
  'precision': 0.9775668679896462,
  'recall': 0.8728813559322034,
  'f1': 0.9222629222629223,
  'auc': np.float64(0.701500918929957)},
 'window_params': {'new_window_size': 60,
  'window_overlap': 15,
  'dimensionsPerSample': 60},
 'model_params': {'n_estimators': 50,
  'max_samples': 0.75,
  'contamination': 'auto',
  'random_state': 42},
 'tuner_type': 'StaticFeaturesTuner'}

In [41]:
import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, confusion_matrix, classification_report
)

best_iso = tuner.best_model

y_pred_test = best_iso.TrainerPred(X_test)
y_pred_anom = best_iso.TrainerPred(anom_test)

y_pred = np.concatenate((y_pred_test, y_pred_anom))
y_true = np.concatenate((np.ones_like(y_pred_test, dtype=int), np.zeros_like(y_pred_anom, dtype=int)))

acc = accuracy_score(y_true, y_pred)
prec_anom = precision_score(y_true, y_pred, pos_label=0, zero_division=0)
rec_anom = recall_score(y_true, y_pred, pos_label=0, zero_division=0)
f1_anom = f1_score(y_true, y_pred, pos_label=0, zero_division=0)
cm = confusion_matrix(y_true, y_pred)

print("Confusion matrix (rows=true, cols=pred):\n", cm)
print(f"Accuracy: {acc:.4f}")
print(f"Precision (anomaly as +): {prec_anom:.4f}")
print(f"Recall    (anomaly as +): {rec_anom:.4f}")
print(f"F1-score  (anomaly as +): {f1_anom:.4f}")
print("\nClassification report:\n", classification_report(y_true, y_pred, target_names=['anomaly','normal'], zero_division=0))


if hasattr(best_iso, "model") and hasattr(best_iso.model, "decision_function"):
    scores_test = -best_iso.model.decision_function(X_test)
    scores_anom = -best_iso.model.decision_function(anom_test)
    scores = np.concatenate((scores_test, scores_anom))
    y_true_anom = (y_true == 0).astype(int)  # 1 = anomaly for sklearn metrics

    try:
        roc_auc = roc_auc_score(y_true_anom, scores)
    except ValueError:
        roc_auc = float("nan")
    try:
        ap = average_precision_score(y_true_anom, scores)
    except ValueError:
        ap = float("nan")

    print(f"ROC AUC (anomaly positive): {roc_auc:.4f}")
    print(f"Average Precision (PR AUC): {ap:.4f}")
else:
    print("Modelo não expõe decision_function; não foi possível calcular ROC/PR AUC.")


Confusion matrix (rows=true, cols=pred):
 [[  87   79]
 [ 498 3396]]
Accuracy: 0.8579
Precision (anomaly as +): 0.1487
Recall    (anomaly as +): 0.5241
F1-score  (anomaly as +): 0.2317

Classification report:
               precision    recall  f1-score   support

     anomaly       0.15      0.52      0.23       166
      normal       0.98      0.87      0.92      3894

    accuracy                           0.86      4060
   macro avg       0.56      0.70      0.58      4060
weighted avg       0.94      0.86      0.89      4060

ROC AUC (anomaly positive): 0.7745
Average Precision (PR AUC): 0.3212


In [20]:
import tensorflow as tf
from tensorflow.keras import layers, models

def build_cnn_autoencoder(
    window_size: int,
    n_sensors: int,
    filters: list = (32, 16),
    kernel_sizes: list = (3, 3),
    pool_size: int = 2,
    latent_channels: int = 8,
    use_batchnorm: bool = True,
):
    assert len(filters) == len(kernel_sizes)
    inp = layers.Input(shape=(window_size, n_sensors))

    # Encoder
    x = inp
    for f, k in zip(filters, kernel_sizes):
        x = layers.Conv1D(f, kernel_size=k, padding='same', activation='relu')(x)
        if use_batchnorm:
            x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=pool_size, padding='same')(x)

    # Latent
    x = layers.Conv1D(latent_channels, kernel_size=1, padding='same', activation='relu')(x)

    # Decoder
    x = layers.UpSampling1D(size=pool_size)(x)
    for f, k in zip(filters[::-1], kernel_sizes[::-1]):
        x = layers.Conv1D(f, kernel_size=k, padding='same', activation='relu')(x)
        if use_batchnorm:
            x = layers.BatchNormalization()(x)

    decoded = layers.Conv1D(n_sensors, kernel_size=1, padding='same', activation=None)(x)

    model = models.Model(inp, decoded)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss='mse')
    return model


In [56]:
WINDOW_SIZE_DL = 60
WINDOW_OVERLAP_DL = 15

pp.preprocessar_todos_deepLearning(window_size=WINDOW_SIZE_DL, window_overlap=WINDOW_OVERLAP_DL)

X_train = pp.normal_splits[0]
X_val = pp.normal_splits[1]
anom_val = pp.anomalo_splits[0]

n_features_per_timestep = pp.normal.shape[1]
window_length = WINDOW_SIZE_DL

# Reshape the flattened windows back into 3D format (num_samples, window_length, n_features_per_timestep)
X_train_reshaped = X_train.reshape(-1, window_length, n_features_per_timestep)
X_val_reshaped = X_val.reshape(-1, window_length, n_features_per_timestep)
anom_val_reshaped = anom_val.reshape(-1, window_length, n_features_per_timestep)

print(f"Number of features per timestep (n_sensors): {n_features_per_timestep}")
print(f"Window length: {window_length}")
print(f"Shape of X_train_reshaped: {X_train_reshaped.shape}")
print(f"Shape of X_val_reshaped: {X_val_reshaped.shape}")
print(f"Shape of anom_val_reshaped: {anom_val_reshaped.shape}")

Number of features per timestep (n_sensors): 10
Window length: 60
Shape of X_train_reshaped: (11684, 60, 10)
Shape of X_val_reshaped: (3894, 60, 10)
Shape of anom_val_reshaped: (166, 60, 10)


In [61]:
import tensorflow as tf

cnn_autoencoder_model = build_cnn_autoencoder(
    window_size=window_length,
    n_sensors=n_features_per_timestep,
    filters=[8, 6],
    kernel_sizes=[5, 3],
    pool_size=2,
    latent_channels=4,
    use_batchnorm=True
)

cnn_autoencoder_model.fit(
    X_train_reshaped,
    X_train_reshaped,
    epochs=60,
    batch_size=32,
    validation_data=(X_val_reshaped, X_val_reshaped),
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
    verbose=1
)


Epoch 1/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 21ms/step - loss: 19.5279 - val_loss: 10.9729
Epoch 2/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 12ms/step - loss: 7.9083 - val_loss: 4.7716
Epoch 3/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 10ms/step - loss: 4.0962 - val_loss: 2.7320
Epoch 4/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 10ms/step - loss: 2.9565 - val_loss: 2.2188
Epoch 5/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 12ms/step - loss: 2.5028 - val_loss: 1.9380
Epoch 6/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 10ms/step - loss: 2.2636 - val_loss: 1.7598
Epoch 7/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 10ms/step - loss: 2.1119 - val_loss: 1.6416
Epoch 8/60
[1m366/366[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 13ms/step - loss: 1.9847 - val_loss: 1.5399
Epoch 9/60
[1m366/366[0m [

<keras.src.callbacks.history.History at 0x7fb56ffce690>

In [62]:
print("\nEvaluating Autoencoder Reconstruction Error...")
X_val_pred = cnn_autoencoder_model.predict(X_val_reshaped)
mse_normal = np.mean(np.square(X_val_reshaped - X_val_pred), axis=(1, 2))

anom_val_pred = cnn_autoencoder_model.predict(anom_val_reshaped)
mse_anomalous = np.mean(np.square(anom_val_reshaped - anom_val_pred), axis=(1, 2))

print(f"Mean Reconstruction Error (MSE) for normal validation data: {np.mean(mse_normal):.4f}")
print(f"Mean Reconstruction Error (MSE) for anomalous validation data: {np.mean(mse_anomalous):.4f}")


Evaluating Autoencoder Reconstruction Error...
[1m122/122[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step 
Mean Reconstruction Error (MSE) for normal validation data: 0.8685
Mean Reconstruction Error (MSE) for anomalous validation data: 14014.8165


In [63]:
import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, confusion_matrix, classification_report
)

anomaly_threshold = np.percentile(mse_normal, 99)  # NÃO 100

y_pred_normal = (mse_normal <= anomaly_threshold).astype(int)   # normal se <= limiar
y_pred_anomalous = (mse_anomalous <= anomaly_threshold).astype(int)

y_pred = np.concatenate((y_pred_normal, y_pred_anomalous))
y_true = np.concatenate((
    np.ones_like(mse_normal, dtype=int),  # 1 = normal
    np.zeros_like(mse_anomalous, dtype=int)  # 0 = anomaly
))

acc = accuracy_score(y_true, y_pred)
prec_anom = precision_score(y_true, y_pred, pos_label=0, zero_division=0)
rec_anom = recall_score(y_true, y_pred, pos_label=0, zero_division=0)
f1_anom  = f1_score(y_true, y_pred, pos_label=0, zero_division=0)
cm = confusion_matrix(y_true, y_pred)

print("Confusion matrix:\n", cm)
print(f"Accuracy: {acc:.4f}")
print(f"Precision (anomaly as +): {prec_anom:.4f}")
print(f"Recall    (anomaly as +): {rec_anom:.4f}")
print(f"F1-score  (anomaly as +): {f1_anom:.4f}")
print("\nClassification report:\n", classification_report(y_true, y_pred, target_names=['anomaly','normal'], zero_division=0))

scores = np.concatenate((mse_normal, mse_anomalous))  # maior => mais anômalo
y_true_anom = (y_true == 0).astype(int)               # 1 = anomaly

roc_auc = roc_auc_score(y_true_anom, scores)
ap = average_precision_score(y_true_anom, scores)    # PR AUC

print(f"ROC AUC (anomaly positive): {roc_auc:.4f}")
print(f"Average Precision (PR AUC): {ap:.4f}")


Confusion matrix:
 [[ 166    0]
 [  39 3855]]
Accuracy: 0.9904
Precision (anomaly as +): 0.8098
Recall    (anomaly as +): 1.0000
F1-score  (anomaly as +): 0.8949

Classification report:
               precision    recall  f1-score   support

     anomaly       0.81      1.00      0.89       166
      normal       1.00      0.99      0.99      3894

    accuracy                           0.99      4060
   macro avg       0.90      0.99      0.94      4060
weighted avg       0.99      0.99      0.99      4060

ROC AUC (anomaly positive): 1.0000
Average Precision (PR AUC): 1.0000


In [64]:
X_test = pp.normal_splits[2]
anom_test = pp.anomalo_splits[1]

X_test_reshaped = X_test.reshape(-1, window_length, n_features_per_timestep)
anom_test_reshaped = anom_test.reshape(-1, window_length, n_features_per_timestep)

print(f"Shape of X_test_reshaped: {X_test_reshaped.shape}")
print(f"Shape of anom_test_reshaped: {anom_test_reshaped.shape}")

print("\nEvaluating Autoencoder Reconstruction Error on Test Data...")
X_test_pred = cnn_autoencoder_model.predict(X_test_reshaped)
mse_test_normal = np.mean(np.square(X_test_reshaped - X_test_pred), axis=(1, 2))

anom_test_pred = cnn_autoencoder_model.predict(anom_test_reshaped)
mse_test_anomalous = np.mean(np.square(anom_test_reshaped - anom_test_pred), axis=(1, 2))

print(f"Mean Reconstruction Error (MSE) for normal test data: {np.mean(mse_test_normal):.4f}")
print(f"Mean Reconstruction Error (MSE) for anomalous test data: {np.mean(mse_test_anomalous):.4f}")

y_pred_test_normal = (mse_test_normal <= anomaly_threshold).astype(int)
y_pred_test_anomalous = (mse_test_anomalous <= anomaly_threshold).astype(int)

y_pred_test = np.concatenate((y_pred_test_normal, y_pred_test_anomalous))
y_true_test = np.concatenate((
    np.ones_like(mse_test_normal, dtype=int),  # 1 = normal
    np.zeros_like(mse_test_anomalous, dtype=int)  # 0 = anomaly
))

acc_test = accuracy_score(y_true_test, y_pred_test)
prec_anom_test = precision_score(y_true_test, y_pred_test, pos_label=0, zero_division=0)
rec_anom_test = recall_score(y_true_test, y_pred_test, pos_label=0, zero_division=0)
f1_anom_test  = f1_score(y_true_test, y_pred_test, pos_label=0, zero_division=0)
cm_test = confusion_matrix(y_true_test, y_pred_test)

print("\n--- Test Set Performance ---")
print("Confusion matrix:\n", cm_test)
print(f"Accuracy: {acc_test:.4f}")
print(f"Precision (anomaly as +): {prec_anom_test:.4f}")
print(f"Recall    (anomaly as +): {rec_anom_test:.4f}")
print(f"F1-score  (anomaly as +): {f1_anom_test:.4f}")
print("\nClassification report:\n", classification_report(y_true_test, y_pred_test, target_names=['anomaly','normal'], zero_division=0))

scores_test = np.concatenate((mse_test_normal, mse_test_anomalous))  # maior => mais anômalo
y_true_anom_test = (y_true_test == 0).astype(int)               # 1 = anomaly

roc_auc_test = roc_auc_score(y_true_anom_test, scores_test)
ap_test = average_precision_score(y_true_anom_test, scores_test)    # PR AUC

print(f"ROC AUC (anomaly positive): {roc_auc_test:.4f}")
print(f"Average Precision (PR AUC): {ap_test:.4f}")

Shape of X_test_reshaped: (3894, 60, 10)
Shape of anom_test_reshaped: (166, 60, 10)

Evaluating Autoencoder Reconstruction Error on Test Data...
[1m122/122[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step 
Mean Reconstruction Error (MSE) for normal test data: 1.0947
Mean Reconstruction Error (MSE) for anomalous test data: 14016.3169

--- Test Set Performance ---
Confusion matrix:
 [[ 166    0]
 [ 176 3718]]
Accuracy: 0.9567
Precision (anomaly as +): 0.4854
Recall    (anomaly as +): 1.0000
F1-score  (anomaly as +): 0.6535

Classification report:
               precision    recall  f1-score   support

     anomaly       0.49      1.00      0.65       166
      normal       1.00      0.95      0.98      3894

    accuracy                           0.96      4060
   macro avg       0.74      0.98      0.82      4060
weighted avg       0.98      0.96      0.96      4060

ROC AUC (anomaly positive): 1.0000


In [55]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks

class AETrainer(Trainer):
    def __init__(
        self,
        # arquitetura
        filters=(32,16),
        kernel_sizes=(3,3),
        pool_size=2,
        latent_channels=8,
        use_batchnorm=True,
        # treino
        batch_size=64,
        epochs=50,
        lr=1e-3,
        patience=5,
        threshold_percentile=95.0,
        verbose=0,
        window_length: int | None = None,
        n_sensors: int | None = None,
        random_seed: int | None = 42,
        **kwargs
    ):
        # arquitetura
        self.filters = list(filters)
        self.kernel_sizes = list(kernel_sizes)
        self.pool_size = pool_size
        self.latent_channels = latent_channels
        self.use_batchnorm = use_batchnorm

        # treino
        self.batch_size = int(batch_size)
        self.epochs = int(epochs)
        self.lr = float(lr)
        self.patience = int(patience)
        self.threshold_percentile = float(threshold_percentile)
        self.verbose = int(verbose)

        self.window_length = window_length
        self.n_sensors = n_sensors

        self.model: tf.keras.Model | None = None
        self.threshold: float | None = None
        self.train_recon_err_ = None

        if random_seed is not None:
            np.random.seed(random_seed)
            tf.random.set_seed(random_seed)

    def _build_model(self, window_size, n_sensors):
        inp = layers.Input(shape=(window_size, n_sensors))
        x = inp
        for f, k in zip(self.filters, self.kernel_sizes):
            x = layers.Conv1D(f, kernel_size=k, padding='same', activation='relu')(x)
            if self.use_batchnorm:
                x = layers.BatchNormalization()(x)
        x = layers.MaxPooling1D(pool_size=self.pool_size, padding='same')(x)
        x = layers.Conv1D(self.latent_channels, kernel_size=1, padding='same', activation='relu')(x)
        x = layers.UpSampling1D(size=self.pool_size)(x)
        for f, k in zip(self.filters[::-1], self.kernel_sizes[::-1]):
            x = layers.Conv1D(f, kernel_size=k, padding='same', activation='relu')(x)
            if self.use_batchnorm:
                x = layers.BatchNormalization()(x)
        decoded = layers.Conv1D(n_sensors, kernel_size=1, padding='same', activation=None)(x)
        model = models.Model(inp, decoded)
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.lr), loss='mse')
        return model

    def TrainerFit(self, X_train_flat: np.ndarray, Y_train: np.ndarray | None = None) -> None:
        """
        X_train_flat: (n_windows, window_length * n_sensors)  <- that's what Preprocessing.resizeFlattenedWindow returns
        """
        print("Training Autoencoder")
        if X_train_flat.ndim != 2:
            raise ValueError("X_train must be 2D flattened windows (n_windows, window_length * n_sensors)")

        if (self.window_length is None) or (self.n_sensors is None):

            total_feats = X_train_flat.shape[1]
            if total_feats % self.n_sensors == 0 and self.n_sensors is not None:
                self.window_length = total_feats // self.n_sensors
            else:
                raise ValueError("window_length and n_sensors must be set in AETrainer constructor.")

        # reshape para 3D (n_windows, T, S)
        X_train = X_train_flat.reshape(-1, self.window_length, self.n_sensors)

        self.model = self._build_model(self.window_length, self.n_sensors)

        es = callbacks.EarlyStopping(monitor='val_loss', patience=self.patience, restore_best_weights=True, verbose=0)
        history = self.model.fit(
            X_train, X_train,
            epochs=self.epochs,
            batch_size=self.batch_size,
            validation_split=0.1,
            callbacks=[es],
            verbose=self.verbose
        )

        # compute reconstruction errors on train (to set threshold)
        X_recon = self.model.predict(X_train, batch_size=self.batch_size, verbose=0)
        recon_err = np.mean((X_train - X_recon) ** 2, axis=(1,2))
        self.train_recon_err_ = recon_err
        self.threshold = float(np.percentile(recon_err, self.threshold_percentile))
        self._train_shape = X_train.shape
        print("Training complete")

    def anomaly_score(self, X_flat: np.ndarray) -> np.ndarray:
        if self.model is None:
            raise ValueError("Model not fitted.")
        X = X_flat.reshape(-1, self.window_length, self.n_sensors)
        X_recon = self.model.predict(X, batch_size=self.batch_size, verbose=0)
        recon_err = np.mean((X - X_recon) ** 2, axis=(1,2))  # greater => more anomalous
        return recon_err

    def TrainerPred(self, X_flat: np.ndarray) -> np.ndarray:
        """Return 1 = normal, 0 = anomaly (keeps your convention)"""
        if self.threshold is None:
            raise ValueError("Model not fitted (threshold missing).")
        scores = self.anomaly_score(X_flat)
        preds = (scores <= self.threshold).astype(int)
        return preds


WINDOW_SIZE_DL = 60
WINDOW_OVERLAP_DL = 15
n_features_per_timestep = pp.normal.shape[1]

window_hyperparameters = {
    "new_window_size": [WINDOW_SIZE_DL],
    "window_overlap": [WINDOW_OVERLAP_DL],
    "dimensionsPerSample": [n_features_per_timestep]
}

model_hyperparameters = {
    "filters": [[8, 6], [12, 8]],           # versão leve e versão mais capaz
    "kernel_sizes": [[3,3], [5,3]],
    "latent_channels": [4, 8],
    "batch_size": [64],
    "epochs": [60],
    "lr": [1e-3],
    "patience": [5],
    "threshold_percentile": [97.5, 99],
    # sttatic
    "window_length": [WINDOW_SIZE_DL],
    "n_sensors": [n_features_per_timestep],
    "random_seed": [42],
    "verbose": [0]
}

evaluator = Evaluator()

tuner = DeepLearningTuner(
    window_params=window_hyperparameters,
    modelTrainerParams=model_hyperparameters,
    X_train=X_train,    # raw or flattened windows acceptable: DeepLearningTuner will call resizeFlattenedWindow
    X_val=X_val,
    X_test=X_test,
    anom_val=anom_val,
    anom_test=anom_test
)

tuner.tune(Trainer_factory=AETrainer, evaluator=evaluator)

if tuner.best_params is not None:
    print("\nBest configuration:")
    print("Window Params:", tuner.best_params["window_params"])
    print("Model Params:", tuner.best_params["model_params"])
    print(f"Best validation score: {tuner.best_score:.4f}")
    best_ae = tuner.best_model
else:
    print("Nenhum modelo válido encontrado.")


Processing Window Config: {'new_window_size': 60, 'window_overlap': 15, 'dimensionsPerSample': 10}
Training Autoencoder
Training complete
{'filters': [8, 6], 'kernel_sizes': [3, 3], 'latent_channels': 4, 'batch_size': 64, 'epochs': 60, 'lr': 0.001, 'patience': 5, 'threshold_percentile': 97.5, 'window_length': 60, 'n_sensors': 10, 'random_seed': 42, 'verbose': 0}
Training Autoencoder
Training complete
{'filters': [8, 6], 'kernel_sizes': [3, 3], 'latent_channels': 4, 'batch_size': 64, 'epochs': 60, 'lr': 0.001, 'patience': 5, 'threshold_percentile': 99, 'window_length': 60, 'n_sensors': 10, 'random_seed': 42, 'verbose': 0}
Training Autoencoder
Training complete
Training Autoencoder
Training complete
Training Autoencoder
Training complete
Training Autoencoder
Training complete
{'filters': [8, 6], 'kernel_sizes': [5, 3], 'latent_channels': 4, 'batch_size': 64, 'epochs': 60, 'lr': 0.001, 'patience': 5, 'threshold_percentile': 99, 'window_length': 60, 'n_sensors': 10, 'random_seed': 42, 'ver

KeyboardInterrupt: 

In [None]:
tuner.best_params