# Actividad 3


|                Alumno                 | Matrícula |
| :-----------------------------------: | :-------: |
|    Juan Pablo Echeagaray González     | A00830646 |
| Verónica Victoria García De la Fuente | A00830383 |
|       Emily Rebeca Méndez Cruz        | A00830768 |
|        Erika Martínez Meneses         | A01028621 |

- Diseño de algoritmos matemáticos bio-inspirados
- Dra. Yadira Isabel Silva Soto
- 15 de octubre del 2022

## Dependencias

In [None]:
# Basic dependencies
import numpy as np
import pandas as pd
import scipy as sp
import matplotlib.pyplot as plt
import seaborn as sns

# Optimization libraries
from ortools.sat.python import cp_model
import pants

# Geocoding libraries
from geopy.geocoders import Bing
from geopy.extra.rate_limiter import RateLimiter
from geopy.distance import geodesic
from scipy.spatial.distance import cdist
import folium as fl

# Code typing and hinting
from typing import List, Tuple


## Lectura de `API` key de `Microsoft Bing Maps`

In [None]:
def get_key(file_path):
    with open('key.txt') as f:
        key = f.read().strip()

    return key

## Lectura de datos

In [None]:
df = pd.read_csv('data/addresses.csv')
df.drop(columns=['ID'], inplace=True)
df.head()

## Procesamiento de coordenadas

In [None]:
def get_coords(address_file_path: str):
    
    # Needs a function that handles errors
    test_data = pd.read_csv(address_file_path)
    locator = Bing(api_key=get_key('key.txt'), user_agent='Python TSP', timeout=10)

    # Create a RateLimiter object to limit requests to the API.
    geocode = RateLimiter(locator.geocode, min_delay_seconds=1)

    test_data['locations'] = test_data['Domicilio'].apply(geocode)
    print(f'Found {(1 - test_data.locations.isnull().sum() / len(test_data))*100:.4f} % of the addresses')

    test_data['point'] = test_data['locations'].apply(lambda loc: tuple(loc.point) if loc else None)
    # Drop rows with missing locations
    test_data = test_data[test_data.locations.notnull()]
    test_data[['latitude', 'longitude', 'altitude']] = pd.DataFrame(test_data['point'].tolist(), index=test_data.index)

    test_data.to_csv('data/address-with-coords.csv', index=False)
    print(f'Processed {len(test_data)} addresses')


## Cálculo de matriz de distancias

In [None]:
def calculate_distance_matrix(address_coords_file_path: str):
    df = pd.read_csv(address_coords_file_path)
    df = df[['latitude', 'longitude']]
    df = df.to_numpy()
    distance_matrix = cdist(df, df, lambda u, v: geodesic(u, v).km)
    np.save('data/distance_matrix.npy', distance_matrix)

In [None]:
get_coords('data/addresses.csv')

In [None]:
calculate_distance_matrix('data/address-with-coords.csv')

In [None]:
distance_matrix = np.load('data/distance_matrix.npy')

## Solución exacta con `solver`

In [None]:
def solver(distance_matrix: np.array, print_output: bool = False) -> Tuple[List[int], float]:

    DISTANCE_MATRIX = distance_matrix.tolist()
    num_nodes = len(DISTANCE_MATRIX)
    all_nodes = range(num_nodes)

    # Model creation
    model = cp_model.CpModel()
    
    # Decision variables
    obj_vars = []
    obj_coeffs = []

    # Circuit constraint
    arcs = []
    arc_literals = {}
    for i in all_nodes:
        for j in all_nodes:
            if i == j:
                continue
            lit = model.NewBoolVar(f'x_{i}_{j}')
            arcs.append((i, j, lit))
            arc_literals[(i, j)] = lit

            obj_vars.append(lit)
            obj_coeffs.append(DISTANCE_MATRIX[i][j])
    
    # Circuit Constraints
    model.AddCircuit(arcs)

    # Objective
    model.Minimize(sum([obj_vars[i] * obj_coeffs[i] for i in range(len(obj_vars))]))

    # Solver
    solver = cp_model.CpSolver()
    solver.parameters.log_search_progress = False
    # To benefit from the linearization of the circuit constraint. ?
    solver.parameters.linearization_level = 2

    solver.Solve(model)
    solution = [0]

    current_node = 0
    str_route = f'{current_node} -> '
    route_is_finished = False
    route_distance = 0
    while not route_is_finished:
        for i in all_nodes:
            if i == current_node:
                continue
            if solver.BooleanValue(arc_literals[current_node, i]):
                str_route += f' -> {i}'
                route_distance += DISTANCE_MATRIX[current_node][i]
                current_node = i
                solution.append(current_node)
                if current_node == 0:
                    route_is_finished = True
                break
    if print_output:
        print(f'Number of nodes: {num_nodes}')
        print(solver.ResponseStats())
        print('Route:', str_route)
        print('Travelled distance:', route_distance)

    return solution, route_distance


