## 0. 라이브러리 가져오기 및 하이퍼파라미터 설정

### 라이브러리 불러오기

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import json

import numpy as np
import pandas as pd
import joblib

import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import MinMaxScaler

from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score


In [None]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### 하이퍼파라미터 설정

In [None]:
train_ratio = 0.9
num_class = 2
length=33

batch_size = 16
GRU_hidden_size = 10
learning_rate = 1e-3
num_epochs = 100

## 1. 데이터 불러오기

In [None]:
mypath="./training_data/"
from os import listdir
from os.path import isfile, join
onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]

In [None]:
data_dict={}
for name in onlyfiles:
  data_dict[name[:-5]]=pd.read_excel(mypath+name)["current"].to_numpy()

## 2. 데이터 전처리

In [None]:
for key, value in data_dict.items():
  x_normalized = (value-min(value))/(max(value)-min(value))
  data_dict[key]=x_normalized

In [None]:
def data_divide(data_dict, length=33):
  for key,value in data_dict.items():
    nd_array=np.zeros((value.shape[0]-length + 1, length), dtype="float")
    for k in range(nd_array.shape[0]):
      nd_array[k]=value[k:k+length]
    data_dict[key]=nd_array
  return data_dict

In [None]:
data_dict=data_divide(data_dict, length=length)

In [None]:
num_of_data = list(data_dict.values())[0].shape[0]*len(data_dict)
x_data=np.zeros((num_of_data,length),dtype="float")
y_data=np.zeros((num_of_data),dtype="int")
i=0
for key,value in data_dict.items():
  for k in range(value.shape[0]):
    x_data[i]=value[k]
    if key in ["LG gram - LG gram (1차, 35퍼)","LG gram - LG gram (2차, 35퍼)","LG gram - LG gram (3차, 35퍼)",
               "삼성 25w 충전기 - 갤럭시 s22+ (1차, 37퍼)","삼성 25w 충전기 - 갤럭시 s22+ (2차, 37퍼)","삼성 25w 충전기 - 갤럭시 s22+ (3차, 37퍼)",
               "삼성 45w 충전기 - LG gram (1차, 32퍼)","삼성 45w 충전기 - LG gram (2차, 32퍼)","삼성 45w 충전기 - LG gram (3차, 32퍼)",
               "삼성 45w 충전기 - 갤럭시 s22+ (1차, 27퍼)","삼성 45w 충전기 - 갤럭시 s22+ (2차, 27퍼)","삼성 45w 충전기 - 갤럭시 s22+ (3차, 27퍼)",
               "삼성 45w 충전기 - 갤럭시 탭 s6 lite (1차, 6퍼)","삼성 45w 충전기 - 갤럭시 탭 s6 lite (2차, 6퍼)","삼성 45w 충전기 - 갤럭시 탭 s6 lite (3차, 6퍼)",
               ]:
      y_data[i]=1
    else:
      y_data[i]=0
    i+=1

## 3. 훈련/테스트 데이터 분리

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, train_size=train_ratio)

## 4. 모델 구성

### GRU 모델

In [None]:
class GRUModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, num_classes):
        super(GRUModel, self).__init__()

        # Defining the number of layers and the nodes in each layer
        self.layer_dim = layer_dim
        self.hidden_dim = hidden_dim
        self.name = "GRU"
        # GRU layers
        self.gru = nn.GRU(
            input_dim, hidden_dim, layer_dim, batch_first=True
        )

        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        # Initializing hidden state for first input with zeros
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim,device=x.device).requires_grad_()

        # Forward propagation by passing in the input and hidden state into the model
        out, _ = self.gru(x, h0.detach())

        # Reshaping the outputs in the shape of (batch_size, seq_length, hidden_size)
        # so that it can fit into the fully connected layer
        out = out[:, -1, :]

        # Convert the final state to our desired output shape (batch_size, output_dim)
        out = self.fc(out)

        return out

## 5. 모델용 데이터셋 준비

In [None]:
x_train_tensor = torch.Tensor(x_train).unsqueeze(1).to(DEVICE)
x_test_tensor = torch.Tensor(x_test).unsqueeze(1).to(DEVICE)
y_train_tensor = torch.Tensor(y_train).long().to(DEVICE)
y_test_tensor = torch.Tensor(y_test).long().to(DEVICE)

y_train_tensor = F.one_hot(y_train_tensor, num_classes=num_class)
y_test_tensor = F.one_hot(y_test_tensor, num_classes=num_class)

