In [None]:
# IMPORTS
from tensorflow.keras import layers, models
import time
import ast
import tensorflow as tf
from predictor import Predictor
from copy import deepcopy
import random
from numpy import exp

In [None]:
# GLOBAL VARIABLES
show = True
predictor_data_filename = "data.txt"

In [None]:
# PREDICTOR MODEL
class Predictor:
    def __init__(self, n_layers, keras_loss_fun, hyperparameters, show_summary=False):
        # Hyperparameters:
        # List of dictionaries (one for each layer)
        model = models.Sequential()
        # Create input and hidden layers
        for i in range(n_layers):
            model.add(layers.Dense(hyperparameters[i]['size'], activation=hyperparameters[i]['activation']))

        # Create output layer (accuracy, test_latency)
        model.add(layers.Dense(2))  # No activation function, since it is a regression problem

        model.compile(optimizer='adam',
                      loss=keras_loss_fun,
                      metrics=['accuracy'])

        if show_summary:
            model.summary()

        self.model = model

    def train(self, train_data, train_labels, test_data, test_labels):

        history = self.model.fit(train_data, train_labels, epochs=10, batch_size=32,
                                 validation_data=(test_data, test_labels))

        start_time = time.time()
        test_loss, test_acc = self.model.evaluate(test_data, test_labels)
        end_time = time.time()
        test_time = end_time - start_time
        return test_acc, test_time

In [None]:
# PREDICTOR HELPER

# TRAIN AND TEST DATA
def _get_predictor_data(filename):
    hyperparameters, outputs = _parse_predictor_data_file(filename)
    hyperparameters, outputs = _preprocess_predictor_data(hyperparameters, outputs)

    dataset_size = len(hyperparameters)
    train_percentage = 0.7
    train_size = round(dataset_size * train_percentage)

    dataset_indexes = range(0, dataset_size)
    train_sample_indexes = random.sample(dataset_indexes, train_size)
    test_sample_indexes = [index for index in dataset_indexes if index not in train_sample_indexes]

    train_data, train_labels = [(hyperparameters[i], outputs[i]) for i in train_sample_indexes]
    test_data, test_labels = [(hyperparameters[i], outputs[i]) for i in test_sample_indexes]

    return train_data, train_labels, test_data, test_labels


activation_mappings = {
    'linear': 0,
    'relu': 1,
    'sigmoid': 5,
    'tanh': 6,
    'softmax': 8,
    'exponential': 10,
}

padding_mappings = {
    'valid': 0,
    'same': 1,
}


def _preprocess_predictor_data(inputs, outputs):
    processed_inputs = []
    for model in inputs:
        processed_model = []
        for (layer_type, params) in model:  # Iterate over all layers
            if 'conv' in layer_type:
                processed_model.append(activation_mappings[params['activation']])
                processed_model.append(padding_mappings[params['padding']])
                processed_model.append(params['kernel_size'])
                processed_model.append(params['strides'])
                processed_model.append(params['filters'] / 10)  #  Make it around the same size as the others
                processed_model.append(params['pool_size'])
            else:
                processed_model.append(padding_mappings[params['padding']])
                processed_model.append(params['strides'])
                processed_model.append(params['pool_size'])
        if show:
            print("\nModel = {}".format(model))
            print("\n--> Processed model = {}\n".format(processed_model))
        processed_inputs.append(processed_model)
    return processed_inputs, outputs


def _parse_predictor_data_file(filename):
    with open(filename, 'r') as f:
        lines = f.readlines()

    hyperparameters = []
    outputs = []  # Output  = (accuracy, latency)
    for line in lines:
        line = line.strip()
        result = ast.literal_eval(line)
        hyperparameters.append(result['HP'])
        outputs.append((result['Accuracy'], result['Latency']))

    return hyperparameters, outputs

predictor_loss_set = [
    'mean_squared_error',
    'mean_absolute_error',
    tf.keras.losses.Huber(),
    tf.keras.losses.LogCosh()
]

