In [1]:
import sys
from pathlib import Path
from datetime import timedelta
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
from itertools import chain
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, RepeatVector, TimeDistributed, Dense


def dataframe_from_csv(target):
    return pd.read_csv(target).rename(columns=lambda x: x.strip())

def dataframe_from_csvs(targets):
    return pd.concat([dataframe_from_csv(x) for x in targets])

TEST_DATASET = sorted([x for x in Path("./test1/").glob("*.csv")])
TRAIN_DATASET = sorted([x for x in Path("./train/").glob("*.csv")])
TEST_DF_RAW = dataframe_from_csvs(TEST_DATASET)
TRAIN_DF_RAW = dataframe_from_csvs(TRAIN_DATASET)
ATTACK_DF = TEST_DF_RAW['attack']


In [2]:
DROP_FIELD = ["time", "attack_P1", "attack_P2", "attack_P3","attack"]
VALID_COLUMNS_IN_TRAIN_DATASET = TRAIN_DF_RAW.columns.drop(DROP_FIELD) # DROP_FIELD를 통해 normalization에 사용하지 않을 변수를 제거함.
TAG_MIN = TRAIN_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET].min()
TAG_MAX = TRAIN_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET].max()

In [3]:
def normalize(df, TAG_MIN, TAG_MAX):
    ndf = df.copy()
    for c in df.columns:
        if TAG_MIN[c] == TAG_MAX[c]:
            ndf[c] = df[c] - TAG_MIN[c]
        else:
            ndf[c] = (df[c] - TAG_MIN[c]) / (TAG_MAX[c] - TAG_MIN[c])
    return ndf

# Min-Max Normalize
TRAIN_DF = normalize(TRAIN_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET], TAG_MIN, TAG_MAX).ewm(alpha=0.9).mean()


In [4]:
def boundary_check(df):
    x = np.array(df, dtype=np.float32)
    return np.any(x > 1.0), np.any(x < 0), np.any(np.isnan(x))

# Boundary Check
print(boundary_check(TRAIN_DF))

(False, False, False)


In [5]:
print(TRAIN_DF.shape)

(550800, 59)


In [6]:
train = np.array(TRAIN_DF)
x_train = train.reshape(train.shape[0], 1, train.shape[1])
x_train.shape

(550800, 1, 59)

In [None]:
print(len(TEST_DF))
TEST_DF = TEST_DF.dropna()
print(len(TEST_DF))

In [7]:
#비지도 학습

window_size = 60
label_size = 100000
def sliding_window_unsupervised(df, window_size, feature_columns, answer_column):
    data = df[feature_columns].values
    answers = answer_column.values

    num_samples = len(df) - window_size
    features = np.empty((num_samples, window_size, len(feature_columns)), dtype=np.float32)
    targets = np.empty((num_samples, window_size, len(feature_columns)), dtype=np.float32)
    answer_targets = np.empty(num_samples, dtype=int)

    for i in range(num_samples):
        features[i] = data[i:i+window_size]
        targets[i] = data[i+window_size]
        answer_targets[i] = 1 if np.any(answers[i:i+window_size] == 1) else 0

    return features, targets, answer_targets

features = []
targets = []
answers = []

In [9]:
feature_columns = ['P1_B2004', 'P1_B2016', 'P1_B3004', 'P1_B3005', 'P1_B4002', 'P1_B4005', 'P1_B400B',
                   'P1_B4022', 'P1_FCV01D', 'P1_FCV01Z', 'P1_FCV02D', 'P1_FCV02Z', 'P1_FCV03D',
                   'P1_FCV03Z', 'P1_FT01', 'P1_FT01Z', 'P1_FT02', 'P1_FT02Z', 'P1_FT03', 'P1_FT03Z',
                   'P1_LCV01D', 'P1_LIT01', 'P1_PCV01D', 'P1_PCV01Z', 'P1_PCV02D', 'P1_PCV02Z',
                   'P1_PIT01', 'P1_PIT02', 'P1_TIT01', 'P1_TIT02']

features, targets, answers = sliding_window_unsupervised(TRAIN_DF[:label_size], 60, feature_columns, ATTACK_DF[:label_size])
print(features.shape)
print(targets.shape)
print(answers.shape)

(99940, 60, 30)
(99940, 60, 30)
(99940,)


In [None]:
# 데이터 분할
features_train, features_test, _, targets_test, _, labels_test = train_test_split(features, targets, answers, test_size=0.2,random_state=42)

In [None]:
def build_autoencoder(window_size, num_features):
    model = Sequential()
    # 인코더
    model.add(LSTM(64, activation='relu', input_shape=(window_size, num_features), return_sequences=False))

    model.add(Dense(64))  # 잠재 공간 표현
    model.add(RepeatVector(window_size))  # 디코더로 전달할 시퀀스 재생성
    model.add(Dense(64))
    
    # 디코더
    model.add(LSTM(64, activation='relu', return_sequences=True))
    model.add(TimeDistributed(Dense(num_features)))

    # 옵티마이저에 학습률 설정
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mae')
    return model

In [None]:
# 모델 생성
autoencoder = build_autoencoder(window_size, len(feature_columns))