In [None]:
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
test_dataset = TensorDataset(x_test_tensor, y_test_tensor)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## 6. 모델 생성

In [None]:
# GPU .to(DEVICE) 설정 코드
prediction_model = GRUModel(x_data.shape[1], GRU_hidden_size, 3, num_class).to(DEVICE)

## 7. 모델 손실함수 및 Optimizer 설정

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(prediction_model.parameters(), lr=learning_rate)

## 8. 모델 훈련

In [None]:
train_loss_record = []
test_loss_record = []

In [None]:
def train_and_test(model, train_loader, test_loader, optimizer, criterion, num_epochs, verbose=True):
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.to(torch.float32), targets.to(torch.float32))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        if verbose:
          print("-----------------------------------------------------------------------------")
          print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader)}")
        train_loss_record.append(total_loss / len(train_loader))

        model.eval()
        total_loss = 0.0
        with torch.no_grad():
            for inputs, targets in test_loader:
                outputs = model(inputs)
                loss = criterion(outputs.to(torch.float32), targets.to(torch.float32))
                total_loss += loss.item()
        avg_loss = total_loss / len(test_loader)
        if verbose:
          print(f"Test Loss: {avg_loss}")
          print("-----------------------------------------------------------------------------")
          print("")
        test_loss_record.append(total_loss / len(test_loader))

In [None]:
print("Cuda : ", next(prediction_model.parameters()).is_cuda)
train_and_test(prediction_model, train_loader, test_loader, optimizer, criterion, num_epochs)

## 9. 모델 평가 및 수정

In [None]:
predict_y = np.argmax(prediction_model(x_test_tensor).cpu().detach().numpy(), axis = 1)
print("Accuracy: ", accuracy_score(y_test, predict_y))
print("AUROC Score: ", np.round(roc_auc_score(y_test, predict_y), 3))

In [None]:
!pip install optuna

In [None]:
import optuna

In [None]:
best_dict = {"ROC": 0.0}

In [None]:
def objective(trial):
    param = {
        "batch_size" : trial.suggest_categorical('batch_size', [8, 16, 32, 64, 128]),
        "GRU_hidden_size" : trial.suggest_int('GRU_hidden_size', 4, 20),
        "layer_dim" : trial.suggest_int('layer_dim', 1, 5),
        "learning_rate" : trial.suggest_float('learning_rate', 1e-4, 2e-3),
        "num_epochs" : trial.suggest_int('num_epochs', 10, 60)
    }

    train_loader = DataLoader(train_dataset, batch_size = param["batch_size"])
    test_loader = DataLoader(test_dataset, batch_size = param["batch_size"])

    prediction_model = GRUModel(length, param["GRU_hidden_size"], param["layer_dim"], num_class).to(DEVICE)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(prediction_model.parameters(), lr=param["learning_rate"])

    train_and_test(prediction_model, train_loader, test_loader, optimizer, criterion, param["num_epochs"], verbose=True)

    predict_y = np.argmax(prediction_model(x_test_tensor).cpu().detach().numpy(), axis=1)
    roc = np.round(roc_auc_score(y_test, predict_y), 3)

    print("Final ROC:", roc)

    if roc > best_dict["ROC"]:
        best_dict["Model"] = prediction_model
        best_dict["ROC"] = roc
        best_dict["Hyperparam"] = param
        best_dict["train_loss"] = train_loss_record[-param["num_epochs"]:]
        best_dict["test_loss"] = test_loss_record[-param["num_epochs"]:]
        print("Best Model Updated")

    print("")
    return roc

In [None]:
study = optuna.create_study(direction='maximize')
study.enqueue_trial({
    "batch_size": 8,
    "GRU_hidden_size": 13,
    "layer_dim": 4,
    "learning_rate": 0.00035698599755058444,
    "num_epochs": 45
    })
study.optimize(objective,n_trials=40)

In [None]:
print(best_dict)

## 10. 모델 저장 및 시각화

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(best_dict["train_loss"], label="train")
plt.plot(best_dict["test_loss"], label="test")
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy Loss")
plt.title("Loss Results")
plt.show()

In [None]:
PATH = ""
torch.save(prediction_model, PATH + "_"+ prediction_model.name + "_Model.pt")

In [None]:
with open(PATH + prediction_model.name + "_hyperparam.json", "w") as json_file:
    json.dump(best_dict["Hyperparam"], json_file)