In [1]:
from library.optimiser import *
from library.post_analysis import *
from library.experiments import *
%matplotlib inline
SEED = 23191

In [2]:
class objective_func(ABC):
    @abstractmethod
    def func(self, x):
        pass
    @abstractmethod
    def dfunc(self, x):
        pass
    @abstractmethod
    def get_optimal(self):
        pass
    @abstractmethod
    def get_optimum(self):
        pass
    def visualise2d(self, lim, n):
        x, y = np.linspace(-lim, lim, n), np.linspace(-lim, lim, n)
        xx, yy = np.meshgrid(x, y)
        zz = np.zeros(xx.shape)
        for j in range(n):
            for i in range(n):
                zz[j, i] = self.func((x[i], y[j]))
        fig = plt.figure(figsize=(4,4))
        ax = fig.add_subplot(111)
        sc = ax.scatter(x=xx.ravel(), y=yy.ravel(), c=zz.ravel())
        ax.scatter(x=[self.optimal[0]], y=[self.optimal[1]], c='red', marker='x')
        plt.colorbar(sc)
        fig.show()
        return ax
    def visualise3d(self, lim, n):
        x, y = np.linspace(-lim, lim, n), np.linspace(-lim, lim, n)
        z = []
        for i in y:
            z_line = []
            for j in x:
                z_line.append(self.func([j,i]))
            z.append(z_line)
        fig = go.Figure(data=[go.Surface(z=z, x=x, y=y),  \
                              go.Scatter3d(x=[self.optimal[0]], y=[self.optimal[1]], z=[self.optimum])])
        fig.update_layout(autosize=False,
                          scene_camera_eye=dict(x=1.87, y=0.88, z=-0.64),
                          width=500, height=500,
                          margin=dict(l=65, r=50, b=65, t=90))
        fig.show()
    def visualise_gradient(self, lim, n):
        x, y = np.linspace(-lim, lim, n), np.linspace(-lim, lim, n)
        xx, yy = np.meshgrid(x, y)
        zz = np.zeros((n, n, 2))
        for j in range(len(y)):
            for i in range(len(x)):
                zz[j, i, :] = self.dfunc([x[i], y[j]])
        fig = plt.figure(figsize=(8,8))
        ax = fig.add_subplot(111)
        ax.quiver(xx,yy,zz[:,:,0],zz[:,:,1])
        ax.scatter(x=[self.optimal[0]], y=[self.optimal[1]], c='red', marker='x')
        fig.show()
        return ax
    def visualise2d_section(self, pos, dire):
        fig = plt.figure(figsize=(4,4))
        xs = np.linspace(-self.lim, self.lim, 301)
        fs = []
        if dire == 'x':
            for x in xs:
                fs.append(self.func([x, pos]))
        else:
            for x in xs:
                fs.append(self.func([pos, x]))
        plt.plot(xs, fs)
        fig.show()
    def visualise2d_section_gradient(self, pos, dire):
        fig = plt.figure(figsize=(4,4))
        xs = np.linspace(-self.lim, self.lim, 300)
        dfs = []
        if dire == 'x':
            for x in xs:
                dfs.append(self.dfunc([x, pos]))
        else:
            for x in xs:
                dfs.append(self.dfunc([pos, x]))
        dfs = np.array(dfs)
        plt.plot(xs, dfs[:,0])
        plt.plot(xs, dfs[:,1])
        fig.show()
        
