Bayesian Optimization from Scratch 🔝
--------------------------

We must create some functions in this notebook that will help us make Bayesian Optimization to search for the best hyperparameter that will give the best model on the validation set. Bayesian Optimization uses Gaussian Process to approximate the objective function given input data. In our case, the input data is the set of hyperparameter values we must tune. The Bayesian theorem is used to direct the search to find the minimum or maximum of our objective function. The objective function can be the `Accuracy,` the `Recall,` or whatever score function to evaluate the model performance. 

The Bayesian Theorem suggests identifying a prior distribution for the objective function and updating it with the likelihood obtained with data given the objective function to get a posterior distribution. The Gaussian Process (GP) is commonly used for noisy distribution, which is difficult to directly simple from. The GP doesn't need to optimize parameters to find the distribution and can give a standard derivation around the mean distribution, which defines the uncertainty about the approximation. 

The prior distribution is aMultivariate normal distribution with mean $m_0$ and variance $K_{x, x}$, which is commonly a kernel distribution like the Radial Basis function (RBF) and calculates the similarity between the input values. Some noise $\epsilon$ are added to the Prior Distribution with a mean of 0 and a variance $\sigma^2$. The posterior distribution is also Multivariate normal distribution for which we are searching the mean $m_y$ vector and the covariance matrix $\Sigma$. The `Cholesty iterative method` is commonly used to find the best posterior mean and covariance for a given new point. We will explain further the Gaussian Process. 

For now, let us focus on Bayesian Optimization: 

- After finding the posterior distribution, we can obtain the mean value of the objective function for any group of new hyperparameters. The new objective function values are used to find the best new samples for the subsequent evaluation trial since we make trials before finding the best hyperparameter values.

- The first trial finds the first score from the objective function given random hyperparameter values. 

- A `surrogate function` is used to give that score from the Posterior Multivariate Distribution of the objective function given the input data.

- The estimated score from the `surrogate function` is then used in a new function named `acquisition function` to generate new samples from the hyperparameter's search spaces. It exists many different `acquisition functions.`

- The new samples are concatenated to the previous ones and used to train a further Posterior distribution. 

- The process is repeated until finding the most satisfying score.

**Note**:  This idea is related to reinforcement learning methods to search for the following action(s) which maximize the value function (the reward of long term). We can consider the value function to be the cumulative distribution function of the approximate Posterior Multivariate Distribution function over the new samples and the new actions to be the ones sampled from the value function plus a variance rate to explore more states. The states are abstracted (not visible) in our case.

We will implement the Bayesian Optimization process from scratch since we want to customize it for the current project. We will not code the Gaussian Process Regression sub-process since it is already integrated into the `scikit-learn` library. We can also use the `GPytorch` library which can provide better result but for the purpose of that project we will not need it. Let us define nextly the objective function. 

Let us import the necessary libraries.

In [31]:
import warnings
warnings.filterwarnings('ignore')

from sklearn.gaussian_process import GaussianProcessRegressor
from functools import partial
from torch.nn import MSELoss
from scipy.stats import norm
from functools import partial
from typing import *
from torch import nn
import pandas as pd
import numpy as np
import random
import torch

### Objective function

We will use the `MSELoss` to calculate the value returned by the objective function, which is our training function. It will calculate the mean squared error between values predicted from a feed-forward neural network and pre-defined target values. The input and target values are randomly initialized from a Gaussian distribution. We will need input data of 8 variables and 100 samples (not very large, not fast up the training). The following parameters will be necessary for a sample example:

- The number of epochs -> ... [1, 10]
- The number of layers -> ... [1, 4]
- The number of features -> ... [40, 100]
- The learning rate -> ... [1e-1, 1e-4]

Let us initialize the input data and the targets.

In [2]:
X = torch.randn((100, 8))

y = torch.rand((100, 1))

Let us initialize the model and the objective function. Notice that we must add the noise we defined earlier to the final calculated score. The noise is sampled from a normal distribution with a mean of 0 and a scale that we must determine. Let us choose a $\sigma^2 = 0.1$ scale as the default.

In [3]:
# model
def set_model(input_size: int = 8, n_features: int = 1, n_layers: int = 1):
    
    layers = [nn.Sequential(nn.Linear(input_size, n_features), nn.ReLU())]
    
    layers.extend([nn.Sequential(nn.Linear(n_features, n_features), nn.ReLU()) for i in range(n_layers - 1)])
    
    layers.append(nn.Sequential(nn.Linear(n_features, 1)))
    
    sequence = nn.Sequential(*layers)
    
    return sequence

