
# Tabu Search of the Eggholder Function



## Importing Libraries


In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import LinearLocator
from matplotlib import cm
from tqdm import tqdm
import matplotlib.gridspec as gridspec
from sys import path_importer_cache
import os
import eggholder as egg
import init_archive as inar
import random
import pickle


## Tabu Search Functions

In [None]:
def in_array(array, list_of_arrays):
    '''Check if an array (x) is in a list of arrays.'''
    return any(np.array_equal(array, a) for a in list_of_arrays)

def poss_steps(d, step):
    '''returns a vector of possible steps to make.'''
    poss_steps = []
    for i in range(d):
        new = np.zeros(d)
        new[i] = step
        poss_steps.append(new.copy())
        new[i] = -step
        poss_steps.append(new.copy())
    return poss_steps

def search(base, step, STM, x_best, fx_best, constrained, f_evals_count, w_val):
    '''evaluates a step in the given step direction.'''
    try:
        x_new = base + step
        if not in_array(x_new, STM):
            if constrained:
                fx_new = egg.eggholder(x_new)
            else:
                fx_new = egg.eggholder_unconstrained(x_new, w_val)
            f_evals_count += 1
            if fx_best == None:
                x_best = x_new.copy()
                fx_best = fx_new.copy() 
            if fx_new < fx_best:
                x_best = x_new.copy()
                fx_best = fx_new.copy()
    except AssertionError:
        print("not in bounds.")
        pass
    
    return x_best, fx_best, f_evals_count

def pattern(x_best, fx_best, base, constrained, w_val, f_evals_count):
    '''perform a pattern move in the specified direction.'''
    try:
        pattern_move = 2*x_best[1] - base
        if constrained:
            f_pattern = egg.eggholder(pattern_move)
        else:
            f_pattern = egg.eggholder_unconstrained(pattern_move, w_val)
        f_evals_count += 1
        if f_pattern < fx_best:
            fx_best  = f_pattern
            x_best = pattern_move
    except AssertionError:
        print("not in bounds.")
        pass

    return x_best, fx_best, f_evals_count

def local_search_basic(base, f_base, d, STM, steps, f_evals_count, constrained, w_val):
    '''Perform a local search step for the tabu search function.
    inputs
        base:           d-length vector     current point x
        f_base:         float               function evaluated at x
        d:              int                 dimension of x
        STM:            {x:fx}              dictionary of recently explored locations
        steps:          list                a list of +/- delta for each dimension of x.
        f_evals_count:  int                 count of total function evaluations
        constrained:    bool                controls if we use penalty function or not
        w_val:          int                 penalty value

    returns best step x0, fx0 from the local search 
    and the updated count of total cost function evaluations.
    '''

    x_best = None
    fx_best = None

    while len(steps) > 0:
        step = steps.pop()
        x_best, fx_best, f_evals_count = search(base, step, STM, x_best, fx_best, 
                                                            constrained, f_evals_count, w_val)    
    if fx_best != None:
        if fx_best < f_base:
            x_best, fx_best, f_evals_count = pattern(x_best, fx_best, base, constrained, w_val, f_evals_count)

    if fx_best == None:
        x_best = base
        fx_best = f_base
    return x_best, fx_best, f_evals_count

def random_subset(base, f_base, d, P, STM, steps, f_evals_count, constrained, w_val):
    '''Perform a local search  using the random subset method for the tabu search function.
    inputs
        base:           d-length vector     current point x
        f_base:         float               function evaluated at x
        d:              int                 dimension of x
        P:              int                 the max number of possible non-tabu moves to evaluate
        STM:            {x:fx}              dictionary of recently explored locations
        steps:          list                a list of +/- delta for each dimension of x.
        f_evals_count   int                 count of total function evaluations
        constrained:    bool                controls if we use penalty function or not
        w_val:          int                 penalty value

    returns best step x0, fx0 from the local search 
    and the updated count of total cost function evaluations.
    '''
    x_best = None
    fx_best = None

    for i in range(P):
        step = steps.pop()
        x_best, fx_best, f_evals_count = search(base, step, STM, x_best, fx_best, 
                                                            constrained, f_evals_count, w_val)    
    if fx_best != None:
        if fx_best < f_base:
            x_best, fx_best, f_evals_count = pattern(x_best, fx_best, base, constrained, w_val, f_evals_count)
    if fx_best == None:
        x_best = base
        fx_best = f_base
    return x_best, fx_best, f_evals_count


