In [4]:
%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [5]:
import osmnx as ox
import networkx as nx
import folium
from geopy.geocoders import Nominatim
from geopy.distance import geodesic
import pandas as pd
from typing import List, Tuple, Dict


In [6]:
class DeliveryRouter:
    def __init__(self, city: str = "Berlin, Germany"):
        """Initialize the router with a specific city's road network."""
        # Initialize Nominatim geocoder
        self.geocoder = Nominatim(user_agent="delivery_router_rinteln", timeout=6)
        
        # Download and create the road network for the specified city
        self.G = ox.graph_from_place(city, network_type="drive", simplify=False)
        
        # Convert to projected graph for accurate distance calculations; UTM zone 32N
        self.G_proj = ox.project_graph(self.G, to_crs="EPSG:25832")
        
        # Store restaurants data
        self.restaurants: Dict[str, Dict] = {}
        
    def add_restaurant(self, name: str, address: str) -> bool:
        """Add a restaurant to the system with its location."""
        try:
            location = self.geocoder.geocode(f"{name}, {address}, Germany")
            if location:
                self.restaurants[name] = {
                    "address": address,
                    "lat": location.latitude,
                    "lon": location.longitude,
                    "node": ox.nearest_nodes(self.G, location.longitude, location.latitude)
                }
                return True
            return False
        except Exception as e:
            print(f"Error adding restaurant: {e}")
            return False
    
    def calculate_route(self, start_coords: Tuple[float, float], 
                       end_coords: Tuple[float, float]) -> List[Tuple[float, float]]:
        """Calculate route between two points."""
        start_node = ox.nearest_nodes(self.G, start_coords[1], start_coords[0])
        end_node = ox.nearest_nodes(self.G, end_coords[1], end_coords[0])
        path = nx.shortest_path(self.G, start_node, end_node, weight="length")
        print(f"path {path}")
        return [(self.G.nodes[node]['y'], self.G.nodes[node]['x']) for node in path]

    def calculate_route_advanced(self, start_coords: Tuple[float, float], 
                        end_coords: Tuple[float, float]) -> List[Tuple[float, float]]:
        """Calculate a detailed route between two points."""
        start_node = ox.nearest_nodes(self.G, start_coords[1], start_coords[0])
        end_node = ox.nearest_nodes(self.G, end_coords[1], end_coords[0])
        
        # Find the shortest path by nodes
        path = nx.shortest_path(self.G, start_node, end_node, weight="length")
        
        # Retrieve the detailed geometry of the route
        route_coords = []
        for u, v in zip(path[:-1], path[1:]):  # Iterate over edges in the path
            edge_data = self.G[u][v][0]  # Get the edge data
            if "geometry" in edge_data:  # Use geometry if available
                route_coords.extend([(point[1], point[0]) for point in edge_data["geometry"].coords])
            else:  # Fall back to straight line if no geometry is present
                route_coords.extend([(self.G.nodes[u]['y'], self.G.nodes[u]['x']),
                                    (self.G.nodes[v]['y'], self.G.nodes[v]['x'])])
        
        return route_coords


    def find_optimal_route(self, 
                          driver_location: Tuple[float, float], 
                          delivery_locations: List[Tuple[float, float]]) -> List[List[Tuple[float, float]]]:
        """Find the optimal route for deliveries using nearest neighbor algorithm."""
        # Get nearest node for driver's location
        driver_node = ox.nearest_nodes(self.G, driver_location[1], driver_location[0])
        
        # Convert delivery locations to nodes
        delivery_nodes = [ox.nearest_nodes(self.G, lon, lat) 
                         for lat, lon in delivery_locations]
        
        # Simple nearest neighbor algorithm
        current_node = driver_node
        route = []
        unvisited = delivery_nodes.copy()
        
        while unvisited:
            # Find nearest unvisited node
            nearest = min(unvisited, 
                         key=lambda x: nx.shortest_path_length(self.G, 
                                                             current_node, 
                                                             x, 
                                                             weight="length"))
            # Get the shortest path to this node
            path = nx.shortest_path(self.G, current_node, nearest, weight="length")
            route.append(path)
            current_node = nearest
            unvisited.remove(nearest)
            
        return [[(self.G.nodes[node]['y'], self.G.nodes[node]['x']) 
                 for node in path] for path in route]
    
    def visualize_map(self, driver_position: Tuple[float, float] = None, 
                     route: List[Tuple[float, float]] = None) -> folium.Map:
        """Create a visualization of restaurants, driver, and route."""
        # Create base map centered on Rinteln
        center_location = self.geocoder.geocode("Rinteln, Germany")
        m = folium.Map(location=[center_location.latitude, center_location.longitude], 
                      zoom_start=14)
        
        # Add restaurants with custom icons
        for name, data in self.restaurants.items():
            folium.Marker(
                [data["lat"], data["lon"]],
                popup=f"{name}\n{data['address']}",
                icon=folium.Icon(color='red', icon='cutlery')
            ).add_to(m)
        
        # Add driver position if provided
        if driver_position:
            folium.Marker(
                driver_position,
                popup="Driver Location",
                icon=folium.Icon(color='green', icon='user')
            ).add_to(m)
        
        # Add delivery route if provided
        if route:
            folium.PolyLine(
                route,
                weight=3,
                color='blue',
                opacity=0.8
            ).add_to(m)
        
        return m

    def get_estimated_delivery_time(self, route: List[Tuple[float, float]], 
                                  avg_speed_kmh: float = 30) -> float:
        """Calculate estimated delivery time in minutes for a route."""
        total_distance = 0
        for i in range(len(route) - 1):
            total_distance += geodesic(route[i], route[i + 1]).kilometers
        
        # Convert to hours and then to minutes
        return (total_distance / avg_speed_kmh) * 60


