In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time
from scipy.special import gamma as gamma_funct

np.random.seed(42)

In [None]:
### Benchmark Functions ###

class BenchmarkFunction():
    def __init__(self, dim, low, high, target, name):
        self.dim = dim
        self.lower_bound = low
        self.upper_bound = high
        self.target = target
        self.funct_name = name

    def plot_function(self):
        global X, Y, Z
        x = np.linspace(self.lower_bound, self.upper_bound, 100)
        y = np.linspace(self.lower_bound, self.upper_bound, 100)
        X, Y = np.meshgrid(x, y)
        Z = np.zeros_like(X)
        for i in range(len(X)):
            for j in range(len(Y)):
                Z[i, j] = self.compute(np.array([X[i, j], Y[i, j]]))

        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')
        ax.plot_surface(X, Y, Z, cmap='coolwarm', alpha=0.8)

        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        plt.title(f'{self.funct_name} Function')
        plt.show()
    
    def __str__(self):
        return self.funct_name

class Michalewicz(BenchmarkFunction):
    def __init__(self):
        super(Michalewicz, self).__init__(5, 0, np.pi, -4.687658, 'Michalewicz')

    def compute(self, x, m=10):
        n = len(x)
        return -np.sum(np.sin(x) * np.sin((np.arange(1, self.dim+1) * x**2) / np.pi)**(2 * m))

class Rastrigin(BenchmarkFunction):
    def __init__(self):
        super(Rastrigin, self).__init__(30, -5.12, 5.12, 0, 'Rastrigin')

    def compute(self, x):
        return 10*self.dim + np.sum(x**2 - 10 * np.cos(2 * np.pi * x))
    
class Rosenbrock(BenchmarkFunction):
    def __init__(self):
        super(Rosenbrock, self).__init__(2, -100, 100, 0, 'Rosenbrock')

    def compute(self, x):
        return np.sum(100 * (x[1:] - x[:-1]**2)**2 + (1 - x[:-1])**2)
    
class Schwefel(BenchmarkFunction):
    def __init__(self):
        super(Schwefel, self).__init__(30, -500, 500, 0, 'Schwefel')

    def compute(self, x):
        return 418.9829 * self.dim - np.sum(x * np.sin(np.sqrt(np.abs(x))))

class Sphere(BenchmarkFunction):
    def __init__(self):
        super(Sphere, self).__init__(30, -100, 100, 0, 'Sphere')

    def compute(self, x):
        return np.sum(x**2)

In [None]:
### Helpful Plotting Functions ###

# TODO: Extend visualization capabilities to all optimization methods
def plot_pso_update(lower_bound, upper_bound, p_best_pos, p_pos, p_vel, g_best_pos, iter):
    fig, ax = plt.subplots(figsize=(8, 6))
    fig.set_tight_layout(True)

    img = ax.imshow(Z, extent=[lower_bound, upper_bound, lower_bound, upper_bound], origin='lower', cmap='viridis', alpha=0.5)
    fig.colorbar(img, ax=ax)
    ax.plot([0], [0], marker='x', markersize=5, color="white")
    contours = ax.contour(X, Y, Z, 10, colors='black', alpha=0.4)
    ax.clabel(contours, inline=True, fontsize=8, fmt="%.0f")

    for pos, vel, best_pos in zip(p_pos, p_vel, p_best_pos):
        # ax.scatter(best_pos[0], best_pos[1], marker='o', color='black', alpha=0.5)
        ax.scatter(pos[0], pos[1], marker='o', color='blue', alpha=0.5)
        ax.quiver(pos[0], pos[1], vel[0], vel[1], color='blue', width=0.005, angles='xy', scale_units='xy', scale=0.5)

    plt.scatter([g_best_pos[0]], [g_best_pos[1]], marker='*', s=100, color='green', alpha=0.4)
    ax.set_xlim([lower_bound, upper_bound])
    ax.set_ylim([lower_bound, upper_bound])
    plt.title(f'Iteration {iter} Plot')
    plt.show()
    