def successive_rs(base, f_base, d, P, STM, steps, f_evals_count, constrained, w_val):
    '''Perform a local search  using the successive random subset method for the tabu search function.
    inputs
        base:           d-length vector     current point x
        f_base:         float               function evaluated at x
        d:              int                 dimension of x
        P:              int                 the max number of possible non-tabu moves to evaluate
        STM:            {x:fx}              dictionary of recently explored locations
        steps:          list                a list of +/- delta for each dimension of x.
        f_evals_count   int                 count of total function evaluations
        constrained:    bool                controls if we use penalty function or not
        w_val:          int                 penalty value

    returns best step x0, fx0 from the local search 
    and the updated count of total cost function evaluations.
    '''
    x_best = None
    fx_best =  None
    improved = False

    while not improved and len(steps) > 0:
        if len(steps) > P:
            subset = [steps.pop() for _ in range(P)]
        else:
            subset = steps.copy()
            steps = []
        
        for step in subset:
            x_best, fx_best, f_evals_count = search(base, step, STM, x_best, fx_best, 
                                                            constrained, f_evals_count, w_val)
            if fx_best != None:  
                if fx_best < f_base:
                    improved = True    

    if improved:
        x_best, fx_best, f_evals_count = pattern(x_best, fx_best, base, constrained, w_val, f_evals_count)
    if fx_best == None:
        x_best = base
        fx_best = f_base
    return x_best, fx_best, f_evals_count

def sensitivity(base, d, delta, f_evals_count, w_val):
    '''Find the d/2 most sensitive directions.'''
    s = np.zeros(d)
    for i in range(d):
        f_plus = egg.eggholder_unconstrained(base+delta, w_val)
        f_minus = egg.eggholder_unconstrained(base-delta, w_val)
        s[i] = np.abs((f_plus - f_minus)/(2*delta))
        f_evals_count += 2
    j = d//2
    most_sensitive_directions = sorted(range(len(s)), key=lambda i: s[i])[-j:]
    return most_sensitive_directions, f_evals_count




### MTM and STM Functions

In [None]:

def update_MTM(x0, fx0, M, MTM):
    '''Updates the Medium-Term Memory (MTM) if Necessary.
    inputs
        MTM:        {fx:x}              current set of points in MTM
        x0:         d-length vector     new trial point
        fx0:        float               evaluation at new point
        M:          int                 max size of MTM
    
    returns updated MTM.
    '''
    if len(MTM) < M:
        MTM[fx0] = x0
    else:
        MTM_list = [*MTM]
        worst = min(MTM_list)
        if fx0 < worst:
            del MTM[worst]
            MTM[fx0] = x0
    
    return MTM

def update_counter(x0, fx0, best_sln, counter):
    '''update the counter and best solution.'''
    if fx0 < best_sln[0]:
        best_sln = [fx0,x0]
        counter = 0
    else:
        counter += 1
    
    return counter, best_sln

def update_STM(x0, N, STM):
    '''Update the Short-Term Memory (STM).'''
    STM.append(x0)
    while len(STM) > N:
        STM.pop()
    return STM


## Long-Term Memory (LTM) Functions


In [None]:
def index_to_increase(sector, d, split):
    '''Used in the initialisation of the LTM. Essentially for counting up in base-n, where
    n=split, as this is how all the sectors are divided.
    
    inputs
        sector:     d-length list       each element is in range(0,split). This is the most
                                        recently appended sector to the LTM initialisation.
        d:          int                 dimension of x
        split:      int                 number of sections that each variable is split into
    
    returns the index that should be increased for the next sector to append to the LTM. 
    If all sectors have been appended (all values in sector = (split-1)), then return -1.
    '''
    for i in range(d):
        if sector[i] != split-1:
            return i
    return -1

def init_LTM(d, split, sector, LTM):
    '''Creates an LTM with a key for each sector according to the specified dimension 
    and split.

    inputs
        d:          int             dimension of x
        split:      int             number of sections that each variable is split into.
        sector:     d-length list   The most recently appended sector
        LTM         {sector: count} All counts initialised to 0. Once filled with all
                                    sectors this is returned.

    returns the initialised LTM with a key for every sector needed.
    '''
    i = index_to_increase(sector,d,split)
    if i == -1:
        return(LTM)
    elif i > 0:
        for j in range(i):
            sector[j] = 0
    sector[i] += 1
    LTM[tuple(sector)] = 0
    return init_LTM(d, split, sector, LTM)