In [None]:
%%timeit -n 10 -r 10
solver(distance_matrix)

## Algoritmo genético

In [None]:
class genetic_algorithm:

    def __init__(self, population_size: int, num_generations: int, mutation_rate: float):
        self.population_size = population_size
        self.population = []
        self.num_generations = num_generations
        self.mutation_rate = mutation_rate
        self.solution = None


    def generate_population(self):
        raise NotImplementedError

    def fitness(self):
        raise NotImplementedError

    def crossover(self, parent1, parent2):
        raise NotImplementedError

    def mutation(self, individual):
        raise NotImplementedError

    def selection(self):
        evaluation = np.array(self.fitness())
        reproduction_prob = evaluation / evaluation.sum()
        chosen = np.random.choice(len(self.population), p=reproduction_prob)

        return self.population[chosen]

    def run(self):
        evaluation = self.fitness()

        # Initial Best solution, best individual
        best_idx = np.argmax(evaluation)
        best_individual = self.population[best_idx]
        best_solution = evaluation[best_idx]

        best_solutions = []
        best_solutions.append((best_individual, best_solution))

        for generation in range(self.num_generations):
            new_population = []
            
            for i in range(self.population_size):
                parent_x = self.selection()
                parent_y = self.selection()
                offspring = self.crossover(parent_x, parent_y)

                if np.random.random() <= self.mutation_rate:
                    offspring = self.mutation(offspring)

                new_population.append(offspring)

            self.population = np.array(new_population)
            evaluation = self.fitness()
            best_idx = np.argmax(evaluation)
            best_individual = self.population[best_idx]
            best_solution = evaluation[best_idx]

            best_solutions.append((best_individual, best_solution))
        
        # Find the individual amongst the history record of best solutions
        best_solutions = np.array(best_solutions, dtype=object)
        best_found = np.argmax(best_solutions[:,1])

        optimal = best_solutions[best_found]

        self.solution = optimal

    def calculate_score(self):
        raise NotImplementedError

In [None]:
class genetic_tsp(genetic_algorithm):

    def __init__(self, population_size: int, num_generations: int, mutation_rate: float, distance_matrix: np.array):
        super().__init__(population_size, num_generations, mutation_rate)
        self.distance_matrix = distance_matrix.tolist()
        self.num_nodes = len(self.distance_matrix)
        self.generate_population()

    def generate_population(self):
        population = []
        start = 0
        
        for i in range(self.population_size):
            individual = np.random.permutation(range(1, self.num_nodes))
            individual = np.hstack((start, individual))
            individual = np.hstack((individual, start))
            population.append(individual)

        self.population = population

    def fitness(self):
        fitness = []
        for individual in self.population:
            distance = 0
            for i in range(len(individual) - 1):
                distance += self.distance_matrix[individual[i]][individual[i+1]]
            fitness.append(1 / distance)
        return fitness

    def crossover(self, parent1, parent2):

        # Remove the start and end cities
        parent1 = np.delete(parent1, 0)
        parent1 = np.delete(parent1, -1)
        parent2 = np.delete(parent2, 0)
        parent2 = np.delete(parent2, -1)
        
        child = [None] * len(parent1)

        # Choose a random allele from parent_1
        geneA = int(np.random.random() * len(parent1))
        geneB = int(np.random.random() * len(parent1))

        start = min(geneA, geneB)
        end = max(geneA, geneB)

        child[start:end] = parent1[start:end]

        mask = [i for i in parent2 if i not in child]

        for i in range(len(parent1)):
            # print(i)
            if child[i] == None:
                child[i] = mask.pop(0)

        child = np.array(child)
        # Add the start and end cities
        child = np.hstack((0, child))
        child = np.hstack((child, 0))

        return child

    def mutation(self, chromosome):

        if np.random.random() <= self.mutation_rate:
            # Make sure the mutation does not make the chromosome invalid
            chromosome = np.delete(chromosome, 0)
            chromosome = np.delete(chromosome, -1)

            # Randomly swap two cities
            i = np.random.randint(0, len(chromosome))
            j = np.random.randint(0, len(chromosome))
            chromosome[i], chromosome[j] = chromosome[j], chromosome[i]

            # Add the start and end cities
            chromosome = np.hstack((0, chromosome))
            chromosome = np.hstack((chromosome, 0))
            
        return chromosome
        

    def optimize(self, print_output: bool = False):
        self.run()
        route = self.solution[0]
        route_taken = f''
        real_cost = 0
        for idx, val in enumerate(route):
            if idx + 1 < len(route):
                real_cost += self.distance_matrix[route[idx]][route[idx + 1]]
                route_taken += f'{val} -> '
            else:
                route_taken += f'{val}'
        
        if print_output:
            print('Solution Found')
            print(f'Total Cost : {real_cost}')
            print(f'Route Taken: {route_taken}')

        return route, real_cost

