In [29]:
import numpy as np
import scipy
import scipy.linalg
import logging
import plotly.graph_objs as go
import plotly.express as px
import tensorflow as tf

In [30]:
class CMA_ES:
    def __init__(self, x0, sigma, maxfevals = 10000, popsize = None, weights = None):
        N = x0.shape[0]
        self.dimension = N
        self.chiN = N**0.5 * (1 - 1. / (4 * N) + 1. / (21 * N**2))
        self.lam = 4 + int(3 * np.log(N)) if not popsize else popsize
        print(f"Popsize: {self.lam}")
        self.mu = int(self.lam / 2)
        self.shape = tf.cast((self.lam, N), tf.int32)
        
        if weights:
            self.weights = weights
        else:
            self.weights = np.array([np.log(self.lam / 2 + 0.5) - np.log(i + 1) if i < self.mu else 0
                        for i in range(self.lam)])
            self.weights /= np.sum(self.weights)
        if(self.weights.shape == (self.lam,)):
            self.weights = self.weights[:, np.newaxis]
        self.mueff = np.sum(self.weights)**2 / np.sum(self.weights**2)
        
        self.cc = (4 + self.mueff/N) / (N+4 + 2 * self.mueff/N)
        self.cs = (self.mueff + 2) / (N + self.mueff + 5)
        self.c1 = 2 / ((N + 1.3)**2 + self.mueff) 
        # self.cmu = min([1 - self.c1, 2 * (self.mueff - 2 + 1/self.mueff) / ((N + 2)**2 + self.mueff)])
        self.cmu = 2 * (self.mueff - 2 + 1 / self.mueff) / ((N + 2)**2 + 2 * self.mueff / 2)
        # self.damps = 2 * self.mueff/self.lam + 0.3 + self.cs
        self.damps = 1 + 2 * max(0, np.sqrt((self.mueff - 1)/(N + 1)) - 1) + self.cs

        self.xmean = np.array(x0[:])
        self.sigma = sigma
        self.pc = np.zeros(N) 
        self.ps =np.zeros(N) 
        self.lazy_gap_evals = 0.5 * N * self.lam * (self.c1 + self.cmu)**-1 / N**2
        self.maxfevals = maxfevals
        self.C = np.identity(N)
        self.counteval = 0 
        self.fitvals = []   
        self.best = (x0, None)
        self.condition_number = 1
        self.eigen_values = np.ones(N)
        self.eigen_vectors = np.identity(N)
        self.updated_eval = 0
        self.inv_sqrt = np.ones(N)
        self.B = np.eye(self.dimension)
        self.D = np.eye(self.dimension)


        #---------------
        self.acov = np.random.rand() # [0-1]
        self.ccov = float(self.acov) * (2. / (N + np.sqrt(2.))**2 + (1 - self.acov) * min(1., (2.*self.mu - 1.)/(N+2.)**2 + self.mu))

    # def _update_eigensystem(self, current_eval, lazy_gap_evals):
    #     if current_eval <= self.updated_eval + lazy_gap_evals:
    #         return self
    #     self.eigen_values, self.eigen_vectors = np.linalg.eig(self.C)
    #     self.inv_sqrt = self.eigen_vectors @ np.diag(self.eigen_values**-0.5) @ self.eigen_vectors.T
    #     self.condition_number = self.eigen_values.max() / self.eigen_values.min()
         

    #yi = (lam, N) + 1, * (N, N) * (N, N) * (N, 1) = (lam, N) + 1, * (N, 1) = (lam, N)

    def sample(self, old_x):
        z = tf.random.normal(self.shape, dtype=tf.float64) 
        z = np.array(z)
        y = z @ self.B @ self.D # lam, N
        x = old_x + self.sigma * y  # old_x ?
        return x, z
    
    def update(self, x, fitvals, z):
        """Zaktualizuj wartoĹci uzyskanych parametrĂłw"""
        self.counteval += fitvals.shape[0] 
        
        #---------- z avg
        zavg = 1 / self.mu * np.sum(z) #matmul or multiplication?
        print(self.B.shape) # (N, N)
        print(self.D.shape) # (N, N)
        print(x[0].reshape(-1, 1).shape) #(N, 1) 
        print(x.shape) #lam, N
        # x = x + self.sigma * self.B @ self.D * zavg
        for i in range(len(x)):
            precalculate = self.sigma * self.B @ self.D * zavg
            print("precalc", precalculate.shape)
            x[i] = x[i] + self.sigma * self.B@self.D*zavg # 
        #----------

        idx = np.argsort(fitvals)
        x_sorted = x[idx]
        self.fitvals = fitvals[idx] 
        self.best = (x[0], self.fitvals[0])
        taken = x_sorted[:self.mu]
        #--------------------------------

        xdiff = x_sorted - self.xmean
        x_mean = np.sum(xdiff * self.weights, axis=0)
        m = self.xmean + x_mean
        #------------------------------------------------------------------------------------------------------
        # y_mean = x_mean / self.sigma 
        pc = (1 - self.cc) * self.pc + np.sqrt(self.cc * (2 - self.cc) * self.mueff) * self.B @ self.D * zavg
        # pcmatrix = pc[:, np.newaxis]
        ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mueff) * self.B * zavg   
        psnorm = np.linalg.norm(ps)

        # C_m = np.array([e[:, np.newaxis] * e.T for e in (xdiff / self.sigma)])
        # y_s = np.sum(C_m * self.weights[:, np.newaxis], axis=0)

        # C = (1 - self.c1 - self.cmu) * self.C + self.c1 * pcmatrix * pcmatrix.T + self.cmu * y_s
        Z = B @ D * (1/self.mu * np.sum(z*z.T)) * (self.B@self.D).T
        C = (1 - self.ccov) * self.C + self.ccov * (self.acov * self.pc * self.pc.T + (1 - self.acov) @ Z)

        # C = (C + C.T)/2.0 # ????????
        
        #--------------------------------------------------------------------------------------------------------

        # D_inv = np.diag(np.reciprocal(np.diag(self.D)))
        # C_inv_squared = (self.B @ D_inv) @ (self.B.T)
        # C_inv_squared_y = np.squeeze(C_inv_squared @ y_mean[:, np.newaxis])  
        # sigma = self.sigma * np.exp((self.cs / self.damps) * ((np.linalg.norm(ps) / self.chiN) - 1))
        sigma = self.sigma * np.exp((psnorm - self.chiN) / self.damps * self.chiN)

        #-------------------------------------------------------------------------------------------------------
        eigvalues, eigvectors = tf.linalg.ein(C)
        eigenvalues = tf.math.real(eigenvalues)

        B = tf.math.real(eigenvectors)

        D = tf.linalg.diag(tf.sqrt(eigenvalues))

        B = np.array(B)
        D = np.array(D)
        # u, B, _ = tf.linalg.svd(C)
        # u = np.array(u)
        # B = np.array(B)
        # diag_D = np.sqrt(u)
        # D = np.diag(diag_D)

        #--------------------------------------------------------------------------------------------------------

        self.pc = pc
        self.ps = ps
        self.C = C
        self.sigma = sigma
        self.B = B
        self.D = D
        self.xmean = m
        return taken
        
    def terminate(self):
        """ZakoĹcz algorytm"""
        if self.counteval <= 0:
            return False
        if self.counteval >= self.maxfevals:
            return True
        if self.condition_number > 1e13:
            return True
        if self.sigma * np.max(self.eigen_values)**0.5 < 1e-13:
            return True
        return False

