# Implement Cholesky Decomposition

In [None]:
%%writefile cholesky.py
import numpy as np
from scipy.linalg import cholesky as cho, cho_solve

def backwardSubstituition(U:np.array, b:np.array) -> np.array:
    '''
    For upper triangular matrix
    '''
    N = U.shape[0]
    b_ = np.copy(b)
    b_[N - 1] /= U[N - 1, N - 1]
    for idx in range(N - 2, -1, -1):
        b_[idx] = (b_[idx] - (U[idx, idx + 1:N] @ b_[idx + 1:N])) / U[idx, idx]
    return b_

def forwardSubstituition(L:np.array, b:np.array) -> np.array:
    '''
    For lower triangular matrix
    '''
    N = L.shape[0]
    b_ = np.copy(b)
    b_[0] /= L[0, 0]
    for idx in range(1, N):
        b_[idx] = (b_[idx] - (L[idx, :idx] @ b_[:idx])) / L[idx, idx]
    return b_



def cholesky(A:np.array) -> np.array:
    A_ = np.copy(A)
    N = A_.shape[0]
    for idx in range(N):
        A_[idx:N, idx] -= (A_[idx:N, :idx] @ A_[idx, :idx].T)
        A_[idx:N, idx] /= -np.sqrt(-A_[idx, idx]) if A_[idx, idx] < 0 else np.sqrt(A_[idx, idx])

    return np.tril(A_)

def cholesky_solve(L:np.array, b:np.array) -> np.array:
    return backwardSubstituition(L.T, forwardSubstituition(L, b))

# Implement parameter wrapper classes

In [None]:
%%writefile paramsSpace.py

from typing import Iterable
import numpy as np



'''
Class to wrap Real, Integer, Categorical hyperparameter
'''
class Real:
    def __init__(self, low, high) -> None:
        self.__low = low
        self.__high = high
    
    def sample(self, shape):
        s = np.random.uniform(self.__low, self.__high, size=shape)
        return s
    
    def interval(self):
        return np.array([self.__low, self.__high])
    
class Int:
    def __init__(self, low, high) -> None:
        self.__low = low
        self.__high = high

    def sample(self, shape):
        s = np.random.randint(self.__low, self.__high + 1, size=shape)
        return s
    
    def interval(self):
        return np.array([self.__low, self.__high])

class Cat:
    def __init__(self, category:Iterable) -> None:
        self.category = category

    def sample(self, shape):
        s = np.random.randint(0, len(self.category), size=shape)
        return s
    
    def interval(self):
        return np.array([0, len(self.category) - 1])

class Choice:
    def __init__(self, choice:Iterable) -> None:
        self.choice = choice

    def sample(self, shape):
        s = np.random.choice(self.choice, size=shape, replace=True)
        return s
    
    def interval(self):
        return np.array([0, len(self.choice) - 1])

class RealKernels:
    '''
    Class to wrap hyperparameter for Gaussian Process Regressor kernel
    '''
    def __init__(self, bounds:np.array) -> None:
        self.__bounds = bounds

    def sample(self, shape):
        s = np.random.uniform(self.__bounds[:, 0], self.__bounds[:, 1], size=shape)
        return s

    def interval(self):
        return self.__bounds

class AcqParams:
    '''
    Class to wrap hyperparameter so that 
    it can be passed into acquisition function
    '''
    def __init__(self, params) -> None:
        self.__params = params

    def sample(self, shape):
        s = np.zeros(shape)
        for idx, p in enumerate(self.__params):
            s[:, idx] = self.__params[p].sample(shape[0])
        return s

    def interval(self):
        bounds = np.zeros((len(self.__params), 2))
        for idx, p in enumerate(self.__params):
            bounds[idx] = self.__params[p].interval()
        return bounds
    
    def Int_idxs(self):
        mask_int = []
        for idx, p in enumerate(self.__params):
            if type(self.__params[p]) == Int: 
                mask_int.append(idx)
        mask_int = np.array(mask_int)
        return mask_int
    
    def Cat_idxs(self):
        mask_cat = []
        for idx, p in enumerate(self.__params):
            if type(self.__params[p]) == Cat: 
                mask_cat.append(idx)
        mask_cat = np.array(mask_cat)
        return mask_cat




def convert_to_bound(params:dict) -> np.array:
    '''
    Convert hyperparameter dictionary into numpy array bounds
    '''
    if "theta" in params:
        bound = params["theta"].interval()
    elif "x" in params:
        bound = params["x"].interval()
    else:
        bound = np.zeros(shape=(len(params), 2))
        for idx, p in enumerate(params):
            bound[idx] = params[p].interval()
    return bound

def decode_acq_params(encoded, params):
    '''
    Decode the encoded acquisition data back to 
    hyperparameter dictionary
    '''
    params_ = {}
    for idx, p in enumerate(params):
        if type(params[p]) == Int:
            params_[p] = encoded["x"][idx].astype(int)
        elif type(params[p]) == Cat:
            params_[p] = params[p].category[encoded["x"][idx].astype(int)]
        elif type(params[p]) == Real:
            params_[p] = encoded["x"][idx]
    return params_

def convert_to_params(encoded, params):
    '''
    Convert the numpy array (encoded hyperparameter) to
    hyperparameter dictionary
    '''
    params_ = {}
    # print(encoded)
    if "theta" in params:
        params_["theta"] = encoded
        return params_
    elif "x" in params:
        params_["x"] = encoded
        return params_
    for idx, p in enumerate(params):
        if type(params[p]) == Real:
            params_[p] = encoded[idx]
        elif type(params[p]) == Int:
            params_[p] = encoded[idx].astype(int)
        elif type(params[p]) == Cat:
            params_[p] = params[p].category[encoded[idx].astype(int)]
    return params_
            
    

if __name__ == "__main__":
    from keras.layers import Dense, Conv2D
    params = {
        "Layer1":Cat([Dense, Conv2D]),
        "Layer1_filters":Cat([8, 16, 32]),
        "Layer1_kernel_size":Cat([3, 5, 7]),
        "Layer1_units":Int(12, 64),
        "Layer1_activation":Cat(["relu", "sigmoid"]),

        "Layer2_units":Int(32, 128),
        "Layer2_activation":Cat(["relu", "sigmoid"]),
        "optimizer":Cat(["adam", "sgd", "rmsprop"]),
        "epochs":Int(5, 20),
        "batch_size":Int(64, 256),
        "learning_rate":Real(0.0001, 0.01)
    }

# Utils

In [None]:
%%writefile util.py

import matplotlib.pyplot as plt
import numpy as np
# import seaborn as sns
import pandas as pd
from paramsSpace import convert_to_params

