In [2]:
!pip install kora -q
from kora import drive
drive.link_nbs()

In [3]:
!pip -q install vrplib

In [4]:
import numpy as np
import random
import vrplib
from tqdm.notebook import tqdm

#### 0) Loading VRP data

In [5]:
DATA_PATH = "/content/drive/MyDrive/AI_Notebooks/Python_HSE/MS_mentor_workshop/VRP_data/"

set_A_names = [name for name in vrplib.list_names() if name.startswith('A-')]
set_M_names = [name for name in vrplib.list_names() if name.startswith('M-')]
set_P_names = [name for name in vrplib.list_names() if name.startswith('P-')] + ["P-n55-k8"]

print("Number of task names in sets:")
print("Set A:", len(set_A_names))    # -> 27 - correct
print("Set M:", len(set_M_names))    # -> 5 - correct
print("Set P:", len(set_P_names))    # -> 24 - correct

Number of task names in sets:
Set A: 27
Set M: 5
Set P: 24


In [6]:
# storing num of vehicles for each task
k_vehicles_A = [int(name.split('k')[1]) for name in set_A_names]
k_vehicles_M = [int(name.split('k')[1]) for name in set_M_names]
k_vehicles_P = [int(name.split('k')[1]) for name in set_P_names]

In [7]:
inst_A, sol_A = [], []

for name in tqdm(set_A_names):
    inst_data = vrplib.read_instance(DATA_PATH + 'A/' + name + ".vrp")
    sol_data = vrplib.read_solution(DATA_PATH + 'A/' + name + ".sol")
    inst_A.append(inst_data)
    sol_A.append(sol_data)

  0%|          | 0/27 [00:00<?, ?it/s]

In [8]:
inst_M, sol_M = [], []

for name in tqdm(set_M_names):
    inst_data = vrplib.read_instance(DATA_PATH + 'M/' + name + ".vrp")
    sol_data = vrplib.read_solution(DATA_PATH + 'M/' + name + ".sol")
    inst_M.append(inst_data)
    sol_M.append(sol_data)

  0%|          | 0/5 [00:00<?, ?it/s]

In [9]:
inst_P, sol_P = [], []

for name in tqdm(set_P_names):
    inst_data = vrplib.read_instance(DATA_PATH + 'P/' + name + ".vrp")
    sol_data = vrplib.read_solution(DATA_PATH + 'P/' + name + ".sol")
    inst_P.append(inst_data)
    sol_P.append(sol_data)

  0%|          | 0/24 [00:00<?, ?it/s]

#### 2) Global stage -> Ant Colony Optimization

In [10]:
import numpy as np
import random

class AntColonyCVRP:
    def __init__(self, distance_matrix, demands, vehicle_capacity, num_ants, num_iterations, alpha=1, beta=2, evaporation_rate=0.5):
        self.distance_matrix = distance_matrix
        self.demands = demands
        self.vehicle_capacity = vehicle_capacity
        self.num_ants = num_ants
        self.num_iterations = num_iterations
        self.alpha = alpha  # pheromone importance
        self.beta = beta    # heuristic importance
        self.evaporation_rate = evaporation_rate
        self.num_locations = len(distance_matrix)
        self.pheromones = np.ones((self.num_locations, self.num_locations))
        self.depot = 0

    def run(self):
        best_solution = None
        best_cost = float('inf')

        for _ in range(self.num_iterations):
            all_solutions = []
            for _ in range(self.num_ants):
                solution, cost = self.construct_solution()
                all_solutions.append((solution, cost))
                if cost < best_cost:
                    best_cost = cost
                    best_solution = solution

            self.update_pheromones(all_solutions)

        return best_solution, best_cost

    def construct_solution(self):
        routes = []
        visited = [False] * self.num_locations
        visited[self.depot] = True  # depot is always visited

        while not all(visited):
            route, route_cost = self.construct_route(visited)
            routes.append(route)

        total_cost = sum(self.calculate_route_cost(route) for route in routes)
        return routes, total_cost

    def construct_route(self, visited):
        route = [self.depot]  # start at the depot
        current_location = self.depot
        current_capacity = self.vehicle_capacity

        while True:
            next_location = self.select_next_location(current_location, visited, current_capacity)
            if next_location is None:  # no valid next location (return to depot)
                break

            route.append(next_location)
            visited[next_location] = True
            current_capacity -= self.demands[next_location]
            current_location = next_location

        route.append(self.depot)  # return to depot to complete the route

        return route, self.calculate_route_cost(route)

    def select_next_location(self, current_location, visited, remaining_capacity):
        probabilities = []

        for j in range(self.num_locations):
            if not visited[j] and j != self.depot and self.demands[j] <= remaining_capacity:
                pheromone_level = self.pheromones[current_location][j]
                heuristic_value = 1 / (self.distance_matrix[current_location][j] + 1e-10)  # avoid division by zero
                probabilities.append((j, pheromone_level ** self.alpha * heuristic_value ** self.beta))

        if not probabilities:
            return None

        total_probability = sum(p[1] for p in probabilities)
        probabilities = [(p[0], p[1] / total_probability) for p in probabilities]

        rand_val = random.random()
        cumulative_probability = 0.0

        for location, prob in probabilities:
            cumulative_probability += prob
            if rand_val <= cumulative_probability:
                return location

        return None

    def calculate_route_cost(self, route):
        cost = 0.0
        for i in range(len(route) - 1):
            cost += self.distance_matrix[route[i]][route[i + 1]]
        return cost

    def update_pheromones(self, all_solutions):
        self.pheromones *= (1 - self.evaporation_rate)

        for solution, cost in all_solutions:
            pheromone_increase = 1 / cost  # inverse of the cost as pheromone increase factor
            for route in solution:
                for i in range(len(route) - 1):
                    start_loc = route[i]
                    end_loc = route[i + 1]
                    self.pheromones[start_loc][end_loc] += pheromone_increase

