### 1. Import Library

In [1]:
# !pip install -r requirements.txt

In [12]:
# 필요한 라이브러리 임포트
import numpy as np
import pandas as pd
import random
from pathlib import Path
import matplotlib.pyplot as plt
import plotly.graph_objs as go

In [13]:
# 난수 생성 고정
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

### 2. Load Train Data & Test Data

In [14]:
# 데이터 경로 설정
DATA_PATH = Path("./data")

In [15]:
train_data = pd.read_csv(DATA_PATH / "train.csv")
test_data = pd.read_csv(DATA_PATH / "test.csv")

In [16]:
# 필요 피처 선택
non_numeric_cols = ["faultNumber", "simulationRun", "sample"]
use_cols = train_data.columns.difference(non_numeric_cols)

X_train = train_data[use_cols]
X_test = test_data[use_cols]

In [17]:
print(len(X_train.columns))
print(len(X_test.columns))

52
52


In [18]:
# 필요한 라이브러리 임포트 
from sklearn.preprocessing import MinMaxScaler, RobustScaler
from torch.utils.data import DataLoader, TensorDataset 
import torch

# 데이터 스케일러 인스턴스 생성(데이터 표준화)
## 어떤 스케일러를 적용해야하는가?
scaler = MinMaxScaler() 
# 학습 데이터셋에 대해 fit과 transform 수행: train 기준 정보 계산 및 데이터 변환
X_train_scaled = scaler.fit_transform(X_train) 
# 테스트 데이터셋에 대해서는 transform만 수행: 학습 데이터셋의 기준 정보를 사용하여 데이터 변환
X_test_scaled = scaler.transform(X_test)  

# PyTorch Tensor로 변환 
X_train_tensor = torch.FloatTensor(X_train_scaled) 
X_test_tensor = torch.FloatTensor(X_test_scaled)

In [19]:
# GPU 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### 3. Autoencoder 구현 및 학습, 추론

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
import os

# 오토인코더 정의
class Autoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dims):
        super(Autoencoder, self).__init__()
        layers = []
        # 인코더 레이어 추가
        in_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(in_dim, hidden_dim))
            layers.append(nn.ReLU())
            in_dim = hidden_dim
        
        self.encoder = nn.Sequential(*layers)  # 인코더 레이어
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dims[-1], input_dim),  # 마지막 인코더 레이어의 출력 크기와 같아야 함
            nn.Sigmoid()
        )
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

# 손실 함수 정의
def loss_function(recon_x, x):
    return nn.MSELoss()(recon_x, x)  # 재구성 손실

# 체크포인트 저장 함수 (수정 필요)
def save_checkpoint(model, optimizer, trial, file_path):
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'trial_number': trial.number
    }, file_path)

In [None]:
# 목적 함수 정의 (Optuna 최적화 대상)
def objective(trial):
    # 하이퍼파라미터 샘플링
    num_layers = trial.suggest_int('num_layers', 1, 3)  # 레이어 수를 1에서 4까지 샘플링
    hidden_dims = [trial.suggest_int(f'hidden_dim_{i}', 8, 32) for i in range(num_layers)]  # 각 레이어의 노드 수 샘플링
    # learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2)  # learning_rate 샘플링
    
    # 데이터 로드
    input_dim = X_train_tensor.shape[1]
    
    # 학습 데이터만 사용하여 학습 (정상 데이터)
    train_loader = DataLoader(TensorDataset(X_train_tensor), batch_size=64, shuffle=True)

    # 모델 초기화
    model = Autoencoder(input_dim=input_dim, hidden_dims=hidden_dims).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    # 체크포인트 저장 경로 설정
    checkpoint_dir = 'checkpoints'
    os.makedirs(checkpoint_dir, exist_ok=True)
    checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_trial_{trial.number}.pt')

    # 학습 과정
    model.train()
    num_epochs = 10
    for epoch in range(num_epochs):
        total_loss = 0
        for data in train_loader:
            batch_data = data[0].to(device)
            optimizer.zero_grad()
            output = model(batch_data)
            loss = loss_function(output, batch_data)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        # 에포크마다 체크포인트 저장
        save_checkpoint(model, optimizer, trial, checkpoint_path)

    # 재구성 오차를 사용하여 모델 평가
    recon_errors = []
    model.eval()
    with torch.no_grad():
        for data in X_train_tensor:
            data = data.unsqueeze(0).to(device)  # 배치 형태로 변경
            recon = model(data)
            error = torch.mean((recon - data) ** 2).item()  # MSE 계산
            recon_errors.append(error)

    # 최종 재구성 오차의 평균을 반환 (최소화할 목표)
    return torch.tensor(recon_errors).mean().item()  # 평균 재구성 오차

# Optuna 스터디 실행
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=100)