In [31]:
class Active_CMA_ES:
    def __init__(self, x0, sigma, maxfevals = 10000, popsize = None, weights = None, domain = None):
        N = x0.shape[0]
        self.dimension = N
        self.chiN = N**0.5 * (1 - 1. / (4 * N) + 1. / (21 * N**2))
        self.lam = 4 + int(3 * np.log(N)) if not popsize else popsize
        print(f"Popsize: {self.lam}")
        self.mu = int(self.lam / 2)
        self.shape = tf.cast((self.lam, N), tf.int32)
        
        if weights:
            self.weights = weights
        else:
            self.weights = np.array([np.log(self.lam / 2 + 0.5) - np.log(i + 1) if i < self.mu 
                          else np.log(i + 1 - self.lam / 2) - np.log(self.lam / 2) 
                          for i in range(self.lam)])
            self.weights /= np.sum(np.abs(self.weights))
        if(self.weights.shape == (self.lam,)):
            self.weights = self.weights[:, np.newaxis]
        if domain:
            self.domain = domain
        self.mueff = np.sum(self.weights)**2 / np.sum(self.weights**2)
        
        self.cc = (4 + self.mueff/N) / (N+4 + 2 * self.mueff/N)
        self.cs = (self.mueff + 2) / (N + self.mueff + 5)
        self.c1 = 2 / ((N + 1.3)**2 + self.mueff) 
        # self.cmu = min([1 - self.c1, 2 * (self.mueff - 2 + 1/self.mueff) / ((N + 2)**2 + self.mueff)])
        self.cmu = 2 * (self.mueff - 2 + 1 / self.mueff) / ((N + 2)**2 + 2 * self.mueff / 2)
        # self.damps = 2 * self.mueff/self.lam + 0.3 + self.cs
        self.damps = 1 + 2 * max(0, np.sqrt((self.mueff - 1)/(N + 1)) - 1) + self.cs

        self.xmean = np.array(x0[:])
        self.sigma = sigma
        self.pc = np.zeros(N) 
        self.ps =np.zeros(N) 
        self.lazy_gap_evals = 0.5 * N * self.lam * (self.c1 + self.cmu)**-1 / N**2
        self.maxfevals = maxfevals
        self.C = np.identity(N)
        self.counteval = 0 
        self.fitvals = []   
        self.best = (x0, None)
        self.condition_number = 1
        self.eigen_values = np.ones(N)
        self.eigen_vectors = np.identity(N)
        self.updated_eval = 0
        self.inv_sqrt = np.ones(N)
        self.B = np.eye(self.dimension)
        self.D = np.eye(self.dimension)

    def _update_eigensystem(self, current_eval, lazy_gap_evals):
        if current_eval <= self.updated_eval + lazy_gap_evals:
            return self
        self.eigen_values, self.eigen_vectors = np.linalg.eig(self.C)
        self.inv_sqrt = self.eigen_vectors @ np.diag(self.eigen_values**-0.5) @ self.eigen_vectors.T
        self.condition_number = self.eigen_values.max() / self.eigen_values.min()
         
    def sample(self):
        z = tf.random.normal(self.shape, dtype=tf.float64)
        z = np.array(z)
        y = z @ (self.B @ self.D)
        x = self.xmean + self.sigma * y
        return x
    
    def update(self, x, fitvals):
        """Zaktualizuj wartoĹci uzyskanych parametrĂłw"""
        self.counteval += fitvals.shape[0] 
        #check if x in domain, if its not add 1e8 to fitval of this x
        if self.domain:
            for i, xi in enumerate(x):
                if np.any(xi < self.domain[0]) or np.any(xi > self.domain[1]):
                    fitvals[i] += 1e8

        
        #------------------------------------------------------------------------------------------------------
        idx = np.argsort(fitvals)
        x_sorted = x[idx]
        self.fitvals = fitvals[idx] 
        self.best = (x[0], self.fitvals[0])

        xdiff = x_sorted - self.xmean
        x_mean = np.sum(xdiff * self.weights, axis=0)
        m = self.xmean + x_mean
        #------------------------------------------------------------------------------------------------------
        y_mean = x_mean / self.sigma 
        pc = (1 - self.cc) * self.pc + np.sqrt(self.cc * (2 - self.cc) * self.mueff) * y_mean
        pcmatrix = pc[:, np.newaxis]

        C_m = np.array([e[:, np.newaxis] * e.T for e in (xdiff / self.sigma)])
        y_s = np.sum(C_m * self.weights[:, np.newaxis], axis=0)

        C = (1 - self.c1 - self.cmu) * self.C + self.c1 * pcmatrix * pcmatrix.T + self.cmu * y_s

        C = (C + C.T)/2.0
        
        #--------------------------------------------------------------------------------------------------------

        D_inv = np.diag(np.reciprocal(np.diag(self.D)))
        C_inv_squared = (self.B @ D_inv) @ (self.B.T)
        C_inv_squared_y = np.squeeze(C_inv_squared @ y_mean[:, np.newaxis])  
        ps = (1 - self.cs) * self.ps + np.sqrt(self.cs * (2 - self.cs) * self.mueff) * C_inv_squared_y  

        sigma = self.sigma * np.exp((self.cs / self.damps) * ((np.linalg.norm(ps) / self.chiN) - 1))

        #--------------------------------------------------------------------------------------------------------
        u, B, _ = tf.linalg.svd(C)
        u = np.array(u)
        B = np.array(B)
        diag_D = np.sqrt(u)
        D = np.diag(diag_D)

        #--------------------------------------------------------------------------------------------------------

        self.pc = pc
        self.ps = ps
        self.C = C
        self.sigma = sigma
        self.B = B
        self.D = D
        self.xmean = m
        
    def terminate(self):
        """ZakoĹcz algorytm"""
        if self.counteval <= 0:
            return False
        if self.counteval >= self.maxfevals:
            return True
        if self.condition_number > 1e13:
            return True
        if self.sigma * np.max(self.eigen_values)**0.5 < 1e-13:
            return True
        return False