In [15]:
# checling the algorithm
distance_matrix = inst_A[0]['edge_weight']
demands = inst_A[0]['demand']
vehicle_capacity = inst_A[0]['capacity']

aco_solver_cvrp = AntColonyCVRP(distance_matrix=distance_matrix,
                                demands=demands,
                                vehicle_capacity=vehicle_capacity,
                                num_ants=5,
                                num_iterations=500)

best_routes, best_cost = aco_solver_cvrp.run()
print(f"Ants' total cost: {best_cost}")
print("Solution Cost:", sol_A[0]['cost'])

Ants' total cost: 981.5636456571016
Solution Cost: 784


#### 3) Local stage -> stack of methods:
-  2-opt + 3-opt for each route separately
-  (swap of nodes) + (simulated annealing) for all the routes

In [11]:
import random
import numpy as np
import math

def calculate_route_cost(route, distance_matrix):
    cost = 0
    for i in range(len(route) - 1):
        cost += distance_matrix[route[i]][route[i + 1]]

    return cost

def two_opt(route, distance_matrix):
    best_route = route
    best_cost = calculate_route_cost(route, distance_matrix)

    for i in range(1, len(route) - 1):
        for j in range(i + 1, len(route) - 1):
            # reversing the interval between (i) and (j)
            new_route = best_route[:i] + best_route[i:j + 1][::-1] + best_route[j + 1:]
            new_cost = calculate_route_cost(new_route, distance_matrix)

            if new_cost < best_cost:
                best_route = new_route
                best_cost = new_cost

    return best_route, best_cost

def three_opt(route, distance_matrix):
    best_route = route
    best_cost = calculate_route_cost(route, distance_matrix)

    for i in range(1, len(route) - 2):
        for j in range(i + 1, len(route) - 1):
            for k in range(j + 1, len(route)):
                # all possible combinations
                new_routes = [
                    best_route[:i] + best_route[i:j + 1][::-1] + best_route[j + 1:k + 1][::-1] + best_route[k + 1:],
                    best_route[:i] + best_route[i:j + 1] + best_route[j + 1:k + 1][::-1] + best_route[k + 1:],
                    best_route[:i] + best_route[i:j + 1] + best_route[j + 1:k + 1] + best_route[k + 1:],
                ]

                for new_route in new_routes:
                    new_cost = calculate_route_cost(new_route, distance_matrix)
                    if new_cost < best_cost:
                        best_route = new_route
                        best_cost = new_cost

    return best_route, best_cost

