## Create distance matrices

In [1]:
# Create distance matrix (excluding distance from each point to itself)
import random
import copy
import numpy as np
import xml.etree.ElementTree as ET
import ipyparallel as ipp
from ipyparallel.util import interactive

burma_tree = ET.parse('burma14.xml')
burma_root = burma_tree.getroot()
brazil_tree = ET.parse('brazil58.xml')
brazil_root = brazil_tree.getroot()

d = []
edge_num = 14
country = "burma" #brazil, burma

if country == "brazil":
    edge_num = 58
    
    for vertex in brazil_root.iter('vertex'):
        temp = [0]*edge_num
        for edge in vertex.iter('edge'):
            temp[int(edge.text)] = (float(edge.get('cost')))
        d.append(temp)
else:
    for vertex in burma_root.iter('vertex'):
        temp = [0]*edge_num
        for edge in vertex.iter('edge'):
            temp[int(edge.text)] = (float(edge.get('cost')))
        d.append(temp)

    
d = np.array(d)

# Methods

In [2]:
def remove_current_city(city, H):
    """
    Removes current node from hueristic matrix

    :param city: the current node
    :param H: hueristic matrix
    :return: new heuristic matrix
    """ 
    
    for index, x in np.ndenumerate(H):
        if city == index[1] or x == 0:
            H[index] = 0
        else:
            H[index] = round(1/d[index[0]][index[1]],4)
    return H

def create_probabilities(city, H, T, alpha, beta):
    """
    Create probability matrix using heuristic and pheromones

    :param city: the current node
    :param H: hueristic matrix
    :param T: pheromone matrix
    :param alpha: alpha hyper parameter
    :param beta: beta hyper parameter
    :return: probability matrix
    """ 
    
    P = np.zeros(edge_num)
    den = 0

    # for each value in column of current city, plug number into formula x^alpha * y^beta
    
    # Numerators
    N = np.array(T[city, :] **alpha * H[city, :] **beta)
    
    # Denominator (sum of all numerators)
    den = np.sum(N)
    
    # Probabilities
    P = N/den
    
    return P

def find_next_city(P, city):
    """
    Determines the next node for an ant to go to

    :param P: probability matrix
    :param city: current node
    :return: next node
    """ 
    rand = random.random()
    CP = 0
    for i in range(edge_num):
        CP += P[i]
        if CP >= rand:
            return i
        
def create_ants(num, start, H, T, alpha, beta):
    """
    Create set of ants

    :param num: number of ants to create
    :param alpha: alpha hyper parameter
    :param beta: beta hyper parameter
    :return: list of ant objects
    """

    # Initial probabilities
    P = create_probabilities(start, copy.deepcopy(H), copy.deepcopy(T), alpha, beta)

    # Ant dictionary
    ant_dict = {
    "H": copy.deepcopy(H),
    "T": copy.deepcopy(T),
    "P": copy.deepcopy(P),
    "Length": 0,
    "Path": [0]
    }

    # Create ants
    Ants = np.array([copy.deepcopy(ant_dict) for i in range(num + 1)])

    return Ants

def run_ants(start, Ants, alpha, beta):
    """
    Run the ants over the nodes storing a path and length for each

    :param start: starting node
    :param Ants: list of ant objects
    :param alpha: alpha hyper parameter
    :param beta: beta hyper parameter
    :return: void
    """ 
    
    def run_singular_ant(ant):
        
        # Start city
        current_city = start
        
        # Go to every node
        for i in range(edge_num - 1):
            
            # Remove this city from heuristics
            ant["H"] = remove_current_city(current_city, ant["H"])

            # Create probabilities
            ant["P"] = create_probabilities(current_city, ant["H"], ant["T"], alpha, beta)

            # Get next city to go to
            temp = current_city
            current_city = find_next_city(ant["P"], temp)

            # Update length
            ant["Length"] += d[temp][current_city]

            # Update path
            ant["Path"].append(current_city)

            # Add the length to get back to starting node
            
 
        ant["Length"] += d[current_city][start]
        ant["H"] = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])
    
    v_run_ant = np.vectorize(run_singular_ant)
    v_run_ant(Ants)
    
    Ants = Ants[1 :]
    # return all but first ant (dummy first ant for vectorizing to establish types by running first element twice https://numpy.org/doc/stable/reference/generated/numpy.vectorize.html)
    return Ants


def evapourate(T, e):
    """
    Evapourate pheromones

    :param e: evapouration rate
    :return: void
    """ 
            
    return np.multiply(T, (1-e))
    