def plot1D(x, f, opt, n_samples, label):
    X = x.reshape(-1, 1)
    y = [f(x_) for x_ in x]
    if hasattr(opt, "_gp"):
        model = opt._gp
    elif hasattr(opt, "gp"):
        model = opt.gp

    if hasattr(opt, "x_history") and hasattr(opt, "y_history"):
        plt.scatter(opt.x_history, opt.y_history, c="blue", s=100, marker="+")

    mean, std = model.predict(X, return_std=True)

    y_samples = model.sample_y(X, n_samples)
    for idx, single_prior in enumerate(y_samples.T):
        # print("Single Prior : ", single_prior)
        plt.plot(
            x,
            single_prior.reshape(x.shape[0]),
            linestyle="--",
            alpha=0.7,
            label=f"Sampled function #{idx + 1}",
        )
    plt.plot(x, mean, label=label)
    plt.plot(x, y, label="Actual")
    plt.fill_between(
        x,
        mean.reshape(x.shape[0], ) - std,
        mean.reshape(x.shape[0], ) + std,
        alpha=0.1,
        label=fr"{label} mean $\pm$ 1 std. dev.",
    )
    if hasattr(opt, "x_history") and hasattr(opt, "y_history"):
        plt.scatter(opt.x_history, opt.y_history, c="blue", s=100, marker="+")

    plt.legend()
    # plt.show()

def plot2D(x, y, f, opt):
    x_, y_ = np.meshgrid(x, y)
    # z = f(x_, y_)
    z = np.zeros_like(x_)
    for idx in range(x_.shape[0]):
        for jdx in range(x_.shape[1]):
            z[idx, jdx] = f(x_[idx][jdx], y_[idx][jdx])
    print(z.max())
    fig, axs = plt.subplots(ncols=3)
    
    c_actual = axs[0].contourf(x_, y_, z, levels=100, cmap="viridis")
    axs[0].set_title("Actual")
    axs[0].grid()
    axs[0].plot()


    input_mesh = np.column_stack((x_.ravel(), y_.ravel()))
    mean, std = opt.gp.predict(input_mesh, return_std=True)

    z_predict = mean.reshape(x_.shape)
    c_predict = axs[1].contourf(x_, y_, z_predict, levels=100, cmap="viridis")
    axs[1].set_title("Predict")
    axs[1].grid()
    axs[1].plot()

    if hasattr(opt, "x_history") and hasattr(opt, "y_history"):
        axs[1].scatter(opt.x_history[:opt.init, 0], opt.x_history[:opt.init, 1], c="black", s=75, marker="+")
        axs[1].scatter(opt.x_history[opt.init:, 0], opt.x_history[opt.init:, 1], c="blue", s=75, marker="+")


    diff = np.abs(z - z_predict)
    contour_diff = axs[2].contourf(x_, y_, diff, levels=100, cmap='coolwarm')
    axs[2].set_title("Actual - Predicted")
    axs[2].grid()
    cbar_diff = fig.colorbar(contour_diff, ax=axs[2], orientation='vertical')
    # cbar_diff.set_label('')

    fig.colorbar(c_actual, ax=[axs[0], axs[1]], orientation="horizontal") 
    plt.show()

def kdeplot(y_true, y_pred):
    plt.figure(figsize=(24, 8))
    ax1 = sns.kdeplot(data=y_true, color="r", label="Actual")
    sns.kdeplot(data=y_pred, color="b", label="Predicted")
    ax1.set_title("Predicted VS Actual")
    plt.legend()
    plt.show()

def regplot(y_true, y_pred):
    plt.figure(figsize=(40, 25))
    ax = sns.regplot(x=y_true, y=y_pred)
    plt.title("Best Fit Line")
    ax.set_xlabel("Actual")
    ax.set_ylabel("Predicted")
    plt.show()



from matplotlib.animation import FuncAnimation

class Result:
    def __init__(self, algo_name, func_name, best_sol, 
                best_sol_decoded, best_fitness, params,
                fitness_history, pop_history) -> None:
        self.__algo_name = algo_name
        self.__func_namae = func_name
        self.x = best_sol
        self.x_decoded = best_sol_decoded
        self.fun = best_fitness
        self.__fitness_history = fitness_history
        self.__pop_history = pop_history
        self.__params = params

    def res(self) -> pd.DataFrame:

        # if n_iter is None:
        #     pop = self.__pop_history[-1]
        #     fitness = self.__fitness_history[-1]
        # else:
        #     pop = self.__pop_history[n_iter]
        #     fitness = self.__fitness_history[n_iter]
        n_iter = self.__pop_history.shape[0]
        pop_decoded = []
        for idx in range(n_iter):
            for p in self.__pop_history[idx]:
                pop_decoded.append(convert_to_params(p, self.__params))
        
        fitness = self.__fitness_history.flatten()
        # print(n_iter)
        # print(pop_decoded)
        # print(fitness)
        # print({k:[p[k] for p in pop_decoded] for k in self.__params})
        # print(np.repeat(np.arange(0, n_iter + 1), self.__pop_history.shape[1]))
        return pd.DataFrame(
            {
                **{k:[p[k] for p in pop_decoded] for k in self.__params},
                "fitness":fitness,
                "iteration":np.repeat(np.arange(0, n_iter), self.__pop_history.shape[1])
            }
        )

    def func_name(self):
        return self.__func_namae
    
    def algo_name(self):
        return self.__algo_name

    def fitness_history(self):
        return self.__fitness_history
    
    def population_history(self):
        return self.__pop_history
    
    def plot_fitness(self):
        fitness = np.min(self.__fitness_history, axis=1)
        plt.scatter([x for x in range(self.__fitness_history.shape[0])], fitness, marker="+", c="black", s=50)
        plt.plot([x for x in range(self.__fitness_history.shape[0])], fitness)
        plt.title(f"Fitness Function : {self.__func_namae}\nAlgorithm : {self.__algo_name}")
        plt.xlabel("Iteration")
        plt.ylabel("Fitness")
        plt.grid()
        plt.show()

    def plot_fitness_mean_bar(self):
        # for idx in range((self.__fitness_history.shape[0] )):
        #     plt.scatter([x for x in range(self.__fitness_history.shape[1])], self.__fitness_history[idx], marker="+")
        mean = self.__fitness_history.mean(axis=1)
        plt.bar(x=[x for x in range(self.__fitness_history.shape[0])], height=mean)
        plt.title("Fitness Mean")
        plt.style.use("seaborn-darkgrid")
        plt.show()


# Implement Acquisition Function

In [None]:
%%writefile acquisition.py

from scipy.stats import norm
import numpy as np