# Only one iteration will be sufficient
def objective(optimizer: nn.Module, loss_fn: nn.Module, input: torch.Tensor, target: torch.Tensor, params: dict, scale: float = 0.1):
    
    noise = torch.distributions.normal.Normal(0.0, scale).sample().item()
    
    model = set_model(n_features=params['n_features'], n_layers=params['n_layers'])
    
    optimizer_ = optimizer(model.parameters(), lr = params['lr'])
    
    losses = []
    
    for _ in range(params['epochs']):
        
        outputs = model(input)
        
        loss = loss_fn(outputs, target)
        
        loss.backward()
        
        optimizer_.step()
        
        optimizer_.zero_grad()
        
        losses.append(loss.item())
    
    return 1 / np.mean(losses) + noise

We must also define simple functions which generate random samples from search spaces.

In [4]:
%%writefile fake-face-detection/fake_face_detection/utils/sampling.py
from typing import *
import numpy as np
import random

def get_random_sample(search_space: dict, p: Union[List[float], None] = None):
    """Recuperate a random sample

    Args:
        search_space (dict): A dictionary defining the search space

    Raises:
        ValueError: 'min' and 'max' can only be numbers
        KeyError: Only the following keys can be provided {'min', 'max'}, {'value'}, {'values'} or {'values', 'p'} 

    Returns:
        Union[int, float, str]: The random sample 
    """
    
    keys = set(search_space)
    
    if keys == set(['min', 'max']):
        
        assert search_space['min'] < search_space['max']
        
        if isinstance(search_space['min'], int) and isinstance(search_space['max'], int):
            
            return random.randint(search_space['min'], search_space['max'])
        
        elif isinstance(search_space['min'], float) or isinstance(search_space, float):
            
            return random.uniform(search_space['min'], search_space['max'])
        
        else:
            
            raise ValueError("You can only provide int or float values with min max!")
    
    elif keys == set(['value']):
        
        return search_space['value']
    
    elif keys.issubset(set(['values'])):
        
        p = None
        
        if 'p' in keys: p = search_space['p']
        
        return np.random.choice(search_space['values'], size = (1), p = p)[0]
    
    else:
        
        raise KeyError("You didn't provide right keys! Try between: {'min', 'max'}, {'value'}, {'values'} or {'values', 'p'}")
        

def get_random_samples(search_spaces: dict):
    """Recuperate random samples from a dictionary of search spaces

    Args:
        search_spaces (dict): A dictionary where the keys are the hyperparameter names and the values are the search spaces

    Returns:
        dict: A dictionary where the keys are the hyperparameter names and the values are the sampled values from the search spaces
    """
    
    samples = {}
    
    for search_space in search_spaces:
        
        samples[search_space] = get_random_sample(search_spaces[search_space])
    
    return samples

Overwriting fake-face-detection/fake_face_detection/utils/sampling.py


In [5]:
%run fake-face-detection/fake_face_detection/utils/sampling.py

Let us test the sampling functions and do training to obtain a first value.

In [6]:
# define the search spaces
search_spaces = {'epochs': {
    'min': 1,
    'max': 10
    },
    'n_layers': {
        'values': [1, 2, 3, 4]
    },
    'n_features': {
        'value': 50
    },
    'lr': {
        'min': 1e-4,
        'max': 1e-1
    }
}

# recuperate random samples
samples = get_random_samples(search_spaces)

We obtain the following samples for each hyperparameter.

In [7]:
samples

{'epochs': 7, 'n_layers': 4, 'n_features': 50, 'lr': 0.02310174766808769}

Let us train the model with them and recuperate the loss.

In [8]:
loss = objective(torch.optim.Adam, nn.MSELoss(), X, y, samples)

We obtain the following loss from the sampled hyperparameter values.

In [9]:
loss

5.45991277150153

Let us now implement the surrogate function

### The surrogate function

The surrogate tries to estimate the objective function using the Bayes theorem probabilistically. We want to find the probability of obtaining such a score $f$ conditionally to input data $D$. The score is the loss calculated after training, and the input data is the set of parameters. To simulate the posterior distribution $P(f/D)$, we decided to use the Gaussian Process (GP) Regression. The kernel used to calculate the similarity between the input data sample can be the `Radial Basis Function` kernel which commonly provides excellent results. The GP Regression is already implemented in sklearn. We can use it directly for our implementation.

In [10]:
# define the distribution
gp_model = GaussianProcessRegressor()

The only data we have is the sample hyperparameter value which instantiates the input data.

In [11]:
# instantiate the input data
data = [list(samples.values())]

And the only score we currently have is the loss we calculated earlier.

In [12]:
# instantiate the scores
scores = [[loss]]

Let us fit the model with the data and scores.

In [13]:
gp_model.fit(data, scores)

We can estimate the value of the target using the input data (and the standard deviation).

