# 05. Solucionador del TSP sobre el grafo incompleto del sistema metro de la CDMX

Carga de librerías

In [1]:
import pickle
import json
import networkx as nx
import pandas as pd

import plotly.graph_objects as go
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib import cm

from random import shuffle, sample
from math import exp

Lectura de archivo pickle con infroamción del grafo incompleto de las estaciones de metro de CDMX

In [2]:
metro_graph = pickle.load(open("../../output_metro/metro_graph.pickle", "rb"))

Lectura de archivo json para extraer las localizaciones de los nodos (estaciones) del grafo


In [3]:
json_file = "../../output_metro/travel_times_metro.json"
with open(json_file) as input_json:
    dict_times_metro = json.load(input_json)

location_stations = dict()
for route_id in dict_times_metro.keys():
    stations_data = dict_times_metro[route_id]
    for station_data in stations_data:
        location_stations[station_data[0]] = tuple(station_data[2:])

Se realiza una implementación de la metaheurística __Recocido Simulado__ para hallar soluciones sobre el grafo completo en base a su matriz de distancias, utilizando el algoritmo de Dijkstra para hallar la ruta más corta entre nodos no vecinos.

In [4]:
class TSP_SimulatedAnnealing:
    def __init__(self, graph:nx.Graph, nodes_to_visit: list[str], init_temp: float, min_temp: float, cool_rate: float, max_iters: int):
        """
        Initializes the simulated annealing algorithm for solving the Traveling Salesman Problem (TSP).

        Parameters:
        graph (nx.Graph): A Graph containing nodes, edges and weights for each edge
        distance_matrix (pd.DataFrame): A DataFrame containing the distances between nodes.
        nodes_to_visit (list[str]): A list of nodes to be visited.
        init_temp (float): Initial temperature for the annealing process.
        min_temp (float): Minimum temperature for the annealing process.
        cool_rate (float): Cooling rate for the temperature.
        max_iters (int): Maximum number of iterations to perform.
        """
        self.__graph = graph
        self.__dist_matrix = self.__get_distances_matrix()
        self.__nodes_to_visit = nodes_to_visit
        self.__temperature = init_temp
        self.__min_temperature = min_temp
        self.__cool_rate = cool_rate
        self.__max_iters = max_iters

        self.__size_nodes = len(nodes_to_visit)


    def __get_distances_matrix(self):
        """
        Computes the distances matrix for all pairs of nodes in the graph.

        This private method generates a matrix where each element represents the shortest path 
        distance between a pair of nodes in the graph. The distances are computed using Dijkstra's 
        algorithm, assuming the graph is weighted.

        Returns:
            pd.DataFrame: A DataFrame where the rows and columns correspond to the graph's nodes, 
                          and each element [i, j] contains the shortest path distance from node i to node j.

        Notes:
            - The method utilizes NetworkX's shortest_path_length function with a weight parameter 
              to account for edge weights.
            - The resulting DataFrame is symmetric if the graph is undirected, with zeros on the diagonal.
        """
        matrix_distance = pd.DataFrame(index=self.__graph.nodes(), columns=self.__graph.nodes())
        for origin_node in self.__graph.nodes():
            length = nx.single_source_dijkstra_path_length(self.__graph, origin_node, weight='weight')
            for target_node, distance in length.items():
                matrix_distance.at[origin_node, target_node] = distance
        return matrix_distance.astype(float)

    def __generate_initial_path(self):
        """
        Generates the initial path by shuffling the list of nodes to visit.

        Returns:
        list: A shuffled list representing the initial path.
        """
        path = self.__nodes_to_visit[:]
        shuffle(path)
        return path
    
    def __generate_new_path(self, path):
        """
        Generates a new path by swapping two randomly selected nodes in the current path.

        Parameters:
        path (list): The current path.

        Returns:
        list: A new path with two nodes swapped.
        """
        new_path = path[:]
        i, j = sorted(sample(range(self.__size_nodes), 2))
        new_path[i:j] = reversed(new_path[i:j])
        return new_path
    
    def __compute_path_cost(self, path):
        """
        Computes the total cost of the given path based on the distance matrix.

        Parameters:
        path (list): The path for which the cost is to be computed.

        Returns:
        float: The total cost of the path.
        """
        total_cost = 0.0
        for k in range(len(path) - 1):
            total_cost += self.__dist_matrix.loc[path[k], path[k+1]]
        total_cost += self.__dist_matrix.loc[path[-1], path[0]]
        return total_cost

    def __acceptance_condition(self, new_cost, old_cost, temp):
        """
        Determines if the new path should be accepted based on the cost difference and the current temperature.

        Parameters:
        new_cost (float): The cost of the new path.
        old_cost (float): The cost of the current path.
        temp (float): The current temperature.

        Returns:
        float: The acceptance probability.
        """
        if new_cost < old_cost:
            return 1.0
        return exp((old_cost - new_cost) / temp)

    def __get_full_tsp_path(self):
        """
        Converts the best path found into a full path including all intermediate nodes.

        This method uses the shortest path between each pair of nodes in the best path to
        generate a complete route, ensuring all nodes are visited in sequence.
        """
        complete_best_route = []

        for i in range(self.__size_nodes - 1):
            complete_best_route += nx.shortest_path(self.__graph, self.best_path[i], self.best_path[i+1], weight="weight")
        complete_best_route += nx.shortest_path(self.__graph, self.best_path[-1], self.best_path[0], weight="weight")

        self.best_path = complete_best_route
        self.best_cost = self.__compute_path_cost(self.best_path) 
    
    def find_solution(self):
        """
        Executes the simulated annealing algorithm to find the best solution for the TSP.

        Returns:
        tuple: A tuple containing the best path and the best cost.
        """
        self.__current_path = self.__generate_initial_path()
        actual_cost = self.__compute_path_cost(self.__current_path)

        self.best_path = self.__current_path[:]
        self.best_cost = actual_cost

        for iteration in range(self.__max_iters):
            if self.__temperature >= self.__min_temperature:
                new_path = self.__generate_new_path(self.__current_path)
                new_cost = self.__compute_path_cost(new_path)

                if self.__acceptance_condition(new_cost, actual_cost, self.__temperature):
                    self.__current_path = new_path[:]
                    actual_cost = new_cost

                    if new_cost < self.best_cost:
                        self.best_path = self.__current_path[:]
                        self.best_cost = actual_cost

                self.__temperature *= self.__cool_rate
            else:
                break

        self.__get_full_tsp_path()

        print(f"Finished simulated annealing with...\n{iteration+1} iterations and temperature of {self.__temperature}\nBest Solution: {self.best_path}\nBest Cost: {self.best_cost}")

        return self.best_path, self.best_cost