class AcqFunc:

    def __init__(self, gp) -> None:
        self.__gp = gp

    def func(self, X, kind:["ucb", "lcb", "pi", "ei", "mix"], **acq_params) -> callable:

        if kind == "ucb":
            return self.GP_UCB(X, **acq_params)
        elif kind == "lcb":
            return self.GP_LCB(X, **acq_params)
        elif kind == "ei":
            return self.GP_EI(X, **acq_params)
        elif kind == "pi":
            return self.GP_PI(X, **acq_params)
        elif kind == "mix":
            pi = self.GP_PI(X, **acq_params)
            ei = self.GP_EI(X, **acq_params)
            ucb = self.GP_UCB(X, **acq_params)
            sum = pi + ei + ucb
            return (pi * pi / sum) + (ei * ei / sum) + (ucb * ucb / sum)
        
    def GP_EI(self, X, xi=0.01, kappa=0, y_opt:float=0):
        # print(y_opt)
        mean, std = self.__gp.predict(X, return_std=True)
        values = np.zeros_like(mean)
        mask = std > 0
        improve = y_opt - xi - mean[mask]
        scaled = improve / std[mask]
        cdf = norm.cdf(scaled)
        pdf = norm.pdf(scaled)
        exploit = improve * cdf
        explore = std[mask] * pdf
        values[mask] = exploit + explore
        return values

    def GP_PI(self, X, xi=0.01, kappa=0, y_opt:float=0):
        mean, std = self.__gp.predict(X, return_std=True)
        values = np.zeros_like(mean)
        mask = std > 0
        improve = y_opt - xi - mean[mask]
        scaled = improve / std[mask]
        values[mask] = norm.cdf(scaled)
        return values

    def GP_LCB(self, X, xi=0, kappa=1.96, y_opt=0):
        mean, std = self.__gp.predict(X, return_std=True)
        return mean - kappa * std

    def GP_UCB(self, X, xi=0, kappa=1.96, y_opt=0):
        # print("X : ", X)
        mean, std = self.__gp.predict(X, return_std=True)
        return mean + kappa * std

# Implement optimizer

In [None]:
%% writefile optimizer.py

import numpy as np
from abc import ABC, abstractmethod
from tqdm import tqdm
from util import Result
from paramsSpace import Real, Int, Cat, convert_to_bound, convert_to_params
import sys

class Optimizer(ABC):
    '''
    Abstract optimizer class
    '''
    def __init__(self, func, params,
                popsize, n_iter, ttl,
                verbose, random_state
                ) -> None:
        
        self.func = func
        self.params = params
        self.popsize = popsize
        self.n_iter = n_iter
        self.verbose = verbose
        self.ttl = ttl

        self.best_score = None
        self.best_params = None
        self.pbounds = None
        self.dimensions = len(self.params) if self.params is not None else None

        # Seed the numpy first
        self.verify_random_state(random_state)
        if self.func is None and self.params is None:
            self.init_pop_fitness()


    @abstractmethod
    def optim(self):
        pass
    
    def init_dimension(self):
        '''
        Get the dimensions of the search space
        '''
        if "theta" in self.params:
            self.dimensions = self.params["theta"].interval().shape[0]
        elif "x" in self.params:
            self.dimensions = self.params["x"].interval().shape[0]
        else:
            self.dimensions = len(self.params)

    def preprocess(self):
        '''
        Record the indices of Int and Cat types in the search space
        '''
        if "theta" in self.params:
            mask_int = np.array([])
            mask_cat = np.array([])
        elif "x" in self.params:
            mask_int = self.params["x"].Int_idxs()
            mask_cat = self.params["x"].Cat_idxs()
        else:
            mask_int = []
            mask_cat = []
            for idx, p in enumerate(self.params):
                if type(self.params[p]) == Int: 
                    mask_int.append(idx)
                elif type(self.params[p]) == Cat:
                    mask_cat.append(idx)
            mask_int = np.array(mask_int)
            mask_cat = np.array(mask_cat)
        return mask_int, mask_cat

    def init_pop_fitness(self):
        '''
        Initialize the population and calculate the fitness of individual
        '''
        self.pop = np.zeros(shape=(self.popsize, self.dimensions))
        if "theta" in self.params:
            self.pop = self.params["theta"].sample((self.popsize, self.dimensions))
        elif "x" in self.params:
            self.pop = self.params["x"].sample((self.popsize, self.dimensions))
        else:
            for idx, p in enumerate(self.params):
                self.pop[:, idx] = self.params[p].sample(shape=self.popsize)
        self.fitness = np.zeros((self.popsize, ))
        iteration = range(self.popsize)

        if self.verbose:
            iteration = tqdm(iteration, desc=f"Initializing {self.__class__.__name__}")
        for idx, ind in zip(iteration, self.pop):      
            f = self.func(**convert_to_params(ind, self.params))
            self.fitness[idx] = f


    def set_func(self, func:callable):
        '''
        Set the function after the instance was initialized
        '''
        self.func = func

    def set_params(self, params:dict):
        '''
        Set the search space after the instance was initialized
        '''
        self.params = params


    def verify_func_params(self):
        '''
        Verify that the target function and parameters range are provided
        '''
        if self.func is None:
            ValueError("Please provide your function")
        elif self.params is None:
            ValueError("Please provide your parameters")

    def verify_random_state(self, random_state):
        if isinstance(random_state, int):
            return np.random.seed(random_state)
        elif random_state is None:
            return np.random.seed()
        
class DifferentialEvol(Optimizer):
    def __init__(self, func:callable=None, params:dict=None, mut_1=0.9, mut_2=0.9, 
                crossp=0.95, popsize=10, ttl=np.inf,
                n_iter=20, verbose=0, random_state=None) -> None:
        super().__init__(func, params, popsize, n_iter, ttl, verbose, random_state)
        self.mut_1 = mut_1
        self.mut_2 = mut_2
        self.crossp = crossp

    def optim(self) -> Result:
        self.verify_func_params()
        self.init_dimension()
        self.init_pop_fitness()

        assert(self.popsize >= 3)
        
        age = np.ones((self.popsize, )) * np.inf
        if self.ttl > 0:
            age = np.ones((self.popsize, )) * self.ttl
            
        
        mask_int, mask_cat = self.preprocess()

        self.pbounds = convert_to_bound(self.params)
        min_b, max_b = self.pbounds.T

        pop_history = np.zeros((self.n_iter + 1, self.popsize, self.dimensions))
        fitness_history = np.zeros((self.n_iter + 1, self.popsize))
        pop_history[0] = self.pop.copy()
        fitness_history[0] = self.fitness.copy()

        best_idx = np.argmax(self.fitness)
        best_x = self.pop[best_idx]
        self.best_score = self.fitness[best_idx]

        for idx in range(self.n_iter):
            iteration = range(self.popsize)
            if self.verbose:
                iteration = tqdm(iteration, file=sys.stdout)

            for jdx in iteration:
                age[jdx] -= 1
                if self.verbose:
                    iteration.set_description_str(desc=f"Differential Evol {idx + 1}")
                    iteration.set_postfix_str(f"best_f : {round(self.best_score, 5)}")
                idxs = [kdx for kdx in range(self.popsize) if kdx != jdx]

                # Mutation strategy
                a, b, c = self.pop[np.random.choice(idxs, 3, replace = False)]
                mutant = a + self.mut_1 * (b - c) + self.mut_2 * (best_x - a)

                # Boundary checking
                mutant[mutant > max_b] = max_b[mutant > max_b]
                mutant[mutant < min_b] = min_b[mutant < min_b]

                # Cross over
                cross_points = np.random.rand(self.dimensions) < self.crossp
                if not np.any(cross_points):
                    cross_points[np.random.randint(0, self.dimensions)] = True

                trial = np.where(cross_points, mutant, self.pop[jdx])
                if mask_int.size:
                    trial[mask_int] = np.round(trial[mask_int])
                if mask_cat.size:
                    trial[mask_cat] = np.round(trial[mask_cat])
                
                trial_ = convert_to_params(trial, self.params)
                f = self.func(**trial_)

                if f > self.fitness[jdx] or (age[jdx] == -1):
                    self.fitness[jdx] = f
                    self.pop[jdx] = trial
                    age[jdx] = self.ttl
                    if f > self.best_score:
                        best_x = trial
                        self.best_score = f

            fitness_history[idx + 1] = self.fitness.copy()
            pop_history[idx + 1] = self.pop.copy()
        self.best_params = convert_to_params(best_x, self.params)

        return Result(self.__class__.__name__, self.func.__name__, 
                    best_x, self.best_params, self.best_score, 
                    self.params, fitness_history, pop_history)

