In [None]:
import pandas as pd
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout
from pathlib import Path
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

# 데이터 읽기
TRAIN_FILES = sorted([x for x in Path("/content/drive/MyDrive/Colab Notebooks/train/").glob("*.csv")])
TEST_FILES = sorted([x for x in Path("/content/drive/MyDrive/Colab Notebooks/test/").glob("*.csv")])

def dataframe_from_csv(target):
    return pd.read_csv(target).rename(columns=lambda x: x.strip())

def dataframe_from_csvs(targets):
    return pd.concat([dataframe_from_csv(x) for x in targets])

TRAIN_DF_RAW = dataframe_from_csvs(TRAIN_FILES)
TEST_DF_RAW = dataframe_from_csvs(TEST_FILES)

# 공격 레이블 제거
ATTACK_DF = TEST_DF_RAW['attack']
DROP_FIELD = ["time", "attack_P1", "attack_P2", "attack_P3", "attack"]
VALID_COLUMNS_IN_TRAIN_DATASET = TRAIN_DF_RAW.columns.drop(DROP_FIELD)

# 데이터 정규화
scaler = StandardScaler()
TRAIN_DF = scaler.fit_transform(TRAIN_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET])
TEST_DF = scaler.transform(TEST_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET])

# 슬라이딩 윈도우 함수 정의
def create_sliding_windows(data, window_size):
    windows = []
    for i in range(len(data) - window_size + 1):
        windows.append(data[i:i + window_size])
    return np.array(windows)

# 윈도우 크기 설정
window_size = 50

# 슬라이딩 윈도우 적용
TRAIN_WINDOWS = create_sliding_windows(TRAIN_DF, window_size)
TEST_WINDOWS = create_sliding_windows(TEST_DF, window_size)

# 오토인코더 모델 정의
input_dim = TRAIN_WINDOWS.shape[1] * TRAIN_WINDOWS.shape[2]
hidden_dim_1 = 128
hidden_dim_2 = 64
code_dim = 32

input_layer = Input(shape=(input_dim,))
hidden_1 = Dense(hidden_dim_1, activation='relu')(input_layer)
dropout_1 = Dropout(0.5)(hidden_1)
hidden_2 = Dense(hidden_dim_2, activation='relu')(dropout_1)
dropout_2 = Dropout(0.5)(hidden_2)
code = Dense(code_dim, activation='relu')(dropout_2)
hidden_3 = Dense(hidden_dim_2, activation='relu')(code)
dropout_3 = Dropout(0.5)(hidden_3)
hidden_4 = Dense(hidden_dim_1, activation='relu')(dropout_3)
output_layer = Dense(input_dim, activation='sigmoid')(hidden_4)

autoencoder = Model(inputs=input_layer, outputs=output_layer)
optimizer = Adam(learning_rate=0.001)
autoencoder.compile(optimizer=optimizer, loss='mse')

# 데이터셋을 훈련 및 검증으로 분리
train_data, val_data = train_test_split(TRAIN_WINDOWS.reshape(-1, input_dim), test_size=0.2, random_state=42)

# 오토인코더 모델 훈련
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
history = autoencoder.fit(train_data, train_data,
                          epochs=100,
                          batch_size=32,
                          shuffle=True,
                          validation_data=(val_data, val_data),
                          verbose=1,
                          callbacks=[early_stopping])

# 훈련 과정 시각화 (옵션)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.legend()
plt.show()

# 테스트 데이터셋 예측
TEST_WINDOWS_FLATTENED = TEST_WINDOWS.reshape(-1, input_dim)
reconstructed = autoencoder.predict(TEST_WINDOWS_FLATTENED)

# 재구성 오차 계산
mse = np.mean(np.power(TEST_WINDOWS_FLATTENED - reconstructed, 2), axis=1)

# 임계값 설정 (훈련 데이터에서의 95번째 백분위수 사용)
threshold = np.percentile(mse, 95)
print(f'Threshold: {threshold}')

# 이상 탐지
anomalies = mse > threshold
anomalies = anomalies.astype(int)  # 이진형으로 변환

# 결과 시각화
plt.figure(figsize=(10, 6))
plt.plot(mse, label='Reconstruction Error')
plt.hlines(threshold, xmin=0, xmax=len(mse), colors='r', label='Threshold')
plt.legend()
plt.show()

# 공격 레이블과 비교
from sklearn.metrics import classification_report, confusion_matrix

# 슬라이딩 윈도우의 중앙 값을 사용하여 공격 레이블 비교
attack_labels = ATTACK_DF.values[window_size // 2: -window_size // 2 + 1]

print(confusion_matrix(attack_labels, anomalies))
print(classification_report(attack_labels, anomalies))