def plot_convergence(histories, title='Convergence Plot', save=True, save_path=None):
    fig = plt.figure(figsize=(8, 6))
    fig.set_tight_layout(True)
    for i, history in enumerate(histories):
        plt.plot(np.arange(len(history)), history, label=f'Run {i+1}')
        plt.xlabel('Iteration')
        plt.ylabel('Fitness Value')
        plt.title(title)
        plt.legend()

        if save:
            plt.savefig(save_path)
        else:
            plt.show()

In [None]:
# TODO: Revise functional structure into class/object structure?

def particle_swarm(f, num_particles=50, max_iter=5000, w=0.7298, c1=2.05, c2=2.05, visualize=False, verbose=True):      
    # Initialize particles  
    p_pos = np.random.uniform(low=f.lower_bound, high=f.upper_bound, size=(num_particles, f.dim))
    p_vel = np.zeros((num_particles, f.dim))
    p_best_pos = p_pos.copy()
    p_best_val = np.array([f.compute(pos) for pos in p_pos])

    g_best_idx = np.argmin(p_best_val)
    g_best_pos = p_best_pos[g_best_idx]
    g_best_val = p_best_val[g_best_idx]

    g_best_val_history = []
    converge_val = 0
    converge_idx = 0
    converge_count = 0
    for i in range(max_iter):
        r1 = np.random.uniform(size=(num_particles, f.dim))
        r2 = np.random.uniform(size=(num_particles, f.dim))
        
        # Update particle velocity and position
        p_vel = (w * p_vel) + (c1 * r1 * (p_best_pos - p_pos)) + (c2 * r2 * (g_best_pos - p_pos))
        p_pos = p_pos + p_vel
        
        # Restrict position to set range
        p_pos = np.clip(p_pos, f.lower_bound, f.upper_bound)
        
        # Update pbest
        update_p_best_val = np.array([f.compute(pos) for pos in p_pos])
        update_mask = update_p_best_val < p_best_val
        p_best_pos[update_mask] = p_pos[update_mask]
        p_best_val[update_mask] = update_p_best_val[update_mask]
        
        # Update gbest
        g_best_idx = np.argmin(p_best_val)
        # Check convergence
        if p_best_val[g_best_idx] < g_best_val:
            g_best_pos = p_best_pos[g_best_idx]
            g_best_val = p_best_val[g_best_idx]
            converge_count = 0
        else:
            converge_count += 1
            if converge_count == (max_iter * 0.2):
                converge_val = g_best_val
                converge_idx = (i+1) - converge_count
        g_best_val_history.append(g_best_val)

        if visualize and i % (max_iter * 0.1) == 0:
            plot_pso_update(f.lower_bound, f.upper_bound, p_best_pos, p_pos, p_vel, g_best_pos, i)
        
        if verbose:
            print(f"Iteration {i+1}: Global best value = {round(g_best_val, 6)} at position {np.round(g_best_pos, 4)}")

    if round(converge_val, 6) == round(g_best_val, 6):
        return g_best_val, g_best_val_history, converge_idx
    else:
        return g_best_val, g_best_val_history, i+1

In [None]:
def select_parents(population, fitness, tourney_size=3):
    parent_idxs = []
    for _ in range(2):
        # Select 3 individuals at random 
        tourney_idxs = np.random.choice(population.shape[0], size=tourney_size, replace=False)
        tourney_vals = fitness[tourney_idxs]
        # Select fittest of 3 as parent
        winner_idx = tourney_idxs[np.argmin(tourney_vals)]
        parent_idxs.append(winner_idx)

    return population[parent_idxs]