Planteamos el problema de visitar las estaciones que tengan grado igual a 1; es decir, que solo tengan una estación vecina y esta misma sea la única por la cúal se puede acceder al respectivo nodo. Por propositos de mejor visualización, se retira la estación con etiqueta BUENAVISTA puesto que está se encuentra en el centro del sistema de metro y desfavorecería la tarea de hallar las diferencias entre las soluciones generadas

In [5]:
node_degrees = dict(metro_graph.degree())
nodes_to_visit = [node for node in metro_graph.nodes() if node_degrees[node] == 1 and node != "BUENAVISTA"]

found_paths = []
found_costs = []
for k in range(5):
    sa_tsp = TSP_SimulatedAnnealing(metro_graph, nodes_to_visit, 1000.0, 1e-12, 0.999, 100_000)
    best_solution, best_cost = sa_tsp.find_solution()
    found_paths.append(best_solution)
    found_costs.append(best_cost)

Finished simulated annealing with...
34523 iterations and temperature of 9.995040019876023e-13
Best Solution: ['CIUDADAZTECA', 'PLAZAARAGON', 'OLIMPICA', 'ECATEPEC', 'MUZQUIZ', 'RIOREMEDIOS', 'IMPULSORA', 'NEZAHUALCO', 'VILLAARAGON', 'BOSQUEARAGON', 'DPVOOCEANIA', 'OCEANIA', 'TNALAEREA', 'HANGARES', 'PANTITLAN', 'AGRICOLA', 'CANALSANJUAN', 'TEPALCATES', 'GUELATAO', 'PENONVIEJO', 'ACATITLA', 'STAMARTA', 'LOSREYES', 'LAPAZ', 'LAPAZ', 'LOSREYES', 'STAMARTA', 'ACATITLA', 'PENONVIEJO', 'GUELATAO', 'TEPALCATES', 'CANALSANJUAN', 'AGRICOLA', 'PANTITLAN', 'PUEBLA', 'CIUDADDVA', 'VELODROMO', 'MIXIUHCA', 'JAMAICA', 'SNTAANITA', 'COYUYA', 'IZTACALCO', 'APATLACO', 'ACULCO', 'ESCUADRON', 'ATLALILCO', 'CULHUACAN', 'SANANDRESTO', 'LOMASESTRELLA', 'CALLE11', 'PERIFERICOOTE', 'TEZONCO', 'OLIVOS', 'NOPALERA', 'ZAPOTITLAN', 'TLALTENCO', 'TLAHUAC', 'TLAHUAC', 'TLALTENCO', 'ZAPOTITLAN', 'NOPALERA', 'OLIVOS', 'TEZONCO', 'PERIFERICOOTE', 'CALLE11', 'LOMASESTRELLA', 'SANANDRESTO', 'CULHUACAN', 'ATLALILCO', 'IZ

Generamos y visualizamos una tabla de las 5 rutas generadas acompañadas de su costo

In [6]:
FoundRoutesCost_df = pd.DataFrame({"Costo": found_costs})
FoundRoutesCost_df

MinCostIndex = FoundRoutesCost_df.idxmin()["Costo"]
MinCost = FoundRoutesCost_df.loc[MinCostIndex]["Costo"]

print(f"The best route given by the solver is the #{MinCostIndex} with cost {MinCost}")

The best route given by the solver is the #4 with cost 5.69305555555555


Generamos una clase para hallar mapeos de color para denotar el orden de las estaciones visitadas

In [7]:
class ColorizerByMap:
  def __init__(self, cmap_name, start_val, stop_val):
    self.cmap_name = cmap_name
    self.cmap = plt.get_cmap(cmap_name)
    self.norm = mpl.colors.Normalize(vmin=start_val, vmax=stop_val)
    self.scalarMap = cm.ScalarMappable(norm=self.norm, cmap=self.cmap)

  def get_rgb(self, val):
    return self.scalarMap.to_rgba(val, bytes=False, norm=True)

Obtenemos el mapeo de color de los nodos en la solución obtenida

In [8]:
colors_found_solutions = []    
for path in found_paths:   
    size_tsp_path_SA = len(path)
    colorize_tsp_path_SA = ColorizerByMap("coolwarm", 0, size_tsp_path_SA)
    colors_tsp_path = {node:'rgba' + str(colorize_tsp_path_SA.get_rgb(i)) for i, node in enumerate(path)}

    colors_found_solutions.append(colors_tsp_path)

Visualizamos el grafo original del sistema de metro de la Ciudad de México con la ruta solución

In [9]:
edge_x = []
edge_y = []
for edge in metro_graph.edges():
    x0, y0 = location_stations[edge[0]]
    x1, y1 = location_stations[edge[1]]
    edge_x.append(x0)
    edge_x.append(x1)
    edge_x.append(None)
    edge_y.append(y0)
    edge_y.append(y1)
    edge_y.append(None)

node_x = []
node_y = []
node_text = []
node_degrees = dict(metro_graph.degree())
for node in metro_graph.nodes():
    x, y = location_stations[node]
    node_x.append(x)
    node_y.append(y)
    node_text.append(f"{node}\n# de estaciones: {node_degrees[node]}")

In [10]:
NodesNames = list(metro_graph.nodes())
ResultsOrder = [(k, cost_k) for k, cost_k in enumerate(found_costs)]
ResultsOrder.sort(key = lambda x: x[1])
IndexOrder, _ = zip(*ResultsOrder)

for k in IndexOrder:
    colors_nodes = len(NodesNames)*['rgba(0,0,0,1)']
    for node in found_paths[k]:
        colors_nodes[NodesNames.index(node)] = colors_found_solutions[k][node]

    edge_trace = go.Scatter(
        x=edge_x, y=edge_y,
        line=dict(width=2, color='gray'),
        hoverinfo='none',
        mode='lines')

    node_trace = go.Scatter(
        x=node_x, y=node_y,
        mode='markers',
        hoverinfo='text',
        text=node_text,
        textposition="top center",
        marker = dict(
            color = colors_nodes,
            size = 10,
            line = dict(
                color = "black",
                width = 1
            )        
        ),
        line_width=2)

    fig = go.Figure(data=[edge_trace, node_trace],
                    layout=go.Layout(
                        title=f'Ruta #{k+1} obtenida por Recocido Simulado',
                        titlefont_size=16,
                        showlegend=False,
                        hovermode='closest',
                        margin=dict(b=20, l=5, r=5, t=40),
                        annotations=[dict(
                            text="",
                            showarrow=False,
                            xref="paper", yref="paper")],
                        xaxis=dict(showgrid=False, zeroline=False),
                        yaxis=dict(showgrid=False, zeroline=False)))

    fig.update_layout(
        autosize=False,
        width=800,
        height=800,
    )

    fig.show()