def swap_customers_opt(routes, distance_matrix):
    improved = True

    while improved:
        improved = False
        for i in range(len(routes)):
            for j in range(1, len(routes[i]) - 1):
                for k in range(j + 1, len(routes[i]) - 1):
                    # swap customers j and k in route i
                    new_route = routes[i][:]  # copy current route
                    new_route[j], new_route[k] = new_route[k], new_route[j]

                    # Calculate new cost
                    new_cost = calculate_route_cost(new_route, distance_matrix)
                    current_cost = calculate_route_cost(routes[i], distance_matrix)

                    # If new cost is better, update the route
                    if new_cost < current_cost:
                        routes[i] = new_route
                        improved = True

    new_total_cost = sum(calculate_route_cost(route, distance_matrix) for route in routes)

    return routes, new_total_cost


def swap_customers(route, idx1, idx2):
    new_route = route[:]
    new_route[idx1], new_route[idx2] = new_route[idx2], new_route[idx1]

    return new_route

def simulated_annealing(routes, distance_matrix, initial_temp=1000, cooling_rate=0.95, stopping_temp=1):
    current_routes = routes
    current_cost = sum(calculate_route_cost(route, distance_matrix) for route in current_routes)

    best_routes = current_routes
    best_cost = current_cost

    temperature = initial_temp

    while temperature > stopping_temp:
        # Randomly select two routes and two customer indices to swap
        route_idx = random.randint(0, len(current_routes) - 1)
        route = current_routes[route_idx]

        if len(route) > 3:  # Ensure there are customers to swap (excluding depot)
            idx1 = random.randint(1, len(route) - 2)
            idx2 = random.randint(1, len(route) - 2)

            # Swap customers and calculate new cost
            new_route = swap_customers(route, idx1, idx2)
            new_routes = current_routes[:]
            new_routes[route_idx] = new_route

            new_cost = sum(calculate_route_cost(r, distance_matrix) for r in new_routes)

            # Determine if we should accept the new solution
            if (new_cost < current_cost or
                random.uniform(0, 1) < math.exp((current_cost - new_cost) / temperature)):
                current_routes = new_routes
                current_cost = new_cost

                # Update best solution found so far
                if current_cost < best_cost:
                    best_routes = current_routes
                    best_cost = current_cost

        # Cool down the temperature
        temperature *= cooling_rate

    return best_routes, best_cost

def optimize_routes(routes, distance_matrix):
    optimized_routes = []
    total_cost = 0

    # apply 2-opt -> 3-opt for each route
    for route in routes:
        optimized_route, _ = two_opt(route, distance_matrix)
        optimized_route, _ = three_opt(optimized_route, distance_matrix)

        optimized_routes.append(optimized_route)
        total_cost += calculate_route_cost(optimized_route, distance_matrix)

    # apply swap optimization for all the routes
    optimized_routes, total_cost = swap_customers_opt(optimized_routes, distance_matrix)
    # apply simulated annealing optimization for all the routes
    optimized_routes, total_cost = simulated_annealing(optimized_routes, distance_matrix)

    return optimized_routes, total_cost

In [50]:
# checking all the pipeline

distance_matrix = inst_A[-1]['edge_weight']
demands = inst_A[-1]['demand']
vehicle_capacity = inst_A[-1]['capacity']

aco_solver_cvrp = AntColonyCVRP(distance_matrix=distance_matrix,
                                demands=demands,
                                vehicle_capacity=vehicle_capacity,
                                num_ants=10,
                                num_iterations=500)

ant_routes, ant_cost = aco_solver_cvrp.run()
optimized_routes, opt_cost = optimize_routes(ant_routes, distance_matrix)

print("Ants Cost:", ant_cost)
print("Optimized Cost:", opt_cost)
print("Solution Cost:", sol_A[-1]['cost'])

Ants Cost: 2199.5839944162763
Optimized Cost: 1914.0615928678762
Solution Cost: 1763


In [37]:
# check that all nodes are visited
sum([len(route) - 2 for route in optimized_routes])

31

In [51]:
# checking all the pipeline #2

distance_matrix = inst_M[-1]['edge_weight']
demands = inst_M[-1]['demand']                # demand at each location (depot has no demand)
vehicle_capacity = inst_M[-1]['capacity']     # capacity of each vehicle

aco_solver_cvrp = AntColonyCVRP(distance_matrix=distance_matrix,
                                demands=demands,
                                vehicle_capacity=vehicle_capacity,
                                num_ants=17,
                                num_iterations=500)

ant_routes, ant_cost = aco_solver_cvrp.run()
optimized_routes, opt_cost = optimize_routes(ant_routes, distance_matrix)

print("Ants Cost:", ant_cost)
print("Optimized Cost:", opt_cost)
print("Solution Cost:", sol_M[-1]['cost'])

