In [None]:
import os
from pathlib import Path
base = Path().cwd()

# switch to home directory to import helper scripts
if str(base).split('/')[-1] != 'runtime-monitoring':
    DATASET = str(base).split('/')[4]
    os.chdir('../../..')
    base = Path().cwd()

print(DATASET)
print(base)

PREFIX = 'Regularization'
FILENAME_POSTFIX = f"{DATASET}_{PREFIX}"
SEED = 42

In [None]:
# https://optuna.readthedocs.io/en/stable/reference/generated/optuna.trial.Trial.html#
import optuna
from optuna.trial import TrialState
from optuna.samplers import CmaEsSampler, RandomSampler, TPESampler

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
from torchvision import datasets
from torchvision import transforms

In [None]:
DEVICE = torch.device("cuda:0")
EPOCHS = 5

In [None]:
from utilities.utils import *
from utilities.pathManager import fetchPaths
from Cifar10.trainingModels.Cifar10_CNN import Cifar10_CNN

model_ = Cifar10_CNN

In [None]:
paths = fetchPaths(base, DATASET)

path = paths[DATASET.lower()]
path_dataset = paths['dataset']
path_trainingModels = paths['trainingModels']
path_trainingModels_regularization = paths['trainingModels_regularization']

In [None]:
feature_names = get_labels(DATASET)

train_full = get_dataset(DATASET, path_dataset, train=True)
test_data = get_dataset(DATASET, path_dataset, train=False)

In [None]:
val_split = int( len(train_full) * 0.05 )
train_split = int( len(train_full) * 0.2 )

train_data, _ = split_data(train_full, [train_split, len(train_full) - train_split], SEED)
val_data, _ = split_data(train_full, [val_split, len(train_full) - val_split], SEED)

len(train_data), len(val_data)

In [None]:
def define_model(trial):
    # batchnorm
    batchnorm = trial.suggest_int("batchnorm", 0, 1)
    
    # dropout
    dropout = trial.suggest_float("dropout", 0.0, 0.3, step=0.1)
    
    # model
    model = model_(batchnorm=batchnorm, dropout=dropout)
    
    return model

In [None]:
def get_dataset(trial):
    batchsize = trial.suggest_categorical("batchsize", [32, 64, 128, 256])
    
    train_dataloader = get_dataLoader(train_data, batchsize, True)
    val_dataloader = get_dataLoader(val_data, batchsize, False)

    return train_dataloader, val_dataloader, batchsize

In [None]:
def objective(trial):
    # Generate the model.
    model = define_model(trial).to(DEVICE)

    # Generate the optimizers.
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "SGD"])
    lr = trial.suggest_float("lr", 1e-6, 1e-2)
    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)

    # Get the FashionMNIST dataset.
    train_loader, valid_loader, batchsize = get_dataset(trial)
    
    # regularization
    l2_ = trial.suggest_float("L2", 0, 1e-2)
    l1_ = trial.suggest_float("L1", 0, 1e-2)

    
    # Training of the model.
    for epoch in range(EPOCHS):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):

            data, target = data.to(DEVICE), target.to(DEVICE)

            optimizer.zero_grad()
            output = model(data)
            
            # regularization
            l2_loss = model._l2_regularization(l2_)
            l1_loss = model._l1_regularization(l1_)
            
            # calc loss
            loss = F.cross_entropy(output, target) + l1_loss + l2_loss
            loss.backward()
            optimizer.step()

        # Validation of the model.
        model.eval()
        losses = []
        correct = 0
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(valid_loader):
                data, target = data.to(DEVICE), target.to(DEVICE)
                output = model(data)
                
                
                loss = F.cross_entropy(output, target)
                losses.append(loss.item())
                
                # Get the index of the max log-probability.
                correct += sum(target.to(DEVICE) == output.argmax(dim=1))

        accuracy = correct / len(valid_loader.dataset)

        trial.report(accuracy, epoch)

        # Handle pruning based on the intermediate value.
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return accuracy

In [None]:
sampler = TPESampler(seed=SEED) # Default

study = optuna.create_study(direction="maximize", sampler=sampler)
study.optimize(objective, n_trials=50, timeout=60*60)

pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

In [None]:
optuna.visualization.plot_param_importances(study, target=lambda t: t.values[0], target_name="accuracy")

In [None]:
df_studys = study.trials_dataframe().sort_values('value', ascending=False)
df_studys.to_csv(base + f'/optunaResultsRegularization_{type(sampler).__name__}.csv' ,index=False)
df_studys = df_studys.set_index('number')

df_studys.head()

In [None]:
sampler = RandomSampler(seed=SEED)

study = optuna.create_study(direction="maximize", sampler=sampler)
study.optimize(objective, n_trials=50, timeout=60*60)

pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

In [None]:
optuna.visualization.plot_param_importances(study, target=lambda t: t.values[0], target_name="accuracy")

In [None]:
df_studys = study.trials_dataframe().sort_values('value', ascending=False)
df_studys.to_csv(base + f'/optunaResultsRegularization_{type(sampler).__name__}.csv' ,index=False)
df_studys = df_studys.set_index('number')

df_studys.head()

In [None]:
sampler = CmaEsSampler(seed=SEED)

study = optuna.create_study(direction="maximize", sampler=sampler)
study.optimize(objective, n_trials=50, timeout=60*60)

pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

In [None]:
optuna.visualization.plot_param_importances(study, target=lambda t: t.values[0], target_name="accuracy")

In [None]:
df_studys = study.trials_dataframe().sort_values('value', ascending=False)
df_studys.to_csv(base + f'/optunaResultsRegularization_{type(sampler).__name__}.csv' ,index=False)
df_studys = df_studys.set_index('number')

df_studys.head()