def update_LTM(x0, d, LTM, split):
    '''Find the sector that the new point is in and increment the count
    for that sector. returns the updated LTM.'''
    location = [0]*d
    sector_width = 1024/split
    for i in range(d):
        for j in range(split):
            if x0[i] >= -512 + j*sector_width and x0[i] < (-512 + (j+1)*sector_width):
                location[i] = j
    location = tuple(location)
    LTM[location] += 1
    return LTM

def diversify_x(LTM, d, split):
    '''Carries out a diversification step and returns a randomly found
    x within the least populated sector.'''
    sector_width = 1024/split
    values = [*LTM.values()]
    min_index = values.index(min(values))
    keys = [*LTM.keys()]
    sector = keys[min_index]
    new_x = np.zeros(d)
    for i, j in enumerate(sector):
        range_min = -512 + j*sector_width
        range_max = -512 + (j+1)*sector_width
        new_x[i] = np.random.uniform(range_min, range_max)
        
    return new_x


## Main TS Code

In [None]:
def TS_run(d, seed = 1, search_method = 0, init_step_size = 500, P = 6, split = 2, N=7, M=4, run_name = 'initial', 
            intensify = 10, diversify = 15, reduce = 25, prioritise= 6, constrained=True, w_val = 1e6, alpha = 0.9):
    '''Conduct a run of Tabu Search under specified conditions.
    inputs
        d:              int     dimension of datapoints.
        seed:           float   random seed for different runs
        search_method:  int     choose which local search method to use.
        init_step_size: int     starting step size to take
        P:              int     the number of steps to evaluate in each cycle for rs.
        split:          int     the number of splits per variable for creating sectors.
        N:              int     size of STM
        M:              int     size of MTM
        run_name:       str     identifier for the run if required
        intensify:      int     value of counter at which to intensify search
        diversify:      int     value of counter at which to diversify search
        reduce:         int     value of counter at which to reduce search
        prioritise:     int     number of iterations in variable prioritisation before reevaluating
        constrained:    bool    controls whether we are using a constrained formulation or not
        w_val           int     weights on the penalty function for the unconstrained formulation
        alpha           float   the ratio to reduce step size by on reduce steps.

    returns the progress in best solution over iterations for the specified setting.
    '''
    np.random.seed(seed)
    x0 = inar.init_x(1, d)
    fx0 = egg.eggholder(x0)
    counter = 0
    step_size = init_step_size
    f_evals_count = 0
    STM = []
    MTM = {fx0: x0.copy()}
    first_sector = [0]*d
    LTM = init_LTM(d, split, first_sector, {tuple(first_sector):0})
    prioritisation_count = 0
    directions = [0]

    path = {fx0:x0.copy()}
    archive = {fx0:x0.copy()}
    fx_progress = []
    f_evals_count_list = []

    best_sln = [fx0, x0]
    while step_size > 1e-4 and f_evals_count < 15000:
        steps = poss_steps(d, step_size)
        if search_method == 0:
            random.shuffle(steps)
            x0, fx0, f_evals_count = local_search_basic(x0.copy(), fx0.copy(), d, STM, steps, 
                                                        f_evals_count, constrained, w_val)
        elif search_method == 1:
            random.shuffle(steps)
            x0, fx0, f_evals_count = random_subset(x0.copy(), fx0.copy(), d, P, STM, steps, 
                                                    f_evals_count, constrained, w_val)
        elif search_method == 2:
            random.shuffle(steps)
            x0, fx0, f_evals_count = successive_rs(x0.copy(), fx0.copy(), d, P, STM, steps, 
                                                    f_evals_count, constrained, w_val)
        else:
            if prioritisation_count >= prioritise or f_evals_count == 0:
                directions, f_evals_count = sensitivity(x0.copy(), d, step_size, 
                                                        f_evals_count, w_val)
                prioritisation_count = 0
            prioritisation_count += 1
            chosen_steps = []
            for direction in directions:
                chosen_steps.append(steps[2*direction])
                chosen_steps.append(steps[2*direction + 1])
            random.shuffle(chosen_steps)
            x0, fx0, f_evals_count = local_search_basic(x0.copy(), fx0.copy(), d, STM, chosen_steps, 
                                                        f_evals_count, constrained, w_val)
                    
        archive, updated = inar.update_archive(x0, fx0, archive)
        path[fx0] = x0.copy()
        counter, best_sln = update_counter(x0, fx0, best_sln, counter)
        fx_progress.append(best_sln[0])
        f_evals_count_list.append(f_evals_count)
        STM = update_STM(x0.copy(), N, STM)
        MTM = update_MTM(x0.copy(), fx0, M, MTM)
        LTM = update_LTM(x0.copy(), d, LTM, split)
        
        if counter == intensify:
            vals = np.array([*MTM.values()])
            x0 = np.mean(vals, axis=0)
        elif counter == diversify:
            xnew = diversify_x(LTM, d, split)
            x0 = xnew
            
        elif counter == reduce:
            step_size *= alpha
            counter = 0
            fx0 = best_sln[0]
            x0 = best_sln[1]
            continue
        
        if constrained:
            fx0 = egg.eggholder(x0)
        else:
            fx0 = egg.eggholder_unconstrained(x0, w_val)
        f_evals_count += 1

    fx_star = min([*archive])
    print("\nBest solution: \n", fx_star, archive[fx_star])  
    print(len(path))
    print(len(archive))

    z = []
    if d==2:
        # Draw contour plot and evolution of best solution found across iterations
        z = egg.plot_eggholder2D(archive, path, run_name)
    egg.plot_progress(fx_progress, z, d, run_name, f_evals_count_list)
    # return fx_progress, f_evals_count_list