class HarmonySearch(Optimizer):
    def __init__(self, func:callable=None, params:dict=None, HMCR=0.7, PAR=0.3, BW:np.array=None, 
                popsize=10, n_iter=20, ttl=np.inf, verbose=0, random_state=None) -> None:
        super().__init__(func, params, popsize, n_iter, ttl, verbose, random_state)
        self.HMCR = HMCR
        self.PAR = PAR
        self.BW = BW
    
    def optim(self):
        self.verify_func_params()
        self.init_dimension()
        self.init_pop_fitness()
        
        age = np.ones((self.popsize, )) * np.inf
        if self.ttl > 0:
            age = np.ones((self.popsize, )) * self.ttl
        
        mask_int, mask_cat = self.preprocess()

        harmony_history = np.zeros((self.n_iter + 1, self.popsize, self.dimensions))
        fitness_history = np.zeros((self.n_iter + 1,self. popsize))

        self.pbounds = convert_to_bound(self.params)
        min_b, max_b = self.pbounds.T
        diff = max_b - min_b
        if self.BW is not None:
            assert(self.BW.shape[0] == self.dimensions)
            diff = self.BW

        harmony_history[0] = self.pop.copy()
        fitness_history[0] = self.fitness.copy()

        best_idx = np.argmax(self.fitness)
        worst_idx = np.argmin(self.fitness)
        best_harmony = self.pop[best_idx]
        self.best_score = self.fitness[best_idx]

        for idx in range(self.n_iter):
            r1_ = np.random.rand(self.popsize, self.dimensions)
            r2_ = np.random.rand(self.popsize, self.dimensions)
            r3_ = np.random.uniform(low=-1, high=1.001, size=(self.popsize, self.dimensions))
            iteration = range(self.popsize)
            if self.verbose:
                iteration = tqdm(iteration, file=sys.stdout)

            for jdx in iteration:
                age[jdx] -= 1
                if self.verbose:
                    iteration.set_description_str(desc=f"Harmony Search {idx + 1}")
                    iteration.set_postfix_str(f"best_f : {round(self.best_score, 5)}")
                trial = np.zeros(self.dimensions)
                for kdx in range(self.dimensions):
                    if r1_[jdx][kdx] < self.HMCR:
                        trial[kdx] = self.pop[np.random.randint(0, self.popsize)][kdx]
                    else:
                        trial[kdx] = np.random.uniform(min_b[kdx], max_b[kdx] + 0.001)
                    if r2_[jdx][kdx] < self.PAR:
                        trial[kdx] += r3_[jdx][kdx] * diff[kdx]
                trial[trial > max_b] = max_b[trial > max_b]
                trial[trial < min_b] = min_b[trial < min_b]
                if mask_int.size:
                    trial[mask_int] = np.round(trial[mask_int])
                if mask_cat.size:
                    trial[mask_cat] = np.round(trial[mask_cat])

                trial_ = convert_to_params(trial, self.params)
                f = self.func(**trial_)

                # Update worst_idx first and followed by best_idx
                # Ignore the trial harmony if worse than the worst harmony
                if f > self.fitness[worst_idx] or (age[jdx] == -1):
                    self.pop[worst_idx] = trial
                    self.fitness[worst_idx] = f
                    age[jdx] = self.ttl
                    if f > self.best_score:
                        best_harmony = trial
                        self.best_score = f
                    worst_idx = np.argmin(self.fitness)

            self.best_params = convert_to_params(best_harmony, self.params)
            fitness_history[idx + 1] = self.fitness.copy()
            harmony_history[idx + 1] = self.pop.copy()

        return Result(self.__class__.__name__, self.func.__name__, 
                    best_harmony, self.best_params, self.best_score, 
                    self.params, fitness_history, harmony_history)

class ParticleSwarm(Optimizer):
    def __init__(self, func:callable=None, params:dict=None, inertia=.5, cognitive=1.5, social=1.5, 
                popsize=10, n_iter=20, ttl=np.inf,
                verbose=0, random_state=None) -> None:
        super().__init__(func, params, popsize, n_iter, ttl, verbose, random_state)
        self.inertia = inertia
        self.cognitive = cognitive
        self.social = social

    def optim(self) -> Result:

        self.verify_func_params()
        self.init_dimension()
        self.init_pop_fitness()
        
        age = np.ones((self.popsize, )) * np.inf
        if self.ttl > 0:
            age = np.ones((self.popsize, )) * self.ttl
        
        mask_int, mask_cat = self.preprocess()

        swarm_history = np.zeros((self.n_iter + 1, self.popsize, self.dimensions))
        fitness_history = np.zeros((self.n_iter + 1, self.popsize))

        self.pbounds = convert_to_bound(self.params)
        min_b, max_b = self.pbounds.T
        diff = np.fabs(min_b - max_b)

        swarm_history[0] = self.pop.copy()
        fitness_history[0] = self.fitness.copy()
        velocity = np.random.uniform(-np.abs(diff), np.abs(diff), (self.popsize, self.dimensions))
        swarm_best = self.pop.copy()

        best_idx = np.argmax(self.fitness)
        best_position = swarm_best[best_idx]
        self.best_score = self.fitness[best_idx]

        for idx in range(self.n_iter):
            r_p = np.random.rand(self.popsize, self.dimensions)
            r_s = np.random.rand(self.popsize, self.dimensions)
            iteration = range(self.popsize)
            if self.verbose:
                iteration = tqdm(iteration, file=sys.stdout)

            for jdx in iteration:
                age[jdx] -= 1
                if self.verbose:
                    iteration.set_description_str(f"Swarm Particle {idx + 1}")
                    iteration.set_postfix_str(f"best_f : {round(self.best_score, 5)}")
                velocity[jdx, :] = self.inertia * velocity[jdx, :] + self.cognitive * r_p[jdx, :] * (swarm_best[jdx, :] - self.pop[jdx, :]) + \
                                            self.social * r_s[jdx, :] * (best_position - self.pop[jdx, :])
                self.pop[jdx, :] += velocity[jdx, :]

                self.pop[jdx, self.pop[jdx, :] > max_b] = max_b[self.pop[jdx, :] > max_b]
                self.pop[jdx, self.pop[jdx, :] < min_b] = min_b[self.pop[jdx, :] < min_b]
                if mask_int.size:
                    self.pop[jdx][mask_int] = np.round(self.pop[jdx][mask_int])
                if mask_cat.size:
                    self.pop[jdx][mask_cat] = np.round(self.pop[jdx][mask_cat])
                trial_ = convert_to_params(self.pop[jdx], self.params)
                
                f = self.func(**trial_)
                if f > self.fitness[jdx] or (age[jdx] == -1):
                    swarm_best[jdx] = self.pop[jdx]
                    self.fitness[jdx] = f
                    age[jdx] = self.ttl
                    if f > self.best_score:
                        best_position = swarm_best[jdx]
                        self.best_score = f

            fitness_history[idx + 1] = self.fitness.copy()
            swarm_history[idx + 1] = self.pop.copy()


        self.best_params = convert_to_params(best_position, self.params)

        return Result(self.__class__.__name__, self.func.__name__, 
                    best_position, self.best_params, self.best_score, 
                    self.params, fitness_history, swarm_history)