In [None]:
%%timeit -n 1 -r 10
GA = genetic_tsp(population_size=100, num_generations=100, mutation_rate=0.15, distance_matrix=distance_matrix)
GA.optimize(print_output=False)


## Colonia de Hormigas

In [None]:
def ACO(print_output: bool = False):
    coords = pd.read_csv('data/address-with-coords.csv')
    coords = coords[['latitude', 'longitude']]
    cities = coords.to_numpy()
    cities = cities.tolist()

    def distance_function(pointA, pointB):
        return geodesic(pointA, pointB).km

    world = pants.World(cities, distance_function)

    aco_solver = pants.Solver()
    solution = aco_solver.solve(world)

    # Route taken
    route_taken = f'{cities.index(solution.tour[0])} -> '
    route = [cities.index(solution.tour[0])]
    # Test if the return distance is correct
    real_cost = 0
    for city in range(1, len(solution.tour)):
        real_cost += distance_function(solution.tour[city - 1], solution.tour[city])
        route_taken += f'{cities.index(solution.tour[city])} -> '
        route.append(cities.index(solution.tour[city]))

    
    route_taken += f'{cities.index(solution.tour[0])}'
    real_cost += distance_function(solution.tour[-1], solution.tour[0])
    route.append(cities.index(solution.tour[0]))

    assert real_cost == solution.distance

    if print_output:
        print('Solution Found')
        print(f'Total Cost : {solution.distance}')
        print(f'Route Taken: {route_taken}')


    return route, solution.distance
    

## Comparación de resultados

In [None]:
optimal_solution, optimal_cost = solver(distance_matrix)
GA = genetic_tsp(population_size=100, num_generations=100, mutation_rate=0.15, distance_matrix=distance_matrix)
ga_solution, ga_cost = GA.optimize(print_output=False)
aco_route, aco_cost = ACO()


In [None]:
def plot_route(solution: list, name: str, cost: float) -> None:
    df = pd.read_csv('data/address-with-coords.csv')
    coordinates = df[['latitude', 'longitude']].to_numpy()

    fl_coords = [[coord[0], coord[1]] for coord in coordinates]
    names = df['Domicilio'].to_numpy().tolist()

    map = fl.Map(location=[fl_coords[0][0], fl_coords[0][1]], zoom_start=12, tiles='OpenStreetMap')

    for coordinate, location in zip(fl_coords, names):
        fl.Marker(location=coordinate, popup=location).add_to(map)

    route = [fl_coords[i] for i in solution]
    fl.PolyLine(route, color='red', weight=2.5, opacity=1, tooltip=f'{name}: {cost:.4f}').add_to(map)

    return map


### Solución exacta

In [None]:
plot_route(optimal_solution, 'Optimal', optimal_cost)

### Solución de algoritmo genético

In [None]:
plot_route(ga_solution, 'Genetic Algorithm', ga_cost)

### Solución de colonia de hormigas

In [None]:
plot_route(aco_route, 'Ant Colony Optimization', aco_cost)

In [None]:
%%timeit -n 10 -r 10
ACO()