In [101]:
# import modules
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
from optuna.samplers import GPSampler


In [102]:
# load data
class RawDataLoader():
    def __init__(self, path='BMED_train_data_v2.xlsx'):
        self.path = path
        self.X_data, self.Y_data = self.RawData()

    def RawData(self):
        df = pd.read_excel(self.path, sheet_name='Sheet2')
        X_data = df[['T','V','E','CF_LA','CF_K','CA_LA','CB_K']].values
        Y_data = df[['dNLA','dNK','dVF','dVA','dVB']].values
        return X_data, Y_data
    
    def FoldData(self):
        kf = KFold(n_splits=5, shuffle=True, random_state=42)
        folds = []
        for train_index, test_index in kf.split(self.X_data):
            # Split the data into training and test sets
            X_train, X_test = self.X_data[train_index], self.X_data[test_index]
            Y_train, Y_test = self.Y_data[train_index], self.Y_data[test_index]
            
            # Normalize the data
            scaler_X = StandardScaler()
            scaler_Y = StandardScaler()
            
            X_train_scaled = scaler_X.fit_transform(X_train)
            X_test_scaled = scaler_X.transform(X_test)
            
            Y_train_scaled = scaler_Y.fit_transform(Y_train)
            Y_test_scaled = scaler_Y.transform(Y_test)

            X_train_tensor = torch.FloatTensor(X_train_scaled)
            X_test_tensor = torch.FloatTensor(X_test_scaled)
            Y_train_tensor = torch.FloatTensor(Y_train_scaled)
            Y_test_tensor = torch.FloatTensor(Y_test_scaled)

            folds.append((X_train_tensor, X_test_tensor, Y_train_tensor, Y_test_tensor, scaler_X, scaler_Y))
        return folds
    
# Customize the NN architecture
class CustomModel(nn.Module):
    def __init__(self, hidden_layers=2, hidden_nodes = 8):
        super().__init__()
        layers = []
        nodes = 7
        for _ in range(hidden_layers):
            layers.append(nn.Linear(nodes, hidden_nodes))
            layers.append(nn.ReLU())
            nodes = hidden_nodes
        layers.append(nn.Linear(hidden_nodes, 5))
        self.hidden = nn.Sequential(*layers)

    def forward(self, x):
        return self.hidden(x)

# Hyperparameter optimization
class NNOpt():
    def __init__(self, hidden_layers=2, hidden_nodes = 8, learning_rate=0.001, num_epochs=500, batch_size=256, weight_decay=1e-5):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.hidden_layers= hidden_layers
        self.hidden_nodes = hidden_nodes
        self.learning_rate = learning_rate
        self.num_epochs = num_epochs
        self.batch_size = batch_size
        self.weight_decay = weight_decay
        self.model = CustomModel(hidden_layers=self.hidden_layers, hidden_nodes=self.hidden_nodes).to(self.device)
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay)

    def train(self, X_train, Y_train, X_test, Y_test):
        X_train_gpu = X_train.to(self.device)
        Y_train_gpu = Y_train.to(self.device)
        X_test_gpu = X_test.to(self.device)
        Y_test_gpu = Y_test.to(self.device)

        dataset = TensorDataset(X_train_gpu, Y_train_gpu)
        dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

        for _ in range(self.num_epochs):
            self.model.train()
            for X_batch, Y_batch in dataloader:
                self.optimizer.zero_grad()
                train_outputs = self.model(X_batch)
                train_loss = self.criterion(train_outputs, Y_batch)
                train_loss.backward()
                self.optimizer.step()
            
        with torch.no_grad():
            test_outputs = self.model(X_test_gpu)
            test_loss = self.criterion(test_outputs, Y_test_gpu)
        
        return test_loss.item() 

# Objective function
def objective(trial):
    hidden_layers = trial.suggest_int('hidden_layers', 1, 10)
    hidden_nodes = trial.suggest_int('hidden_nodes', 8, 128)
    learning_rate = trial.suggest_float('learning_rate', 0.0001, 0.1, log=True)
    num_epochs = trial.suggest_int('num_epochs', 100, 10000)
    batch_size = trial.suggest_int('batch_size', 16, 1024)
    weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)

    data = RawDataLoader()
    folds = data.FoldData()

    test_losses = []
    for _, (X_train, X_test, Y_train, Y_test, _, _) in enumerate(folds):
        model = NNOpt(
            hidden_layers=hidden_layers, 
            hidden_nodes=hidden_nodes, 
            learning_rate=learning_rate, 
            num_epochs=num_epochs, 
            batch_size=batch_size, 
            weight_decay=weight_decay)
        test_loss = model.train(X_train, Y_train, X_test, Y_test)
        test_losses.append(test_loss)

    return sum(test_losses) / len(test_losses)

In [103]:
# Optuna Study 설정
pruner = optuna.pruners.MedianPruner(n_warmup_steps=20, interval_steps=5) # 학습 시 one epoch 내에서 20번은 그냥 학습하고, 이후에 pruning 시작 -> 5스텝마다 pruning 결정
sampler = GPSampler() # gaussian process sampler, hyperparameter를 결정하는 surrogate model 
study = optuna.create_study(direction="minimize", pruner=pruner, sampler=sampler)

# 최대 60번의 trial을 수행
study.optimize(objective, n_trials=1000)

Fold 1 Test Loss: 0.2547
Fold 2 Test Loss: 0.5839
Fold 3 Test Loss: 0.3486
Fold 4 Test Loss: 0.2622
Fold 5 Test Loss: 0.3354
Average Test Loss: 0.3570