# Implement Gaussian Process Regressor

In [None]:
%%writefile gaussRgr.py

from cholesky import cholesky, cholesky_solve, forwardSubstituition
import numpy as np
from scipy.linalg import cholesky as cho, cho_solve, solve_triangular
import sklearn.gaussian_process.kernels as kernels
import scipy.optimize
from operator import itemgetter
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import optimizer
from paramsSpace import RealKernels

class GPR:
    def __init__(
        self,          
        kernel=None, 
        epsilon=1e-10,
        random_state=None,
        n_restarts_optimizer=0,
        normalize_y=False,
        optimizer="de"
        ) -> None:
        self.y_mean: np.array
        self.epsilon: np.array
        self.alpha: np.array
        self.L: np.array
        self.kernel_ = kernel
        self.epsilon = epsilon
        self.random_state = random_state
        self.X_train, self.y_train = None, None
        self.n_restarts_optimizer= n_restarts_optimizer
        self.normalize_y = normalize_y
        self.optimizer=optimizer

    def fit(self, X, y):
        if self.random_state is None or self.random_state is np.random:
            self.random_state = np.random.mtrand._rand
        if isinstance(self.random_state, int):
            self.random_state = np.random.RandomState(self.random_state)

        if self.kernel_ is None:
            self.kernel = kernels.ConstantKernel(1.0, constant_value_bounds="fixed") * kernels.RBF(
                1.0, length_scale_bounds="fixed"
            )
        else:
            # Use self.kernel so that we get the unfitted kernel not keep 
            # recloning the fitted one since self.kernel_ holds the 
            # original unfitted kernel
            self.kernel = kernels.clone(self.kernel_)

        if self.normalize_y:
            scaler = StandardScaler()
            y = scaler.fit_transform(y)
            self._y_train_mean = scaler.mean_
            self._y_train_std = scaler.scale_

        else:
            self._y_train_mean = np.zeros(1)
            self._y_train_std = 1

        self.X_train = np.copy(X)
        self.y_train = np.copy(y)


        if self.kernel.n_dims > 0:
            def kernelFunc(theta, evaluate_grad=True if self.optimizer=="l-bfgs-b" else False):
                if evaluate_grad:
                    lml, grad = self.log_marginal_likelihood(theta, evaluate_grad)
                    return -lml, -grad
                else:
                    lml = self.log_marginal_likelihood(theta, evaluate_grad)
                    return lml
            if self.optimizer == "l-bfgs-b":
                optima = [
                    (
                        self.optimization(
                            kernelFunc, self.kernel.theta, self.kernel.bounds, self.optimizer
                        )
                    )
                ]
                if self.n_restarts_optimizer > 0:
                    bounds = self.kernel.bounds
                    for iteration in range(self.n_restarts_optimizer):
                        theta_initial = self.random_state.uniform(bounds[:, 0], bounds[:, 1])
                        optima.append(
                            self.optimization(kernelFunc, theta_initial, bounds, self.optimizer)
                        )

                # Get the log-marginal-likelihood value
                lml_values = list(map(itemgetter(1), optima))
                # Get the kernel theta with the smallest negative log-marginal-likelihood value(which means largest)
                self.kernel.theta = optima[np.argmin(lml_values)][0]
                self.kernel._check_bounds_params()
            elif self.optimizer == "de":
                res = optimizer.DifferentialEvol(kernelFunc, {"theta":RealKernels(self.kernel.bounds)}, 
                                        n_iter=self.n_restarts_optimizer * 20 if self.n_restarts_optimizer > 0 else 20,
                                        verbose=0).optim()
            elif self.optimizer == "hs":
                res = optimizer.HarmonySearch(kernelFunc, {"theta":RealKernels(self.kernel.bounds)}, 
                                        n_iter=self.n_restarts_optimizer * 20 if self.n_restarts_optimizer > 0 else 20,
                                        verbose=0).optim()
            elif self.optimizer == "pso":
                res = optimizer.ParticleSwarm(kernelFunc, {"theta":RealKernels(self.kernel.bounds)}, 
                                        n_iter=self.n_restarts_optimizer * 20 if self.n_restarts_optimizer > 0 else 20,
                                        verbose=0).optim()
                # res = Optimizer().optim(kernelFunc, {"theta":RealKernels(self.kernel.bounds)}, self.optimizer, 
                #                         n_iter=self.n_restarts_optimizer * 20 if self.n_restarts_optimizer > 0 else 20,
                #                         verbose=0)
                # print("Res : ", res)
            self.kernel.theta = res.x
                # res.plot_fitness()


        K = self.kernel(self.X_train)
        K[np.diag_indices_from(K)] += self.epsilon
        self.L = cholesky(K)
        self.alpha = cholesky_solve(self.L, self.y_train)

        return self
    
    def predict(self, X, return_std:bool=False, return_cov:bool=False):
        if self.X_train is None:
            self.y_mean = np.zeros(X.shape[0])
            if return_cov and return_std:
                y_cov = self.kernel(X)
                return self.y_mean, y_cov, np.diag(y_cov)
            elif return_cov:
                y_cov = self.kernel(X)
                return self.y_mean, y_cov
            elif return_std:
                y_var = np.diag(self.kernel(X))
                return self.y_mean, np.sqrt(y_var)
            else:
                return self.y_mean
        else:
            K_ = self.kernel(X, self.X_train)
            y_mean = K_ @ self.alpha
            y_mean = self._y_train_std * y_mean + self._y_train_mean

        V = forwardSubstituition(self.L, K_.T)
        if return_cov:
            y_cov = self.kernel(X) - V.T @ V
            y_cov = np.outer(y_cov, self._y_train_std ** 2).reshape(*y_cov.shape, -1)
            if y_cov.shape[2] == 1:
                y_cov = np.squeeze(y_cov, axis=2)
            return y_mean, y_cov
        elif return_std:
            y_var = self.kernel.diag(X)
            y_var -= np.einsum("ij,ji->i", V.T, V)
            y_var = np.outer(y_var, self._y_train_std ** 2).reshape(*y_var.shape, -1)
            if y_var.shape[1] == 1:
                y_var = np.squeeze(y_var, axis=1)
            return y_mean, np.sqrt(y_var)
        else:
            return y_mean


    def sample_y(self, X, n_samples=1, random_state=0):
        rng = np.random.RandomState(random_state)
        y_mean, y_cov = self.predict(X, return_cov=True)
        y_samples = rng.multivariate_normal(y_mean.reshape(y_mean.shape[0]), y_cov, n_samples).T
        return y_samples
    
    def score(self, X, y):
        y_pred = self.predict(X)
        return r2_score(y, y_pred)
    
    def log_marginal_likelihood(self, theta, evaluate_grad):
        kernel = self.kernel
        kernel.theta = theta
        if evaluate_grad:
            K, K_gradient = kernel(self.X_train, eval_gradient=True)
        else:
            K = kernel(self.X_train)

        K[np.diag_indices_from(K)] += self.epsilon

        L = cholesky(K)
        if self.y_train.ndim == 1:
            y_train = self.y_train[..., np.newaxis]
        else:
            y_train = self.y_train

        alpha = cholesky_solve(L, y_train)
        
        log_likelihood_dims = -0.5 * np.einsum("ik, ik->k", y_train, alpha)
        log_likelihood_dims -= np.log(np.diag(L)).sum()
        log_likelihood_dims -= K.shape[0] / 2 * np.log(2 * np.pi)

        log_likelihood = log_likelihood_dims.sum(axis=-1)

        if evaluate_grad:
            # Compute outer product
            # Equivalent to the following lines:
            # for idx in range(alpha.shape[0]):
            #     for jdx in range(alpha.shape[0]):
            #         E[idx, jdx, :] = alpha[idx, :] * alpha[jdx, :]
            inner_term = np.einsum("ik,jk->ijk", alpha, alpha)
            K_inv = cholesky_solve(L, np.eye(K.shape[0]))
            inner_term -= K_inv[..., np.newaxis]
            # Compute trace
            log_likelihood_gradient_dims = 0.5 * np.einsum(
                "ijl,jik->kl", inner_term, K_gradient
            )
            log_likelihood_gradient = log_likelihood_gradient_dims.sum(axis=-1)

            return log_likelihood, log_likelihood_gradient
        else:
            return log_likelihood


        
    def optimization(self, obj_func, initial_theta, bounds, optimizer):
        if optimizer == "l-bfgs-b":
            opt_res = scipy.optimize.minimize(
                obj_func,
                initial_theta,
                method="L-BFGS-B",
                jac=True,
                bounds=bounds,
            )

        return opt_res.x, opt_res.fun

