### 라이브러리 불러오기
여러 라이브러리


In [62]:
import torch
from pathlib import Path
import pandas as pd
import numpy as np
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from pandas.plotting import register_matplotlib_converters
from torch import nn, optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, f1_score

%matplotlib inline
%config InlineBackend.figure_format='retina'

sns.set(style='whitegrid', palette='muted', font_scale=1.2)
rcParams['figure.figsize'] = 14, 10
register_matplotlib_converters()
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

<torch._C.Generator at 0x1e8d221f190>

In [46]:
def dataframe_from_csv(target):
    return pd.read_csv(target).rename(columns=lambda x: x.strip())

def dataframe_from_csvs(targets):
    return pd.concat([dataframe_from_csv(x) for x in targets])

### 데이터 불러오기

In [47]:
TEST_DATASET = sorted([x for x in Path("./test/").glob("*.csv")])
TRAIN_DATASET = sorted([x for x in Path("./train/").glob("*.csv")])
TEST_DF_RAW = dataframe_from_csvs(TEST_DATASET)
TRAIN_DF_RAW = dataframe_from_csvs(TRAIN_DATASET)
ATTACK_DF = TEST_DF_RAW['attack']

### 유효필드 정리

In [48]:
DROP_FIELD = ["time", "attack_P1", "attack_P2", "attack_P3","attack"]
VALID_COLUMNS_IN_TRAIN_DATASET = TRAIN_DF_RAW.columns.drop(DROP_FIELD) # DROP_FIELD를 통해 normalization에 사용하지 않을 변수를 제거함.
TAG_MIN = TRAIN_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET].min()
TAG_MAX = TRAIN_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET].max()

### 전처리

In [49]:
def normalize(df, TAG_MIN, TAG_MAX):
    ndf = df.copy()
    for c in df.columns:
        if TAG_MIN[c] == TAG_MAX[c]:
            ndf[c] = df[c] - TAG_MIN[c]
        else:
            ndf[c] = (df[c] - TAG_MIN[c]) / (TAG_MAX[c] - TAG_MIN[c])
    return ndf

# Min-Max Normalize
TRAIN_DF = normalize(TRAIN_DF_RAW[VALID_COLUMNS_IN_TRAIN_DATASET], TAG_MIN, TAG_MAX).ewm(alpha=0.9).mean()

### 정규화 체크

In [50]:
def boundary_check(df):
    x = np.array(df, dtype=np.float32)
    return np.any(x > 1.0), np.any(x < 0), np.any(np.isnan(x))

# Boundary Check
print(boundary_check(TRAIN_DF))

(False, False, False)


### 슬라이딩 윈도우 적용

In [51]:
# 비지도 학습을 위한 데이터셋 구성
window_size = 60
label_size = 30000
def sliding_window_unsupervised(df, window_size, feature_columns, answer_column):
    data = df[feature_columns].values
    answers = answer_column.values

    num_samples = len(df) - window_size
    features = np.empty((num_samples, window_size, len(feature_columns)), dtype=np.float32)
    answer_targets = np.empty(num_samples, dtype=int)

    for i in range(num_samples):
        features[i] = data[i:i+window_size]
        answer_targets[i] = 1 if np.any(answers[i:i+window_size] == 1) else 0

    return features, answer_targets

### 특정 컬럼 추출

In [52]:
feature_columns = ['P1_B2004', 'P1_B2016', 'P1_B3004', 'P1_B3005', 'P1_B4002', 'P1_B4005', 'P1_B400B',
                   'P1_B4022', 'P1_FCV01D', 'P1_FCV01Z', 'P1_FCV02D', 'P1_FCV02Z', 'P1_FCV03D',
                   'P1_FCV03Z', 'P1_FT01', 'P1_FT01Z', 'P1_FT02', 'P1_FT02Z', 'P1_FT03', 'P1_FT03Z',
                   'P1_LCV01D', 'P1_LIT01', 'P1_PCV01D', 'P1_PCV01Z', 'P1_PCV02D', 'P1_PCV02Z',
                   'P1_PIT01', 'P1_PIT02', 'P1_TIT01', 'P1_TIT02']

