# Lab 9: Design of Experiment
In this lab, we will observe the effect of the DoE in the Bayesian optimization and Bio-inspired approaches.

In the following (hidden) block, the utilities used for running the experiments are implemented.

The list of available benchmark functions can be found at this [link](https://gitlab.com/luca.baronti/python_benchmark_functions)

**NOTE**: When studying the effect of the parameters is *extremely* important to vary just one parameter at a time. Therefore, you are suggested to study one parameter by fixing all the others, and then moving to the next.

Moreover, when comparing different algorithms, is *very* important to run each of them several times (e.g., 30) by using different initial random seeds.


You will use the sampling technique seen in the lecture. In particular, you will have to implement:


*   Random sampling
*   The Halton sequence
*   The full factorial sampling
*   The Latin Hypercube sampling



Code for the Bayesian Optimization, you have to reuse the code from Lab. 6 for the acquisition and the objective functions.

In [None]:
import itertools as it
import shutil
from pathlib import Path

import more_itertools
import pipe as p

from typing import List, Tuple, Iterator, Callable
from scipy.stats import norm

from math import sin
from math import pi
from numpy import arange, ndarray
from numpy import vstack
from numpy import argmax
from numpy import asarray
from numpy.random import normal
from numpy.random import random
from scipy.stats import norm
from sklearn.gaussian_process import GaussianProcessRegressor
from warnings import catch_warnings
from warnings import simplefilter
from matplotlib import pyplot

from numpy import mean
from sklearn.datasets import make_blobs
from sklearn.model_selection import cross_val_score
from sklearn.neighbors import KNeighborsClassifier
from skopt.space import Integer
from skopt.utils import use_named_args
from warnings import catch_warnings
from skopt import gp_minimize
from warnings import simplefilter

import cma
import inspyred
import importlib
import functools
import numpy as np
from pylab import *
from inspyred import ec
from copy import deepcopy
from functools import reduce
import benchmark_functions as bf
from matplotlib import pyplot as plt
from inspyred.ec import EvolutionaryComputation
from inspyred.ec import selectors, replacers, terminators

In [None]:
def objective(x: float, noise: float = 0.05) -> float:
    return sin(3 * 2 * pi * x) + np.random.uniform(-noise, noise)

In [None]:
XS = arange(0, 1, 0.001).reshape(-1, 1)
YS = objective(XS, 0)


def surrogate(model: GaussianProcessRegressor, X: ndarray[float]) -> Tuple[ndarray[float], ndarray[float]]:
    """
    surrogate or approximation for the objective function
    """

    # catch any warning generated when making a prediction
    with catch_warnings():
        # ignore generated warnings
        simplefilter("ignore")
        return model.predict(X, return_std=True)


def opt_acquisition(
        xs_known: ndarray[float],
        ys_known: ndarray[float],
        model: GaussianProcessRegressor,
        acquisition: Callable[[ndarray[float], ndarray[float], GaussianProcessRegressor], ndarray[float]]
) -> float:
    """
    optimize the acquisition function
    """

    # random search, generate random samples
    xs_unknown: ndarray[float] = random(50)
    xs_unknown = xs_unknown.reshape(-1, 1)

    # calculate the acquisition function for each sample
    scores = acquisition(xs_known, xs_unknown, model)

    # locate the index of the largest scores
    ix = argmax(scores)
    return xs_unknown[ix, 0]


def bayesian_optimization(
        generation: int,
        model: GaussianProcessRegressor,
        acquisition: Callable[[ndarray[float], ndarray[float], GaussianProcessRegressor], ndarray[float]],
        initial_points: List[float],
        file: str
) -> Tuple[ndarray[float], ndarray[float], GaussianProcessRegressor]:
    # reshape into rows and cols
    xs_known = asarray(initial_points).reshape(-1, 1)
    ys_known = asarray([objective(x) for x in xs_known]).reshape(-1, 1)

    # fit the model
    model.fit(xs_known, ys_known)

    # perform the optimization process
    for i in range(generation):
        # select the next point
        # and sample it
        x_next = opt_acquisition(xs_known, ys_known, model, acquisition)
        y_next = objective(x_next)

        # region plot
        fig, (ax1, ax2) = pyplot.subplots(2, sharex=True, height_ratios=[3, 1], gridspec_kw={'hspace': 0})
        plot_approximation(ax1, model, xs_known, ys_known, x_next)
        plot_acquisition(ax2, model, xs_known, acquisition, x_next)
        if i == 0:
            lines_labels = [ax.get_legend_handles_labels() for ax in fig.axes]
            lines, labels = [sum(lol, []) for lol in zip(*lines_labels)]
            fig.legend(lines, labels, loc='upper left')
        pyplot.savefig(f'{file} {i}.svg')
        # endregion

        # add the data to the dataset
        xs_known = vstack((xs_known, [[x_next]]))
        ys_known = vstack((ys_known, [[y_next]]))

        # update the model
        model.fit(xs_known, ys_known)

    pyplot.close()
    return xs_known, ys_known, model

In [None]:
from typing import Callable


def plot_approximation(
        ax,
        model,
        xs_known,
        ys_known,
        x_next,
):
    mu, std = model.predict(XS, return_std=True)
    ax.fill_between(
        XS.ravel(),
        mu.ravel() + 1.96 * std,
        mu.ravel() - 1.96 * std,
        alpha=0.1
    )
    ax.fill_between(
        XS.ravel(),
        YS.ravel() + 0.05,
        YS.ravel() - 0.05,
        alpha=0.1
    )
    ax.plot(XS, YS, 'y--', lw=1, label='objective')
    ax.plot(XS, mu, 'b-', lw=1, label='surrogate function')
    ax.plot(xs_known, ys_known, 'kx', mew=3, label='noisy samples')
    ax.axvline(x=x_next, ls='--', c='k', lw=1)


def plot_acquisition(
        ax,
        model,
        xs_known,
        acquisition: Callable[[ndarray[float], ndarray[float], GaussianProcessRegressor], ndarray[float]],
        x_next,
):
    ax.plot(XS, acquisition(xs_known, XS, model), 'r-', lw=1, label='Acquisition function')
    ax.axvline(x=x_next, ls='--', c='k', lw=1, label='Next sampling location')

In [None]:
def probability_of_improvement(
        xs_known: ndarray[float],
        xs_unknown: ndarray[float],
        model: GaussianProcessRegressor
) -> ndarray[float]:
    y_hat, _ = surrogate(model, xs_known)
    best = max(y_hat)

    mu, sd = surrogate(model, xs_unknown)
    z = (best - mu) / sd

    return norm.cdf(-z)

In [None]:
def expected_improvement(
        xs_known: ndarray[float],
        xs_unknown: ndarray[float],
        model: GaussianProcessRegressor,
) -> ndarray[float]:
    y_hat, _ = surrogate(model, xs_known)
    best = max(y_hat)

    mu, sd = surrogate(model, xs_unknown)
    z = (best - mu) / sd

    return (mu - best) * norm.cdf(-z) + sd * norm.pdf(-z)

Code for the Genetic algorithm from lab. 7

In [None]:
GLOBAL = 'Global'
INDIVIDUAL = 'Individual'
CORRELATED = 'Correlated'
STAR = 'star'
RING = 'ring'


class OptFun():
    def __init__(self, wf):
        self.f = wf
        self.history = []
        self.__name__ = f'OptFun({wf.__class__})'

    def __call__(self, candidates, *args, **kwargs):
        y = []
        for x0 in candidates:
            self.history.append(deepcopy(x0))
            y.append(self.f(x0))
        return y

    def minima(self):
        return self.f.minima()

    def bounder(self):
        def fcn(candidate, *args):
            bounds = self.f.suggested_bounds()

            for i, (m, M) in enumerate(zip(*bounds)):
                if candidate[i] < m:
                    candidate[i] = m
                if candidate[i] > M:
                    candidate[i] = M
            return candidate

        return fcn

    def bounds(self):
        return self._convert_bounds(self.f.suggested_bounds())

    def heatmap(self, file=None):
        plt.clf()
        resolution = 50
        fig = plt.figure()
        bounds_lower, bounds_upper = self.f.suggested_bounds()
        x = np.linspace(bounds_lower[0], bounds_upper[0], resolution)
        y = np.linspace(bounds_lower[1], bounds_upper[1], resolution)
        X, Y = np.meshgrid(x, y)
        Z = np.asarray([[self.f((X[i][j], Y[i][j])) for j in range(len(X[i]))] for i in range(len(X))])

        plt.contour(x, y, Z, 15, linewidths=0.5, colors='k')  # height lines
        plt.contourf(x, y, Z, 15, cmap='viridis', vmin=Z.min(), vmax=Z.max())  # heat map
        plt.xlabel('x')
        plt.ylabel('y')
        cbar = plt.colorbar()
        cbar.set_label('z')
        if len(self.history) > 0:  # plot points
            xdata = [x for [x, _] in self.history]
            ydata = [y for [_, y] in self.history]
            plt.plot(xdata, ydata, 'or', markersize=3)
        if file is None:
            plt.show()
        else:
            fig.savefig(file, dpi=400)
        plt.close()

    def heatmapP(self, window: int, init: int = 0, file=None):
        plt.clf()
        resolution = 50
        fig = plt.figure()
        bounds_lower, bounds_upper = self.f.suggested_bounds()
        x = np.linspace(bounds_lower[0], bounds_upper[0], resolution)

        y = np.linspace(bounds_lower[1], bounds_upper[1], resolution)
        X, Y = np.meshgrid(x, y)
        Z = np.asarray([[self.f((X[i][j], Y[i][j])) for j in range(len(X[i]))] for i in range(len(X))])

        plt.contour(x, y, Z, 15, linewidths=0.5, colors='k')  # height lines
        plt.contourf(x, y, Z, 15, cmap='viridis', vmin=Z.min(), vmax=Z.max())  # heat map
        plt.xlabel('x')
        plt.ylabel('y')
        cbar = plt.colorbar()
        cbar.set_label('z')
        if len(self.history) > 0:  # plot points

            plt.plot(
                [x for [x, _] in self.history[:init]],
                [y for [_, y] in self.history[:init]],
                'ow',
                markersize=3,
                alpha=0.5
            )

            list(self.history[init:]
                 | p.Pipe(lambda ps: more_itertools.windowed(ps, n=window, step=window))
                 | p.izip(it.count(1))
                 | p.map(lambda ps: plt.plot([x for [x, _] in ps[0]], [y for [_, y] in ps[0]], 'or', markersize=3,
                                             alpha=0.1 + 0.9 * (ps[1] / (len(self.history) / window))))
                 )

        if file is None:
            plt.show()
        else:
            fig.savefig(file, dpi=400)
        plt.close()

    def plot(self):
        plt.clf()
        values = [self.f(v) for v in self.history]
        min = func.minima()[0].score
        plt.plot(values)
        plt.axhline(min, color="r", label="optimum")
        plt.legend()
        plt.show()

    def _convert_bounds(self, bounds):
        new_bounds = []
        for i in range(len(bounds[0])):
            new_bounds.append((bounds[0][i], bounds[1][i]))
        return new_bounds

    def current_calls(self):
        return len(self.history)


def choice_without_replacement(rng, n, size):
    result = set()
    while len(result) < size:
        result.add(rng.randint(0, n))
    return result


class NumpyRandomWrapper(RandomState):
    def __init__(self, seed=None):
        super(NumpyRandomWrapper, self).__init__(seed)

    def sample(self, population, k):
        if isinstance(population, int):
            population = range(population)

        return asarray([population[i] for i in
                        choice_without_replacement(self, len(population), k)])
        #return #self.choice(population, k, replace=False)

    def random(self):
        return self.random_sample()

    def gauss(self, mu, sigma):
        return self.normal(mu, sigma)


def initial_pop_observer(population, num_generations, num_evaluations,
                         args):
    if num_generations == 0:
        args["initial_pop_storage"]["individuals"] = asarray([guy.candidate
                                                              for guy in population])
        args["initial_pop_storage"]["fitnesses"] = asarray([guy.fitness
                                                            for guy in population])


def generator(case):
    if case == "random":
        return random_generator
    if case == "LHS":
        return lhs_generator
    if case == "Halton":
        return Halton_generator
    if case == "FF":
        return ff_generator


def generator_wrapper(func):
    @functools.wraps(func)
    def _generator(random, args):
        return asarray(func(random, args))

    return _generator


def single_objective_evaluator(candidates, args):
    problem = args["problem"]
    return [CombinedObjectives(fit, args) for fit in
            problem.evaluator(candidates, args)]


def run_ga(random, generator_type, func, num_vars=0,
           maximize=False, **kwargs):
    #create dictionaries to store data about initial population, and lines
    initial_pop_storage = {}

    algorithm = ec.EvolutionaryComputation(random)
    algorithm.terminator = ec.terminators.generation_termination
    algorithm.replacer = ec.replacers.generational_replacement
    algorithm.variator = [ec.variators.uniform_crossover, ec.variators.gaussian_mutation]
    algorithm.selector = ec.selectors.tournament_selection

    algorithm.observer = initial_pop_observer

    kwargs["num_selected"] = kwargs["pop_size"]

    kwargs["bounder"] = func.bounder()
    kwargs["generator"] = generator(generator_type)

    final_pop = algorithm.evolve(evaluator=func,
                                 maximize=False,
                                 initial_pop_storage=initial_pop_storage,
                                 num_vars=num_vars,
                                 **kwargs)

    #best_guy = final_pop[0].candidate
    #best_fitness = final_pop[0].fitness
    final_pop_fitnesses = asarray([guy.fitness for guy in final_pop])
    final_pop_candidates = asarray([guy.candidate for guy in final_pop])

    sort_indexes = sorted(range(len(final_pop_fitnesses)), key=final_pop_fitnesses.__getitem__)
    final_pop_fitnesses = final_pop_fitnesses[sort_indexes]
    final_pop_candidates = final_pop_candidates[sort_indexes]

    best_guy = final_pop_candidates[0]
    best_fitness = final_pop_fitnesses[0]

    return best_guy, best_fitness, final_pop


Samplings methods to implement

In [None]:
def random_generator(
        n: int,
        boundaries: List[Tuple[int, int]],
        rnd: Generator,
) -> ndarray[float]:
    if len(boundaries) == 1:
        [(start, stop)] = boundaries
        return rnd.uniform(low=start, high=stop, size=n)

    return np.asarray([rnd.uniform(low=start, high=stop, size=n) for (start, stop) in boundaries]).T


def lhs_generator(
        n: int,
        boundaries: List[Tuple[int, int]],
        rnd: Generator,
) -> ndarray[float]:
    if len(boundaries) == 1:
        [(start, stop)] = boundaries
        return start + (4 + norm.ppf(
            [
                (i + 1 - rnd.uniform()) / n
                for i
                in rnd.permutation(n)
            ]
        )) * (stop - start) / 8

    return np.asarray(
        [
            start + (4 + norm.ppf(
                [
                    (i + 1 - rnd.uniform()) / n
                    for i
                    in rnd.permutation(n)
                ]
            )) * (stop - start) / 8
            for (start, stop)
            in boundaries
        ]
    )


def Halton_generator(
        n: int,
        boundaries: List[Tuple[int, int]],
        bases: List[int]
) -> ndarray[float]:
    def gen(start: int, stop: int, base: int) -> Iterator[float]:
        n, d = 0, 1
        for _ in range(n):
            x = d - n
            if x == 1:
                n = 1
                d *= base
            else:
                y = d // base
                while x <= y:
                    y //= base
                n = (base + 1) * y - x
            yield start + ((n / d) * (stop - start))

    if len(boundaries) == 1:
        [(start, stop)] = boundaries
        [base] = bases
        return np.fromiter(iter=gen(start, stop, base), dtype=float)

    return np.asarray(
        [np.fromiter(iter=gen(start, stop, base), dtype=float) for [(start, stop), base] in zip(boundaries, bases)]
    )


def ff_generator(
        boundaries: List[Tuple[int, int]],
        m: int
) -> ndarray[float]:
    if len(boundaries) == 1:
        [(start, stop)] = boundaries
        return np.linspace(start=start, stop=stop, num=m)

    return np.asarray(
        it.product(*[np.linspace(start=start, stop=stop, num=m) for (start, stop) in boundaries])
    )


# Exercise 1: Bayesian Optmization

In [None]:
def bo() -> None:
    xs: ndarray[float]
    ys: ndarray[float]
    model: GaussianProcessRegressor
    xs, ys, model = bayesian_optimization(
        generation=10,
        model=GaussianProcessRegressor(),
        acquisition=expected_improvement,
        initial_points=[1 / 3],
        file='ei/ei exp-sine-squared one-third'
    )

    ix: ndarray[int] = argmax(ys)
    print('Best Result: x=%.3f, y=%.3f' % (xs[ix], ys[ix]))


bo()

# Exercise 2: Genetic Algorithm

In [None]:
bfs = [
    ("ackley", bf.Ackley(), None),
    ("de jong 3", bf.DeJong3(), None),
    ("keane", bf.Keane(), -0.68),
    ("picheny goldstein and price", bf.PichenyGoldsteinAndPrice(), None),
    ("mc cormick", bf.McCormick(), None),
    ("michalewicz", bf.Michalewicz(), None),
    ("styblinski and tang", bf.StyblinskiTang(), None),
    ("easom", bf.Easom(), None),
    ("pits and holes", bf.PitsAndHoles(), None),
    ("egg holder", bf.EggHolder(), None),
]


def f(fni, history):
    x = OptFun(fni)
    x.history = history
    return x


shutil.rmtree("out")
Path("out").mkdir(parents=True, exist_ok=False)

In [None]:
def ga() -> None:
    for (name, fn, minima) in bfs:
        func = OptFun(fn)

        args = dict(
            num_vars=2,  # number of dimensions of the search space
            gaussian_stdev=1.0,  # standard deviation of the Gaussian mutations
            tournament_size=2,
            num_elites=1,  # number of elite individuals to maintain in each gen
            pop_size=20,  # population size
            pop_init_range=func.bounds()[0],  # range for the initial population
            max_generations=3 ** 4 - 1,  # number of generations of the GA
            crossover_rate=0.9,
            mutation_rate=0.1
        )

        run_ga(
            np.random.default_rng(seed=0),  # seeded random number generator
            func,
            **args
        )

        # list(
        #     func.history
        #     | p.Pipe(lambda xs: more_itertools.windowed(xs, n=args['pop_size'],   step=args['pop_size']))
        #     | p.map(lambda x: f(fn, x))
        #     | p.izip(it.count(0))
        #     | p.map(lambda x: x[0].heatmap(file=f'out/heatmap {name} {x[1]}.svg'))
        # )

        list(
            range(1, 5)
            | p.map(lambda x: args['pop_size'] * (3 ** x))
            | p.map(lambda x: f(fn, func.history[:x]))
            | p.izip(it.count(0))
            | p.map(lambda x: x[0].heatmapP(window=args['pop_size'], file=f'out/heatmap {name}   {x[1]}.svg'))
        )

        func.plot(minima=minima, file=f'out/trend {name}.svg')


ga()

In this lab, you will need to compare the algorithms studied in the previous lessons with their version enhanced with different DOE techniques.

1.   How do the performances increases? Are the algorithms faster to converge, or can they find better solutions? 
2.   Is there an approach better than the others in terms of performance?
3.   How much do the DOEs affect the search cost? 
