***Author**: Alexander Telle, DASU - Transferzentrum für Digitalisierung, Analytics & Data Science Ulm*

***Data Source**: [Kaggle](https://www.kaggle.com/datasets/andrewmvd/fetal-health-classification)*

***Code Repository**: [GitHub](https://github.com/dasudigital/optuna_workshop)*

***Licence**: [MIT License](https://github.com/dasudigital/optuna_workshop/blob/master/LICENSE)*

***Optuna Documentation**: [https://optuna.readthedocs.io/en/stable/](https://optuna.readthedocs.io/en/stable/)*

# [III. Innovationskongress Data Science](https://studium.hs-ulm.de/de/research/Seiten/Innokongress.aspx)
#### Workshop: Hyperparameter-Tuning (HPO) mit Optuna
---

> **Import der Bibliotheken & allgemeine Einstellungen**:

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
import warnings
warnings.filterwarnings('ignore')

from sklearn.preprocessing import OrdinalEncoder
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from torch.optim.lr_scheduler import CosineAnnealingLR

import torch
import torch.nn as nn

###
import optuna
import torch.optim as optim
###

In [None]:
EPOCHS = 50
CLASSES = 3

torch.manual_seed(42)
random.seed(42)
RANDOM_SEED = 42

> **Definition der Hyperparameter (Auswahl)**:

> **Vorbereitung der Daten**:

In [None]:
data = pd.read_csv('Data.csv')
data.head()

In [None]:
len(data)

In [None]:
data.isnull().any().any()

In [None]:
data.fetal_health.value_counts().plot(kind='bar')
data.fetal_health.value_counts()

In [None]:
null_accuracy = data.fetal_health.value_counts()[1.0]/len(data)
null_accuracy

In [None]:
encoder = OrdinalEncoder()
oe_columns = ['fetal_health']
encoder.fit(data[oe_columns])
data[oe_columns] = encoder.transform(data[oe_columns])

In [None]:
data.fetal_health.value_counts()

> **Erstellung von Helfer-Klassen & -Methoden**:

In [None]:
class FetalHealthData(torch.utils.data.Dataset):
    def __init__(self, data):
        self.labels = data.fetal_health.tolist()
        self.features = data.drop(columns=['fetal_health'], axis=1).values.tolist()
    
    def __getitem__(self, index):
        sample = np.array(self.features[index]), np.array(self.labels[index])
        return sample
        
    def __len__(self):
        return len(self.labels)

In [None]:
def get_model(params, trial):
    # Suggesting Numbers of Hidden Layers
    num_layers = trial.suggest_int('num_layers', 2, 4)
    
    layers = list()
    
    # 21 Input Features
    in_features = len(data.drop(columns=['fetal_health'], axis=1).columns)
    
    # Suggest Numbers of Neurons for each Hidden Layer
    for layer in range(num_layers):
        out_features = trial.suggest_int(f'layer_{layer}', 8, 128)
        layers.append(nn.Linear(in_features, out_features))
        layers.append(nn.LeakyReLU())
        
        in_features = out_features
    
    layers.append(nn.Dropout())
    layers.append(nn.Linear(in_features, CLASSES))

    return nn.Sequential(*layers)

In [None]:
# HP: Batch Size
def get_data(params):
    
    ###
    batch_size = params['batch_size']
    ###
    
    training_data, testing_data = train_test_split(data, test_size=0.2, random_state=RANDOM_SEED, stratify=data.fetal_health)
    training_data, testing_data = FetalHealthData(training_data), FetalHealthData(testing_data)
    return torch.utils.data.DataLoader(training_data, batch_size=batch_size, shuffle=True), torch.utils.data.DataLoader(testing_data, batch_size=batch_size, shuffle=False)

In [None]:
# HP: Optimizer, Learning Rate, Weight Decay
def get_optimizer(model, params):
    
    ###
    optimizer = params['optimizer']
    learning_rate = params['learning_rate']
    weight_decay = params['weight_decay']
    ###
    
    if optimizer == 'Adam':
        return torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer == 'SGD':
        return torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer == 'RMSprop':
        return torch.optim.RMSprop(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    
    # return getattr(optim, optimizer)(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

> **Erstellung des Trainings-Loops**:

In [None]:
# HP: Scheduler
def train(model, training_batches, testing_batches, params):
    ###
    scheduler = params['scheduler']
    ###
    
    accuracy = list()
    criterion = nn.CrossEntropyLoss()
    
    ###
    optimizer = get_optimizer(model, params)
    ###
    
    if scheduler:
        scheduler = CosineAnnealingLR(optimizer, EPOCHS-1, verbose=False)
    
    for epoch in range(EPOCHS):
        ### Training
        model.train()
        for samples, labels in training_batches:
            optimizer.zero_grad()
            outputs = model(samples.float())
            loss = criterion(outputs, labels.long())
            loss.backward()
            optimizer.step()
        
        num_samples = 0
        correct_predictions = 0
        ### Testing
        model.eval()
        with torch.no_grad():
            for samples, labels in testing_batches:
                output = model(samples.float())
                correct_predictions += (output.argmax(dim=1) == labels).sum().item()
                num_samples += labels.size(0)
        
        ###
        accuracy.append(100.0 * correct_predictions / num_samples)
        ###
    
    return accuracy

> **Erstellung einer Optuna-Studie**:

In [None]:
###
study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=RANDOM_SEED))
###

> **Definition der Zielfunktion**:

In [None]:
def objective(trial):
    params = {
        'batch_size': trial.suggest_int('batch_size', 8, 128),
        'optimizer': trial.suggest_categorical('optimizer', ['SGD', 'Adam', 'RMSprop']),
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-5, 1e-1),
        'weight_decay': trial.suggest_loguniform('weight_decay', 1e-5, 1e-1),
        'scheduler': True if trial.suggest_int('scheduler', 0, 1) == 1 else False
    }
    
    ###
    model = get_model(params, trial)
    ###
    
    training_batches, testing_batches = get_data(params)
    
    ###
    history = train(model, training_batches, testing_batches, params)
    ###
    
    # Fitness-Value
    return history[-1]

> **Optimierung der Hyperparameter**:

In [None]:
study.optimize(objective, n_trials=5)

In [None]:
study.best_params

In [None]:
study.optimize(objective, n_trials=2)

In [None]:
study.best_params