# 최적의 하이퍼파라미터 출력
print("Best trial:")
trial = study.best_trial
print(f"  Value: {trial.value}")
print("  Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

In [21]:
import torch

# 최적 하이퍼파라미터로 모델 재학습
# best_num_layers = trial.params['num_layers']
# best_hidden_dims = [trial.params[f'hidden_dim_{i}'] for i in range(best_num_layers)]
best_num_layers = 1
best_hidden_dims = [31]

# # 체크포인트 로드하는 함수
# def load_best_checkpoint(filepath):
#     checkpoint = torch.load(filepath)
#     model_state_dict = checkpoint['model_state_dict']
#     optimizer_state_dict = checkpoint['optimizer_state_dict']
#     epoch = checkpoint['epoch']
#     loss = checkpoint['loss']
#     hyperparams = checkpoint['hyperparams']

#     print(f"Best checkpoint loaded from: {filepath}")
#     print(f"Loaded model from epoch: {epoch}, with validation loss: {loss:.4f}")
#     print("Hyperparameters:", hyperparams)
#     return epoch, loss, hyperparams

# # 체크포인트 로드
# start_epoch, best_loss, best_hyperparams = load_best_checkpoint('./checkpoints/checkpoint_trial_22.pt')
# print(f"Loaded from epoch: {start_epoch}, Best validation loss: {best_loss:.4f}")
# print("Hyperparameters: ", best_hyperparams)

In [22]:
# 최종 모델 초기화
final_model = Autoencoder(input_dim=X_train.shape[1], hidden_dims=best_hidden_dims).to(device)
final_optimizer = optim.Adam(final_model.parameters(), lr=1e-3)

# 최종 학습 데이터로 모델 학습
final_model.train()
num_epochs = 10
train_loader = DataLoader(TensorDataset(X_train_tensor), batch_size=64, shuffle=True)

for epoch in range(num_epochs):
    total_loss = 0
    for data in train_loader:
        batch_data = data[0].to(device)
        final_optimizer.zero_grad()
        output = final_model(batch_data)
        loss = loss_function(output, batch_data)
        loss.backward()
        final_optimizer.step()
        total_loss += loss.item()
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader)}')

# 테스트 데이터로 이상 탐지 수행
def detect_anomaly(data, model, threshold):
    model.eval()
    with torch.no_grad():
        recon = model(data.to(device))
        recon_error = torch.mean((recon - data.to(device)) ** 2, dim=1)  # MSE 계산
        return recon_error > threshold  # 재구성 오차가 임계값을 넘는 경우 이상으로 판단

# 정상 데이터에서 추정한 임계값 설정
recon_errors = []
final_model.eval()
with torch.no_grad():
    for data in X_train_tensor:
        data = data.unsqueeze(0).to(device)  # 배치 형태로 변경
        recon = final_model(data)
        error = torch.mean((recon - data) ** 2).item()  # MSE 계산
        recon_errors.append(error)

# 임계값 설정: 정상 데이터의 평균 재구성 오차 + 3 * 표준편차
threshold = torch.tensor(recon_errors).mean() + 3 * torch.tensor(recon_errors).std()

# 테스트 데이터로 이상 탐지 수행
test_loader = DataLoader(TensorDataset(X_test_tensor), batch_size=64, shuffle=False)
results = []

with torch.no_grad():
    for data in test_loader:
        batch_data = data[0].to(device)
        is_anomaly = detect_anomaly(batch_data, final_model, threshold)  # 이상 탐지
        results.extend(is_anomaly.cpu().numpy())  # 결과를 리스트에 추가

Epoch 1/10, Loss: 0.005548735258856225
Epoch 2/10, Loss: 0.0038746538866880546
Epoch 3/10, Loss: 0.0037469227829509937
Epoch 4/10, Loss: 0.0037295030929231153
Epoch 5/10, Loss: 0.00372040592376424
Epoch 6/10, Loss: 0.0037139145141223836
Epoch 7/10, Loss: 0.0037094439158303362
Epoch 8/10, Loss: 0.003706689888782111
Epoch 9/10, Loss: 0.00370377482355808
Epoch 10/10, Loss: 0.003702098612287655


In [23]:
from datetime import datetime

# 결과를 "faultNumber"로 변환 (정상이면 0, 비정상이면 1)
fault_numbers = [0 if not x else 1 for x in results]

# 결과를 DataFrame으로 변환 후 CSV 파일로 저장
results_df = pd.DataFrame({'faultNumber': fault_numbers})
print(results_df.value_counts())

current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"output_ae_optuna_{current_time}.csv"

results_df.to_csv(filename)

print("Anomaly detection results saved to '.csv'.")

faultNumber
0              578109
1              132291
Name: count, dtype: int64
Anomaly detection results saved to '.csv'.


In [24]:
print(threshold)

tensor(0.0072)