# Implement Bayesian Optimizer

In [None]:
%%writefile bayesOpt.py

import acquisition as acqfunc, numpy as np
from GPR import GPR
from scipy.optimize import minimize
from sklearn.gaussian_process import GaussianProcessRegressor
import pandas as pd
from paramsSpace import convert_to_bound, convert_to_params, AcqParams, decode_acq_params
from tqdm import tqdm
import optimizer

class BayesianOptimizer:
    def __init__(self, f:callable=None, params=None, init_X=None, init_y=None, n_iter=10, n_init=5, 
                kernel=None, verbose=0, random_state=None,
                **gp_params) -> None:
        self.f = f
        self.__n_iter = n_iter
        self.init = n_init
        self.X_train = init_X
        self.y_train = init_y
        self.acq_history = []
        self.params = params
        self.random_state = random_state
        self.__verbose = verbose
        self.best_params:dict
        self.best_score = None

        if self.random_state is None:
            self.random_state = np.random.RandomState()
        elif isinstance(random_state, int):
            self.random_state = np.random.RandomState(random_state)
        else:
            assert isinstance(random_state, np.random.RandomState)

        if self.X_train is None and self.y_train is None and f is not None:
            self.X_train = np.zeros((n_init, len(params)))
            for idx, p in enumerate(params):
                self.X_train[:, idx] = params[p].sample(shape=n_init)

            self.y_train = np.zeros((n_init, ))

            iteration = range(self.X_train.shape[0])
            if verbose:
                iteration = tqdm(iteration, desc=f"Initializing Bayesian Opt")
            for idx in iteration:
                self.y_train[idx] = f(**(convert_to_params(self.X_train[idx], self.params)))                
            if self.y_train.ndim == 1:
                self.y_train = self.y_train.reshape(-1, 1)

        self.pbounds = convert_to_bound(params)

        # Set the default acquisition function
        self.acqFunc = lambda x: acqfunc.AcqFunc(gp=self.gp).func(x.reshape(1, -1), "ucb", kappa=2.576)
        self.gp = GPR(kernel=kernel, random_state=self.random_state, **gp_params)

    def set_func(self, f:callable, params:dict=None):
        if params is not None:
            self.params = params
            self.pbounds = convert_to_bound(params)     
        else:
            params = self.params

        if self.X_train is None and self.y_train is None:
            self.X_train = np.zeros((self.init, len(params)))
            for idx, p in enumerate(params):
                self.X_train[:, idx] = params[p].sample(shape=self.init)

            self.y_train = np.zeros((self.init, ))
            iteration = range(self.X_train.shape[0])
            if self.__verbose:
                iteration = tqdm(iteration, desc=f"Initializing Bayesian Opt")
            for idx in iteration:
                self.y_train[idx] = f(**(convert_to_params(self.X_train[idx], self.params)))                
            if self.y_train.ndim == 1:
                self.y_train = self.y_train.reshape(-1, 1)
        self.f = f

    def set_params(self, params:dict):
        self.params = params

    def set_acqfunc(self, acqfunction:["ucb", "lcb", "pi", "ei"]="ucb", **acq_params):
        '''
        Set acquisition function and pass in the parameters of acquisition function, for
        detail, refer to AcqFunc class
        '''
        self.__acq_type = acqfunction
        self.__acq_params = acq_params

    def optimize(self, opt="de",  n_samples=25, **opt_params):
        if (self.f is None or self.params is None):
            raise ValueError("Please ensure you provide target function and the search space")
        
        params = {
            "x":AcqParams(self.params)
        }

        iteration = range(self.__n_iter)
        if self.__verbose:
            iteration = tqdm(iteration, desc="Bayesian Optimization")

        for idx in iteration:
            if self.__verbose:
                iteration.set_postfix_str(f"best_f : {round(self.y_train.max(), 5)}")
            max_acq = -np.inf
            self.gp = self.gp.fit(self.X_train, self.y_train)
            self.acqFunc = lambda x: acqfunc.AcqFunc(gp=self.gp).func(x.reshape(1, -1), self.__acq_type, y_opt=self.y_train.max(), **self.__acq_params)
                        
            if opt == "de":
                res = optimizer.DifferentialEvol(self.acqFunc, params, 
                                        popsize=n_samples,
                                        random_state=self.random_state,
                                        **opt_params).optim()
            elif opt == "hs":
                res = optimizer.HarmonySearch(self.acqFunc, params,
                                        popsize=n_samples, 
                                        random_state=self.random_state,
                                        **opt_params).optim()
            elif opt == "pso":
                res = optimizer.ParticleSwarm(self.acqFunc, params, 
                                        popsize=n_samples,
                                        random_state=self.random_state,
                                        **opt_params).optim()
            x_max = res.x
            x_max_encoded = res.x_decoded
            max_acq = res.fun

            self.acq_history.append(max_acq.squeeze() if type(max_acq) == np.array else max_acq)
            self.X_train = np.vstack((self.X_train, [x_max]))
            self.y_train = np.vstack((self.y_train, [self.f(**decode_acq_params(x_max_encoded, self.params))]))

        best_idx = np.argmax(self.y_train)
        self.best_params = convert_to_params(self.X_train[best_idx], self.params)
        self.best_score = np.max(self.y_train)

    def res(self) -> pd.DataFrame:
        acq = ["/"] * self.init
        acq.extend(self.acq_history)
        decoded = []
        for x in self.X_train:
            decoded.append(convert_to_params(x, self.params))
        df = pd.DataFrame(
            {
                "Acq":acq,
                "func":self.y_train.flatten(),
                **{k:[p[k] for p in decoded] for k in self.params},
            }
        )
        return df