In [None]:
TS_run(2)


## Large Runs


### Changing Alpha

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

d=6
for alpha in [0.85, 0.9, 0.95, 0.99]:
    graph_name = 'a: {}'.format(alpha)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 0, 100, constrained=True, alpha=alpha)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list


egg.plot_progress_multvar(fx_progress_dict, d, 'TS_alpha_step100', True, f_evals_count_dict)


### Changing Step Size

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

d=6
for init_step in [50, 100, 200, 500]:
    graph_name = 'step: {}'.format(init_step)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 0, init_step, constrained=True)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list

egg.plot_progress_multvar(fx_progress_dict, d, 'TS_step_size', True, f_evals_count_dict)



### Random Subset Method

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

d=6
for p in [2, 4, 6, 9]:
    graph_name = 'P: {}'.format(p)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 1, P = p)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list

egg.plot_progress_multvar(fx_progress_dict, d, 'RS', True, f_evals_count_dict)


### Successive RS

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

d=6
for p in [4, 6, 8, 10]:
    graph_name = 'P: {}'.format(p)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 2, P = p)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list
    
egg.plot_progress_multvar(fx_progress_dict, d, 'successive_RS', True, f_evals_count_dict)


### Variable Prioritisation

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

d=6
for count in [6, 10, 20, 40]:
    graph_name = 'count: {}'.format(count)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 3, prioritise=count)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list

egg.plot_progress_multvar(fx_progress_dict, d, 'var_prioritisation', True, f_evals_count_dict)


### Constrained Analysis


In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

d=6
constrained = False
for w in [0, 1e3, 1e5, 1e6, 1e9]:
    graph_name = 'w: {}'.format(w)
    if w == 0:
        constrained = True
        graph_name = 'Constrained'
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 1, constrained=constrained, w_val = w)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list

egg.plot_progress_multvar(fx_progress_dict, d, 'unconstrained', True, f_evals_count_dict)


### M and N

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

N_vals = [7,20,40, 80]
M_vals = [4,15,30, 60]

d=6
for index,M in enumerate(M_vals):
    N = N_vals[index]
    graph_name = 'N: {}, M: {}'.format(N, M)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 1, N=N, M=M)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list

egg.plot_progress_multvar(fx_progress_dict, d, 'NM2', True, f_evals_count_dict)


### Parameter Variance

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

intensify_vals = [8, 10, 20, 40]
diversify_vals = [12, 15, 30, 60]
reduce_vals = [20, 25, 50, 100]

d=6
for index,I in enumerate(intensify_vals):
    D = diversify_vals[index]
    R = reduce_vals[index]
    graph_name = 'I: {}, D: {}, R: {}'.format(I, D, R)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 1, intensify=I, diversify=D, reduce=R)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list

egg.plot_progress_multvar(fx_progress_dict, d, 'IDR', True, f_evals_count_dict)


### Split

In [None]:
fx_progress_dict = {}
f_evals_count_dict = {}

d=6
for split in [2,3]:
    graph_name = 'Split: {}'.format(split)
    print(graph_name)
    progress_list = []
    f_evals_count_list = []
    for i in tqdm(range(100)):
        f_progress, f_evals = TS_run(d, i, 1,split=split, intensify=20, diversify=30, reduce=50)
        progress_list.append(f_progress)
        f_evals_count_list.append(f_evals)
    fx_progress_dict[graph_name] = progress_list
    f_evals_count_dict[graph_name] = f_evals_count_list

egg.plot_progress_multvar(fx_progress_dict, d, 'Split', True, f_evals_count_dict)