In [14]:
pred, stds = gp_model.predict(data, return_std=True)

We obtain the following prediction, which is very close to the actual loss.

In [15]:
pred, stds

(array([5.45991277]), array([1.00000004e-05]))

It is time to choose an acquisition function to generate new samples.

### Acquisition Function

The acquisition function will use the surrogate to examine which of many random samples is the best suited for the next generation:

1. First, we must generate random samples from the search spaces.
2. Second, since we already have an approximation of the objective function's distribution conditionally to input data via the surrogate function, we can estimate the vector means and their corresponding standard deviations:
$$
\mu_e, \sigma_e \sim P(f/samples)
$$ 
3. Third, we must compare the values in the vector of the estimated means are compared to the best mean calculated as follows:
$$
\max(\mu), \space where \space \mu,. \sim P(f/input\_data)$$ 

The probability of improvement is the cumulative normal distribution of the normalized distance between the estimated means and the best one (this distance represents the regret in RL). The probability of improvement is calculated as follows:

$$
PI = P(f < \frac{\mu_e - \mu_{best}}{\sigma_e + \epsilon})
$$

Where $\epsilon$ is added to avoid division by zero.

**Remark**: it exists different acquisition functions like the UCB function. But for the purpose of our project we will focus on the probability of improvement.

**Note**: In our example we want to minimize the loss so we must take $-\mu_e$ and $-\mu$ to find the best solution.


Let us implement bellow the acquisition function.

In [16]:
%%writefile fake-face-detection/fake_face_detection/utils/acquisitions.py
from sklearn.gaussian_process import GaussianProcessRegressor
from scipy.stats import norm
from typing import *

def PI_acquisition(X: List, X_prime: List, model: GaussianProcessRegressor, maximize: bool = True):
    """Acquisition function for bayesian optimization using probability of improvement

    Args:
        X (List): A list containing the input data
        X_prime (List): A list containing the generate samples
        model (GaussianProcessRegressor): The gaussian model to use
        maximize (bool, optional): A boolean value indicating the optimization objective. Defaults to True.

    Returns:
        List: A list containing the probabilities
    """
    
    # let us predict the means for the input data
    mu = model.predict(X)
    
    # let us calculate the means and standard deviation for the random samples
    mu_e, std_e = model.predict(X_prime, return_std=True)
    
    if not maximize:
        
        mu = -mu
        
        mu_e = -mu_e
    
    # let us take the best mean
    mu_best = max(mu)
    
    # let us calculate the probability of improvement
    probs = norm.cdf((mu_e - mu_best) / std_e)
    
    return probs

Overwriting fake-face-detection/fake_face_detection/utils/acquisitions.py


In [17]:
%run fake-face-detection/fake_face_detection/utils/acquisitions.py

The next generated sample is the one which have the best probability of being chosen.

In [18]:
%%writefile fake-face-detection/fake_face_detection/utils/generation.py
from fake_face_detection.utils.acquisitions import PI_acquisition
from fake_face_detection.utils.sampling import get_random_samples
from sklearn.gaussian_process import GaussianProcessRegressor
from typing import *
import numpy as np

def PI_generate_sample(X: Iterable, model: GaussianProcessRegressor, search_spaces: dict, n_tests: int = 100, maximize: bool = True):
    """Generate new samples with the probability of improvement

    Args:
        X (Iterable): The list of input data
        model (GaussianProcessRegressor): The model to train
        search_spaces (dict): The search spaces
        n_tests (int, optional): The number of random samples to test. Defaults to 100.
        maximize (bool, optional): The optimization strategy. If maximize == True -> maximize, else -> minimize. Defaults to True.

    Returns:
        List: The new sample
    """
    
    # let us create random samples
    X_prime = [list(get_random_samples(search_spaces).values()) for i in range(n_tests)]
    
    # let us recuperate the probabilities from the acquisition function
    probs = PI_acquisition(X, X_prime, model, maximize = maximize)
    
    # let us return the best sample
    return X_prime[np.argmax(probs)]

Overwriting fake-face-detection/fake_face_detection/utils/generation.py


In [19]:
%run fake-face-detection/fake_face_detection/utils/generation.py

Let us generate the next sample with the above function.

In [20]:
new_samples = PI_generate_sample(data, gp_model, search_spaces, maximize = False)

We obtained the following new values for the next training.

In [21]:
new_samples

[1, 1, 50, 0.03131119507929577]

Let us train again the model and recuperate the new score.

In [22]:
# initialize the new dictionary of samples
params = {key: new_samples[i] for i, key in enumerate(search_spaces)}

# calculate the new score
new_score = objective(torch.optim.Adam, nn.MSELoss(), X, y, params)


In [23]:
new_score

