# Libs

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, ConfusionMatrixDisplay

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, RepeatVector, TimeDistributed

# Silenciar logs menos importantes do TensorFlow
tf.get_logger().setLevel('ERROR')

# Data generation

In [None]:
# 1. GERAÇÃO DE DADOS SINTÉTICOS
# -----------------------------------------------------------------------------
def generate_data():
    """Gera um DataFrame com séries temporais multivariadas e anomalias injetadas."""
    np.random.seed(42)
    n_timesteps = 1000
    timestamps = pd.to_datetime(pd.date_range('2023-01-01', periods=n_timesteps, freq='H'))

    # Comportamento normal (ondas senoidais com ruído)
    time = np.arange(n_timesteps)
    v1 = np.sin(time / 20) + np.random.normal(0, 0.1, n_timesteps)
    v2 = np.cos(time / 30) + np.random.normal(0, 0.1, n_timesteps)
    v3 = np.sin(time / 15) * np.cos(time / 25) + np.random.normal(0, 0.1, n_timesteps)

    df = pd.DataFrame({'V1': v1, 'V2': v2, 'V3': v3}, index=timestamps)
    
    # Injetar anomalias no período de teste
    is_anomaly = pd.Series(np.zeros(n_timesteps, dtype=int), index=timestamps)
    
    # Anomalia 1: Pico súbito
    df.loc['2023-02-10 00:00:00':'2023-02-10 02:00:00', 'V1'] += 3.5
    is_anomaly.loc['2023-02-10 00:00:00':'2023-02-10 02:00:00'] = 1

    # Anomalia 2: Mudança de nível
    df.loc['2023-02-20 12:00:00':'2023-02-20 18:00:00', 'V2'] -= 2.0
    is_anomaly.loc['2023-02-20 12:00:00':'2023-02-20 18:00:00'] = 1
    
    # Anomalia 3: Ruído excessivo
    noise_period = df.loc['2023-02-01 06:00:00':'2023-02-01 10:00:00']
    df.loc[noise_period.index, 'V3'] += np.random.normal(0, 1.0, len(noise_period))
    is_anomaly.loc[noise_period.index] = 1

    return df, is_anomaly

# Pre-process and create windows

In [None]:
# 2. PRÉ-PROCESSAMENTO E CRIAÇÃO DE JANELAS
# -----------------------------------------------------------------------------
def preprocess_and_window(data, window_size):
    """Normaliza e cria janelas deslizantes (sequências) a partir dos dados."""
    # Normalização
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)
    
    # Criação das janelas
    X = []
    for i in range(len(data_scaled) - window_size + 1):
        X.append(data_scaled[i:i + window_size])
    
    return np.array(X), scaler

# Build autoencoder

In [None]:
# 3. CONSTRUÇÃO DO MODELO AUTOENCODER LSTM
# -----------------------------------------------------------------------------
def create_lstm_autoencoder(input_shape):
    """Cria e compila um modelo de Autoencoder LSTM."""
    # Encoder
    inputs = Input(shape=input_shape)
    # A dimensionalidade latente é 32
    encoded = LSTM(128, activation='relu', return_sequences=False)(inputs)
    encoded = RepeatVector(input_shape[0])(encoded) # Repete o vetor latente para a entrada do decoder
    
    # Decoder
    decoded = LSTM(128, activation='relu', return_sequences=True)(encoded)
    decoded = TimeDistributed(Dense(input_shape[1]))(decoded)
    
    # Autoencoder
    autoencoder = Model(inputs, decoded)
    autoencoder.compile(optimizer='adam', loss='mae') # Mean Absolute Error é robusto a outliers
    
    autoencoder.summary()
    return autoencoder

# Plot and evaluate

In [None]:
# 4. FUNÇÕES DE PLOTAGEM E AVALIAÇÃO
# -----------------------------------------------------------------------------
def plot_reconstruction_error(error_df, threshold):
    """Plota o erro de reconstrução e o limiar de anomalia."""
    plt.figure(figsize=(15, 6))
    plt.plot(error_df.index, error_df['error'], label='Erro de Reconstrução')
    plt.axhline(y=threshold, color='r', linestyle='--', label='Limiar de Anomalia')
    plt.title('Erro de Reconstrução ao Longo do Tempo')
    plt.xlabel('Data')
    plt.ylabel('Erro (MAE)')
    plt.legend()
    plt.grid(True)
    plt.show()

