In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
def cross_entropy_optimization(opt_type, pop_size, elite_size, stopping_criteria, n_max_iter, objective_function, search_space):
    
    # ========================== start -> For ploting purposes only
    populations, elites = [], []
    # ========================== end
    
    cache = {}
    i = 0
    stopping_criteria_reached = False
    while not stopping_criteria_reached:
        
        if i > 0:
            elite = sample_elite(opt_type, population, elite_size, cache)
            search_params = update_search_space(elite, search_space) # To be implemented: sampling distribution/gaussian mixture fitting
            
        else:
            best_y = np.inf if opt_type == 'min' else -np.inf
            best_iteration_y = best_y
            search_params = None
        
        population = generate_population(search_params, search_space, pop_size)
        for decision in population:
            decision_key = tuple(decision)
            if decision_key not in cache.keys():
                cache[decision_key] = objective_function(decision)
        
        best_iteration_y = min(cache.values()) if opt_type == 'min' else max(cache.values())
            
        if ((opt_type == 'min') and (best_iteration_y < best_y)) or ((opt_type == 'max') and (best_iteration_y > best_y)):
            best_x = decodes_key(cache, opt_type)
            best_y = best_iteration_y
            best_i = i
            
            print(best_i, best_x, best_y)

        # ========================== start -> For ploting purposes only
        populations.append(population)
        if i > 0:
            elites.append(elite)
        else:
            elites.append([])
        # ========================== end
            
        stopping_criteria_reached = stopping_criteria_update(stopping_criteria, n_max_iter, i, best_i)
        i = i + 1
        
    return best_x, best_y, populations, elites # 'populations' and 'elites' for printing purposes only

In [None]:
def decodes_key(cache, opt_type): # Considers that the cache key is a tuple
    
    best_x = min(cache, key = cache.get) if opt_type == 'min' else max(cache, key = cache.get)
    
    return best_x

In [None]:
def stopping_criteria_update(stopping_criteria, n_max_iter, i, best_i):
    
    if (stopping_criteria == 'iters_without_improvement') and (i - best_i > n_max_iter):
        stopping_criteria_reached = True
    elif (stopping_criteria == 'n_max_iters') and (i > n_max_iter):
        stopping_criteria_reached = True
    else:
        stopping_criteria_reached = False
    
    return stopping_criteria_reached

In [None]:
def sample_elite(opt_type, population, elite_size, cache):
    
    elite_size = int(elite_size * len(population)) if elite_size < 1 else elite_size
    
    population_args = list(cache.keys())
    population_values = list(cache.values())
    population_idxs = range(0, len(population_values))
        
    if opt_type == 'min':
        sorted_population_values, sorted_population_idxs = zip(*sorted(zip(population_values, population_idxs)))
    else:
        sorted_population_values, sorted_population_idxs = zip(*reversed(sorted(zip(population_values, population_idxs))))
    
    elite_idxs = sorted_population_idxs[:elite_size]
    elite = [population_args[idx] for idx in elite_idxs]
    
    return elite

In [None]:
def update_search_space(elite, search_space):
    
    search_params = []
    
    for variable_idx, variable_data in enumerate(search_space):
        variable_type = variable_data[1]
        elite_values = np.asarray(elite)[:, variable_idx]
        if variable_type == bool:
            positive_proba = np.sum(elite_values) / len(np.asarray(elite)[:, variable_idx])
            search_params.append([positive_proba])
            
        else: # float or int
            search_params.append([np.mean(elite_values), np.std(elite_values)])
    
    return search_params

In [None]:
def generate_population(search_params, search_space, pop_size):
    
    initialization = True if search_params is None else False
    
    transposed_population = []
    
    for variable_idx, variable_data in enumerate(search_space):
        
        variable_type = variable_data[1]
        variable_min = variable_data[0][0]
        variable_max = variable_data[0][1]
        
        if variable_type == bool:
            
            if initialization:
#                 np.random.seed(variable_idx)
                variable = np.random.choice([variable_min, variable_max], pop_size)
            else:
                positive_proba = search_params[variable_idx][0]
#                 np.random.seed(int(positive_proba * variable_idx * 100))
                variable = np.random.choice([variable_min, variable_max], pop_size, p = [1 - positive_proba, positive_proba])
            
        else:
            
            if initialization:
                
#                 np.random.seed(variable_idx)
                if variable_type == int:
                    variable = np.random.randint(variable_min, variable_max + 1, pop_size)
                else:
                    variable = np.random.uniform(variable_min, variable_max, pop_size)
                
            else:
                mean = search_params[variable_idx][0]
                std = search_params[variable_idx][1]
