In [59]:
import numpy as np
import vrplib
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_percentage_error as mape

In [141]:
def calculate_probability(ratio_distance, distance, ratio_pheromone, pheromone):
    individual_results = np.power(1 / (distance + 1e-6), ratio_distance) * np.power(pheromone, ratio_pheromone)
    return np.cumsum(individual_results / individual_results.sum())

In [154]:
def aco(
    n_ants: int,
    n_elite_ants: int,
    n_iterations: int,
    ratio_distance: float,
    ratio_pheromone: float,
    ratio_remainder: float,
    filenames_tests: list
) -> (list, list):
    
    result_distance = []
    result_route = []

    for filename_test in tqdm(filenames_tests):
        rng = np.random.default_rng(42)

        data = vrplib.read_instance(filename_test)
        
        pheromone = np.mean(data['edge_weight']) / (data['edge_weight'] + 1e-6)
        
        best_distance = np.inf
        best_route = None

        elite_route = [None] * n_elite_ants
        elite_distance = [None] * n_elite_ants
        max_elite_distance = np.inf
        
        for iteration in range(1, n_iterations + 1):
            all_routes = []
            all_distances = []
        
            for _ in range(n_ants):
                visited = set()
                remain_capacity = data['capacity']
                current_vertex = data['depot'][0]
                current_route = []
                current_distance = 0
        
                while len(visited) < data['dimension']:
                    visited.add(current_vertex)
                    able_visit = [data['depot'][0]] if current_vertex != data['depot'][0] else []
        
                    if current_vertex == data['depot'][0]:
                        remain_capacity = data['capacity']
        
                    able_visit.extend(
                        number for number, demand in enumerate(data['demand'])
                        if remain_capacity >= demand and number not in visited
                    )
                    
                    probabilities = calculate_probability(
                        ratio_distance,
                        data['edge_weight'][current_vertex, able_visit],
                        ratio_pheromone,
                        pheromone[current_vertex, able_visit]
                    )
        
                    next_vertex = able_visit[np.searchsorted(probabilities, rng.random())]
        
                    current_route.append((current_vertex, next_vertex))
                    current_distance += data['edge_weight'][current_vertex, next_vertex]
                    remain_capacity -= data['demand'][next_vertex]
                    current_vertex = next_vertex
        
                all_routes.append(current_route)
                all_distances.append(current_distance)
        
            pheromone *= ratio_remainder

            sorted_indices = sorted(range(len(all_distances)), key=lambda i: all_distances[i])

            sorted_distance = [all_distances[i] for i in sorted_indices]
            sorted_routes = [all_routes[i] for i in sorted_indices]

            for route, distance in zip(elite_route, elite_distance):
                if route and distance:
                    for start_vertex, end_vertex in route:
                        pheromone[start_vertex, end_vertex] += sorted_distance[0] / pow(distance + 1e-6, 2)
                        pheromone[end_vertex, start_vertex] = pheromone[start_vertex, end_vertex]

            if sorted_routes[0] not in elite_route:
                elite_route[iteration % n_elite_ants] = sorted_routes[0]
                elite_distance[iteration % n_elite_ants] = sorted_distance[0]
            
            for route, distance in zip(sorted_routes[:int(len(sorted_routes) / 2)], sorted_distance[:int(len(sorted_distance) / 2)]):
                for start_vertex, end_vertex in route:
                    pheromone[start_vertex, end_vertex] += sorted_distance[-1] / pow(distance + 1e-6, 2)
                    pheromone[end_vertex, start_vertex] = pheromone[start_vertex, end_vertex]
        
            if sorted_distance[0] < best_distance:
                best_distance = sorted_distance[0]
                best_route = sorted_routes[0]
        
        result_distance.append(best_distance)
        result_route.append(best_route)
    return result_distance, result_route

In [135]:
def write_solution(
    path: str,
    filenames: list,
    distance: list,
    route: list
) -> None:

    for filename, distance, route in zip(filenames, aco_distance, aco_route):
        result_filename = filename.split('/')[-1]
        if not os.path.exists(path):
            os.mkdir(path)
    
        result_route = []
        transport = []
        for pair in route:
            if pair[1] != 0:
                transport.append(pair[1])
            else:
                result_route.append(transport.copy())
                transport.clear()
    
        vrplib.write_solution(path=f'{path}{result_filename}', routes=result_route, data={'cost': distance})

In [342]:
def visualize_data(path_raw: str, path_route: str) -> None:
    raw_filenames = sorted([filename for filename in os.listdir(path_raw) if filename.endswith('.vrp')])
    optimal_filenames = sorted([filename for filename in os.listdir(path_raw) if filename.endswith('.sol')])
    route_filenames = sorted([filename for filename in os.listdir(path_route) if filename.endswith('.sol')])
    
    for raw_filename, route_filename, optimal_filename in zip(raw_filenames, route_filenames, optimal_filenames):
        raw = vrplib.read_instance(f'{path_raw}{raw_filename}')
        optimal = vrplib.read_solution(f'{path_raw}{optimal_filename}')
        routes = vrplib.read_solution(f'{path_route}{route_filename}')
        
        score = abs(optimal['cost'] - routes['cost']) / optimal['cost']
        
        routes = routes['routes']
        
        plt.scatter(raw['node_coord'][:, 0], raw['node_coord'][:, 1], c='r')
        plt.scatter(raw['node_coord'][0, 0], raw['node_coord'][0, 1], c='g')

        plt.title(f'{raw_filename.split('.')[-2]}: {round(score, 3)}')

        for route in routes:
            route.insert(0, 0)
            route.append(0)

            for index in range(len(route) - 1):
                start_point = raw['node_coord'][route[index]]
                end_point = raw['node_coord'][route[index + 1]]
                plt.plot([start_point[0], end_point[0]], [start_point[1], end_point[1]], c='b')

        plt.savefig(f'{path_route}{route_filename.split('.')[-2]}.jpg')
        plt.close()

In [177]:
tests = []
solutions = []
for dirname, _, filenames in os.walk('./set_m/'):
    for filename in filenames:
        if filename.endswith('.vrp'):
            tests.append(f'{dirname}{filename}')
        if filename.endswith('.sol'):
            solutions.append(f'{dirname}{filename}')
tests.sort()
solutions.sort()

In [179]:
optimal_distance = []
for filename in solutions:
    solution = vrplib.read_solution(filename)
    optimal_distance.append(solution['cost'])

In [181]:
%%time

aco_distance, aco_route = aco(80, 20, 1000, 1, 13, 0.97, tests)
write_solution('./results_m/', solutions, aco_distance, aco_route)
visualize_data('./set_m/', './results_m/')

print(mape(y_pred=aco_distance, y_true=optimal_distance))

100%|████████████████████████████████████████████| 5/5 [43:19<00:00, 519.84s/it]

0.1494241276636954
CPU times: user 42min 58s, sys: 16.4 s, total: 43min 14s
Wall time: 43min 19s