def pheromone_update(Ants, Q, T):
    """
    Updates pheromones for acs approach

    :param Ants: list of ant objects
    :param Q: hyper parameter to scale pheromones
    :return: void
    """
    
    for ant in Ants:
        # Determine the amount to deposit
        delta = Q/ant["Length"]

        
        # Deposit on each path taken
        x = [[item] for item in ant["Path"][1::]]
        y = [[item] for item in ant["Path"][:-1]]
        

        indices = np.hstack([y, x])
        

        np.add.at(T, (indices[:, 0], indices[:, 1]), delta)
            
    return T

def find_best(Ants):
    """
    Find the best solution

    :param Ants: list of ant objects
    :return: best solution (path and length)
    """ 
    
    # Set best to first ant
    smallest = Ants[0]["Length"]
    path = Ants[0]["Path"]
    
    # Find the smallest length
    for ant in Ants:
        if ant["Length"] < smallest:
            smallest = ant["Length"]
            path = ant["Path"]

    return {"Length": smallest, "Path": path}

# Tests

In [3]:
def remove_current_city_test():
    # Initial Hueristic Matrix
    H = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])

    current_city = 2
    H = remove_current_city(current_city, H)
    
    assert(H[0, 2] == 0 and H[1, 2] == 0 and H[2, 2] == 0 and H[3, 2] == 0 and H[4, 2] == 0)

def create_probabilities_test():
    from fractions import Fraction
    # Initial Hueristic Matrix
    H = np.array([[1, 2, 3], [2, 2, 2]])
    
    # Initial Pheromones Matrix
    T = np.array([[2, 2, 2], [5, 2, 1]])
    
    # Other parameters
    current_city = 0
    alpha = 1
    beta = 2
    
    P =  create_probabilities(current_city, H, T, alpha, beta)
    
    assert(round(P[0], 4) == 0.0714)
    assert(round(P[1], 4) == 0.2857)
    assert(round(P[2], 4) == 0.6429)
    
def create_ants_test():
    # Initial Hueristic Matrix
    H = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])
    
     # Initial Pheromones Matrix
    T = np.array([[random.random() for j in range(edge_num)] for i in range(edge_num)])
    
    num = 3
    start = 0
    alpha = 1
    beta = 2
    
    Ants = create_ants(num, start, H, T, alpha, beta)
    
    assert(len(Ants) == num)
    assert(Ants[0]["Length"] == 0)
    assert(Ants[0]["Path"] == [0])
    np.testing.assert_array_equal(Ants[0]["H"], H)
    np.testing.assert_array_equal(Ants[0]["T"], T)
    
    
def run_ants_test():
    # Initial Hueristic Matrix
    H = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])
    
    # Initial Pheromones Matrix
    T = np.array([[random.random() for j in range(edge_num)] for i in range(edge_num)])
    
    num = 2
    start = 0
    alpha = 1
    beta = 2
    
    Ants = create_ants(num, start, H, T, alpha, beta)
    Ants = run_ants(start, Ants, alpha, beta)
    
    for ant in Ants:
        print(ant["Length"])
    
    
def evapourate_test():
    
    # Evapouration rate
    e = 0.5
    
    # Initial Pheromones Matrix
    T = [[1 for j in range(edge_num)] for i in range(edge_num)]
    
    # Test Pheromones Matrix
    T_test = [[0.5 for j in range(edge_num)] for i in range(edge_num)]
    T = evapourate(T, e)
    
    # Assertion
    np.testing.assert_array_equal(T, T_test)
    

def pheromone_update_test():
    # Initial Hueristic Matrix
    H = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])
    
     # Initial Pheromones Matrix
    T = np.array([[1 for j in range(edge_num)] for i in range(edge_num)])
    
    num = 3
    start = 0
    alpha = 1
    beta = 2
    
    Ants = create_ants(num, start, H, T, alpha, beta)
    Ants = Ants[1 :]
    Q = 10
    
    for ant in Ants:
        ant["Length"] = 1
        ant["Path"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
        
    pheromone_update(Ants, Q, T)
    
    assert(T[0][1] == 31)
    assert(T[2][3] == 31)
    assert(T[5][6] == 31)
    assert(T[9][10] == 31)
    assert(T[12][13] == 31)
    
def find_best_test():
    # Initial Hueristic Matrix
    H = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])
    
    # Initial Pheromones Matrix
    T = np.array([[1 for j in range(edge_num)] for i in range(edge_num)])
    
    num = 3
    start = 0
    alpha = 1
    beta = 2
    
    Ants = create_ants(num, start, H, T, alpha, beta)
    Ants = Ants[1 :]
    
    for i in range(3):
        Ants[i]["Length"] = i*10
        Ants[i]["Path"] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
        
    best = find_best(Ants)
    assert(best["Length"] == 0)