In [7]:
router = DeliveryRouter(city="Rinteln, Germany")

# Add restaurants with full addresses and increased timeout
restaurants = {
    "Bodega": "31737 Rinteln",
    "Mykonos": "31737 Rinteln",
    "Bodega Beach Bar": "31737 Rinteln",
    "Kuddel's Grillstube": "31737 Rinteln",
    "Pizzeria Corallo": "31737 Rinteln",
    "Da Fabio": "31737 Rinteln"
}

# Add each restaurant and print the results
for name, address in restaurants.items():
    success = router.add_restaurant(name, address)
    print(f"Adding {name}: {'Success' if success else 'Failed'}")

# Add delivery address
delivery_address = "6, Bachweg, Rinteln, 31737"
try:
    delivery_location = router.geocoder.geocode(delivery_address)
    driver_position = (delivery_location.latitude, delivery_location.longitude)
    print(f"Delivery location found: {delivery_location.latitude}, {delivery_location.longitude}")
except Exception as e:
    print(f"Error geocoding delivery address: {e}")

Adding Bodega: Success
Adding Mykonos: Success
Adding Bodega Beach Bar: Success
Adding Kuddel's Grillstube: Success
Adding Pizzeria Corallo: Success
Adding Da Fabio: Success
Delivery location found: 52.2052102, 9.0932679


In [10]:
beach_bar_data = router.restaurants["Bodega Beach Bar"]
destination = (beach_bar_data["lat"], beach_bar_data["lon"])

route = router.calculate_route_advanced(
    start_coords=driver_position,
    end_coords=destination
)
# route_map = ox.plot.plot_graph(router.G)
# ox.plot.plot_graph_route(router.G, route, route_linewidth=6, node_size=0, bgcolor='k')
print(f"route {route}")
# Create and save map without route calculation for now
map_view = router.visualize_map(
    driver_position=driver_position,
    route=route
)
map_view.save('rinteln_delivery_map.html')

# Print all successfully geocoded locations
print("\nStored restaurant locations:")
for name, data in router.restaurants.items():
    print(f"{name}: {data['lat']}, {data['lon']}")

route [(52.2055089, 9.0932991), (52.2055164, 9.0933452), (52.2055164, 9.0933452), (52.2055312, 9.093381), (52.2055312, 9.093381), (52.2055545, 9.0934096), (52.2055545, 9.0934096), (52.2055839, 9.0934238), (52.2055839, 9.0934238), (52.205613, 9.0934204), (52.205613, 9.0934204), (52.2056473, 9.0933915), (52.2056473, 9.0933915), (52.2056679, 9.0933484), (52.2056679, 9.0933484), (52.2056761, 9.0933028), (52.2056761, 9.0933028), (52.2056762, 9.0932729), (52.2056762, 9.0932729), (52.2056681, 9.0932271), (52.2056681, 9.0932271), (52.2056514, 9.0931894), (52.2056514, 9.0931894), (52.2056189, 9.0931571), (52.2056189, 9.0931571), (52.2055762, 9.0931529), (52.2055762, 9.0931529), (52.2055412, 9.0931789), (52.2055412, 9.0931789), (52.2055148, 9.0932353), (52.2055148, 9.0932353), (52.204947, 9.0926448), (52.204947, 9.0926448), (52.2048459, 9.0925419), (52.2048459, 9.0925419), (52.2045738, 9.0923214), (52.2045738, 9.0923214), (52.2042395, 9.0920895), (52.2042395, 9.0920895), (52.204096, 9.0920015), 