In [1]:
import os
import random
import math
import pandas as pd
import numpy as np
import pickle
import h5py
from csv import writer
from datetime import datetime
import pytz
tzInfo = pytz.timezone('Europe/Paris')

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer, Dense, Conv1D, Flatten, Dropout, \
                                    MaxPooling1D, Bidirectional, LSTM, Input, \
                                    Activation
from tensorflow.keras import optimizers
import tensorflow.keras.backend as K
import keras_tuner as kt

import matplotlib.pyplot as plt

from Models.HyperDACDBLSTM import HyperDACDBLSTM
from Models.KerasTunerHyperModel_Phase2 import KerasTunerHyperModel

from utils.Logger import Logger
from utils.DataPreparation import walk_forward, prepare_data
from utils.Evaluation import evaluate, plot_training, plot_accuracy, plot_loss
from utils.Experiments import Data, DataDomainwise, Settings, TrainOnce, PretrainingFinetuning, DomainAdversarialLearning, set_seed

tf.version.VERSION

'2.6.0'

In [None]:
project_dir = "../../3_Results/Hyperparametertuning"
project_name = "Phase_2"
project_path = project_dir + "/" + project_name + "/"
if not os.path.exists(project_path):
    os.mkdir(project_path)
    os.mkdir(project_path + "tensorboard/")  
    os.mkdir(project_path + "keras_tuner/")  
    os.mkdir(project_path + "evaluations/")  
else:
    print("Existing directory found.")
if os.path.exists(project_path + "logfile.log"):
    os.rename(project_path + "logfile.log", project_path + "logfile_{}.log".format(datetime.now().strftime("%Y-%m-%d_%H-%M-%S")))
logger = Logger(project_dir, project_name, "logfile.log")
logger.activate_logging()

In [None]:
# Load data samples from pickle file
with open('../../1_Data/data_samples_for_hyperparametertuning.pkl', 'rb') as file:
    data_samples = pickle.load(file)

data_samples.keys()

In [5]:
def build_model(hp, oracle):

    #seed = len(oracle._tried_so_far) + 1
    seed = len([t for t in os.listdir(project_path + 'keras_tuner') if 'trial_' in t])
    print("seed:", seed)
    max_number_of_branched_dense_layers = 2
    number_of_branched_dense_layers = hp.Int("number_of_branched_dense_layers", min_value=1, 
                                             max_value=max_number_of_branched_dense_layers, step=1)
    
    print("number of branched dense layers: {}".format(number_of_branched_dense_layers))
    
    def next_layer(n_previous, rule, step, minimum):
        """To reduce complexity this function returns n, the number of neurons/cells etc. in a subsequent layer 
            based on the previous layer and a rule. There are three alternatives: 
            Subsequent layers are allowed to have the same, half or minimum n."""
        if n_previous == minimum:
            return minimum
        if rule == "same":
            return n_previous
        if rule == "half":
            return math.ceil(n_previous/2/step)*step
        if rule == "min":
            return minimum
    
    branched_dropout_rates = []
    branched_dense_neurons = []
    # first layers' values are selected from the range of defined values
    if number_of_branched_dense_layers > 0:
        branched_dense_neurons.append(hp.Int(f"branched_dense_neurons_1",  min_value=50, max_value=500, step=50, parent_name="number_of_branched_dense_layers", parent_values=[j for j in range(1, max_number_of_branched_dense_layers+1)]))
        branched_dropout_rates.append(hp.Float(f"branched_dropout_rate_1", min_value=0.1, max_value=0.5, step=0.2, parent_name="number_of_branched_dense_layers", parent_values=[j for j in range(1, max_number_of_branched_dense_layers+1)]))
    # further layers are assigned half/same as their preceeding layer or the minimum
    for i in range(2, number_of_branched_dense_layers + 1):
        choice_neurons = hp.Choice(f"branched_dense_neurons_{i}", ["same", "half", "min"], parent_name="number_of_branched_dense_layers", parent_values=[j for j in range(i, max_number_of_branched_dense_layers+1)])
        choice_dropout = hp.Choice(f"branched_dropout_rate_{i}", ["same", "half", "min"], parent_name="number_of_branched_dense_layers", parent_values=[j for j in range(i, max_number_of_branched_dense_layers+1)])
        if i <= number_of_branched_dense_layers:
            branched_dense_neurons.append(next_layer(branched_dense_neurons[-1], choice_neurons, 100, 100))
            branched_dropout_rates.append(next_layer(branched_dropout_rates[-1], choice_dropout, 0.3, 0.1))
    branched_dropout_rates = [round(r, 1) for r in branched_dropout_rates]
    print("branched dense neurons", branched_dense_neurons)
    print("branched dropout rates:", branched_dropout_rates)
    
    print("building model...")
    model = HyperDACDBLSTM(classes=2, features=1, domains=2, seed=seed, 
                           domain_clf_neurons=branched_dense_neurons, domain_clf_dropout_rates=branched_dropout_rates)
    print("model built")
    print(model.model.summary())
    return model

In [6]:
oracle=kt.oracles.BayesianOptimizationOracle(
    objective=kt.Objective("Cohens Kappa", "max"),
    max_trials=100
)

In [None]:
tuner = KerasTunerHyperModel(
    data_samples = data_samples,
    hypermodel=(lambda hp: build_model(hp, oracle)),
    oracle=oracle,
    overwrite=False,
    directory=project_path,
    project_name='keras_tuner'
)
tuner.objective=kt.Objective("Cohens Kappa", direction="max"),
tuner.directory

In [8]:
tuner.search_space_summary()

Search space summary
Default search space size: 3
number_of_branched_dense_layers (Int)
{'default': None, 'conditions': [], 'min_value': 1, 'max_value': 2, 'step': 1, 'sampling': None}
branched_dense_neurons_1 (Int)
{'default': None, 'conditions': [{'class_name': 'Parent', 'config': {'name': 'number_of_branched_dense_layers', 'values': [1, 2]}}], 'min_value': 50, 'max_value': 500, 'step': 50, 'sampling': None}
branched_dropout_rate_1 (Float)
{'default': 0.1, 'conditions': [{'class_name': 'Parent', 'config': {'name': 'number_of_branched_dense_layers', 'values': [1, 2]}}], 'min_value': 0.1, 'max_value': 0.5, 'step': 0.2, 'sampling': None}


In [None]:
print(datetime.now(tz=tzInfo))
tuner.search()