In [4]:
# Main loop
def create_and_run_ants(num_ants, num_iters, starting_city, alpha, beta, Q, e, T, H):
    """
    Create and run a set of ants

    :param num_ants: number of ants to create
    :param starting_city: starting node
    :param alpha: alpha hyper parameter
    :param beta: beta hyper parameter
    :param Q: hyper parameter to scale pheromones
    :param mmas: mmas properties
    :param approach_type: aco variation
    :param e: evapouration rate
    :return: list of ant objects
    """ 

    
    # Loop for number of iterations
    for i in range(num_iters):
        
        # Create and run ants
        Ants = create_ants(num_ants, starting_city, H, T, alpha, beta)
        run_ants(starting_city, Ants, alpha, beta)
    
        # Update pheromones
        T = pheromone_update(Ants, Q, T)
        T = evapourate(T, e)
        
        # Keep track of global best
        local_best = find_best(Ants)
        if i == 0 or (local_best["Length"] < global_best["Length"]):
            global_best = local_best
        
    return global_best

In [13]:
T = np.array([[random.random() for j in range(edge_num)] for i in range(edge_num)])
H = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])

# Start n clusters
with ipp.Cluster(n=10) as rc:
    view = rc[:]

    # Define variables in correct space
    view.execute("import copy, random")
    view.execute("import numpy as np")
    view['H'] = H
    view['create_probabilities'] = create_probabilities
    view['create_ants'] = create_ants
    view['T'] = T
    view['remove_current_city'] = remove_current_city
    view['find_next_city'] = find_next_city
    view['edge_num'] = edge_num
    view['run_ants'] = run_ants
    view['d'] = d
    view['create_and_run_ants'] = create_and_run_ants
    view['find_best'] = find_best
    view['pheromone_update'] = pheromone_update
    view['evapourate'] = evapourate

    # Parameters
    num_ants = 100
    num_iterations = round(10_000 / num_ants)
    starting_city = 0
    alpha = 1
    beta = 2
    Q = 1
    e = 0.5

    # Run async task to create and run ants
    asyncresult = view.apply_async(create_and_run_ants, num_ants, num_iterations, starting_city, alpha, beta, Q, e, T, H)
    asyncresult.wait_interactive()
    result = asyncresult.get()

    for solution in result:
        print(f'Length: {solution["Length"]}, Path: {solution["Path"]}')

Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>


  0%|          | 0/10 [00:00<?, ?engine/s]

create_and_run_ants:   0%|          | 0/10 [00:00<?, ?tasks/s]

Length: 3346.0, Path: [0, 7, 1, 13, 2, 3, 4, 5, 11, 6, 12, 10, 8, 9]
Length: 3404.0, Path: [0, 7, 10, 8, 9, 12, 6, 5, 11, 4, 3, 2, 13, 1]
Length: 3381.0, Path: [0, 7, 10, 8, 9, 12, 6, 11, 5, 4, 3, 2, 13, 1]
Length: 3404.0, Path: [0, 7, 10, 8, 9, 12, 6, 5, 11, 4, 3, 2, 13, 1]
Length: 3382.0, Path: [0, 7, 8, 9, 10, 12, 6, 5, 11, 4, 3, 2, 13, 1]
Length: 3404.0, Path: [0, 7, 10, 8, 9, 12, 6, 5, 11, 4, 3, 2, 13, 1]
Length: 3381.0, Path: [0, 7, 10, 8, 9, 12, 6, 11, 5, 4, 3, 2, 13, 1]
Length: 3404.0, Path: [0, 7, 10, 8, 9, 12, 6, 5, 11, 4, 3, 2, 13, 1]
Length: 3381.0, Path: [0, 7, 10, 8, 9, 12, 6, 11, 5, 4, 3, 2, 13, 1]
Length: 3381.0, Path: [0, 7, 10, 8, 9, 12, 6, 11, 5, 4, 3, 2, 13, 1]
Stopping engine(s): 1702347761
engine set stopped 1702347761: {'engines': {'0': {'exit_code': 1, 'pid': 24700, 'identifier': '0'}, '1': {'exit_code': 1, 'pid': 64192, 'identifier': '1'}, '2': {'exit_code': 1, 'pid': 64388, 'identifier': '2'}, '3': {'exit_code': 1, 'pid': 29108, 'identifier': '3'}, '4': {'exit

In [9]:
T = np.array([[random.random() for j in range(edge_num)] for i in range(edge_num)])
H = np.array([[ round(1/d[i][j],4) if i != j else 0 for j in range(edge_num)] for i in range(edge_num)])


# Parameters
num_ants = 100
num_iterations = round(10_000 / num_ants)
starting_city = 0
alpha = 1
beta = 2
Q = 1
e = 0.5

create_and_run_ants(num_ants, num_iterations, starting_city, alpha, beta, Q, e, T, H)

KeyboardInterrupt: 