In [None]:
# 모델 훈련
autoencoder.fit(features_train, features_train, epochs=50, batch_size=64, verbose=1)

In [None]:
# EarlyStopping 설정
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1)

In [None]:
def calculate_ma(scores, n):
    """ 주어진 점수 배열에 대해 이동 평균(MA)을 계산합니다. """
    if scores.ndim > 1:
        raise ValueError("Scores array must be 1-dimensional for moving average calculation.")
    weights = np.ones(n) / n
    ma = np.convolve(scores, weights, 'valid')
    return ma

def calculate_as_t(predicted, targets, n):
    """ 예측값과 타겟 값을 비교하여 AS_t 값을 계산합니다. """
    if predicted.shape != targets.shape:
        raise ValueError("Predicted and targets must have the same shape.")
    
    num_sequences, num_timesteps, num_features = predicted.shape
    as_t_values = np.zeros((num_sequences, num_features))

    for feature_index in range(num_features):
        for sequence_index in range(num_sequences):
            # 특정 피처에 대한 시퀀스 추출
            predicted_sequence = predicted[sequence_index, :, feature_index]
            target_sequence = targets[sequence_index, :, feature_index]

            # MA_t 계산
            ma = calculate_ma(predicted_sequence[:-1], n)
            
            # p_t 계산
            p_t = np.abs(predicted_sequence[n:] - target_sequence[n:])
            
            # AS_t 계산
            as_t = (ma + p_t) / 2
            
            # 특정 시퀀스와 피처에 대한 AS_t 저장
            as_t_values[sequence_index, feature_index] = np.mean(as_t)

    return as_t_values


In [None]:
# 모델로부터 예측값을 얻습니다.
predicted = autoencoder.predict(features_test)

In [None]:
print(predicted.shape)
print(targets_test.shape)

In [None]:
as_t_values = calculate_as_t(predicted, targets_test, len(feature_columns))
print(as_t_values)

In [None]:
print(as_t_values.shape)

In [None]:
# as_t_values에서 첫 번째 특성(인덱스 0)의 모든 AS_t 값 선택
feature_index = 11  # 이 값을 변경하여 다른 특성을 선택할 수 있습니다.
as_t_feature_values = as_t_values[:, feature_index]

# x축 값 생성
x_values = np.arange(len(as_t_feature_values))  # 0부터 19987까지의 정수 배열

# 그래프 생성
plt.figure(figsize=(15, 5))  # 그래프 크기 설정
plt.plot(x_values, as_t_feature_values, label=f'AS_t Values for Feature {feature_index}', marker='o', markersize=2, linestyle='-', color='blue')  # 선 그래프로 AS_t 값을 표시
plt.title(f'AS_t Values Over Time for Feature {feature_index}')  # 그래프 제목
plt.xlabel('Data Point Index')  # x축 레이블
plt.ylabel('AS_t Value')  # y축 레이블
plt.legend()  # 범례 표시
plt.grid(True)  # 그리드 표시
plt.show()  # 그래프 보여주기

In [None]:
from sklearn.metrics import classification_report
import seaborn as sns
def evaluate_model(as_t_values, labels_test, threshold=0.01):
    # 각 피쳐별 이상 여부 판단
    anomalies = as_t_values > threshold

    # 이상 여부 판단 결과를 이진 플래그로 변환 (피처 중 하나라도 이상이면 이상으로 판단)
    anomaly_flags = np.any(anomalies, axis=1).astype(int)
    print(labels_test.shape)
    print(anomaly_flags.shape)

    # 실제 공격 레이블과의 비교
    print("Classification Report:")
    print(classification_report(labels_test, anomaly_flags))

    print("Accuracy Score:")
    print(accuracy_score(labels_test, anomaly_flags))

    print("F1 Score:")
    print(f1_score(labels_test, anomaly_flags, average='macro'))  # 'macro', 'micro', 'weighted' 등 필요에 따라 조정

    # 컨퓨전 매트릭스 생성 및 시각화
    cm = confusion_matrix(labels_test, anomaly_flags)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Normal', 'Anomaly'], yticklabels=['Normal', 'Anomaly'])
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.title('Confusion Matrix')
    plt.show()

In [None]:
# 임계값을 변경하면서 F1 점수 최대화 탐색
min_threshold = np.min(as_t_values)
max_threshold = np.max(as_t_values)
threshold_values = np.arange(min_threshold, max_threshold, 0.00000001)
best_threshold = 0
best_f1 = 0

# 각 임계값에 대한 F1 점수 계산
for threshold in threshold_values:
    # 이상 여부 판단
    anomalies = as_t_values > threshold
    anomaly_flags = np.any(anomalies, axis=1).astype(int)

    # 현재 임계값에 대한 F1 점수 계산
    current_f1 = f1_score(labels_test, anomaly_flags, average='macro')

    # 최적의 F1 점수 업데이트
    if current_f1 > best_f1:
        best_f1 = current_f1
        best_threshold = threshold

# 결과 출력
print(f"Best Threshold: {best_threshold}")
print(f"Best F1 Score: {best_f1}")

In [None]:
print(max_threshold)
print(min_threshold)