class ackley(objective_func):
    def __init__(self, dim=2):
        self.optimal = np.array([0, 0])
        self.optimum = 0
        self.lim = 25
        self.dim = dim
    def func(self, x):
        '''
        the period of local minimum along each axis is 1, integer coordinate (1,1), (2,3)... 
        x and y is interchangeable
        global minimum is 0 with arguments x=y=0
        local minimums far away from orgin are 20
        supremum is 20 + e - 1/e = 22.35
        symmetric along x=0, y=0, y=x lines
        disappearing global gradient when far away from optimal
        '''
        arg1 = -0.2 * np.sqrt(np.power(x, 2).mean())
        arg2 = np.cos(2*np.pi*x).mean()
        return -20. * np.exp(arg1) - np.exp(arg2) + 20. + np.e
    def dfunc(self, x):
        if np.linalg.norm(x) == 0:
            return x
        arg1 = -0.2 * np.sqrt(np.power(x, 2).mean())
        arg2 = np.cos(2*np.pi*x).mean()
        g = lambda xx: -0.8 * xx / arg1 * np.exp(arg1) / self.dim + 2 * np.pi * np.sin(2 * np.pi * xx) * np.exp(arg2) / self.dim
        return g(x)
    def get_optimal(self):
        return self.optimal
    def get_optimum(self):
        return self.optimum
a = ackley()
print("check func value: expected:", a.get_optimum(), ", actual:", a.func(a.get_optimal()))
print("check gradient: expected:[0,0], actual:",a.dfunc(a.get_optimal()))

check func value: expected: 0 , actual: 4.440892098500626e-16
check gradient: expected:[0,0], actual: [0 0]


In [3]:
a.func(np.ones(12,))

3.6253849384403627

In [7]:
ad = adam()
optmizerParas = {'x0': np.array([12.34232, 34.3412, 34.3, 23.3434]),
         'alpha': 0.01,
         'beta_1': 0.9, 
         'beta_2': 0.999, 
         'epsilon': 1e-8, 
         'max_iter': 1000,
         'tol': 1e-5,              
         'verbose': True,
         'record': False}
ad.set_parameters(optmizerParas)
ad.optimise(a)



*******starting optimisation from intitial point:  [12.34232 34.3412  34.3     23.3434 ]


(array([11.99985268, 33.99972505, 33.99985151, 22.99979022]),
 19.915226349433443,
 {'status': None, 'evals': 163})

In [8]:
ls = line_search()
optmizerParas = {'x0': np.array([12.34232, 34.3412, 34.3, 23.3434]),
         'alpha': 1,
         'beta': 0.1, 
         'max_iter': 1000,
         'tol': 1e-5,              
         'verbose': True,
         'record': False}
ls.set_parameters(optmizerParas)
ls.optimise(a)


*******starting optimisation from intitial point:  [12.34232 34.3412  34.3     23.3434 ]


(array([11.99993067, 33.99980343, 33.99980345, 22.99986705]),
 19.91522607560959,
 {'status': None, 'evals': 27})

In [None]:
import numpy as np
import ABC

class optimizer(ABC):
    @abstractmethod
    def set_parameters(self, para):
        '''
        input: parameters, in dictionary
        '''
        pass
    @abstractmethod
    def optimise(self, objective_cls):
        '''
        input: objective function class
        output: empirical found optimal, optimum, and statistics of procedure information
        '''
        pass
    
class adjust_optimizer(optimizer):
    def adjust(self, x0, obj):
        self.x0 = x0
        arg, val, stats = self.optimise(obj)
        return arg, val, stats['evals']
    

