# Ant Colony Optimization (ACO) Demo

### Imports

In [45]:
import numpy as np
import pandas as pd
from time import time, sleep
import datetime

### Define global constants and variables

In [82]:
N_DIM = 2                     # Number of dimension
MAX_ITERATION = 1000          # Max. number of iterations
N_SIZE = 50                   # Solution size
N_ANTS = 10                   # Number of ants
RHO = 0.1                     # Decay factor
SIGMA = 0.1                   #Local pheromone coefficient
BETA = 2.5                    # Heuristic coefficient
Q0 = 0.9                      # Greediness factor
XI = 0.65                     # Pheromone evaporation
DIVIDER = "-----" * 10
CARRIAGE_RETURN = "\n"
N_EPOCHS_FOR_DISPLAY = 50

### Objective functions

In [83]:

def dp(x, axis=1):
    """
    Diagonal Plane
    Range: l_bound, u_bound = 0.5, 1.5
    n_dims = 10
    """
    y = np.mean(x, axis=axis)
    return y

def sphere(x):
    """
    Sphere
    Range: l_bound, u_bound = -3, 7
    n_dims = 10
    """
    y = np.sum(np.power(x,2))
    return y
    
    

### Objective Function and Range map

In [89]:
cost_function_map = {
    "case_1": {
        "range": [1.0, 1.5],
        "n_dim": 10,
        "func": dp
    },
    "case_2": {
        "range": [-3.0, 7.0],
        "n_dim": 10,
        "func": sphere
    }
}

### ACO helper functions

In [90]:
def formatTD(td):
    """ Format time output for report"""
    days = td.days
    hours = td.seconds // 3600
    minutes = (td.seconds % 3600) // 60
    seconds = td.seconds % 60
    result = f"{minutes} mins {seconds} secs"
    #return '%s days %s h %s m %s s' % (days, hours, minutes, seconds)
    return result

def getProblemBounds():
    """
    gets the problem lower and upper bounds
    """
    problem_range = cost_function_map["case_1"]["range"]
    l_bound = np.array([problem_range[0]]*N_DIM)
    u_bound = np.array([problem_range[1]]*N_DIM)
    return l_bound, u_bound


def valueFunction(alpha, beta, lambda_value, x):
    """
    Computation of the value function
    """
    n = len(x)
    value = np.zeros((n))
    for i in range(n):
        if x[i] >= 0:
            value[i] = np.power(x[i], alpha)
        else:
            value[i] = -lambda_value*np.power(x[i], beta)
    return value

def weightProbability(x, gamma):
    """
    Computes the weight probability
    """
    n = len(x)
    prob = np.zeros((n))
    for i in range(n):
        if x[i] < 1.0:
            prob[i] = np.power(x[i], gamma)/(np.power(np.power(x[i], gamma) + np.power(1 - x[i], gamma), (1/gamma)))
        else:
            prob[i] = 1.0
    return prob
    
def computeTotalRunTime(start_time):
    """
    Computes the total run time for the ACO main loop
    """
    total_time_s = time() - start_time
    total_time = datetime.timedelta(seconds=total_time_s)
    total_time = formatTD(total_time)
    return total_time

def initializeSolution():
    """
    Initializes the heuristic solution space/population
    """
    solution = np.zeros((N_SIZE, N_DIM + 1))
    func = cost_function_map["case_1"]["func"]
    l_bound, u_bound = getProblemBounds()
    print(f"lower and upper bound values are: {l_bound} and {u_bound}")
    for k in range(N_SIZE):
        rand_values = np.random.rand(N_ANTS, N_DIM)
        s_rand = np.multiply(u_bound - l_bound, rand_values) + l_bound
        f_x = func(s_rand, axis=1)
        f_best = np.amin(f_x)
        k_best = np.argmin(f_x)
        solution[k,:N_DIM] = s_rand[k_best,:]
        solution[k,N_DIM] = f_best
    solution_sorted = solution[solution[:,N_DIM].argsort()]
    mu = 1.0
    sigma = Q0*N_SIZE
    w = np.random.normal(mu, sigma, size=(N_SIZE))
    return solution_sorted, w

def cost():
    """
    Cost function
    """
    pass

def initiaalizePheromone(pheromone_init, cost_function_map):
    """
    Initializes the pheromone solution
    """
    
    
    
    
def constructSolution(pheromone, 
                      problem_size, 
                      heuristic_coef, 
                      greediness_factor):
    """
    Constructs the ACO solution
    """
    pass

def updateAndDecayLocalPheromone(pheromone, 
                                 soln, 
                                 soln_cost, 
                                 local_pheromone_factor):
    """
    Update and decay local pheromone generation
    """
    pass

def updateAndDecayGlobalPheromone(pheromone, 
                                 prob_soln, 
                                 prob_soln_cost, 
                                 decay_factor):
    """
    Update and decay local pheromone generation
    """
    pass


