In [14]:
import abc
from typing import Dict, Callable, List

import torch
import torch.optim         as optim
from torch.distributions   import constraints, transform_to

# Pyro probabilistic language for Bayesian hyperparameter tuning
#!{sys.executable} -m pip install pyro-ppl
import pyro
import pyro.contrib.gp as gp

from sklearn import linear_model

Bayesian hyperparameter tuning
The below section adapts the tutorial from the Pyro docs at http://pyro.ai/examples/bo.html. I have used their example as a basis to alter for use with my random forest (and later neural network) models.

In [13]:
class ObjectiveFunction(Callable):
    __metaclass__ = abc.ABCMeta
    
    @abc.abstractmethod
    def evaluate(self, param_dict: Dict) -> float:
        """Evaluate the objective function for a supplied set of parameters(param-dict).
        Args:
            param_dict (Dict): Dictionary of param_name (str): param_values (List[float]) that are used to evaluate 
            the objective function.
        Returns:
            float: The value of the objective function at the given point in the parameter space defined by 'param_dict'.
        """
        return

class BayesParameterOptimiser():
    def __init__(self, param_dict: Dict, objective: ObjectiveFunction.evaluate, n_initial_evals: int) -> None:
        self._param_dict = param_dict
        self._objective = objective
        self._n_initial_evals = n_initial_evals
        
        self._lower_bounds = [min(param_dict[hyperparam]) for hyperparam in param_dict]
        self._upper_bounds = [max(param_dict[hyperparam]) for hyperparam in param_dict]
        self._parameter_constraints = [constraints.interval(lower, upper) for lower, upper 
                                       in zip(lower_bounds, upper_bounds)]
    
        # Generating set of parameters with values randomly selected from the above ranges
        X_init = [[options[np.random.randint(len(options))] 
                   for param, options in self._param_dict.items()] 
                  for i in range(self._n_initial_evals)]

        # Converting to a tensor for compatibility with Pyro and PyTorch
        self._X_init = torch.tensor(X_init, dtype = torch.float)
        
        # Evaluate the objective funtion at each initial point in the hyperparameter space
        self._y = self._objective(self._X_init)
        print(f'Initial parameters sets: {self._X_init}')
        print(f'Objective scores for each initial parameters set: \n{y}')
    
        # Defining a Gaussian process prior that will serve as our surrogate objective function
        # It is a 'prior' at this stage as it is based on our prior beliefs (value of the objective function at the initial parameter values)
        # As each new set of parameters is chosen, the surrogate Gaussian process model will be updated to a posterior based on the new data (following Bayes' theorem)
        # The Gaussian process model approximates the distribution over y (the value of the objective function) for a given parameter vector (X)

        # Initialising the model
        self._gpmodel = gp.models.GPRegression(self._X_init,                       # Tensor of hyperparameter sets
                                               self._y,                            # Corresponding validations accuracies
                                               gp.kernels.Matern52(input_dim = 1), # Kernel type for the Gaussian process
                                               noise = torch.tensor(0.1),          # Variance of Gaussian noise in the GP model
                                               jitter = 1.0e-4)                    # Stabilises a key mathematical operation (Cholesky decomposition)
        
    # Defining a function to find the posterior distribution of the surrogate objective function (the GP model) in light of a new set of parameters
    def update_posterior(self, x_new):

        # Evaluate the objective function at the new point in parameter space
        y = self._objective(x_new)

        # Concatenate the new parameter vector (x_new) onto the existing (prior) X space of GP model
        X = torch.cat([self._gpmodel.X, x_new])

        # Concatenate the corresponding objective function evaluation onto the prior y space
        y = torch.cat([self._gpmodel.y, y])

        # Update GP model with new expanded parameter and objective spaces
        self._gpmodel.set_data(X, y)

        # Optimize the GP model's hyperparameters (different to the objective model's parameters!) using Adam with a learning rate of 0.001
        self._optimizer = torch.optim.Adam(self._gpmodel.parameters(), lr = 0.001)

        # Train the model on the new expanded data
        gp.util.train(self._gpmodel, self._optimizer)

    # Defining the acquisition function
    def lower_confidence_bound(self, x, kappa: int = 2):
        """This function finds the lower confidence bound of the '_gpmodel' distribution over y for a given x.
        The lower confidence bound balances exploitaion (points closer to the mean which are likely to be a maximum)
        with exploration (trying points further out in the distribution in the hope of finding a better maximum).
        The parameter 'kappa' is a number greater than 0 that sets how interested we are in exploration.
        """

        # Find the mean and variance of the surrogate model
        mu, variance = self._gpmodel(x, full_cov = False, noiseless = False)

        # Get standard deviation from the variance
        sigma = variance.sqrt()

        # Return the lower confidence bound, the mean minus kappa times the standard deviation
        return mu - (kappa * sigma)

    def vector_constrain(self, vector, constraint, inverse = False):

        vector = torch.cat(tuple(transform_to(constraint[i]).inv(vector[0][i].float()).view(1) if inverse 
                                 else transform_to(constraint[i])(vector[0][i].float()).view(1) 
                                 for i in range(len(vector[0])))).view((vector.shape)).requires_grad_(True)

        return vector

    # 
    def find_a_candidate(self, x_init, constraint):

        # Transform x to an unconstrained domain
        unconstrained_x_init = vector_constrain(x_init, constraint, inverse = True) #transform_to(constraint).inv(x_init)

        # Make a gradient enabled clone
        unconstrained_x = unconstrained_x_init.clone().detach().requires_grad_(True)

        # 
        minimizer = optim.LBFGS([unconstrained_x], line_search_fn = 'strong_wolfe')

        def closure():

            minimizer.zero_grad()
            x = vector_constrain(unconstrained_x, constraint)
            y = lower_confidence_bound(x)
            autograd.backward(unconstrained_x, autograd.grad(y, unconstrained_x))

            return y

        minimizer.step(closure)

        # After finding a candidate in the unconstrained domain convert it back to original domain.
        x = vector_constrain(unconstrained_x, constraint)

        return x.detach()

    # 
    def next_x(self, 
               constraint, 
               num_candidates = 5):

        candidates = []
        values = []

        x_init = self._gpmodel.X[-1:]

        for i in range(num_candidates):

            x = find_a_candidate(x_init, constraint)
            y = lower_confidence_bound(x)
            candidates.append(x)
            values.append(y)
            x_init = torch.tensor([[x[0][i].new_empty(1).uniform_(low, high) 
                                    for i, (low, high) in enumerate(zip(self._lower_bounds, self._upper_bounds))]])

        argmin = torch.min(torch.cat(values), dim = 0)[1].item()
        return candidates[argmin]
    
    def optimise(self, n_iterations: int):

        optimizer = torch.optim.Adam(self._gpmodel.parameters(), lr = 0.001)
        gp.util.train(self._gpmodel, optimizer)

        for i in range(n_iterations):

            xmin = next_x(self._parameter_constraints)
            update_posterior(xmin)

            print(f'{i}. Parameters: {self._gpmodel.X[-1:]}, Objective(Parameters): {-self._gpmodel.y[-1:]}')

        lin = linear_model.LinearRegression()
        lin.fit(np.arange(len(self._gpmodel.y)).reshape(-1, 1), (-self._gpmodel.y) * 100)
        plt.plot(np.arange(len(self._gpmodel.y)), (-self._gpmodel.y) * 100)
        plt.plot(np.arange(len(self._gpmodel.y)), lin.predict(np.arange(len(self._gpmodel.y)).reshape(-1, 1)), label = 'Linear Fit')
        plt.xlabel('Iteration Number')
        plt.ylabel('Objective(Parameters)')
        plt.legend()
        plt.show()