def crossover(parent1, parent2, dim):
    # Select dimension crossover point at random
    crossover_pt = np.random.randint(0, dim)
    child1 = np.concatenate((parent1[:crossover_pt], parent2[crossover_pt:]))
    child2 = np.concatenate((parent2[:crossover_pt], parent1[crossover_pt:]))
    return child1, child2

def mutate(individual, dim, mutation_rate):
    # Introduce slight perturbation as mutation
    mask = np.random.rand(dim) < mutation_rate
    individual[mask] += np.random.uniform(-0.5, 0.5, size=np.sum(mask))
    return individual

def genetic(f, num_individuals=50, max_iter=5000, cross_rate=0.9, mut_rate=0.05, verbose=True):
    # Initialize individuals
    p_pos = np.random.uniform(f.lower_bound, f.upper_bound, size=(num_individuals, f.dim))
    p_vals = np.array([f.compute(pos) for pos in p_pos])

    g_best_idx = np.argmin(p_vals)
    g_best_pos = p_pos[g_best_idx]
    g_best_val = p_vals[g_best_idx]
    
    g_best_val_history = []
    converge_val = 0
    converge_idx = 0
    converge_count = 0
    for i in range(max_iter):
        next_gen = []
        for j in range(num_individuals//2):
            # Select parents
            parent1, parent2 = select_parents(p_pos, p_vals)

            if np.random.rand() < cross_rate:
                # Generate offspring with crossover and potential mutation
                child1, child2 = crossover(parent1, parent2, f.dim)
                next_gen.extend([mutate(child1, f.dim, mut_rate), mutate(child2, f.dim, mut_rate)])
            else:
                # Treat parents as next generaiton with potential mutation
                next_gen.extend([mutate(parent1, f.dim, mut_rate), mutate(parent2, f.dim, mut_rate)])

        # Update position
        p_pos = np.array(next_gen)
        # Restrict position to set range
        p_pos = np.clip(p_pos, f.lower_bound, f.upper_bound)
        p_vals = np.array([f.compute(pos) for pos in p_pos])

        # Update gbest
        g_best_idx = np.argmin(p_vals)
        # Check convergence
        if p_vals[g_best_idx] < g_best_val:
            g_best_pos = p_pos[g_best_idx]
            g_best_val = p_vals[g_best_idx]
            converge_count = 0
        else:
            converge_count += 1
            if converge_count == (max_iter * 0.2):
                converge_val = g_best_val
                converge_idx = (i+1) - converge_count
        g_best_val_history.append(g_best_val)
            
        if verbose:
            print(f"Iteration {i+1}: Global best value = {round(g_best_val, 6)} at position {np.round(g_best_pos, 4)}")

    if round(converge_val, 6) == round(g_best_val, 6):
        return g_best_val, g_best_val_history, converge_idx
    else:
        return g_best_val, g_best_val_history, i+1

In [None]:
def simulate_employed_bees(f, population, fitness):
    update_population = population.copy()
    for i in range(len(population)):
        # Investigate neighbors
        j = np.random.randint(0, population.shape[0])
        k = np.random.randint(0, population.shape[1])

        # Update position
        update_population[k] = population[i][k] + np.random.uniform(-1, 1) * (population[i][k] - population[j][k])
        
    update_population = np.clip(update_population, f.lower_bound, f.upper_bound)
    update_fitness = np.array([f.compute(individual) for individual in update_population])
    update_mask = update_fitness < fitness
    population[update_mask] = update_population[update_mask]
    fitness[update_mask] = update_fitness[update_mask]
    
    return population, fitness

def simulate_onlooker_bees(f, population, fitness):
    # Determine probability of investigating solution further
    sel_prob = fitness / np.sum(fitness)
    sel_idxs = np.random.choice(population.shape[0], population.shape[0], p=sel_prob)
    update_population = population[sel_idxs]
    # Follow employed bees
    return simulate_employed_bees(f, update_population, np.array([f.compute(individual) for individual in update_population]))

def simulate_scout_bees(f, population, fitness, best_fitness, stagnation_count, stagnation_lim):
    for i in range(len(population)):
        if fitness[i] >= best_fitness:
            stagnation_count[i] += 1 
            # Check for excessive stagnation
            if stagnation_count[i] >= stagnation_lim:
                # Select new position at random upon stagnation
                population[i] = np.random.uniform(f.lower_bound, f.upper_bound, population.shape[1])
                fitness[i] = f.compute(population[i])
                stagnation_count[i] = 0
        
    return population, fitness, stagnation_count 

def artificial_bee_colony(f, num_bees=50, max_iter=2000, stagnation_lim=50, verbose=True):
    # Initialize bees
    p_pos = np.random.uniform(f.lower_bound, f.upper_bound, size=(num_bees, f.dim))
    p_vals = np.array([f.compute(pos) for pos in p_pos])
    stagnation_count = np.zeros(num_bees, dtype=int)

    g_best_idx = np.argmin(p_vals)
    g_best_pos = p_pos[g_best_idx]
    g_best_val = p_vals[g_best_idx]

    g_best_val_history = []
    converge_val = 0
    converge_idx = 0
    converge_count = 0
    for i in range(max_iter):
        # Run employed, onlooker, and scout phases
        p_pos, p_vals = simulate_employed_bees(f, p_pos, p_vals)
        p_pos, p_vals = simulate_onlooker_bees(f, p_pos, p_vals)
        p_pos, p_vals, stagnation_count = simulate_scout_bees(f, p_pos, p_vals, g_best_val, stagnation_count, stagnation_lim)

        # Update gbest
        g_best_idx = np.argmin(p_vals)
        # Check for convergence
        if p_vals[g_best_idx] < g_best_val:
            g_best_pos = p_pos[g_best_idx]
            g_best_val = p_vals[g_best_idx]
            converge_count = 0
        else:
            converge_count += 1
            if converge_count == (max_iter * 0.2):
                converge_val = g_best_val
                converge_idx = (i+1) - converge_count
        g_best_val_history.append(g_best_val)
            
        if verbose:
            print(f"Iteration {i+1}: Global best value = {round(g_best_val, 6)} at position {np.round(g_best_pos, 4)}")

    if round(converge_val, 6) == round(g_best_val, 6):
        return g_best_val, g_best_val_history, converge_idx
    else:
        return g_best_val, g_best_val_history, i+1

In [None]:
def simulate_levy_flights(f, population, fitness, beta):
    update_population = population.copy()
    # Apply Levy flight (random walk)
    sigma_u = (gamma_funct(1 + beta) * np.sin(np.pi * beta / 2) / (gamma_funct((1 + beta) / 2) * beta * 2**((beta - 1) / 2)))**(1 / beta)
    sigma_v = 1
    u = np.random.normal(0, sigma_u, (population.shape[0], population.shape[1]))
    v = np.random.normal(0, sigma_v, (population.shape[0], population.shape[1]))
    step = u / (np.abs(v) ** (1 / beta))
    
    # Update position
    update_population += step
    update_population = np.clip(update_population, f.lower_bound, f.upper_bound)
    update_fitness = np.array([f.compute(individual) for individual in update_population])
    update_mask = update_fitness < fitness
    population[update_mask] = update_population[update_mask]
    fitness[update_mask] = update_fitness[update_mask]

    return population, fitness

def cuckoo(f, num_nests=50, max_iter=5000, beta=1.5, verbose=True):
    # Initialize nests
    p_pos = np.random.uniform(f.lower_bound, f.upper_bound, size=(num_nests, f.dim))
    p_vals = np.array([f.compute(pos) for pos in p_pos])

    g_best_idx = np.argmin(p_vals)
    g_best_pos = p_pos[g_best_idx]
    g_best_val = p_vals[g_best_idx]

    g_best_val_history = []
    converge_val = 0
    converge_idx = 0
    converge_count = 0
    for i in range(max_iter):
        # Update position based on random walk
        p_pos, p_vals = simulate_levy_flights(f, p_pos, p_vals, beta)
        
        # Update gbest
        g_best_idx = np.argmin(p_vals)
        # Check convergence
        if p_vals[g_best_idx] < g_best_val:
            g_best_pos = p_pos[g_best_idx]
            g_best_val = p_vals[g_best_idx]
            converge_count = 0
        else:
            converge_count += 1
            if converge_count == (max_iter * 0.2):
                converge_val = g_best_val
                converge_idx = (i+1) - converge_count
        g_best_val_history.append(g_best_val)
            
        if verbose:
            print(f"Iteration {i+1}: Global best value = {round(g_best_val, 6)} at position {np.round(g_best_pos, 4)}")

    if round(converge_val, 6) == round(g_best_val, 6):
        return g_best_val, g_best_val_history, converge_idx
    else:
        return g_best_val, g_best_val_history, i+1

In [None]:
def firefly(f, num_fireflies=50, max_iter=5000, alpha=0.2, beta=1.0, gamma=1.0, verbose=True):
    # Initialize fireflies
    p_pos = np.random.uniform(f.lower_bound, f.upper_bound, size=(num_fireflies, f.dim))
    p_vals = np.array([f.compute(pos) for pos in p_pos])

    g_best_idx = np.argmin(p_vals)
    g_best_pos = p_pos[g_best_idx]
    g_best_val = p_vals[g_best_idx]

    g_best_val_history = []
    converge_val = 0
    converge_idx = 0
    converge_count = 0
    for i in range(max_iter):
        # Cycle through population
        for j in range(len(p_pos)):
            for k in range(len(p_pos)):
                if p_vals[j] > p_vals[k]:
                    # Determine attractiveness value
                    attractiveness = np.exp(-gamma * np.linalg.norm(p_pos[k] - p_pos[j])**2)
                    # Update position
                    p_pos[j] += alpha * attractiveness + beta * (np.random.rand(p_pos.shape[1]) - 0.5)
        
        # Restrict position to set range
        p_pos = np.clip(p_pos, f.lower_bound, f.upper_bound)
        p_vals = np.array([f.compute(pos) for pos in p_pos])

        # Update gbest
        g_best_idx = np.argmin(p_vals)
        # Check for convergence
        if p_vals[g_best_idx] < g_best_val:
            g_best_pos = p_pos[g_best_idx]
            g_best_val = p_vals[g_best_idx]
            converge_count = 0
        else:
            converge_count += 1
            if converge_count == (max_iter * 0.2):
                converge_val = g_best_val
                converge_idx = (i+1) - converge_count
        g_best_val_history.append(g_best_val)
            
        if verbose:
            print(f"Iteration {i+1}: Global best value = {round(g_best_val, 6)} at position {np.round(g_best_pos, 4)}")

    if round(converge_val, 6) == round(g_best_val, 6):
        return g_best_val, g_best_val_history, converge_idx
    else:
        return g_best_val, g_best_val_history, i+1

In [None]:
### Experimentation ###
EXPERIMENT_COUNT = 10
POPULATION_SIZE = 50
MAX_ITER = 5000

benchmarks = [Michalewicz(), Rastrigin(), Rosenbrock(), Schwefel(), Sphere()]
optimizers = [particle_swarm, genetic, artificial_bee_colony, cuckoo]
# optimizers = [firefly]
optimizer_names = ['PSO', 'GA', 'ABC', 'CSO']
# optimizer_names = ['FA']

index = pd.MultiIndex.from_product([[str(funct) for funct in benchmarks], optimizer_names], names=['Benchmark', 'Optimizer'])
cols = ['Avg Convergence Value', 'Min Error', 'Max Error', 'Avg Error', 'Min Convergence Time', 'Max Convergence Time', 
        'Avg Convergence Time', 'Avg Runtime']
df = pd.DataFrame(index=index, columns=cols)

for benchmark in benchmarks:
    for optimizer, name in zip(optimizers, optimizer_names):
        converge_values = []
        converge_iters = []
        errors = []
        histories = []
        runtimes = []
        for i in range(EXPERIMENT_COUNT):
            start = time.time()
            global_best_value, history, iters = optimizer(benchmark, POPULATION_SIZE, MAX_ITER, verbose=False)
            end = time.time()

            converge_values.append(global_best_value)
            converge_iters.append(iters)
            errors.append(abs(benchmark.target - global_best_value))
            histories.append(history)
            runtimes.append(end - start)
        
        # Save benchmark results
        df.loc[(str(benchmark), name), :] = [np.mean(converge_values), np.min(errors), np.max(errors), np.mean(errors), 
                                             np.min(converge_iters), np.max(converge_iters), np.mean(converge_iters), np.mean(runtimes)]
        
        # Save convergence plot
        plot_convergence(histories, title=f'{benchmark}-{name} Convergence Plot', save=True, save_path=f'{benchmark}-{name}.png')

    print(f"Finished {benchmark} experiment")

In [None]:
df.to_csv('exp_results.csv')

In [3]:
# Read results file back in for further analysis
df = pd.read_csv('exp_results.csv', index_col=['Benchmark', 'Optimizer'])
df

Unnamed: 0_level_0,Unnamed: 1_level_0,Avg Convergence Value,Min Error,Max Error,Avg Error,Min Convergence Time,Max Convergence Time,Avg Convergence Time,Avg Runtime
Benchmark,Optimizer,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
Michalewicz,PSO,-4.687658,1.790881e-07,1.790882e-07,1.790881e-07,1371,2234,1799.9,9.321846
Michalewicz,GA,-4.598422,9.264506e-07,0.1917681,0.089236,1481,5000,3550.2,34.971397
Michalewicz,ABC,-3.085652,0.9318841,1.954052,1.602006,250,5000,2700.1,51.863567
Michalewicz,CSO,-3.996531,0.4283008,0.973389,0.6911268,191,5000,2917.8,11.218465
Rastrigin,PSO,231.440862,178.9065,260.6672,231.4409,1612,5000,3422.8,7.666779
Rastrigin,GA,70.776184,57.20876,90.50374,70.77618,3054,5000,4544.3,39.019371
Rastrigin,ABC,4e-06,7.711651e-08,1.081968e-05,3.934633e-06,73,5000,2106.4,30.682508
Rastrigin,CSO,274.662314,246.3991,291.2405,274.6623,2624,5000,3373.2,7.032457
Rosenbrock,PSO,0.0,0.0,0.0,0.0,0,3929,2188.6,3.972424
Rosenbrock,GA,41.840955,0.0002069714,79.69632,41.84095,1992,5000,4208.7,18.070384


In [5]:
# Read FA results file back in for further analysis
fa_df = pd.read_csv('FA_results.csv', index_col=['Benchmark', 'Optimizer'])
fa_df

Unnamed: 0_level_0,Unnamed: 1_level_0,Avg Convergence Value,Min Error,Max Error,Avg Error,Min Convergence Time,Max Convergence Time,Avg Convergence Time,Avg Runtime
Benchmark,Optimizer,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
Michalewicz,FA,-3.626973,0.567622,1.335079,1.060685,1138,5000,3257.2,95.856404
Rastrigin,FA,307.259748,276.394949,325.77031,307.259748,211,5000,2258.9,102.167338
Rosenbrock,FA,0.172053,0.026045,0.585758,0.172053,1374,5000,3185.0,104.038228
Schwefel,FA,9747.783627,9302.479995,9930.131878,9747.783627,3291,5000,4722.0,104.940011
Sphere,FA,55114.593019,50168.390113,59963.004589,55114.593019,2293,5000,4384.3,103.333427