features, answers = sliding_window_unsupervised(TRAIN_DF[:label_size], 60, feature_columns, ATTACK_DF[:label_size])
print(features.shape)
print(answers.shape)


(29940, 60, 30)
(29940,)


### 데이터 분할

In [53]:
# 데이터셋 분할
features_train, features_test, labels_train, labels_test = train_test_split(features, answers, test_size=0.2, random_state=42)

### tensor로 변환

In [54]:
# numpy 배열을 torch Tensor로 변환
features_train = torch.tensor(features_train, dtype=torch.float32)
features_test = torch.tensor(features_test, dtype=torch.float32)
labels_train = torch.tensor(labels_train, dtype=torch.float32)
labels_test = torch.tensor(labels_test, dtype=torch.float32)

### CNN-LSTM 모델

In [70]:
import torch
import torch.nn as nn

class AnomalyDetector(nn.Module):
    # 레이어 초기화 생성자
    def __init__(self, n_features, n_hidden, seq_len, n_layers):
        super(AnomalyDetector, self).__init__()
        self.n_hidden = n_hidden
        self.seq_len = seq_len
        self.n_layers = n_layers

        # CNN 레이어
        self.c1 = nn.Conv1d(in_channels=n_features, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU()

        # LSTM 레이어
        self.lstm = nn.LSTM(
            input_size=64,
            hidden_size=n_hidden,
            num_layers=n_layers,
            batch_first=True
        )

        # 선형 레이어
        self.linear = nn.Linear(in_features=n_hidden, out_features=1)

    # 히든 상태 초기화
    def reset_hidden_state(self, batch_size):
        self.hidden = (
            torch.zeros(self.n_layers, batch_size, self.n_hidden).to(next(self.parameters()).device),
            torch.zeros(self.n_layers, batch_size, self.n_hidden).to(next(self.parameters()).device)
        )

    # 순전파 메서드
    def forward(self, sequences):
        # (batch_size, seq_len, n_features) -> (batch_size, n_features, seq_len)
        sequences = sequences.permute(0, 2, 1)

        # CNN 적용
        sequences = self.c1(sequences)
        sequences = self.relu(sequences)

        # (batch_size, n_features, seq_len) -> (batch_size, seq_len, n_features)
        sequences = sequences.permute(0, 2, 1)

        # LSTM 적용
        lstm_out, self.hidden = self.lstm(sequences, self.hidden)

        # 마지막 타임스텝 출력
        last_time_step = lstm_out[:, -1, :]

        # 선형 레이어 적용
        y_pred = self.linear(last_time_step)
        return y_pred


### 모델 학습 함수

In [71]:
# GPU 사용 설정
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

def train_model(model, train_data, train_labels, val_data=None, val_labels=None, num_epochs=100, verbose=10, patience=20):
    model.to(device)
    loss_fn = torch.nn.BCEWithLogitsLoss()
    optimiser = torch.optim.Adam(model.parameters(), lr=0.0005)
    train_hist = []
    val_hist = []
    f1_hist = []
    for t in range(num_epochs):
        epoch_loss = 0
        model.train()
        for idx, seq in enumerate(train_data):
            model.reset_hidden_state(batch_size=1)
            seq = torch.unsqueeze(seq, 0).to(device)
            y_pred = model(seq)
            loss = loss_fn(y_pred[0], train_labels[idx].unsqueeze(0).to(device))
            optimiser.zero_grad()
            loss.backward()
            optimiser.step()
            epoch_loss += loss.item()
        train_hist.append(epoch_loss / len(train_data))
        if val_data is not None:
            with torch.no_grad():
                val_loss = 0
                model.eval()
                predictions = []
                for val_idx, val_seq in enumerate(val_data):
                    model.reset_hidden_state(batch_size=1)
                    val_seq = torch.unsqueeze(val_seq, 0).to(device)
                    y_val_pred = model(val_seq)
                    val_step_loss = loss_fn(y_val_pred[0], val_labels[val_idx].unsqueeze(0).to(device))
                    val_loss += val_step_loss.item()
                    predictions.append(y_val_pred[0].item())
            val_hist.append(val_loss / len(val_data))
            predictions = (np.array(predictions) > 0.5).astype(int)
            f1 = f1_score(val_labels.cpu().numpy(), predictions)
            f1_hist.append(f1)
            if t % verbose == 0:
                print(f'Epoch {t} train loss: {epoch_loss / len(train_data)} val loss: {val_loss / len(val_data)} F1 score: {f1:.2f}')
            if (t % patience == 0) & (t != 0):
                if val_hist[t - patience] < val_hist[t]:
                    print('\n Early Stopping')
                    break
        elif t % verbose == 0:
            print(f'Epoch {t} train loss: {epoch_loss / len(train_data)}')
    return model, train_hist, val_hist, f1_hist


Using device: cuda:0


### 모델 초기화 및 학습

In [73]:
# 모델 초기화 및 학습
model = AnomalyDetector(
    n_features=features_train.shape[2],
    n_hidden=4,
    seq_len=features_train.shape[1],
    n_layers=1
)


model, train_hist, val_hist, accuracy_hist = train_model(
    model,
    features_train,
    labels_train,
    features_test,
    labels_test,
    num_epochs=100,
    verbose=1,
    patience=20
)


Epoch 0 train loss: 0.2302138715952575 val loss: 0.2066825633556268 F1 score: 0.00
Epoch 1 train loss: 0.19968160146656996 val loss: 0.18891543116104467 F1 score: 0.00
Epoch 2 train loss: 0.18786303882419306 val loss: 0.17848948177271234 F1 score: 0.00
Epoch 3 train loss: 0.18199419955497156 val loss: 0.17400689594293325 F1 score: 0.00
Epoch 4 train loss: 0.17813856395816277 val loss: 0.17082122846711215 F1 score: 0.00
Epoch 5 train loss: 0.17536116174298402 val loss: 0.16845578144372267 F1 score: 0.00
Epoch 6 train loss: 0.17310897827163368 val loss: 0.1663303856072859 F1 score: 0.00
Epoch 7 train loss: 0.1709434811633053 val loss: 0.16505839618429313 F1 score: 0.00
Epoch 8 train loss: 0.16898440151072855 val loss: 0.16230196360954063 F1 score: 0.00
Epoch 9 train loss: 0.16656861249071284 val loss: 0.16102615930513295 F1 score: 0.00
Epoch 10 train loss: 0.16470754911666444 val loss: 0.15846925390586905 F1 score: 0.00
Epoch 11 train loss: 0.1625974107304751 val loss: 0.1570166136146228

### 모델 저장

In [None]:
# 모델 저장
torch.save(model.state_dict(), 'anomaly_detector_model.pth')

### 예측 및 성능 평가

In [None]:
# 학습 및 검증 손실 시각화
plt.figure(figsize=(12, 6))
plt.plot(train_hist, label='Train Loss')
plt.plot(val_hist, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# F1 스코어 시각화
plt.figure(figsize=(12, 6))
plt.plot(f1_hist, label='Validation F1 Score')
plt.xlabel('Epoch')
plt.ylabel('F1 Score')
plt.legend()
plt.show()

# 예측 및 성능 평가
model.eval()
with torch.no_grad():
    predictions = []
    for seq in features_test:
        seq = torch.unsqueeze(seq, 0).to(device)
        y_pred = model(seq)
        predictions.append(y_pred[0].item())
predictions = np.array(predictions)
predictions = (predictions > 0.5).astype(int)  # 0.5를 기준으로 이진 분류
labels_test = np.array(labels_test)

# F1 스코어 계산
f1 = f1_score(labels_test, predictions)
print(f'Final F1 Score: {f1:.2f}')

# 혼동 행렬 그리기
cm = confusion_matrix(labels_test, predictions, labels=[0, 1])
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1])
disp.plot()
plt.show()