#                 np.random.seed(int(std * variable_idx))
                variable = np.random.normal(mean, std, pop_size)
                if variable_type == int:
                    variable = np.round(variable)
                variable = np.clip(variable, variable_min, variable_max)
            
        transposed_population.append(variable)
    
    population = np.array(transposed_population).T
    
    return population

In [None]:
def sphere(decision):
    return sum(np.square(decision))

In [None]:
import math

def griewank(decision):

    griewank = 0
    part_1 = 0
    part_2 = 1
    for idx, xi in enumerate(decision):
        part_1 = part_1 + (xi**2)/4000
        part_2 = part_2 * math.cos(xi/(math.sqrt(idx + 1)))

    griewank = 1 + part_1 - part_2
        
    return griewank

In [None]:
%%time

opt_type = 'min'
pop_size = 100
elite_size = .1
stopping_criteria = 'n_max_iters'
n_max_iter = 10
objective_function = griewank
search_space = [((-100, 100), float), ((-100, 100), float)]

best_x, best_y, populations, elites = cross_entropy_optimization(opt_type, pop_size, elite_size, stopping_criteria, n_max_iter, objective_function, search_space)

# Plotting

In [None]:
n_points = 1000
additional = 1

low = np.inf
high = -low

for item in search_space:
    if np.min(item[0]) < low:
        low = np.min(item[0])
    elif np.max(item[0]) > high:
        high = np.max(item[0])

x1 = np.linspace(low - additional, high + additional, n_points)
x2 = np.linspace(low - additional, high + additional, n_points)

X1, X2 = np.meshgrid(x1, x2)

In [None]:
y = []
for x1_element in list(x1):
    for x2_element in list(x2):
        y.append(objective_function([x1_element, x2_element]))
        
y = np.array(y)

In [None]:
X1, X2 = np.meshgrid(x1, x2)
# print(X1.shape, X2.shape)
Y = y.reshape(X1.shape)

In [None]:
fig = plt.figure()
ax1 = plt.contourf(X1, X2, Y,
                  cmap ='summer',
                  alpha = .7)

plt.colorbar(ax1)

# plt.scatter(best_x[0], best_x[1], c='0')
#plt.scatter(trials_x1, trials_x2, c='0')

plt.show()

In [None]:
for idx, population in enumerate(populations[:-1]):
    
    fig = plt.figure()
    ax1 = plt.contourf(X1, X2, Y,
                      cmap ='summer',
                      alpha = .7)

    plt.colorbar(ax1)

    non_elite = []
    for item in population:
        if tuple(item) not in elites[idx + 1]:
            non_elite.append(item)
    
    non_elite_x1 = np.array(non_elite)[:, 0]
    non_elite_x2 = np.array(non_elite)[:, 1]
    
    elite_x1 = np.array(elites[idx + 1])[:, 0]    
    elite_x2 = np.array(elites[idx + 1])[:, 1]
        
    plt.scatter(non_elite_x1, non_elite_x2, c='0', s=5)
    plt.scatter(elite_x1, elite_x2, c='r', s=40, marker='x')
    
    plt.xlim([low - additional, high + additional])
    plt.ylim([low - additional, high + additional])
    
# #     plt.plot(generate_plot_list(trials_y))
# #     plt.plot(generate_plot_list(acc_trials_y))
    
    plt.show()

# GIF

In [None]:
import imageio
plt.rcParams['figure.dpi'] = 300

In [None]:
%%time

filenames = [] #######

for idx, population in enumerate(populations[:-1]):
    
    fig = plt.figure()
    ax1 = plt.contourf(X1, X2, Y,
                      cmap ='summer',
                      alpha = .7)

    plt.colorbar(ax1)

    non_elite = []
    for item in population:
        if tuple(item) not in elites[idx + 1]:
            non_elite.append(item)
    
    non_elite_x1 = np.array(non_elite)[:, 0]
    non_elite_x2 = np.array(non_elite)[:, 1]
    
    elite_x1 = np.array(elites[idx + 1])[:, 0]    
    elite_x2 = np.array(elites[idx + 1])[:, 1]
        
    plt.scatter(non_elite_x1, non_elite_x2, c='0', s=5)
    plt.scatter(elite_x1, elite_x2, c='r', s=40, marker='x')
    
    plt.xlim([low - additional, high + additional])
    plt.ylim([low - additional, high + additional])
    
# #     plt.plot(generate_plot_list(trials_y))
# #     plt.plot(generate_plot_list(acc_trials_y))
    
#     plt.show()
    
######################################################################

    # create file name and append it to a list
    filename = f'{idx}.png'
    for i in range(7):
        filenames.append(filename)
    filenames.append(filename)
    
    # save frame
    plt.savefig(filename)
    plt.close()
    
# build gif
with imageio.get_writer('cross_entropy_optimization.gif', mode='I') as writer:
    for filename in filenames:
        image = imageio.imread(filename)
        writer.append_data(image)