In [7]:
import random
import os

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go

from sklearn import preprocessing
from sklearn.neighbors import LocalOutlierFactor

In [8]:
# 랜덤변수 통제 함수
def seed_everything(seed):
  random.seed(seed)
  os.environ['PYTHONHASHSEED'] = str(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = True

In [9]:
# 파이토치 커스텀 데이터 세트 정의
class CustomDataset(Dataset):
  def __init__(self, x):
    self.x = x
    
  def __getitem__(self, index):
    x = self.x[index]
    return torch.Tensor(x)
  
  def __len__(self):
    return len(self.x)

In [10]:
# 정상 데이터 사전 훈련을 위한 AutoEncoder 모델 정의
class AE(nn.Module):
  def __init__(self):
    super().__init__()
    self.encoder = nn.Sequential(
      nn.Linear(7, 32, bias=False),
      nn.BatchNorm1d(32, affine=False),
      nn.ReLU(),
      nn.Linear(32, 16, bias=False),
      nn.BatchNorm1d(16, affine=False),
      nn.ReLU(),
      nn.Linear(16, 8, bias=False),
      nn.BatchNorm1d(8, affine=False),
      nn.ReLU(),
      nn.Linear(8, 3, bias=False)
    )
    
    self.decoder = nn.Sequential(
      nn.Linear(3, 8, bias=False),
      nn.BatchNorm1d(8, affine=False),
      nn.ReLU(),
      nn.Linear(8, 16, bias=False),
      nn.BatchNorm1d(16, affine=False),
      nn.ReLU(),
      nn.Linear(16, 32, bias=False),
      nn.BatchNorm1d(32, affine=False),
      nn.ReLU(),
      nn.Linear(32, 7, bias=False)
    )
  def encode(self, x):
    return self.encoder(x)
  
  def decode(self, x):
    return self.decoder(x)
  
  def forward(self, x):
    x = self.encode(x)
    x = self.decode(x)
    return x

In [11]:
class Predictor:
  def __init__(self):
    self.seed = 777
    self.param_path = "./AE_model.pth"
    self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
  
  
  def setup(self, test_data_path):
    seed_everything(self.seed) # 랜덤시드 설정
    test_df = pd.read_csv(test_data_path)
    test_df.drop(columns=["type"], inplace=True)
    scaler = preprocessing.StandardScaler()
    scaled_test = scaler.fit_transform(test_df)
    
    test_dataset = CustomDataset(x=scaled_test)
    self.test_dataloader = DataLoader(
      dataset=test_dataset,
      batch_size=32,
      shuffle=True,
      num_workers=0
    )
    
    self.model = AE().to(self.device)
    self.model.load_state_dict(torch.load(self.param_path)["param"])


  def plot_data(self):
    # 3차원으로 압축된 데이터 그리는 함수
    trace1 = go.Scatter3d(
    x=self.pred_df[self.pred_df.lof_abnormal == 0]["x"],
    y=self.pred_df[self.pred_df.lof_abnormal == 0]["y"],
    z=self.pred_df[self.pred_df.lof_abnormal == 0]["z"],
    mode="markers",
    name="normal"
    )
    
    trace2 = go.Scatter3d(
      x=self.pred_df[self.pred_df.lof_abnormal == 1]["x"],
      y=self.pred_df[self.pred_df.lof_abnormal == 1]["y"],
      z=self.pred_df[self.pred_df.lof_abnormal == 1]["z"],
      mode="markers",
      name="abnormal"
    )
    
    layout = go.Layout(
      title="abnomal & normal"
    )
    fig = go.Figure(data=[trace1, trace2], layout=layout)
    fig.show()
  
  
  def predict(self):
    self.model.eval()
    
    z_ = []
    with torch.no_grad():
      for x in self.test_dataloader:
        x = x.to(self.device)
        z = self.model.encode(x)
        z_.append(z.detach().cpu())

    clf = LocalOutlierFactor(
      n_neighbors=20,
      p=1,
      contamination="auto"
    )
    
    self.pred_df = pd.DataFrame(torch.cat(z_).numpy(), columns=['x', 'y', 'z'])
    lof_pred = clf.fit_predict(self.pred_df)
    self.pred_df["lof_abnormal"] = lof_pred
    self.pred_df["lof_abnormal"] = self.pred_df["lof_abnormal"].apply(lambda x: 0 if x==1 else 1)
    self.pred_df["color"] = self.pred_df["lof_abnormal"].apply(lambda x: "blue" if x==0 else "red")
    submit = pd.read_csv(".\\dataset\\answer_sample.csv")
    submit["label"] = self.pred_df["lof_abnormal"]
    submit.to_csv("submission_ae_lof.csv", index=False)
    
    
  # 인자로 들어오는 하나의 값만 예측하는 함수
  def predict_one(self, param):
    pass

In [12]:
predictor = Predictor()
predictor.setup(test_data_path='./dataset/test_data.csv')
predictor.predict()

In [13]:
predictor.plot_data()