Ants Cost: 1710.0321387566412
Optimized Cost: 1582.2021588456992
Solution Cost: 1275


In [52]:
# check that all the nodes are covered
sum([len(route) - 2 for route in optimized_routes])

199

#### 4) Total inference

In [12]:
ant_costs_A = []
res_costs_A = []
res_percentage_errors_A = []


for i in tqdm(range(len(set_A_names))):
    distance_matrix = inst_A[i]['edge_weight']
    demands = inst_A[i]['demand']                # demand at each location (depot has no demand)
    vehicle_capacity = inst_A[i]['capacity']     # capacity of each vehicle

    aco_solver_cvrp = AntColonyCVRP(distance_matrix=distance_matrix,
                                    demands=demands,
                                    vehicle_capacity=vehicle_capacity,
                                    num_ants=k_vehicles_A[i],
                                    num_iterations=500)

    ant_routes, ant_cost = aco_solver_cvrp.run()
    optimized_routes, opt_cost = optimize_routes(ant_routes, distance_matrix)


    ant_costs_A.append(ant_cost)
    res_costs_A.append(opt_cost)
    percent_error = ((opt_cost - sol_A[i]['cost']) / sol_A[i]['cost']) * 100
    res_percentage_errors_A.append(percent_error)

  0%|          | 0/27 [00:00<?, ?it/s]

In [16]:
mean_error_A = np.mean(res_percentage_errors_A)
mean_error_A

11.547833940619629

In [17]:
ant_costs_M = []
res_costs_M = []
res_percentage_errors_M = []


for i in tqdm(range(len(set_M_names))):
    distance_matrix = inst_M[i]['edge_weight']
    demands = inst_M[i]['demand']                # demand at each location (depot has no demand)
    vehicle_capacity = inst_M[i]['capacity']     # capacity of each vehicle

    aco_solver_cvrp = AntColonyCVRP(distance_matrix=distance_matrix,
                                    demands=demands,
                                    vehicle_capacity=vehicle_capacity,
                                    num_ants=k_vehicles_M[i],
                                    num_iterations=500)

    ant_routes, ant_cost = aco_solver_cvrp.run()
    optimized_routes, opt_cost = optimize_routes(ant_routes, distance_matrix)


    ant_costs_M.append(ant_cost)
    res_costs_M.append(opt_cost)
    percent_error = ((opt_cost - sol_M[i]['cost']) / sol_M[i]['cost']) * 100
    res_percentage_errors_M.append(percent_error)

  0%|          | 0/5 [00:00<?, ?it/s]

In [20]:
mean_error_M = np.mean(res_percentage_errors_M)
print(res_percentage_errors_M)
print(mean_error_M)

[19.367728877431748, 14.24471406236822, 29.66310566484579, 27.911213176523052, 24.22387951767379]
23.08212825976852


In [21]:
ant_costs_P = []
res_costs_P = []
res_percentage_errors_P = []


for i in tqdm(range(len(set_P_names))):
    distance_matrix = inst_P[i]['edge_weight']
    demands = inst_P[i]['demand']                # demand at each location (depot has no demand)
    vehicle_capacity = inst_P[i]['capacity']     # capacity of each vehicle

    aco_solver_cvrp = AntColonyCVRP(distance_matrix=distance_matrix,
                                    demands=demands,
                                    vehicle_capacity=vehicle_capacity,
                                    num_ants=k_vehicles_P[i],
                                    num_iterations=500)

    ant_routes, ant_cost = aco_solver_cvrp.run()
    optimized_routes, opt_cost = optimize_routes(ant_routes, distance_matrix)


    ant_costs_P.append(ant_cost)
    res_costs_P.append(opt_cost)
    percent_error = ((opt_cost - sol_P[i]['cost']) / sol_P[i]['cost']) * 100
    res_percentage_errors_P.append(percent_error)

  0%|          | 0/24 [00:00<?, ?it/s]

In [22]:
mean_error_P = np.mean(res_percentage_errors_P)
mean_error_P

12.087710458746423

In [26]:
# global percentage error
mean_error_all = np.mean(res_percentage_errors_A +
                         res_percentage_errors_M +
                         res_percentage_errors_P)
mean_error_all

12.809057298312265

#### Вывод:
- Среднее отклонение от оптимального решения по всем задачам составило 12.8 %
- Видим, что довольно большое отклонение от оптимального решения на задачах из Set-A  --> есть простор для доработок