## Anomaly Detection Advanced - 개선된 오토인코더 실험

In [1]:
# 라이브러리 로드
import pandas as pd
import os
import warnings
warnings.filterwarnings('ignore')

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import KNNImputer

import tensorflow as tf
import numpy as np

# 사용자 정의 함수 import
from lib.vis import autoencoder_report, visualize_reconstruction
from lib.ae import (
    build_deep_dropout_ae, 
    build_ae_with_loss, 
    EnsembleAutoencoder, 
    build_vae, 
    feature_importance_by_reconstruction_error
)

In [None]:
# 데이터 로드 및 전처리 (기존과 동일)
dataset_dir = 'dataset'
if not os.path.exists(dataset_dir):
    os.makedirs(dataset_dir, exist_ok=True)

ecg_file = os.path.join(dataset_dir, 'ecg.csv')
if not os.path.exists(ecg_file):
    print("ECG 데이터 다운로드 중...")
    url = "https://storage.googleapis.com/download.tensorflow.org/data/ecg.csv"
    data = pd.read_csv(url)
    data.to_csv(ecg_file, index=False)
    print("다운로드 완료!")
else:
    print("ECG 데이터가 이미 존재합니다.")

In [3]:
data = pd.read_csv(ecg_file)
data.iloc[:, -1] = data.iloc[:, -1].apply(lambda x: 1 if x == 1 else 0)

# 결측치 처리
knn_imputer = KNNImputer(n_neighbors=5)
data_knn = pd.DataFrame(knn_imputer.fit_transform(data), columns=data.columns)
data = data_knn

# 데이터 분리
x = data.iloc[:, :-1]
y = data.iloc[:, -1]

# 훈련, 테스트 데이터 분리
x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=0.2, random_state=2025)

# 스케일링
scaler = MinMaxScaler()
x_train = scaler.fit_transform(x_train)
x_val = scaler.transform(x_val)

# 정상 데이터만 학습에 사용
x_train_normal = x_train[y_train == 1]

In [None]:
# 1. 깊은 층+드롭아웃 오토인코더
print("\n[1] 깊은 층+드롭아웃 오토인코더")
deep_ae = build_deep_dropout_ae(x_train_normal.shape[1], dropout_rate=0.3)
deep_ae.compile(optimizer='adam', loss='mse')
deep_ae.fit(x_train_normal, x_train_normal, epochs=50, batch_size=64, validation_data=(x_val, x_val), verbose=1)

pred_deep = deep_ae.predict(x_val)
autoencoder_report(x_val, pred_deep, y_val, 0.01)

In [None]:
# 2. MAE 손실 오토인코더
print("\n[2] MAE 손실 오토인코더")
mae_ae = build_ae_with_loss(x_train_normal.shape[1], loss_fn='mae')
mae_ae.fit(x_train_normal, x_train_normal, epochs=50, batch_size=64, validation_data=(x_val, x_val), verbose=1)

pred_mae = mae_ae.predict(x_val)
autoencoder_report(x_val, pred_mae, y_val, 0.01)

In [None]:
# 3. 앙상블 오토인코더
print("\n[3] 앙상블 오토인코더")
ensemble_ae = EnsembleAutoencoder(n_models=3, input_dim=x_train_normal.shape[1], build_fn=build_deep_dropout_ae)
ensemble_ae.fit(x_train_normal, epochs=30, batch_size=64, verbose=1)

pred_ensemble = ensemble_ae.predict(x_val)
autoencoder_report(x_val, pred_ensemble, y_val, 0.01)

In [None]:
# 4. 변분 오토인코더(VAE)
print("\n[4] 변분 오토인코더(VAE)")
vae = build_vae(x_train_normal.shape[1], latent_dim=4)
vae.fit(x_train_normal, x_train_normal, epochs=50, batch_size=64, validation_data=(x_val, x_val), verbose=1)

pred_vae = vae.predict(x_val)
autoencoder_report(x_val, pred_vae, y_val, 0.01)

In [None]:
# 5. 특성 중요도 분석 (깊은 층+드롭아웃 오토인코더 기준)
print("\n[5] 특성 중요도 분석 (재구성 오차 기반)")
importance = feature_importance_by_reconstruction_error(x_val, deep_ae)
feature_names = ["X_" + str(i) for i, c in enumerate(x.columns)]
for name, score in sorted(zip(feature_names, importance), key=lambda x: -x[1]):
    print(f"{name}: {score:.4f}")