def acoMainLoop():
    """
    Main loop for the Ant Colony Optimization routine
    """
    start_time = time()
    best_solution_tally = []
    func = cost_function_map["case_1"]["func"]
    l_bound, u_bound = getProblemBounds()
    solution_0, w = initializeSolution()
    solution_table_best = solution_0[:]
    print(f"Start of the ACO main loop...{CARRIAGE_RETURN}")
    for epoch in range(MAX_ITERATION):        
        if epoch % N_EPOCHS_FOR_DISPLAY == 0:
            print(f"{DIVIDER}")
            print(f"Current epoch: {epoch}")
            print(f"{CARRIAGE_RETURN}{CARRIAGE_RETURN}")
            
        
        # choose Gaussian function to compose Gaussian kernel
        p = w/sum(w)

        # find best and index of best
        max_prospect = np.amax(p)
        ix_prospect = np.argmax(p)
        selection = ix_prospect
        if epoch % N_EPOCHS_FOR_DISPLAY == 0:
            print(f"Selected probability and index are: {max_prospect} and {selection}")
        
        # calculation of G_i - find standard deviation, sigma first
        sigma_s = np.zeros((N_DIM,1))
        sigma = np.zeros((N_DIM,1))
        for i in range(N_DIM):
            for j in range(N_SIZE):
                sigma_s[i] = sigma_s[i] + abs(solution_table_best[j][i] - solution_table_best[selection][i])
            sigma[i] = XI / (N_SIZE - 1) * sigma_s[i]
            
        # Update the ACO solution
        solution_temp = np.zeros((N_ANTS,N_DIM))
        solution_sample = np.zeros((N_ANTS,N_DIM + 1))
        for k in range(N_ANTS):
            for i in range(N_DIM):
                solution_temp[k][i] = sigma[i] * np.random.random_sample() + solution_table_best[selection][i]
                if solution_temp[k][i] > u_bound[i]:
                    solution_temp[k][i] = u_bound[i]
                elif solution_temp[k][i] < l_bound[i]:
                    solution_temp[k][i] = l_bound[i]                    
        f_solution = func(solution_temp, axis=1)        
        solution_sample[:,:N_DIM] = solution_temp
        solution_sample[:,N_DIM] = f_solution
        
        # add new solutions in the solutions table
        solution_table_current = np.vstack((solution_0, solution_sample))
        
        # sort according to "fitness"
        solution_table_current_sorted = solution_table_current[solution_table_current[:,N_DIM].argsort()]
        
        # keep best solutions
        solution_table_best = solution_table_current_sorted[:N_SIZE][:]
        if epoch % N_EPOCHS_FOR_DISPLAY == 0:
            print(f"solution_table_best[:5]: {solution_table_best[:5]}")
    
        # keep best after each iteration
        best_solution_tally.append(solution_table_best[0])        
        
    run_time = computeTotalRunTime(start_time)
    best_solution = sorted(best_solution_tally, key=lambda row: row[-1],reverse = False)
    best_parameters = best_solution[0][0:N_DIM]
    best_fitness = best_solution[0][N_DIM] 
    print(f"End of the ACO main loop..{CARRIAGE_RETURN}")
    print(f"best parameters are: {best_parameters}")
    print(f"best response is: {best_fitness}")
    print(f"ACO compute time is: {run_time}")


In [91]:
# solution_sorted, w = initializeSolution()
# start_time = time()
# sleep(73.0)
# computeTotalRunTime(start_time)
acoMainLoop()

lower and upper bound values are: [1. 1.] and [1.5 1.5]
Start of the ACO main loop...

--------------------------------------------------
Current epoch: 0



Selected probability and index are: 0.19845228293435455 and 47
solution_table_best[:5]: [[1.02235484 1.01723499 1.01979491]
 [1.01614561 1.02659123 1.02136842]
 [1.04038448 1.00949577 1.02494013]
 [1.02150872 1.03308711 1.02729792]
 [1.00223056 1.05351648 1.02787352]]
--------------------------------------------------
Current epoch: 50



Selected probability and index are: 0.19845228293435455 and 47
solution_table_best[:5]: [[1.02235484 1.01723499 1.01979491]
 [1.01614561 1.02659123 1.02136842]
 [1.04038448 1.00949577 1.02494013]
 [1.02150872 1.03308711 1.02729792]
 [1.00223056 1.05351648 1.02787352]]
--------------------------------------------------
Current epoch: 100



Selected probability and index are: 0.19845228293435455 and 47
solution_table_best[:5]: [[1.02235484 1.01723499 1.01979491]
 [1.01614561 1.02659123 1.02136842]

In [92]:
print(f"solution_sorted[0][2] = {solution_sorted[0][2]} solution_sorted[0,2] = {solution_sorted[0,2]}")

solution_sorted[0][2] = 0.5305641847045905 solution_sorted[0,2] = 0.5305641847045905


In [None]:
l_bound.shape

In [None]:
rand_values.shape

In [None]:
s_rand.shape

In [21]:
for i in range(1000):
    if i % 100 == 0:
        print(i)

0
100
200
300
400
500
600
700
800
900