In [None]:
class cma_es(adjust_optimizer):
    def __init__(self, dim=2):
        self.dim = dim
        paras = {'x0': np.zeros((dim,)),
                 'std': np.ones((dim,)) * 3, 
                 'tol': 1e-5, 
                 'adjust_func': do_nothing(), 
                 'record': False, 
                 'verbose': False}
        self.set_parameters(paras)
    def set_parameters(self, paras):
        self.paras = paras
        self.x0 = paras['x0'] 
        self.std = paras['std']
        self.tol = paras['tol']
        self.adjust_func = paras['adjust_func']
        self.max_iter = 400 if 'max_iter' not in paras.keys() else paras['max_iter']
        # set none to use default value 
        self.cluster_size = None if 'cluster_size' not in paras.keys() else paras['cluster_size']
        self.survival_size = None if 'survival_size' not in paras.keys() else paras['survival_size']
        self.record = True if 'record' not in paras.keys() else paras['record']
        self.verbose = True if 'verbose' not in paras.keys() else paras['verbose']
    def optimise(self, obj):
        '''
        @param obj: objective function class instance
        return arg: found minimum arguments
               val: found minimum value
               stats: collection of recorded statistics for post-analysis
        '''                  
        def update_mean(x):
            return (weights @ x).reshape(dim, 1)
        def update_ps(ps, sigma, C, mean, mean_old):
            return (1 - cs) * ps + np.sqrt(cs * (2 - cs) * mueff) * invsqrtC @ (mean - mean_old) / sigma 
        def update_pc(pc, sigma, ps, mean, mean_old):
            hsig = np.abs(ps) / np.sqrt(1 - (1 - cs)**(2 * iter_/lambda_)) / chiN < 1.4 + 2/(dim + 1)
            return (1 - cc) * pc + hsig * np.sqrt(cc * (2 - cc) * mueff) * (mean - mean_old) / sigma
        def update_C(C, pc, x, mean_old, sigma):
            hsig = np.abs(ps) / np.sqrt(1 - (1 - cs)**(2 * iter_/lambda_)) / chiN < 1.4 + 2/(dim + 1)
            artmp = (1 / sigma) * (x - mean_old.reshape(1, dim))
            return (1 - c1 - cmu) * C + c1 * (pc * pc.T + (1 - hsig) * cc * (2 - cc) * C) + cmu * artmp.T @ np.diag(weights) @ artmp
        def update_sigma(sigma, ps):
            return sigma * np.exp((cs / damps) * (np.linalg.norm(ps)/ chiN - 1))
        def is_not_moving(arg, val, pre_arg, pre_val, tol):
            dis_arg = np.linalg.norm(arg - pre_arg)
            dis_val = np.linalg.norm(val - pre_val)
            return (dis_arg < tol and dis_val < tol*1e5) or (dis_val < tol and dis_arg < tol*1e5) 

        if self.verbose:
            print("\n\n*******starting optimisation from intitial mean: ", self.x0.ravel())
        # User defined input parameters 
        dim = 2    
        sigma = 0.3
        D = self.std / sigma
        mean = self.x0

        # the size of solutions group
        lambda_ = 4 + int(3 * np.log(dim)) if self.cluster_size == None else self.cluster_size  
        # only best "mu" solutions are used to generate iterations
        mu = int(lambda_ / 2) if self.survival_size == None else self.survival_size
        # used to combine best "mu" solutions                                               
        weights = np.log(mu + 1/2) - np.log(np.arange(mu) + 1) 
        weights = weights / np.sum(weights)     
        mueff = np.sum(weights)**2 / np.sum(weights**2) 

        # Strategy parameter setting: Adaptation
        # time constant for cumulation for C
        cc = (4 + mueff / dim) / (dim + 4 + 2 * mueff / dim)  
        # t-const for cumulation for sigma control
        cs = (mueff + 2) / (dim + mueff + 5)  
        # learning rate for rank-one update of C
        c1 = 2 / ((dim + 1.3)**2 + mueff)    
        # and for rank-mu update
        cmu = min(1 - c1, 2 * (mueff - 2 + 1 / mueff) / ((dim + 2)**2 + mueff))  
        # damping for sigma, usually close to 1  
        damps = 1 + 2 * max(0, np.sqrt((mueff - 1)/( dim + 1)) - 1) + cs                                                                 

        # Initialize dynamic (internal) strategy parameters and constants
        # evolution paths for C and sigma
        pc = np.zeros((dim, 1))     
        ps = np.zeros((dim, 1)) 
        # B defines the coordinate system
        B = np.eye(dim)       
        # covariance matrix C
        C = B * np.diag(D**2) * B.T 
        # C^-1/2 
        invsqrtC = B * np.diag(D**-1) * B.T   
        # expectation of ||N(0,I)|| == norm(randn(N,1)) 
        chiN = dim**0.5 * (1 - 1/(4 * dim) + 1 / (21 * dim**2))  

        # --------------------  Initialization --------------------------------  
        x, x_old, f = np.zeros((lambda_, dim)), np.zeros((lambda_, dim)), np.zeros((lambda_,))
        stats = {}
        stats['val'], stats['arg'] = [], []
        stats['x_adjust'] = []
        iter_eval, stats['evals_per_iter'] = np.zeros((lambda_, )), []
        stats['mean'], stats['std'] = [], []
        stats['status'] = None
        iter_, eval_ = 0, 0

        # initial data in record
        for i in range(lambda_):
            x[i] = (mean + np.random.randn(dim, 1)).ravel()
            f[i] = obj.func(x[i])
        idx = np.argsort(f)
        x_ascending = x[idx]
        if self.record:
            stats['arg'].append(x_ascending)
            stats['val'].append(f[idx])
            stats['mean'].append(mean)
            stats['std'].append(sigma * B @ np.diag(D))
            stats['evals_per_iter'].append(np.ones((lambda_,)))
            stats['x_adjust'].append(np.vstack((x.T.copy(), x.T.copy())))
        arg = x_ascending
        val = f[idx]
        pre_arg = x_ascending
        pre_val = f[idx]
        
        # optimise by iterations
        try:
            while iter_ < self.max_iter:
                iter_ += 1
                
                # generate candidate solutions with some stochastic elements
                for i in range(lambda_):
                    x[i] = (mean + sigma * B @ np.diag(D) @ np.random.randn(dim, 1)).ravel() 
                    x_old[i] = x[i]
                    x[i], f[i], eval_cnt = self.adjust_func.adjust(x[i], obj)
                    eval_ += eval_cnt
                    iter_eval[i] = eval_cnt
                # sort the value and positions of solutions 
                idx = np.argsort(f)
                x_ascending = x[idx]

                # update the parameter for next iteration
                mean_old = mean
                mean = update_mean(x_ascending[:mu])
                ps =   update_ps(ps, sigma, C, mean, mean_old)
                pc =   update_pc(pc, sigma, ps, mean, mean_old)
                sigma = update_sigma(sigma, ps)
                C =    update_C(C, pc, x_ascending[:mu], mean_old, sigma)
                C = np.triu(C) + np.triu(C, 1).T
                D, B = np.linalg.eig(C)
                D = np.sqrt(D)
                invsqrtC = B @ np.diag(D**-1) @ B

                # record data during process for post analysis
                if self.record:
                    stats['arg'].append(x_ascending)
                    stats['val'].append(f[idx])
                    stats['mean'].append(mean)
                    stats['std'].append(sigma * B @ np.diag(D))
                    stats['evals_per_iter'].append(iter_eval.copy())
                    stats['x_adjust'].append(np.vstack((x_old.T.copy(), x.T.copy())))
                # stopping condition    
                arg = x_ascending
                val = f[idx]
                
                # check the stop condition
                if np.max(D) > (np.min(D) * 1e6):
                    stats['status'] = 'diverge'
                    print('diverge, concentrate in low dimension manifold')
                    break
                if is_not_moving(arg, val, pre_arg, pre_val, self.tol) :
                    break
                pre_arg = arg
                pre_val = val
        except np.linalg.LinAlgError as err:
            stats['status'] = 'diverge'
            print('diverge, raise LinAlgError!')
        finally:
            if self.verbose:
                print('eigenvalue of variance = {}'.format(D))
                print('total iterations = {}, total evaluatios = {}'.format(iter_, eval_))
                print('found minimum position = {}, found minimum = {}'.format(arg[0], val[0]))

        # carry statistics info before quit
        if self.record:
            stats['arg'] = np.array(stats['arg'])
            stats['val'] = np.array(stats['val'])
            stats['mean'] = np.array(stats['mean'])
            stats['std'] = np.array(stats['std'])
            stats['evals_per_iter'] = np.array(stats['evals_per_iter'])
            stats['x_adjust'] = np.array(stats['x_adjust'])
        stats['evals'] = eval_
        return arg[0], val[0], stats
 