In [1]:
import osmnx as ox
import random
import folium 
import jax.numpy as jnp
from abc import ABC, abstractmethod
ox.__version__

In [2]:
class SimulatedAnnealing(ABC):
    def __init__(self, initial_temp):
        self.initial_temp = initial_temp
        
    @abstractmethod
    def cost(self, X):
        raise NotImplementedError
        
    @abstractmethod
    def mutate(self, X):
        raise NotImplementedError
        
    def acceptance_probability(self, old_cost, new_cost, iteration):
        return jnp.exp(-(new_cost - old_cost)/self.schedule(iteration))
    
    def schedule(self, iteration, cooling_rate = 0.8):
        return self.initial_temp * self.cooling_rate ** iteration
    
    @abstractmethod
    def __call__(self):
        returnNotImplementedError

In [9]:
class TripOptimizer(SimulatedAnnealing):
    def __init__(self,  
                 loc_graph, 
                 destinations, 
                 initial_temp, 
                 cooling_rate = 0.8,
                 transporation_mode = 'drive'):
        self.initial_temp = initial_temp
        self.cooling_rate = cooling_rate

        self.loc_graph = loc_graph
        # self.loc_graph = ox.graph_from_place(location, network_type = transporation_mode)
        self.destinations = destinations

        X = [destination[0] for destination in destinations]
        Y = [destination[1] for destination in destinations]
        
        self.destination_nodes = ox.distance.nearest_nodes(self.loc_graph, Y, X)
        
    def cost(self, X):
        """
        The cost defined as the distance traveled in this particular path.

        Args:
            X: A list containing the destionation nodes to be visited, in order. 
        """
        prev_distances = dict()
        distance_traveled = 0
        for i in range(1, len(X)):
            if (X[i - 1], X[i]) in prev_distances.keys(): 
                distance_traveled += prev_distances[(X[i - 1], X[i])]
                continue
            prev_distances[(X[i - 1], X[i])] = 0
            path = next(ox.routing.k_shortest_paths(self.loc_graph, X[i - 1], X[i], 1))
            for j in range(1, len(path)):
                road = self.loc_graph.edges[(path[j - 1], path[j], 0)]
                if 'travel_time' in road.keys():
                    distance_traveled += road['travel_time']
                    prev_distances[(X[i - 1], X[i])] += road['travel_time']
                    
        return distance_traveled

    def mutate(self, X):
        """Mutates the problem solution."""
        new_solution = X.copy()
        lower_limit = random.randint(0, len(X) - 1)
        upper_limit = random.randint(0, len(X) - 1)
        new_solution[lower_limit:upper_limit] = new_solution[lower_limit:upper_limit][::-1]
        return new_solution
        
    def __call__(self, limit_iter=50):
        """Runs the method."""
        current_path = self.destination_nodes
        current_cost = self.cost(current_path)
    
        print(current_path)
        print(current_cost)
        for k in range(limit_iter):
            print("Iteration:", k)
            
            new_path = self.mutate(current_path)
            new_cost = self.cost(new_path)

            if new_cost < current_cost:
                current_path = new_path
                current_cost = new_cost
            else:
                prob = self.acceptance_probability(current_cost, new_cost, k)
                if random.uniform(0, 1) <= prob:
                    current_path = new_path
                    current_cost = new_cost
    
            print("Route:", current_path)
            print("Cost:", current_cost)

        return current_path

In [4]:
recife = ox.graph_from_place('Recife, Brazil', network_type='drive')

recife = ox.speed.add_edge_speeds(recife)
recife = ox.speed.add_edge_travel_times(recife)

In [6]:
X = [-8.086164, -8.063707, -8.067305, -8.123594, -8.108837, -8.071182]
Y = [-34.893904, -34.874522, -34.962828, -34.899516, -34.890279, -34.881360]

destinations = list(zip(X, Y))

In [12]:
recife_trip = TripOptimizer(recife, destinations, 1000)

In [13]:
best_path = recife_trip(limit_iter=100)

[1639770158, 661046206, 2563332550, 652791938, 324206112, 5729319652]
3112.1000000000013
Iteration: 0
[1639770158, 661046206, 2563332550, 652791938, 324206112, 5729319652]
3112.1000000000013
Iteration: 1
[1639770158, 661046206, 2563332550, 652791938, 324206112, 5729319652]
3112.1000000000013
Iteration: 2
[1639770158, 661046206, 2563332550, 324206112, 652791938, 5729319652]
3449.9999999999995
Iteration: 3
[1639770158, 661046206, 2563332550, 324206112, 652791938, 5729319652]
3449.9999999999995
Iteration: 4
[1639770158, 661046206, 2563332550, 324206112, 652791938, 5729319652]
3449.9999999999995
Iteration: 5
[1639770158, 661046206, 2563332550, 324206112, 652791938, 5729319652]
3449.9999999999995
Iteration: 6
[1639770158, 661046206, 2563332550, 324206112, 652791938, 5729319652]
3449.9999999999995
Iteration: 7
[1639770158, 661046206, 2563332550, 324206112, 652791938, 5729319652]
3449.9999999999995
Iteration: 8
[1639770158, 661046206, 2563332550, 324206112, 652791938, 5729319652]
3449.9999999

In [14]:
print(destinations)
print(recife_trip.destination_nodes)
best_path

[(-8.086164, -34.893904), (-8.063707, -34.874522), (-8.067305, -34.962828), (-8.123594, -34.899516), (-8.108837, -34.890279), (-8.071182, -34.88136)]
[1639770158, 661046206, 2563332550, 652791938, 324206112, 5729319652]


[2563332550, 652791938, 324206112, 1639770158, 661046206, 5729319652]

In [18]:
# optimal cost
optimal_path = [2563332550, 652791938, 324206112, 1639770158, 5729319652, 661046206]
recife_trip.cost(optimal_path)

2123.7999999999997

In [16]:
def plot_trip(place, graph, trip, style=None):
    kept_nodes = set(trip)
    
    for i in range(1, len(trip)):
        path = next(ox.routing.k_shortest_paths(graph, trip[i - 1], trip[i], 1))
        kept_nodes.update(path)
    
    route_graph = graph.subgraph(kept_nodes)
    nodes, streets = ox.utils_graph.graph_to_gdfs(route_graph)
    destinations = nodes[nodes.index.isin(trip)]

    if style == None:
        style = {'color': '#1A19AC', 'weight':'3'}

    m = folium.Map(location=ox.geocode(place))
    folium.GeoJson(streets, style_function=lambda x: style).add_to(m)
    folium.GeoJson(destinations, style_function=lambda x: style).add_to(m)
    
    return m

In [17]:
plot_trip('Recife, Brazil', recife, best_path)

In [19]:
plot_trip('Recife, Brazil', recife, optimal_path)