In [32]:
def optimize(func, x0, sigma, maxfevals = 1000, popsize = None, weights = None, domain = None):
    cma_es = Active_CMA_ES(x0, sigma, maxfevals, popsize, weights, domain)
    res = []
    cntr = 0
    while not cma_es.terminate():
        cntr+=1
        x = cma_es.sample()
        f_eval = func(x)
        cma_es.update(x, f_eval)
        res.append(cma_es.best)
        if cntr % 100 == 0:
            print(f"Iteration {cntr:5d}: {res[-1][1]}")
    return res

def optimize_and_plot(f, sigma = 1, d = 10, popsize = None, maxfevals = 1000, domain = None):
    x0 = np.repeat(100.0, d)
    res = optimize(f, x0, sigma, popsize = popsize, maxfevals = maxfevals, domain = domain)
    print(f"Best: {res[-1][0]}, value: {res[-1][1]}")
    y = np.array([nd for st, nd in res])
    fig = px.line(x = np.arange(y.shape[0]) + 1, y = y)
    fig.show()

In [33]:
def plot_3d_function(f, a = 10, k = 100):
    x = np.linspace(-a, a, k)
    y = x.copy()
    xy = np.transpose([np.tile(x, len(y)), np.repeat(y, len(x))])
    z = f(xy)
    fig = go.Figure(data=[go.Surface(x = x,  y= y, z=z.reshape((x.shape[0], -1)))])
    fig.update_layout(title = f.__name__, margin=dict(l=65, r=50, b=65, t=90))
    fig.show()

In [34]:
def schwefel_function(X):
    return 418.9829 * X.shape[1] - np.sum(X * np.sin(np.sqrt(np.abs(X))), axis=1)

In [51]:
optimize_and_plot(schwefel_function, d = 10, domain = (-500, 500), popsize=100, maxfevals=10000, sigma=5)

Popsize: 100
Iteration   100: -4.561030212200252e+43
Best: [-5.64415151e+42 -1.80698424e+43  2.15710317e+43  5.06663426e+42
 -1.01607591e+43  6.19916495e+42 -6.29935573e+42  1.68707253e+41
  1.82813411e+42 -3.53901826e+41], value: -4.561030212200252e+43



invalid value encountered in sqrt