# Implement train, validation and test generator

In [None]:
def initiateGenerator(path, batchSize, imageSize):
    base_path = path
    print("\nTotal : ", end=" ")
    train_dataset = tf.keras.preprocessing.image_dataset_from_directory(batch_size=batchSize, 
                                                                        directory=base_path+"/"+"train")

    train_datagen = ImageDataGenerator(rescale=1./255)

    print("\nFor Training : ", end=" ")
    train_generator = train_datagen.flow_from_directory(
        base_path+"/"+"train",
        target_size=(imageSize, imageSize),
        batch_size=batchSize,
        class_mode='categorical', subset='training')

    print("\nFor Val : ", end=" ")
    valid_datagen = ImageDataGenerator(rescale=1./255)
    validation_generator = valid_datagen.flow_from_directory(
        base_path+"/"+"valid",
#                 base_path+"/"+"train",

#         base_path + "/" + "Training",
        target_size=(imageSize, imageSize),
        batch_size=batchSize,
        class_mode='categorical',shuffle=False)
    
    print("\nFor Test : ", end=" ")
    test_datagen = ImageDataGenerator(rescale=1./255)
    test_generator = test_datagen.flow_from_directory(
#         base_path+"/"+"Testing",
        base_path + "/" + "test",
        target_size=(imageSize, imageSize),
        batch_size=batchSize,
        class_mode='categorical', shuffle=False)
    class_names = train_dataset.class_names
    noOfClasses = len(class_names)
    print("\nNo of Classes : ", noOfClasses)
    print("Classes : ", class_names)

        
    return noOfClasses,class_names, train_generator, validation_generator, test_generator

In [None]:
from paramsSpace import Int, Real, Cat
from bayesOpt import BayesianOptimizer
import optimizer
from keras import backend as K
from sklearn.gaussian_process import kernels

import tensorflow as tf
from keras.preprocessing import image
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras import layers, models
from keras.datasets import cifar10
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Rescaling, Conv2D, MaxPool2D, Dropout, Dense, Flatten, BatchNormalization
import numpy as np
from sklearn.utils import class_weight
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def plot(model, model_name, labels, X_test, y_test):
    y_pred = model.predict(X_test)
    y_pred = np.argmax(y_pred, axis=1)

    recall = recall_score(y_test, y_pred,average='weighted')
    p = precision_score(y_test, y_pred,average='weighted')
    f1 = f1_score(y_test, y_pred,average='weighted')
    acc = accuracy_score(y_test, y_pred)

    conf_mat = confusion_matrix(y_test, y_pred)
    print(conf_mat)
    conf_df = pd.DataFrame(conf_mat, index=labels, columns=labels)
    plt.figure(figsize=(10, 8))
    plt.title(f"{model_name}_{acc:.3f}")
    sns.heatmap(conf_df, annot=True, fmt="g")
    plt.savefig( f"{model_name}_{acc:.3f}_confusionMatrix.png")
    plt.show()

    print(classification_report(y_test, y_pred))
    report = {
        c : 0 for c in labels
    }
    report.update(classification_report(y_test, y_pred, output_dict=True))
    for idx, _ in enumerate(labels):
        report[_] = report[f"{idx}"]
        del report[f"{idx}"]
    del report["accuracy"]
    df = pd.DataFrame(report).transpose()
    plt.figure(figsize=(10, 8))
    plt.title(f"{model_name}_{acc:.3f}")
    sns.heatmap(df, annot=True)
    plt.savefig(f"{model_name}_{acc:.3f}_classificationReport.png")
    plt.show()

mpath = "/kaggle/input/birds-20-species-image-classification"
imageSize = 64

# Change batch_size and epochs
batchSize = 16
epochs = 30

noOfClasses, class_names, train_generator, validation_generator, test_generator = initiateGenerator(mpath, batchSize=batchSize, imageSize=imageSize)
INPUT_SHAPE = (imageSize, imageSize, 3)
KERNEL_SIZE = (3, 3)

# Calculate class_weight to deal with classes imbalance problem
class_weight = class_weight.compute_class_weight(
                class_weight='balanced',
                classes=np.unique(train_generator.classes), 
                y=train_generator.classes)
class_weight = {x : class_weight[x] for x in range(len(class_weight))}
print("class weight: ", class_weight)

# Setup Base Model

In [None]:
record = []
y_best = 0

for idx in range(100):

    model = Sequential()

    # Change Layer1
    model.add(Conv2D(filters=32, kernel_size=KERNEL_SIZE, input_shape=INPUT_SHAPE, activation='relu', padding='same'))
    model.add(BatchNormalization())
    
    # Change Layer2
    model.add(Conv2D(filters=32, kernel_size=KERNEL_SIZE, activation='relu', padding='same'))
    model.add(BatchNormalization())
    
    model.add(MaxPool2D(pool_size=(2, 2)))
    
    # Change Drop1
    model.add(Dropout(0.25))

    # Change Layer3
    model.add(Conv2D(filters=64, kernel_size=KERNEL_SIZE, activation='relu', padding='same'))
    model.add(BatchNormalization())
    
    # Change Layer4
    model.add(Conv2D(filters=64, kernel_size=KERNEL_SIZE, activation='relu', padding='same'))
    model.add(BatchNormalization())
    
    model.add(MaxPool2D(pool_size=(2, 2)))
    
    # Change Drop2
    model.add(Dropout(0.25))

    # Change Layer5
    model.add(Conv2D(filters=128, kernel_size=KERNEL_SIZE, activation='relu', padding='same'))
    model.add(BatchNormalization())
    
    # Change Layer6
    model.add(Conv2D(filters=128, kernel_size=KERNEL_SIZE, activation='relu', padding='same'))
    model.add(BatchNormalization())
    
    model.add(MaxPool2D(pool_size=(2, 2)))
    
    # Change Drop3
    model.add(Dropout(0.25))

    model.add(Flatten())

    # Change Layer7
    model.add(Dense(128, activation='relu'))
    
    # Change Drop4
    model.add(Dropout(0.25))
    
    model.add(Dense(noOfClasses, activation='softmax'))

    # Change optimizer
    model.compile(optimizer='adam',
                loss='categorical_crossentropy',
                metrics=['accuracy']
                )
    
    # Set learning_rate
    K.set_value(model.optimizer.learning_rate, 0.001)

    # Train the model
    model.fit(train_generator, epochs=epochs, 
            validation_data=validation_generator,
            class_weight=class_weight
            )
    test_loss, test_acc = model.evaluate(test_generator)
    record.append(test_acc)
    if test_acc > y_best:
        y_best = test_acc
        
        # Change the "Naive" to the category you choose
        plot(model, "Naive", class_names, test_generator, test_generator.classes)
        