def plot_anomalies(data, anomaly_indices):
    """Plota a série temporal original destacando as anomalias detectadas."""
    plt.figure(figsize=(15, 8))
    for col in data.columns:
        plt.plot(data.index, data[col], label=col, alpha=0.7)
    
    # Destaca as anomalias detectadas
    anomalies = data.iloc[anomaly_indices]
    for col in anomalies.columns:
        plt.scatter(anomalies.index, anomalies[col], color='red', marker='x', s=50, label=f'Anomalia em {col}')
        
    plt.title('Detecção de Anomalias na Série Temporal Multivariada')
    plt.xlabel('Data')
    plt.ylabel('Valor Normalizado')
    
    # Lidar com legendas duplicadas
    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys())
    
    plt.grid(True)
    plt.show()

# Run

In [None]:
# PARÂMETROS
WINDOW_SIZE = 24 # Usar 24 horas de dados para prever o próximo passo
TRAIN_RATIO = 0.7 # 70% dos dados para treino (período normal)

# 1. Carregar e dividir os dados
df, is_anomaly = generate_data()
n_train = int(len(df) * TRAIN_RATIO)

train_df = df.iloc[:n_train]
test_df = df.iloc[n_train:]
test_labels = is_anomaly.iloc[n_train:]

print(f"Tamanho do dataset de treino: {len(train_df)}")
print(f"Tamanho do dataset de teste: {len(test_df)}")

# 2. Pré-processar e criar janelas
# Treinamos o scaler APENAS com dados de treino para evitar data leakage
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_df)
test_scaled = scaler.transform(test_df)

# Janelas de treino (somente dados normais)
X_train, _ = preprocess_and_window(train_df, WINDOW_SIZE)

# Janelas de teste (inclui anomalias)
X_test, _ = preprocess_and_window(test_df, WINDOW_SIZE)

print(f"Shape das janelas de treino: {X_train.shape}")
print(f"Shape das janelas de teste: {X_test.shape}")

# 3. Criar e treinar o modelo
input_shape = (X_train.shape[1], X_train.shape[2])
autoencoder = create_lstm_autoencoder(input_shape)

history = autoencoder.fit(
    X_train, X_train,
    epochs=50,
    batch_size=32,
    validation_split=0.1,
    shuffle=True,
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, mode='min')]
)

# 4. Calcular erro de reconstrução
# Erro no treino
train_pred = autoencoder.predict(X_train)
train_mae_loss = np.mean(np.abs(train_pred - X_train), axis=(1, 2))

# Erro no teste
test_pred = autoencoder.predict(X_test)
test_mae_loss = np.mean(np.abs(test_pred - X_test), axis=(1, 2))

# 5. Definir o limiar de anomalia
# Usaremos o 95º percentil do erro de treino como nosso limiar
# Esta é uma abordagem estatística comum.
anomaly_threshold = np.percentile(train_mae_loss, 95)
print(f"\nLimiar de anomalia definido: {anomaly_threshold:.4f}")

# Criar um DataFrame com os erros de teste para visualização
# Os erros correspondem ao final de cada janela
test_error_df = pd.DataFrame(
    index=test_df.index[WINDOW_SIZE-1:],
    data={'error': test_mae_loss}
)

# 6. Visualizar o erro de reconstrução
plot_reconstruction_error(test_error_df, anomaly_threshold)

# 7. Identificar e visualizar anomalias
predicted_anomalies_mask = test_mae_loss > anomaly_threshold

# Ajustar o tamanho dos rótulos verdadeiros para corresponder às predições
true_anomalies_mask = test_labels.iloc[WINDOW_SIZE-1:].values.astype(bool)

print("\n--- Métricas de Avaliação ---")
precision, recall, f1, _ = precision_recall_fscore_support(
    true_anomalies_mask, predicted_anomalies_mask, average='binary'
)
print(f"Precisão: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}\n")

# Matriz de Confusão
cm = confusion_matrix(true_anomalies_mask, predicted_anomalies_mask)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Normal', 'Anomalia'])
disp.plot(cmap=plt.cm.Blues)
plt.title('Matriz de Confusão')
plt.show()

# Visualizar as anomalias nos dados originais
# Normalizamos os dados de teste para a plotagem
df_test_scaled = pd.DataFrame(test_scaled, index=test_df.index, columns=test_df.columns)

# Obter os índices onde as anomalias foram detectadas
anomaly_indices = np.where(predicted_anomalies_mask)[0]

# Como as predições são por janela, precisamos mapear de volta para o timestamp original
plot_anomalies(df_test_scaled, anomaly_indices)