In [1]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.layers import LSTM, Dropout, Dense
from keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler
import glob
import os

dataset_path = './datasets/Classical/'

npz파일로 된 dataset을 pd로 바꿔서 전처리하기 위해 만듬.
현재 코드에서는 자동화되었기 때문에 큰 쓸모는 없는듯

In [2]:
# npz data to pd.
def npz_to_csv(file_path):
    file = np.load(file_path, allow_pickle=True)
    X_df = pd.DataFrame(file['X'])
    y_df = pd.DataFrame(file['y'])
    return X_df, y_df

Time series 데이터에대하여 Pandas dataframe을 입력으로 받으면 lstm모델로 학습 후, 검증 loss값들과 accuracy값들을 return해주는 함수

In [3]:
def train_and_evaluate_lstm(X, y, n_splits=5, epochs=30, batch_size=32):
    # 데이터 정규화
    scaler = MinMaxScaler()
    X_scaled = scaler.fit_transform(X)
    
    tscv = TimeSeriesSplit(n_splits=n_splits)
    val_losses = []
    val_accs = []
    
    fold = 1
    for train_idx, test_idx in tscv.split(X_scaled):
        X_train, X_test = X_scaled[train_idx], X_scaled[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
        
        # Input shape 맞추기 위함
        X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
        X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))
        
        model = Sequential()
        model.add(LSTM(units=50, input_shape=(X_train.shape[1], 1), return_sequences=True))
        model.add(Dropout(0.2))
        model.add(LSTM(units=50, return_sequences=False))
        model.add(Dropout(0.2))
        model.add(Dense(units=1, activation='sigmoid'))

        model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
        
        history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_test, y_test), verbose=1)
        
        # 가장 좋은 검증 성능을 기록
        best_val_loss = min(history.history['val_loss'])
        best_val_acc = max(history.history['val_accuracy'])
        val_losses.append(best_val_loss)
        val_accs.append(best_val_acc)
        
        print(f"Fold {fold}, Best Validation Loss: {best_val_loss}, Best Validation Accuracy: {best_val_acc}")
        
        fold += 1

    mean_val_loss = np.mean(val_losses)
    mean_val_acc = np.mean(val_accs)
    
    print(f"Mean Best Validation Loss: {mean_val_loss}")
    print(f"Mean Best Validation Accuracy: {mean_val_acc}")
    
    return val_losses, val_accs

val losses와 val acc를 한 번에 plotting하는 함수

In [4]:
def plot_validation_metrics(val_losses, val_accs):
    num_folds = len(val_losses)
    
    folds = range(1, num_folds + 1)
    
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6), sharex=True)
    
    ax1.plot(folds, val_losses, marker='o', linestyle='-', color='b')
    ax1.set_ylabel('Validation Loss')
    ax1.set_title('Validation Loss and Accuracy')
    ax1.grid(True)
    
    ax2.plot(folds, val_accs, marker='o', linestyle='-', color='g')
    ax2.set_xlabel('Fold')
    ax2.set_ylabel('Validation Accuracy')
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()

Anomaly data 비율을 나타내는 함수.
혹시 너무 accuracy가 정확하게 나오는 경우, 모든 데이터셋에 대하여 0이라고 판단했는데 알고보니 레이블이 0인 데이터의 개수가 엄청나게 많을 수 있음.
따라서 항상 의심할 것

In [5]:
def get_anomaly_rate(y):
    return y.value_counts()[1] / (y.value_counts()[0] + y.value_counts()[1])

모든 val_losses와 val_accs를 데이터셋 인덱스에 따라 dictionary로 저장하기 위한 변수

In [6]:
val_dict = {}
def append_to_val_dict(index, val_loss, val_acc):
    val_dict[index] = (val_loss, val_acc)

학습 완료된 경우 스프레드시트에 적어주세요!
한 데이터셋에 너무 시간을 많이 쓰실 필요는 없습니다. 학습할 내용도 많고 다른 모델도 많이 테스트해봐야 해요.
그리고 학습은 각 데이터셋에 대해 시간이 어느정도 소요되기 때문에 학습 과정에서는 다른 공부 하시는걸 추천드립니다.

In [7]:
# file_path = '/path/to/Classical' 각자가 저장한 폴더로 연결해주세요
file_path = './datasets/Classical/*.npz'
file_list = glob.glob(file_path)

In [8]:
file_list

['./datasets/Classical\\1_wafer.npz',
 './datasets/Classical\\2_pdm_anreal.npz',
 './datasets/Classical\\3_tempSF.npz']

폴더안에 자동으로 학습하고 기록하고싶은 데이터만 넣어두고, 이 밑에 코드 돌려두세요.

In [9]:
anomaly_rates = {}
file_idx = 1
for file in file_list:
    file = np.load(file_path, allow_pickle=True)
    X = file(['X'])
    y = file(['y'])
    
    anomaly_rates[file_idx] = get_anomaly_rate(y)
    val_losses, val_accs = train_and_evaluate_lstm(X, y)
    append_to_val_dict(file_idx, val_losses, val_accs)
    file_idx += 1
    

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
Fold 1, Best Validation Loss: 0.25505590438842773, Best Validation Accuracy: 0.9283276200294495
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
Fold 2, Best Validation Loss: 0.31448590755462646, Best Validation Accuracy: 0.9044368863105774
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 1

In [37]:
# plot_validation_metrics(val_losses, val_accs)
# 기록된 dict를 토대로 각 데이터셋에 대해 for문을 돌리던 indexing을 통해(val_dict[idx]~~)
# 접근하든 해서 plotting하는 함수입니다. loss 값과 accuracy값을 검증 fold마다 출력해줘요.

In [None]:
# plotting 예시
# plot_validation_metrics(val_dict[1][0], val_dict[1][1])