# Class for Grid Search

In [5]:
#TODO maybe change ValueError exceptions to show wrong value

# python libraries
import numpy as np
from typing import Iterator, Callable
import itertools
from numbers import Number

# local libraries
from estimator import Estimator
from util_classes import Dataset
from optimizer import Optimizer
from nn import NeuralNetwork, LinearLayer, ActivationFunction

from loss import LossFunction

# def filter_dict_by_key(dictionary: dict, keys: list[str]) -> dict:
#     filtered_dict = {}
#     for key in keys:
#         if key in dictionary.keys():
#             filtered_dict[key] = dictionary[key]
#     return filtered_dict


class GridSearch:
    _net_keys = ["layers"]
    _optimizer_keys = ["eta", "l2_coeff", "alpha"]
    _loss_keys = ["fname"]
    _estimator_keys = ["batchsize"]
    # _global_keys = _net_keys + _optimizer_keys + _loss_keys + _estimator_keys

    # dictionary containing translations from exposed names to names to pass to functions internally
    _param_name_translations = {
        "layers": "layers",
        "l2": "l2_coeff",
        "momentum": "alpha",
        "eta": "eta",
        "loss": "fname",
        "batchsize": "batchsize",
    }

    # check param_grid validity
    @staticmethod
    def _check_param_grid(param_dict: dict, hyper_grid: dict) -> None:
        """Checks that the given grid of hyperparameters is correct

        Parameters
        ----------
        param_dict : dict
            dictionary of accepted hyperparameters
        hyper_grid : dict
            dictionary containing the grid of hyperparameters on which to perform the validity check

        Returns
        -------
        None

        Raises
        ------
        ValueError
            when an hyperparameter is missing or one of its possible values is not valid
        TypeError
            when an hyperparameter has a value with the wrong type or is not a list of values
        """

        activation_functions = ["ReLU", 'logistic', 'tanh', "linear"]
        loss_functions = ["MSE"]

        for key in param_dict.keys():
            if not key in hyper_grid.keys():
                raise ValueError(
                    (
                        "All the following parameters must be present in the"
                        " hyperparameter grid: "
                    ),
                    list(param_dict.keys()),
                )
            if not isinstance(hyper_grid[key], list) or len(hyper_grid[key]) == 0:
                raise ValueError(
                    "Each parameter must have an associated not empty list of"
                    " parameters"
                )

        # check layers
        for net in hyper_grid["layers"]:
            for layer in net:
                if (
                    not isinstance(layer, tuple)
                    or not len(layer) == 2
                    or not isinstance(layer[0], int)
                    or not isinstance(layer[1], str)
                ):
                    raise TypeError(
                        "The layers parameter accepts a list of tuples of length 2 with"
                        " the first element being an integer that is the number of"
                        " units in that layer, and the second element is a string"
                        " containing the name of the activation function for that layer"
                    )
                if layer[0] <= 0:
                    raise ValueError("The number of units must be greater than 0")
                if not layer[1] in activation_functions:
                    raise ValueError(
                        (
                            "Only the following values are accepted for activation"
                            " function: "
                        ),
                        activation_functions,
                    )

        # check l2
        for l2_coeff in hyper_grid["l2"]:
            if not isinstance(l2_coeff, Number):
                raise TypeError("The L2 coefficient must be a number")
            if l2_coeff < 0:
                raise ValueError("The L2 coefficient must be at least 0")

        # check momentum
        for momentum in hyper_grid["momentum"]:
            if not isinstance(momentum, Number):
                raise TypeError("The momentum parameter must be a number")
            if momentum < 0:
                raise ValueError("The momentum parameter must be at least 0")

        # check eta
        for eta in hyper_grid["eta"]:
            if not isinstance(eta, Number):
                raise TypeError("The learning rate must be a number")
            if eta <= 0:
                raise ValueError("The learning rate must be greater than 0")

        # check loss functions
        for loss in hyper_grid["loss"]:
            if not isinstance(loss, str):
                raise TypeError(
                    "The loss must be a string corresponding to the required loss"
                )
            if loss not in loss_functions:
                raise ValueError(
                    "Only the following values are accepted for the loss: ",
                    loss_functions,
                )

        # check batchsize
        for batchsize in hyper_grid["batchsize"]:
            if not isinstance(batchsize, int):
                raise TypeError("The batch size must be an integer")
            if batchsize <= 0 and batchsize != -1:
                raise ValueError(
                    "The batch size must be greater than 0. If -1 is passed as a value"
                    " the size of the dataset will be used"
                )

    def __init__(self, estimator: Estimator, hyper_grid: dict):
        """Initializes a new instance

        Parameters
        ----------
        estimator : Estimator
            the estimator to use for training and evaluation
        hyper_grid : dict
            grid of hyperparameters

        Raises
        ------
        TypeError
            when parameter types are incorrect
        """
        if estimator == None or type(estimator) != Estimator:
            raise TypeError
        self._estimator = estimator
        if hyper_grid == None or type(hyper_grid) != dict:
            raise TypeError

        # check for wrong values
        GridSearch._check_param_grid(self._param_name_translations, hyper_grid)

        # translate names of parameters and sort by key for better efficiency
        new_grid = {}
        for key in self._param_name_translations.keys():
            if key in hyper_grid:
                new_grid[self._param_name_translations[key]] = hyper_grid[key]
        self._hyper_grid = new_grid

    # returns a list of data folds through indexes
    def _generate_folds(
        self, dataset: Dataset, n_folds: int
    ) -> list[tuple[Dataset, Dataset]]:
        """function to generate the folds to use during grid search

        Parameters
        ----------
        dataset : Dataset
            dataset to split in folds
        n_folds : int
            number of folds

        Returns
        -------
        list(tuple[Dataset, Dataset])
            returns a list containing tuples of Dataset classes. Each tuple is of the form (Training set, Test set)

        """

        data_size = dataset.ids.shape[0]
        indices = np.arange(data_size)

        # TODO maybe shuffle not needed if we assume dataset has already been shuffled
        np.random.shuffle(indices)

        folds = []

        for index_lists in np.array_split(indices, n_folds):
            # make mask to split test and training set indices
            mask = np.zeros(data_size, dtype=bool)
            mask[index_lists] = True
            test_indices = indices[mask]
            train_indices = indices[~mask]
            # initialize test set and training set
            test_set = Dataset(
                ids=dataset.ids[test_indices],
                labels=dataset.labels[test_indices],
                data=dataset.data[test_indices],
            )
            train_set = Dataset(
                ids=dataset.ids[train_indices],
                labels=dataset.labels[train_indices],
                data=dataset.data[train_indices],
            )
            folds.append((train_set, test_set))
        return folds

    def _create_estimator_params(self, combination: dict, input_dim: int) -> dict:
        """function to create the dictionary to pass to estimator for update

        Parameters
        ----------
        combination : dict
            combination of hyperparameters to use
        input_dim : int
            number of features of dataset

        Returns
        -------
        dict
            a dictionary to pass to the estimator's update function
        """
        # filter parameters for various classes
        loss_params = {key: combination[key] for key in self._loss_keys}
        estimator_params = {key: combination[key] for key in self._estimator_keys}
        optimizer_params = {key: combination[key] for key in self._optimizer_keys}
        net_params = {key: combination[key] for key in self._net_keys}

        # create dictionary of params to pass to constructors
        estimator_params["loss"] = LossFunction(**loss_params)
        estimator_params["optimizer"] = Optimizer(**optimizer_params)

        # create list of layers to create NN
        old_units = input_dim
        layer_list = []
        for layer in net_params["layers"][:-1]:
            layer_list.append(LinearLayer((old_units, layer[0])))
            # TODO maybe linear layers can be removed
            layer_list.append(ActivationFunction(fname=layer[1]))
            old_units = layer[0]
        last_layer = net_params["layers"][-1]
        layer_list.append(LinearLayer((old_units, last_layer[0])))
        if last_layer[1] != "linear":
            layer_list.append(ActivationFunction(fname=last_layer[1]))
        estimator_params["net"] = NeuralNetwork(layer_list)
        return estimator_params

    # returns the best set of hyperparameters
    def k_fold(
        self,
        dataset: Dataset,
        n_folds: int,
        n_epochs: int,
        callback: Callable[[dict], None] = print,
        loss_list: list[str] = ['MSE']
    ) -> list:
        """function to execute a k-fold cross-validation on the given dataset

        Parameters
        ----------
        dataset : Dataset
            dataset to use for k-fold cross-validation
        n_folds : int
            number of folds to use in cross-validation
        n_epochs : int
            number of epochs to run training
        callback : Callable[[dict], None], optional
            callback function to use during training, by default print
        loss_list: list[str]
            list of loss to functions to apply to test set

        Returns
        -------
        list
            list containing results of the cross-validation ordered by increasing average loss on the test set.
            Every element is a list of dictionaries containing the
            combination of hyperparameters, the average of the test loss and the standard deviation on the test loss

        Raises
        ------
        ValueError
            when values of some parameters are incorrect
        """

        hyper_grid = self._hyper_grid
        estimator = self._estimator

        data_size = dataset.shape[0]
        input_dim = dataset.shape[1][0]
        output_dim = dataset.shape[1][1]

        # check n_folds value
        if n_folds > data_size:
            raise ValueError(
                "The number of folds cannot be greater than the number of samples in"
                " the dataset"
            )
        # check if output layer is correct for all combinations
        for layers in hyper_grid["layers"]:
            if layers[-1][0] != output_dim:
                raise ValueError(
                    "Number of units in last layer must be equal to the output"
                    " dimension of the data"
                )
        # check values for batchsize
        for batchsize in hyper_grid["batchsize"]:
            if batchsize > data_size:
                raise ValueError(
                    "The batchsize cannot be greater than the number of samples"
                )

        folds = self._generate_folds(dataset=dataset, n_folds=n_folds)

        # generates all combinations of hyperparameters
        keys, values = zip(*hyper_grid.items())
        param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

        combination_loss_list = []

        # iterates all combinations of hyperparameters
        for combination in param_combinations:

            estimator_params = self._create_estimator_params(combination, input_dim)
            if estimator_params["batchsize"] == -1:
                estimator_params["batchsize"] = data_size
            estimator.update_params(**estimator_params)

            test_loss_list = []

            print(combination)

            # iterates folds of dataset
            for train_set, test_set in folds:
                estimator.train(dataset=train_set, n_epochs=n_epochs, callback=callback)
                test_loss_list.append(estimator.evaluate(losses = loss_list, dataset = test_set))
                estimator.reset()
            test_loss_avg = {}
            test_loss_std = {}
            for loss in loss_list:
                test_loss_avg[loss] = sum(d[loss] for d in test_loss_list) / len(test_loss_list)
                test_loss_std[loss] = np.std([d[loss] for d in test_loss_list])

            # test_loss_avg = sum(test_loss_list) / n_folds
            # test_loss_std = np.std(test_loss_list)

            combination_loss_list.append(
                {
                    "parameters": combination,
                    "test_loss_avg": test_loss_avg,
                    "test_loss_std": test_loss_std,
                }
            )
            print(combination_loss_list)
        if(loss_list[0] == 'binary_accuracy'):
            combination_loss_list.sort(key=lambda x: x["test_loss_avg"][loss_list[0]], reverse = True)
        else:
            combination_loss_list.sort(key=lambda x: x["test_loss_avg"][loss_list[0]])
        return combination_loss_list

    # returns an estimation of the risk for the model, average +- standard deviation
    def nested_k_fold(
        self,
        dataset: Dataset,
        inner_n_folds: int,
        outer_n_folds: int,
        n_epochs: int,
        inner_callback: Callable[[dict], None] = print,
        outer_callback: Callable[[dict], None] = print,
    ) -> dict:
        """function implementing nested k-fold cross validation

        Parameters
        ----------
        dataset : Dataset
            dataset to run cross validation on
        inner_n_folds : int
            number of folds for the inner cross validation
        outer_n_folds : int
            number of folds for the outer cross validation
        n_epochs : int
            number of epochs to run training for
        inner_callback : Callable[[dict], None], optional
            callback function to use during training for the inner cross validation, by default print
        outer_callback : Callable[[dict], None], optional
            callback function to use during training for the outer cross validation, by default print

        Returns
        -------
        dict
            a dictionary containing a list of tuples each containing the best combination of hyperparameters
            on that fold and the corresponding loss on the test set for that fold, the average loss on the test sets across the folds
            and their standard deviation

        Raises
        ------
        ValueError
            when values are incorrect
        """

        estimator = self._estimator
        data_size = dataset.shape[0]
        folds = self._generate_folds(dataset=dataset, n_folds=outer_n_folds)
        input_dim = dataset.shape[1][0]

        # check outer_n_folds value
        if outer_n_folds > data_size:
            raise ValueError(
                "The number of folds cannot be greater than the number of samples in"
                " the dataset"
            )

        # check inner_n_folds value
        if inner_n_folds > data_size:
            raise ValueError(
                "The number of folds cannot be greater than the number of samples in"
                " the dataset"
            )

        test_loss_list = []
        param_combination_list = []

        for train_set, test_set in folds:
            train_results = self.k_fold(
                dataset=train_set,
                n_folds=inner_n_folds,
                n_epochs=n_epochs,
                callback=inner_callback,
            )
            params = train_results[0]["parameters"]
            estimator_params = self._create_estimator_params(params, input_dim)
            if estimator_params["batchsize"] == -1:
                estimator_params["batchsize"] = data_size

            estimator.update_params(**estimator_params)
            estimator.train(
                dataset=train_set, n_epochs=n_epochs, callback=outer_callback
            )
            test_loss_list.append(estimator.evaluate(test_set))
            param_combination_list.append(params)

        test_loss_avg = sum(test_loss_list) / outer_n_folds
        test_loss_std = np.std(test_loss_list)

        results = {
            "test_loss_list": list(zip(param_combination_list, test_loss_list)),
            "test_loss_avg": test_loss_avg,
            "test_loss_std": test_loss_std,
        }
        return results