2.981966343907849

We don't have enough data so we can obtain a worth loss. We must concatenate the generated samples and scores in order to obtain a more accurate prediction from the surrogate function.

In [24]:
data.append(new_samples)
scores.append([new_score])

In [25]:
%%writefile fake-face-detection/fake_face_detection/optimization/bayesian_optimization.py
from fake_face_detection.utils.generation import PI_generate_sample as generate_sample
from fake_face_detection.utils.acquisitions import PI_acquisition as acquisition
from fake_face_detection.utils.sampling import get_random_samples
from sklearn.gaussian_process import GaussianProcessRegressor
from typing import *
import pandas as pd
import numpy as np

class SimpleBayesianOptimization:
    
    def __init__(self, objective: Callable, search_spaces: dict, maximize: bool = True):
        
        # recuperate the optimization strategy
        self.maximize = maximize
        
        # recuperate random sample
        sample = get_random_samples(search_spaces)
        
        # initialize the search spaces
        self.search_spaces = search_spaces
        
        # initialize the objective function
        self.objective = objective
        
        # calculate the first score
        score = objective(sample)
        
        # initialize the model
        self.model = GaussianProcessRegressor()
        
        # initialize the input data
        self.data = [list(sample.values())]
        
        # initialize the scores
        self.scores = [[score]]
        
        # fit the model with the input data and the target
        self.model.fit(self.data, self.scores)
    
    def optimize(self, n_trials: int = 50, n_tests: int = 100):
        """Finding the best hyperparameters with the Bayesian Optimization

        Args:
            n_trials (int, optional): The number of trials. Defaults to 50.
            n_tests (int, optional): The number of random samples to test for each trial. Defaults to 100.
        """
        # let us make multiple trials in order to find the best params
        for _ in range(n_trials):
            
            # let us generate new samples with the acquisition and the surrogate functions
            new_sample = generate_sample(self.data, self.model, self.search_spaces, n_tests, maximize = self.maximize)
            sample = {key: new_sample[i] for i, key in enumerate(self.search_spaces)}
            
            # let us recuperate a new score from the new sample
            new_score = self.objective(sample)
            
            # let us add the new sample, target and score to their lists
            self.data.append(new_sample)
            
            self.scores.append([new_score])
            
            # let us train again the model
            self.model.fit(self.data, self.scores)
        
    def get_results(self):
        """Recuperate the generated samples and the scores

        Returns:
            pd.DataFrame: A data frame containing the results
        """
        # let us return the results as a data frame
        data = {key: np.array(self.data, dtype = object)[:, i] for i, key in enumerate(self.search_spaces)}
        
        data.update({'score': np.array(self.scores)[:, 0]})
        
        return pd.DataFrame(data)
        
        

Overwriting fake-face-detection/fake_face_detection/optimization/bayesian_optimization.py


In [26]:
%run fake-face-detection/fake_face_detection/optimization/bayesian_optimization.py

Let us make 50 trials in order to find the set of hyperparameters.

In [27]:
# initialize the attributes
simple_bayesian_optimization = SimpleBayesianOptimization(partial(objective, torch.optim.Adam, nn.MSELoss(), X, y), search_spaces, maximize=False)

# optimize to find the best hyperparameters
simple_bayesian_optimization.optimize(50)

# recuperate the results
results = simple_bayesian_optimization.get_results()

In [28]:
# display the results
pd.options.display.max_rows = 50
results.head(50)

Unnamed: 0,epochs,n_layers,n_features,lr,score
0,10,1,50,0.047436,6.404869
1,3,3,50,0.017284,5.858079
2,7,4,50,0.014734,6.854856
3,6,1,50,0.033547,5.269045
4,1,1,50,0.055477,2.092933
5,10,4,50,0.09402,-0.105739
6,10,4,50,0.076977,0.254223
7,1,4,50,0.059555,2.381605
8,10,3,50,0.095012,0.248136
9,9,3,50,0.096846,0.12756


The best loss and the corresponding hyperparameters are the followings:

In [29]:
results[results['score'] == results['score'].min()]

Unnamed: 0,epochs,n_layers,n_features,lr,score
23,8,3,50,0.09264,-0.224459


Let us get the loss obtained with best set of hyperparameters without adding a large noise.

In [30]:
# recuperate the parameters
params = results[results['score'] == results['score'].min()]

params = params.drop('score', axis = 1).to_dict('list')

params = {key: value[0] for key, value in params.items()}

# train and get the loss
objective(torch.optim.Adam, nn.MSELoss(), X, y, params, scale=1e-5)

0.06265220941131512

The final loss is equal to `0.063`. The first loss was equal to `6.40`. We highly progressed since the first random sample ✌️.