predictor_n_layer_set = range(1, 10)

predictor_layer_hp_set = {
    'activation': [
        'relu',
        'selu',
        'tanh',
        'linear',
        'swish',
    ],
    'size': [20, 40, 60, 80, 100]
}


def get_predictor_layer_default_hyperparameters():
    return {
        'activation': random.sample(predictor_layer_hp_set['activation'], 1)[0],
        'size': random.sample(predictor_layer_hp_set['size'], 1)[0]
    }


def get_default_predictor(n_layers, loss):
    return {
        "keras_loss_fun": loss,
        "n_layers": n_layers,
        "hyperparameters": [get_predictor_layer_default_hyperparameters() for _ in range(0, n_layers)]
    }

# GET PREDICTOR DATA
training_data, training_labels, testing_data, testing_labels = _get_predictor_data(predictor_data_filename)

def predictor_objective(params):
    n_layers = params["n_layers"]
    keras_loss_fun = params["keras_loss_fun"]
    hyperparameters = params["hyperparameters"]
    predictor = Predictor(n_layers, keras_loss_fun, hyperparameters)

    test_acc, test_time = predictor.train(training_data, training_labels, testing_data, testing_labels)

    obj_value = test_acc**2 / test_time
    if show:
        print("\n", "-" * 8, "test_accuracy: {} #### test_latency: {} #### objective: {}"
                    .format(test_acc,test_time, obj_value), "-" * 8)

    return obj_value


def predictor_get_random_neighbouring_solution(old_solution):
    solution = deepcopy(old_solution)
    n_layers = solution["n_layers"]

    # Change the size of two layers
    change_size_layers = random.sample(range(0, n_layers), 2)
    for change_size_layer in change_size_layers:
        new_size = random.sample(predictor_layer_hp_set["size"], 1)[0]
        solution["hyperparameters"][change_size_layer]["size"] = new_size
    # Change activation of one layer
    change_activation_layer = random.sample(range(0, n_layers), 1)[0]
    new_activation = random.sample(predictor_layer_hp_set["activation"], 1)[0]
    solution["hyperparameters"][change_activation_layer]["activation"] = new_activation

    return solution


In [None]:
# METAHEURISTICS: SIMULATED ANNEALING
def simulated_annealing(initial_solution, neighbour_gen_fun, objective_fun, iterations, c, seed=-1):
    # Initialization phase
    if seed >= 0:
        random.seed = seed
    current_solution = initial_solution
    current_eval = objective_fun(current_solution, show)
    # Optimization phase
    for i in range(iterations):
        next_solution = neighbour_gen_fun(current_solution)
        next_eval = objective_fun(next_solution)
        if next_eval > current_eval or current_eval == 0 or _accept_worse_solution(c, i+1, current_eval, next_eval):
            current_solution = next_solution
            current_eval = next_eval
            if show:
                print("\n", "#", "Accepted new solution")
    return current_solution, current_eval


def _accept_worse_solution(c, interval, curr_val, next_val):
    return random.random() < exp(interval * (next_val - curr_val) / (c * curr_val))

In [None]:
# OPTIMIZER

def _predictor_sa_optimization(n_layers, loss_fun):
    c = 1
    iterations_per_conf = 50
    seed = -1
    return simulated_annealing(get_default_predictor(n_layers, loss_fun),
                               predictor_get_random_neighbouring_solution,
                               predictor_objective,
                               iterations_per_conf,
                               c,
                               seed)

def optimize_predictor(optimize_fun):
    best_results = {}

    for n_layers in predictor_n_layer_set:
        for loss_fun in predictor_loss_set:
            model, eval = optimize_fun(n_layers, loss_fun)
            print("#### LAYERS: {}, LOSS: {}: MODEL: {}, EVAL: {} ####\n".format(n_layers, loss_fun, model, eval))
            best_results[n_layers, loss_fun] = {
                "model": model,
                "eval": eval,
            }
            

In [None]:
# RUN
optimize_predictor(_predictor_sa_optimization)