In [9]:
net = NeuralNetwork(
    [
        LinearLayer((8, 16)),
        ActivationFunction(),
        LinearLayer((16, 16)),
        ActivationFunction(),
        LinearLayer((16, 2)),
    ]
)
estimator = Estimator(net)
grid = {}
grid["eta"] = [10e-3]
grid["momentum"] = [0.2]
grid["layers"] = [[(8, "ReLU"), (1, "linear")], [(4, "ReLU"), (1, "linear")]]
grid["loss"] = ["MSE"]
grid["l2"] = [10e-2]
grid["batchsize"] = [-1]

In [10]:
from datasets import read_monks, read_ML_cup

# data = read_ML_cup("train")
data = read_monks(1, "train")

In [11]:
selector = GridSearch(estimator, grid)
selector.k_fold(data, 5, n_epochs=8, loss_list=['binary_accuracy'])

{'layers': [(8, 'ReLU'), (1, 'linear')], 'l2_coeff': 0.1, 'alpha': 0.2, 'eta': 0.01, 'fname': 'MSE', 'batchsize': -1}
{'epoch': 1, 'loss': 533.6682410725465}
{'epoch': 2, 'loss': 0.2089354648232199}
{'epoch': 3, 'loss': 0.25254477147778764}
{'epoch': 4, 'loss': 0.25760474397842303}
{'epoch': 5, 'loss': 0.2528866073285979}
{'epoch': 6, 'loss': 0.24731637364048095}
{'epoch': 7, 'loss': 0.2428197579235229}
{'epoch': 8, 'loss': 0.2395144762576019}
[[0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]
 [0.01099011]] [1 0 1 1 1 1 1 0 1 0 0 1 1 0 1 1 0 0 0 1 0 1 0 1 0]
{'epoch': 1, 'loss': 208.8883950141158}
{'epoch': 2, 'loss': 0.1858129763458394}
{'epoch': 3, 'loss': 0.22373846607185963}
{'epoch': 4, 'loss': 0.239

[{'parameters': {'layers': [(8, 'ReLU'), (1, 'linear')],
   'l2_coeff': 0.1,
   'alpha': 0.2,
   'eta': 0.01,
   'fname': 'MSE',
   'batchsize': -1},
  'test_loss_avg': {'binary_accuracy': 0.499},
  'test_loss_std': {'binary_accuracy': 0.10239140588936164}},
 {'parameters': {'layers': [(4, 'ReLU'), (1, 'linear')],
   'l2_coeff': 0.1,
   'alpha': 0.2,
   'eta': 0.01,
   'fname': 'MSE',
   'batchsize': -1},
  'test_loss_avg': {'binary_accuracy': 0.499},
  'test_loss_std': {'binary_accuracy': 0.10239140588936164}}]

In [15]:
selector.nested_k_fold(data, 5, 5, 5)

{'layers': [(8, 'ReLU'), (1, 'tanh')], 'l2_coeff': 0.1, 'alpha': 0.2, 'eta': 0.01, 'fname': 'MSE', 'batchsize': -1}
{'epoch': 1, 'loss': 0.26582278481007754}
{'epoch': 2, 'loss': 0.26582278382269425}
{'epoch': 3, 'loss': 0.26581924099146403}
{'epoch': 4, 'loss': 0.2650335010837216}
{'epoch': 5, 'loss': 0.24710167857476026}
{'epoch': 1, 'loss': 0.23417721518987342}
{'epoch': 2, 'loss': 0.2341772151763254}
{'epoch': 3, 'loss': 0.234176894596665}
{'epoch': 4, 'loss': 0.2339988574694747}
{'epoch': 5, 'loss': 0.2267940311848102}
{'epoch': 1, 'loss': 0.25316455696202533}
{'epoch': 2, 'loss': 0.25316455696062107}
{'epoch': 3, 'loss': 0.2531644413394418}
{'epoch': 4, 'loss': 0.2530513578439216}
{'epoch': 5, 'loss': 0.24715413665359956}
{'epoch': 1, 'loss': 0.22151898734177203}
{'epoch': 2, 'loss': 0.2215189872888348}
{'epoch': 3, 'loss': 0.22151831881371536}
{'epoch': 4, 'loss': 0.22125811410805812}
{'epoch': 5, 'loss': 0.2129357656738729}
{'epoch': 1, 'loss': 0.23749999999999197}
{'epoch': 2,

TypeError: Estimator.evaluate() missing 1 required positional argument: 'dataset'

In [None]:
data.shape

In [8]:
tmp = np.array([1,2,3,4,5])
tmp.reshape(1,5).ndim

2