# Change the csv name
pd.DataFrame({"acc":record}).to_csv(f"Naive.csv")

# Blackbox Function

In [None]:
annealer = ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=2, verbose=1, min_lr=1e-5, mode="max")
early = EarlyStopping(monitor="val_accuracy", patience=3, verbose=1, mode="max", baseline=0.2)


def hyperTune(y_best=None, filename="", **kwargs):
    train_generator.batch_size = kwargs["batch_size"]
    validation_generator.batch_size = kwargs["batch_size"]
    test_generator.batch_size = kwargs["batch_size"]

    model = Sequential()
    model.add(Conv2D(filters=kwargs["Layer1_filter"], kernel_size=KERNEL_SIZE, input_shape=INPUT_SHAPE, activation=kwargs["Layer1_act"], padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(filters=kwargs["Layer2_filter"], kernel_size=KERNEL_SIZE, activation=kwargs["Layer2_act"], padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPool2D(pool_size=(2, 2)))
    model.add(Dropout(kwargs["Drop1"]))
    
    model.add(Conv2D(filters=kwargs["Layer3_filter"], kernel_size=KERNEL_SIZE, activation=kwargs["Layer3_act"], padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(filters=kwargs["Layer4_filter"], kernel_size=KERNEL_SIZE, activation=kwargs["Layer4_act"], padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPool2D(pool_size=(2, 2)))
    model.add(Dropout(kwargs["Drop2"]))

    model.add(Conv2D(filters=kwargs["Layer5_filter"], kernel_size=KERNEL_SIZE, activation=kwargs["Layer5_act"], padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(filters=kwargs["Layer6_filter"], kernel_size=KERNEL_SIZE, activation=kwargs["Layer6_act"], padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPool2D(pool_size=(2, 2)))
    model.add(Dropout(kwargs["Drop3"]))

    model.add(Flatten())
    model.add(Dense(kwargs["Layer7_units"], activation=kwargs["Layer7_act"]))
    model.add(Dropout(kwargs["Drop4"]))
    model.add(Dense(noOfClasses, activation='softmax'))

    # Compile the model
    model.compile(optimizer=kwargs["optimizer"],
                loss='categorical_crossentropy',
                metrics=['accuracy'])

    K.set_value(model.optimizer.learning_rate, kwargs["learning_rate"])
    print(
        "optim : ", kwargs["optimizer"], 
        "lr : ", kwargs["learning_rate"], 
        "bs : ", kwargs["batch_size"]
        )
    print("optim : ", model.optimizer)
    print("lr : ", model.optimizer.learning_rate)
    print("bs : ", train_generator.batch_size)

    model.summary()

    # Train the model
    model.fit(train_generator, epochs=kwargs["epochs"], 
            validation_data=validation_generator,
            class_weight=class_weight,
            callbacks=[annealer, early])
    test_loss, test_acc = model.evaluate(test_generator)
    if y_best is not None and test_acc > y_best:
        plot(model, f"{filename}_Tuned", class_names, test_generator, test_generator.classes)
    return test_acc

# Setup Bayesian Optimizer

In [None]:
optim = BayesianOptimizer(
    kernel=kernels.Matern(nu=2.5),
    params=params,
    n_iter=100,
    n_init=10, 
    n_restarts_optimizer=5,
    epsilon=1e-6,
    normalize_y=True,
    optimizer="de",
    verbose=1
)
optim.set_func(lambda **x:hyperTune(optimizer.best_score(), "byopt de", **x))
optim.set_acqfunc(acqfunction="mix")
optim.optimize(n_samples=30, 
                opt="de",
                n_iter=10,
                )
print(optim.res())
optim.res().to_csv("byopt de tuned.csv")

optim = BayesianOptimizer(
    kernel=kernels.Matern(nu=2.5),
    params=params,
    n_iter=100,
    n_init=10, 
    n_restarts_optimizer=5,
    epsilon=1e-6,
    normalize_y=True,
    optimizer="pso",
    verbose=1
)
optim.set_func(lambda **x:hyperTune(optim.best_score(), "byopt pso", **x))
optim.set_acqfunc(acqfunction="mix")
optim.optimize(n_samples=30, 
                opt="pso",
                n_iter=10,
                )
print(optim.res())
optim.res().to_csv("byopt pso tuned.csv")

optim = BayesianOptimizer(
    kernel=kernels.Matern(nu=2.5),
    params=params,
    n_iter=100,
    n_init=10, 
    n_restarts_optimizer=5,
    epsilon=1e-6,
    normalize_y=True,
    optimizer="hs",
    verbose=1
)
optim.set_func(lambda **x:hyperTune(optim.best_score(), "byopt hs", **x))
optim.set_acqfunc(acqfunction="mix")
optim.optimize(n_samples=30, 
                opt="hs",
                n_iter=10,
                )
print(optim.res())
optim.res().to_csv("byopt hs tuned.csv")


# Setup Optimzier

In [None]:
import optimizer

de_optimizer = optimizer.DifferentialEvol(params=params, popsize=10, n_iter=10, verbose=1, 
#                                           random_state=42
                                        )
de_optimizer.set_func(lambda **x:hyperTune(de_optimizer.best_score, "DE", **x))
res = de_optimizer.optim()
print(res.res())
res.res().to_csv("de tuned.csv")
print(de_optimizer.best_params)
print(de_optimizer.best_score)

pso_optimizer = optimizer.ParticleSwarm(params=params, popsize=10, n_iter=10, verbose=1, 
#                                           random_state=42
                                        )
pso_optimizer.set_func(lambda **x:hyperTune(pso_optimizer.best_score, "PSO", **x))
res = pso_optimizer.optim()
print(res.res())
res.res().to_csv("pso tuned.csv")
print(pso_optimizer.best_params)
print(pso_optimizer.best_score)

hs_optimizer = optimizer.HarmonySearch(params=params, popsize=10, n_iter=10, verbose=1, 
#                                           random_state=42
                                        )
hs_optimizer.set_func(lambda **x:hyperTune(hs_optimizer.best_score, "HS", **x))
res = hs_optimizer.optim()
print(res.res())
res.res().to_csv("hs tuned.csv")
print(hs_optimizer.best_params)
print(hs_optimizer.best_score)