In [7]:
import osmnx as ox
import networkx as nx
import folium
import math
import logging
from typing import Dict, List, Tuple
import time as tm  # Import time module with an alias to avoid conflicts

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_bangalore_map_with_times() -> nx.Graph:
    """
    Load the road network for Bangalore from OSM and add travel times.
    """
    place_name = "Bangalore, India"
    graph = ox.graph_from_place(place_name, network_type='drive')
    ox.add_edge_speeds(graph)
    ox.add_edge_travel_times(graph)
    return graph

def calculate_aerial_distance(graph: nx.Graph, node1: int, node2: int) -> float:
    """
    Calculate the aerial distance between two nodes.
    """
    lat1, lon1 = graph.nodes[node1].get('y'), graph.nodes[node1].get('x')
    lat2, lon2 = graph.nodes[node2].get('y'), graph.nodes[node2].get('x')
    
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return float('inf')
    
    return get_distance_from_lat_lon_in_km(lat1, lon1, lat2, lon2)

def get_distance_from_lat_lon_in_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """
    Compute the distance between two latitude-longitude points in kilometers.
    """
    R = 6371  # Radius of the Earth in kilometers
    dLat = deg2rad(lat2 - lat1)
    dLon = deg2rad(lon2 - lon1)
    a = math.sin(dLat / 2) ** 2 + math.cos(deg2rad(lat1)) * math.cos(deg2rad(lat2)) * math.sin(dLon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

def deg2rad(deg: float) -> float:
    """
    Convert degrees to radians.
    """
    return deg * (math.pi / 180)

def find_best_paths(graph: nx.Graph, locations: Dict[str, Tuple[float, float]]) -> Dict[str, List[int]]:
    """
    Compute the shortest paths from drivers to the office based on travel time.
    """
    office_location = locations['office']
    office_node = ox.distance.nearest_nodes(graph, office_location[1], office_location[0])
    
    paths = {}
    
    for label, (lat, lon) in locations.items():
        if label.startswith('driver'):
            driver_node = ox.distance.nearest_nodes(graph, lon, lat)
            try:
                path = nx.shortest_path(graph, source=driver_node, target=office_node, weight='travel_time')
                paths[label] = path
            except nx.NetworkXNoPath:
                paths[label] = []  # No path found
    
    return paths

def calculate_driver_companion_distances(
    graph: nx.Graph,
    driver_paths: Dict[str, List[int]],
    companion_nodes: List[int]
) -> Dict[Tuple[str, int], List[Tuple[int, float]]]:
    """
    Calculate the top 5 closest nodes for each driver-companion pair based on aerial distance.
    """
    aerial_distances = {}
    
    for driver_label, path in driver_paths.items():
        if not path:
            continue
        for companion_node in companion_nodes:
            distances = []
            for node in path:
                if node in graph.nodes and companion_node in graph.nodes:
                    distance = calculate_aerial_distance(graph, node, companion_node)
                    distances.append((node, distance))
            
            top_5_nodes = sorted(distances, key=lambda x: x[1])[:5]
            aerial_distances[(driver_label, companion_node)] = top_5_nodes
    
    return aerial_distances

def find_best_intersection_node(
    graph: nx.Graph,
    driver_paths: Dict[str, List[int]],
    companion_nodes: List[int],
    aerial_distances: Dict[Tuple[str, int], List[Tuple[int, float]]]
) -> Dict[Tuple[str, int], Tuple[float, float, int]]:
    """
    Find the best intersection node among the top 5 nodes for each driver-companion pair.
    """
    road_distances = {}
    
    buffer_time = 5  # Buffer time in minutes
    
    for (driver_label, companion_node), top_5_nodes in aerial_distances.items():
        shortest_road_distance = float('inf')
        shortest_road_time = float('inf')
        best_intersection_node = None
        
        for node, _ in top_5_nodes:
            try:
                # Calculate road distance and travel time from the driver to the intersection node
                path_to_intersection = nx.shortest_path(graph, source=driver_paths[driver_label][0], target=node, weight='travel_time')
                road_distance_to_intersection = nx.shortest_path_length(graph, source=driver_paths[driver_label][0], target=node, weight='length')
                travel_time_to_intersection = sum(graph[u][v][0].get('travel_time', 0) for u, v in zip(path_to_intersection[:-1], path_to_intersection[1:]))
                
                # Calculate road distance and travel time from the intersection node to the companion
                path_from_intersection = nx.shortest_path(graph, source=node, target=companion_node, weight='length')
                road_distance_from_intersection = nx.shortest_path_length(graph, source=node, target=companion_node, weight='length')
                travel_time_from_intersection = sum(graph[u][v][0].get('travel_time', 0) for u, v in zip(path_from_intersection[:-1], path_from_intersection[1:]))
                
                if road_distance_from_intersection < shortest_road_distance and travel_time_from_intersection <= (travel_time_to_intersection + buffer_time):
                    shortest_road_distance = road_distance_from_intersection
                    shortest_road_time = travel_time_from_intersection
                    best_intersection_node = node
            
            except nx.NetworkXNoPath:
                continue
        
        road_distances[(driver_label, companion_node)] = (shortest_road_distance, shortest_road_time, best_intersection_node)
    
    return road_distances

def plot_paths_on_map(
    paths: Dict[str, List[int]],
    graph: nx.Graph,
    locations: Dict[str, Tuple[float, float]],
    best_intersections: Dict[Tuple[str, int], Tuple[float, float, int]],
    lbl: str
) -> None:
    """
    Plot paths and intersections on a Folium map.
    """
    # Create a Folium map centered around the average of all provided locations
    center = [sum(lat for lat, lon in locations.values()) / len(locations),
              sum(lon for lat, lon in locations.values()) / len(locations)]
    
    m = folium.Map(location=center, zoom_start=12)
    
    # Add markers for each location
    for label, (lat, lon) in locations.items():
        color = 'blue' if label.startswith('driver') else 'red' if label == 'office' else 'green'
        folium.Marker(
            location=[lat, lon],
            popup=label,
            icon=folium.Icon(color=color, icon='flag')
        ).add_to(m)
    
    office_node = 0
    # Plot paths from drivers to office
    for driver_label, path in paths.items():
        if not path:
            continue
        office_node = path[-1]
        # Get the coordinates for the path
        path_coords = [(graph.nodes[node]['y'], graph.nodes[node]['x']) for node in path]
        if driver_label==lbl:
            color="red"
        else:
            color="blue"
        # Add path to the map
        folium.PolyLine(
            locations=path_coords,
            color=color,
            weight=2.5,
            opacity=0.8,
            popup=driver_label
        ).add_to(m)
    
    # Plot paths from companions to intersection points and add circular markers
    for (driver_label, companion_node), (distance, time, intersection) in best_intersections.items():
        if intersection and driver_label == lbl:
            # Companion's path to intersection
            companion_path = nx.shortest_path(graph, source=intersection, target=companion_node, weight='travel_time')
            companion_coords = [(graph.nodes[node]['y'], graph.nodes[node]['x']) for node in companion_path]
            cd = (graph.nodes[companion_node]['y'], graph.nodes[companion_node]['x'])  
            folium.CircleMarker(
                location=cd,
                radius=5,
                color='black',
                fill=True,
                fill_color='black',
                fill_opacity=0.8,
                popup='Companion start'
            ).add_to(m)
            
            # Plot the companion's path
            folium.PolyLine(
                locations=companion_coords,
                color='red',
                weight=3,
                opacity=1,
                popup=f'Companion Path for {driver_label}'
            ).add_to(m)
            
            # Intersection point
            intersection_coords = (graph.nodes[intersection]['y'], graph.nodes[intersection]['x'])
            folium.CircleMarker(
                location=intersection_coords,
                radius=7,
                color='green',
                fill=True,
                fill_color='green',
                fill_opacity=0.6,
                popup=f'Intersection Point for {driver_label}'
            ).add_to(m)
    
    for _, lst in paths.items():
        coords = (graph.nodes[lst[0]]['y'], graph.nodes[lst[0]]['x'])   
        folium.CircleMarker(
            location=coords,
            radius=5,
            color='black',
            fill=True,
            fill_color='black',
            fill_opacity=0.8,
            popup=f'{_} start node'
        ).add_to(m)
    
    # Add legend
    legend_html = '''
    <div style="
    position: fixed; 
    bottom: 50px; left: 50px; width: 160px; height: 100px; 
    background-color: white; 
    border:2px solid grey; 
    border-radius: 5px; 
    z-index:9999; 
    font-size:14px;
    ">&nbsp; <b>Legend</b><br>
    &nbsp; <i class="fa fa-map-marker" style="color:blue"></i>&nbsp; Driver Path<br>
    &nbsp; <i class="fa fa-map-marker" style="color:red"></i>&nbsp; Companion Path<br>
    &nbsp; <i class="fa fa-map-marker" style="color:green"></i>&nbsp; Intersection<br>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(legend_html))
    
    # Add title
    title_html = '''
    <div style="
    position: fixed; 
    top: 10px; left: 50%; transform: translateX(-50%); 
    width: 100%; height: 40px; 
    background-color: white; 
    border:2px solid grey; 
    border-radius: 5px; 
    z-index:9999; 
    font-size:18px;
    text-align: center;
    ">&nbsp; <b>Drivers and Companions Path Mapping</b>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(title_html))
    
    # Save the map to an HTML file
    m.save('/home/manab/projects/poc_car_pool/paths_map.html')
    logging.info("Map with paths has been saved to paths_map.html")

def main():
    start1 = tm.time()  # Use time module with alias
    # Load the graph and add travel times
    graph = load_bangalore_map_with_times()
    end1 = tm.time()
    
    print(f"Time to load the graph: {end1 - start1:.2f} seconds")
    
    start2 = tm.time()
    # Define locations with updated coordinates
    locations = {
        'driver1': (12.9716, 77.5946),
        'driver2': (12.9654, 77.6101),
        'driver3': (12.9522, 77.6204),
        'driver4': (12.9718, 77.5671),
        'office': (12.9340, 77.6200),
        'companion': (12.9352, 77.6240)
    }

    # Compute the best paths from drivers to the office
    paths = find_best_paths(graph, locations)

    # Calculate distances and find best intersection node for the companion
    companion_nodes = [ox.distance.nearest_nodes(graph, lon, lat) for lat, lon in [locations['companion']]]
    aerial_distances = calculate_driver_companion_distances(graph, paths, companion_nodes)
    driver_companion_distances = find_best_intersection_node(graph, paths, companion_nodes, aerial_distances)

    # Print best drivers for the companion
    best_drivers = sorted(driver_companion_distances.items(), key=lambda x: x[1][0])  # Sort by distance
    for (driver_label, companion_node), (distance, time, intersection) in best_drivers:
        print(f"Driver {driver_label} is {distance:.2f} km away from the companion with a total travel time of {time:.2f} minutes.")
        if intersection:
            intersection_coords = (graph.nodes[intersection]['y'], graph.nodes[intersection]['x'])
            print(f"Best intersection point is at coordinates: {intersection_coords}")

    # Initialize variables to track the best result
    best_driver = None
    best_distance = float('inf')

    # Iterate over the dictionary to find the minimum distance
    for (driver_label, companion_node), (distance, time, intersection) in driver_companion_distances.items():
        if distance < best_distance:
            best_distance = distance
            best_driver = driver_label
    end2 = tm.time()
    print(f"Time to run the script: {end2 - start2:.2f} seconds")
    
    start3 = tm.time()
    plot_paths_on_map(paths, graph, locations, driver_companion_distances, best_driver)
    end3 = tm.time()
    print(f"Time to plot the map: {end3 - start3:.2f} seconds")

if __name__ == "__main__":
    main()


Time to load the graph: 67.96 seconds


2024-09-13 01:47:12,082 - INFO - Map with paths has been saved to paths_map.html


Driver driver3 is 385.22 km away from the companion with a total travel time of 44.80 minutes.
Best intersection point is at coordinates: (12.9363405, 77.6209764)
Driver driver1 is 508.97 km away from the companion with a total travel time of 60.60 minutes.
Best intersection point is at coordinates: (12.9353523, 77.6204512)
Driver driver2 is 508.97 km away from the companion with a total travel time of 60.60 minutes.
Best intersection point is at coordinates: (12.9353523, 77.6204512)
Driver driver4 is 508.97 km away from the companion with a total travel time of 60.60 minutes.
Best intersection point is at coordinates: (12.9353523, 77.6204512)
Time to run the script: 7.46 seconds
Time to plot the map: 0.02 seconds


## Randomisation

In [2]:
# Define the bounding box for Bangalore
BANGALORE_BOUNDS = (12.85, 13.1, 77.5, 77.7)  # (lat_min, lat_max, lon_min, lon_max)


In [3]:
import random

def get_random_locations(num_drivers: int, bounds: Tuple[float, float, float, float]) -> Dict[str, Tuple[float, float]]:
    """
    Generate random locations within the given bounds.
    """
    locations = {}
    lat_min, lat_max, lon_min, lon_max = bounds
    
    for i in range(num_drivers):
        lat = random.uniform(lat_min, lat_max)
        lon = random.uniform(lon_min, lon_max)
        locations[f'driver{i+1}'] = (lat, lon)
    
    return locations


In [17]:
import osmnx as ox
import networkx as nx
import folium
import math
import logging
import random
from typing import Dict, List, Tuple
import time as tm  # Import time module with an alias to avoid conflicts

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_bangalore_map_with_times() -> nx.Graph:
    """
    Load the road network for Bangalore from OSM and add travel times.
    """
    place_name = "Bangalore, India"
    graph = ox.graph_from_place(place_name, network_type='drive')
    ox.add_edge_speeds(graph)
    ox.add_edge_travel_times(graph)
    return graph

def calculate_aerial_distance(graph: nx.Graph, node1: int, node2: int) -> float:
    """
    Calculate the aerial distance between two nodes.
    """
    lat1, lon1 = graph.nodes[node1].get('y'), graph.nodes[node1].get('x')
    lat2, lon2 = graph.nodes[node2].get('y'), graph.nodes[node2].get('x')
    
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return float('inf')
    
    return get_distance_from_lat_lon_in_km(lat1, lon1, lat2, lon2)

def get_distance_from_lat_lon_in_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """
    Compute the distance between two latitude-longitude points in kilometers.
    """
    R = 6371  # Radius of the Earth in kilometers
    dLat = deg2rad(lat2 - lat1)
    dLon = deg2rad(lon2 - lon1)
    a = math.sin(dLat / 2) ** 2 + math.cos(deg2rad(lat1)) * math.cos(deg2rad(lat2)) * math.sin(dLon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

def deg2rad(deg: float) -> float:
    """
    Convert degrees to radians.
    """
    return deg * (math.pi / 180)

def find_best_paths(graph: nx.Graph, locations: Dict[str, Tuple[float, float]]) -> Dict[str, List[int]]:
    """
    Compute the shortest paths from drivers to the office based on travel time.
    """
    office_location = locations['office']
    office_node = ox.distance.nearest_nodes(graph, office_location[1], office_location[0])
    
    paths = {}
    
    for label, (lat, lon) in locations.items():
        if label.startswith('driver'):
            driver_node = ox.distance.nearest_nodes(graph, lon, lat)
            try:
                path = nx.shortest_path(graph, source=driver_node, target=office_node, weight='travel_time')
                paths[label] = path
            except nx.NetworkXNoPath:
                paths[label] = []  # No path found
    
    return paths

def calculate_driver_companion_distances(
    graph: nx.Graph,
    driver_paths: Dict[str, List[int]],
    companion_nodes: List[int]
) -> Dict[Tuple[str, int], List[Tuple[int, float]]]:
    """
    Calculate the top 5 closest nodes for each driver-companion pair based on aerial distance.
    """
    aerial_distances = {}
    
    for driver_label, path in driver_paths.items():
        if not path:
            continue
        for companion_node in companion_nodes:
            distances = []
            for node in path:
                if node in graph.nodes and companion_node in graph.nodes:
                    distance = calculate_aerial_distance(graph, node, companion_node)
                    distances.append((node, distance))
            
            top_5_nodes = sorted(distances, key=lambda x: x[1])[:5]
            aerial_distances[(driver_label, companion_node)] = top_5_nodes
    
    return aerial_distances

def find_best_intersection_node(
    graph: nx.Graph,
    driver_paths: Dict[str, List[int]],
    companion_nodes: List[int],
    aerial_distances: Dict[Tuple[str, int], List[Tuple[int, float]]]
) -> Dict[Tuple[str, int], Tuple[float, float, int]]:
    """
    Find the best intersection node among the top 5 nodes for each driver-companion pair.
    """
    road_distances = {}
    
    buffer_time = 5  # Buffer time in minutes
    
    for (driver_label, companion_node), top_5_nodes in aerial_distances.items():
        shortest_road_distance = float('inf')
        shortest_road_time = float('inf')
        best_intersection_node = None
        
        for node, _ in top_5_nodes:
            try:
                # Calculate road distance and travel time from the driver to the intersection node
                path_to_intersection = nx.shortest_path(graph, source=driver_paths[driver_label][0], target=node, weight='travel_time')
                road_distance_to_intersection = nx.shortest_path_length(graph, source=driver_paths[driver_label][0], target=node, weight='length')
                travel_time_to_intersection = sum(graph[u][v][0].get('travel_time', 0) for u, v in zip(path_to_intersection[:-1], path_to_intersection[1:]))
                
                # Calculate road distance and travel time from the intersection node to the companion
                path_from_intersection = nx.shortest_path(graph, source=node, target=companion_node, weight='length')
                road_distance_from_intersection = nx.shortest_path_length(graph, source=node, target=companion_node, weight='length')
                travel_time_from_intersection = sum(graph[u][v][0].get('travel_time', 0) for u, v in zip(path_from_intersection[:-1], path_from_intersection[1:]))
                
                if road_distance_from_intersection < shortest_road_distance and travel_time_from_intersection <= (travel_time_to_intersection + buffer_time):
                    shortest_road_distance = road_distance_from_intersection
                    shortest_road_time = travel_time_from_intersection
                    best_intersection_node = node
            
            except nx.NetworkXNoPath:
                continue
        
        road_distances[(driver_label, companion_node)] = (shortest_road_distance, shortest_road_time, best_intersection_node)
    
    return road_distances

def plot_paths_on_map(
    paths: Dict[str, List[int]],
    graph: nx.Graph,
    locations: Dict[str, Tuple[float, float]],
    best_intersections: Dict[Tuple[str, int], Tuple[float, float, int]],
    lbl: str
) -> None:
    """
    Plot paths and intersections on a Folium map.
    """
    # Create a Folium map centered around the average of all provided locations
    center = [sum(lat for lat, lon in locations.values()) / len(locations),
              sum(lon for lat, lon in locations.values()) / len(locations)]
    
    m = folium.Map(location=center, zoom_start=12)
    
    # Add markers for each location
    for label, (lat, lon) in locations.items():
        color = 'blue' if label.startswith('driver') else 'red' if label == 'office' else 'green'
        folium.Marker(
            location=[lat, lon],
            popup=label,
            icon=folium.Icon(color=color, icon='flag')
        ).add_to(m)
    
    office_node = 0
    # Plot paths from drivers to office
    for driver_label, path in paths.items():
        if not path:
            continue
        office_node = path[-1]
        # Get the coordinates for the path
        path_coords = [(graph.nodes[node]['y'], graph.nodes[node]['x']) for node in path]
        color = "red" if driver_label == lbl else "blue"
        # Add path to the map
        folium.PolyLine(
            locations=path_coords,
            color=color,
            weight=2.5,
            opacity=0.8,
            popup=driver_label
        ).add_to(m)
    office_node=    (graph.nodes[office_node]['y'], graph.nodes[office_node]['x'])  
    folium.CircleMarker(
                location=office_node,
                radius=7,
                color='yellow',
                fill=True,
                fill_color='brown',
                fill_opacity=0.8,
                popup='Office'
            ).add_to(m)
    
    # Plot paths from companions to intersection points and add circular markers
    for (driver_label, companion_node), (distance, time, intersection) in best_intersections.items():
        if intersection and driver_label == lbl:
            # Companion's path to intersection
            companion_path = nx.shortest_path(graph, source=intersection, target=companion_node, weight='travel_time')
            companion_coords = [(graph.nodes[node]['y'], graph.nodes[node]['x']) for node in companion_path]
            cd = (graph.nodes[companion_node]['y'], graph.nodes[companion_node]['x'])  
            folium.CircleMarker(
                location=cd,
                radius=5,
                color='black',
                fill=True,
                fill_color='black',
                fill_opacity=0.8,
                popup='Companion start'
            ).add_to(m)
            
            # Plot the companion's path
            folium.PolyLine(
                locations=companion_coords,
                color='red',
                weight=3,
                opacity=1,
                popup=f'Companion Path for {driver_label}'
            ).add_to(m)
            
            # Intersection point
            intersection_coords = (graph.nodes[intersection]['y'], graph.nodes[intersection]['x'])
            folium.CircleMarker(
                location=intersection_coords,
                radius=7,
                color='blue',
                fill=True,
                fill_color='blue',
                fill_opacity=0.8,
                popup='Best Intersection'
            ).add_to(m)
            
            # Add marker for the intersection point
        # Add legend
    legend_html = '''
    <div style="
    position: fixed; 
    bottom: 50px; left: 50px; width: 160px; height: 100px; 
    background-color: white; 
    border:2px solid grey; 
    border-radius: 5px; 
    z-index:9999; 
    font-size:14px;
    ">&nbsp; <b>Legend</b><br>
    &nbsp; <i class="fa fa-map-marker" style="color:blue"></i>&nbsp; Driver Path<br>
    &nbsp; <i class="fa fa-map-marker" style="color:red"></i>&nbsp; Companion Path<br>
    &nbsp; <i class="fa fa-map-marker" style="color:green"></i>&nbsp; Intersection<br>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(legend_html))
    
    # Add title
    title_html = '''
    <div style="
    position: fixed; 
    top: 10px; left: 50%; transform: translateX(-50%); 
    width: 100%; height: 40px; 
    background-color: white; 
    border:2px solid grey; 
    border-radius: 5px; 
    z-index:9999; 
    font-size:18px;
    text-align: center;
    ">&nbsp; <b>Drivers and Companions Path Mapping</b>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(title_html))        
    
    m.save('/home/manab/projects/poc_car_pool/Random_map2.html')
    print("Map has been saved as 'Random_map1.html'.")

def get_random_locations(num_drivers: int, bounds: Tuple[float, float, float, float]) -> Dict[str, Tuple[float, float]]:
    """
    Generate random locations within the given bounds.
    """
    locations = {}
    lat_min, lat_max, lon_min, lon_max = bounds
    
    for i in range(num_drivers):
        lat = random.uniform(lat_min, lat_max)
        lon = random.uniform(lon_min, lon_max)
        locations[f'driver{i+1}'] = (lat, lon)
    
    return locations

def main():
    start1 = tm.time()  # Use time module with alias
    # Load the graph and add travel times
    graph = load_bangalore_map_with_times()
    end1 = tm.time()
    
    print(f"Time to load the graph: {end1 - start1:.2f} seconds")

    # Prompt the user for the number of drivers
    num_drivers = int(input("Enter the number of drivers (1-10): "))
    if not (1 <= num_drivers <= 10):
        print("Number of drivers must be between 1 and 10.")
        return

    start2 = tm.time()
    # Generate random locations for drivers
    driver_locations = get_random_locations(num_drivers, BANGALORE_BOUNDS)
    print(driver_locations)
    # Define fixed locations
    locations = driver_locations
    locations['office'] = (12.9340, 77.6200)  # Fixed office location
    locations['companion'] = (12.9370, 77.6280)  # Fixed companion location
    
    print(locations)

    # Compute the best paths from drivers to the office
    paths = find_best_paths(graph, locations)

    # Calculate distances and find best intersection node for the companion
    companion_nodes = [ox.distance.nearest_nodes(graph, lon, lat) for lat, lon in [locations['companion']]]
    aerial_distances = calculate_driver_companion_distances(graph, paths, companion_nodes)
    driver_companion_distances = find_best_intersection_node(graph, paths, companion_nodes, aerial_distances)

    # Print best drivers for the companion
    best_drivers = sorted(driver_companion_distances.items(), key=lambda x: x[1][0])  # Sort by distance
    for (driver_label, companion_node), (distance, time, intersection) in best_drivers:
        print(f"Driver {driver_label} is {distance:.2f} km away from the companion with a total travel time of {time:.2f} minutes.")
        if intersection:
            intersection_coords = (graph.nodes[intersection]['y'], graph.nodes[intersection]['x'])
            print(f"Best intersection point is at coordinates: {intersection_coords}")

    # Initialize variables to track the best result
    best_driver = None
    best_distance = float('inf')

    # Iterate over the dictionary to find the minimum distance
    for (driver_label, companion_node), (distance, time, intersection) in driver_companion_distances.items():
        if distance < best_distance:
            best_distance = distance
            best_driver = driver_label
            best_int=intersection
            print(best_driver)
            
    print(best_int)
    end2 = tm.time()
    print(f"Time to run the script: {end2 - start2:.2f} seconds")
    
    start3 = tm.time()
    plot_paths_on_map(paths, graph, locations, driver_companion_distances, best_driver)
    end3 = tm.time()
    print(f"Time to plot the map: {end3 - start3:.2f} seconds")

if __name__ == "__main__":
    main()


Time to load the graph: 62.09 seconds
{'driver1': (13.064165569984327, 77.69139096635745), 'driver2': (12.9417027837777, 77.6047024498736), 'driver3': (12.954032807906207, 77.65954983678637), 'driver4': (13.058517489154386, 77.60478468841292), 'driver5': (12.880261914881979, 77.53660170901598), 'driver6': (12.940454648155722, 77.53464148361742), 'driver7': (12.853139039629268, 77.53113161431367), 'driver8': (13.019213642830296, 77.64707293682815), 'driver9': (12.924086544235553, 77.6240676997266), 'driver10': (12.890032282392307, 77.63744525906849)}
{'driver1': (13.064165569984327, 77.69139096635745), 'driver2': (12.9417027837777, 77.6047024498736), 'driver3': (12.954032807906207, 77.65954983678637), 'driver4': (13.058517489154386, 77.60478468841292), 'driver5': (12.880261914881979, 77.53660170901598), 'driver6': (12.940454648155722, 77.53464148361742), 'driver7': (12.853139039629268, 77.53113161431367), 'driver8': (13.019213642830296, 77.64707293682815), 'driver9': (12.924086544235553

In [11]:
values = (7, 8)
var1, var2 = values


In [13]:
var1

7

In [34]:
import osmnx as ox
import networkx as nx
import folium
import math
import logging
import random
from typing import Dict, List, Tuple
import time as tm  # Import time module with an alias to avoid conflicts

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_bangalore_map_with_times() -> nx.Graph:
    """
    Load the road network for Bangalore from OSM and add travel times.
    """
    place_name = "Bangalore, India"
    graph = ox.graph_from_place(place_name, network_type='drive')
    ox.add_edge_speeds(graph)
    ox.add_edge_travel_times(graph)
    return graph

def calculate_aerial_distance(graph: nx.Graph, node1: int, node2: int) -> float:
    """
    Calculate the aerial distance between two nodes.
    """
    lat1, lon1 = graph.nodes[node1].get('y'), graph.nodes[node1].get('x')
    lat2, lon2 = graph.nodes[node2].get('y'), graph.nodes[node2].get('x')
    
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return float('inf')
    
    return get_distance_from_lat_lon_in_km(lat1, lon1, lat2, lon2)

def get_distance_from_lat_lon_in_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """
    Compute the distance between two latitude-longitude points in kilometers.
    """
    R = 6371  # Radius of the Earth in kilometers
    dLat = deg2rad(lat2 - lat1)
    dLon = deg2rad(lon2 - lon1)
    a = math.sin(dLat / 2) ** 2 + math.cos(deg2rad(lat1)) * math.cos(deg2rad(lat2)) * math.sin(dLon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

def deg2rad(deg: float) -> float:
    """
    Convert degrees to radians.
    """
    return deg * (math.pi / 180)

def find_best_paths(graph: nx.Graph, locations: Dict[str, Tuple[float, float]]) -> Dict[str, List[int]]:
    """
    Compute the shortest paths from drivers to the office based on travel time.
    """
    office_location = locations['office']
    office_node = ox.distance.nearest_nodes(graph, office_location[1], office_location[0])
    
    paths = {}
    
    for label, (lat, lon) in locations.items():
        if label.startswith('driver'):
            driver_node = ox.distance.nearest_nodes(graph, lon, lat)
            try:
                path = nx.shortest_path(graph, source=driver_node, target=office_node, weight='travel_time')
                paths[label] = path
            except nx.NetworkXNoPath:
                paths[label] = []  # No path found
    
    return paths

def calculate_driver_companion_distances(
    graph: nx.Graph,
    driver_paths: Dict[str, List[int]],
    companion_nodes: List[int]
) -> Dict[Tuple[str, int], List[Tuple[int, float]]]:
    """
    Calculate the top 5 closest nodes for each driver-companion pair based on aerial distance.
    """
    aerial_distances = {}
    
    for driver_label, path in driver_paths.items():
        if not path:
            continue
        for companion_node in companion_nodes:
            distances = []
            for node in path:
                if node in graph.nodes and companion_node in graph.nodes:
                    distance = calculate_aerial_distance(graph, node, companion_node)
                    distances.append((node, distance))
            
            top_5_nodes = sorted(distances, key=lambda x: x[1])[:5]
            aerial_distances[(driver_label, companion_node)] = top_5_nodes
    
    return aerial_distances

def find_best_intersection_node(
    graph: nx.Graph,
    driver_paths: Dict[str, List[int]],
    companion_nodes: List[int],
    aerial_distances: Dict[Tuple[str, int], List[Tuple[int, float]]]
) -> Dict[Tuple[str, int], Tuple[float, float, int]]:
    """
    Find the best intersection node among the top 5 nodes for each driver-companion pair.
    """
    road_distances = {}
    
    buffer_time = 5  # Buffer time in minutes
    
    for (driver_label, companion_node), top_5_nodes in aerial_distances.items():
        shortest_road_distance = float('inf')
        shortest_road_time = float('inf')
        best_intersection_node = None
        # print(top_5_nodes)
        for node, _ in top_5_nodes:
            count=0
            
            # Calculate road distance and travel time from the driver to the intersection node
            # path_to_intersection = nx.shortest_path(graph, source=driver_paths[driver_label][0], target=node, weight='travel_time')
            road_distance_to_intersection = nx.shortest_path_length(graph, source=driver_paths[driver_label][0], target=node, weight='length')
            travel_time_to_intersection = nx.shortest_path_length(graph, source=driver_paths[driver_label][0], target=node, weight='travel_time')
            
            # Calculate road distance and travel time from the intersection node to the companion
            path_from_intersection = nx.shortest_path(graph, source=node, target=companion_node, weight='length')
            road_distance_from_intersection = nx.shortest_path_length(graph, source=node, target=companion_node, weight='length')
            travel_time_from_intersection = sum(graph[u][v][0].get('travel_time', 0) for u, v in zip(path_from_intersection[:-1], path_from_intersection[1:]))
            

            
            if road_distance_from_intersection < shortest_road_distance and travel_time_from_intersection <= (travel_time_to_intersection + buffer_time):
                shortest_road_distance = road_distance_from_intersection
                shortest_road_time = travel_time_from_intersection
                best_intersection_node = node
                
                print("Atleaset entering inside")
                
            else :
                print(driver_label)
                print(f"all details of {node} are {road_distance_to_intersection , travel_time_to_intersection, road_distance_from_intersection, travel_time_from_intersection }")
            
            # except nx.NetworkXNoPath:
            #     print("No path found")
            #     return
        
        road_distances[(driver_label, companion_node)] = (shortest_road_distance, shortest_road_time, best_intersection_node)
        print(road_distances)
    return road_distances

def plot_paths_on_map(
    paths: Dict[str, List[int]],
    graph: nx.Graph,
    locations: Dict[str, Tuple[float, float]],
    best_intersections: Dict[Tuple[str, int], Tuple[float, float, int]],
    lbl: str
) -> None:
    """
    Plot paths and intersections on a Folium map.
    """
    # Create a Folium map centered around the average of all provided locations
    center = [sum(lat for lat, lon in locations.values()) / len(locations),
              sum(lon for lat, lon in locations.values()) / len(locations)]
    
    m = folium.Map(location=center, zoom_start=12)
    
    # Add markers for each location
    for label, (lat, lon) in locations.items():
        color = 'blue' if label.startswith('driver') else 'red' if label == 'office' else 'green'
        folium.Marker(
            location=[lat, lon],
            popup=label,
            icon=folium.Icon(color=color, icon='flag')
        ).add_to(m)
    
    office_node = ox.distance.nearest_nodes(graph, locations['office'][1], locations['office'][0])
    
    # Plot paths from drivers to office
    for driver_label, path in paths.items():
        if not path:
            continue
        office_node = path[-1]
        # Get the coordinates for the path
        path_coords = [(graph.nodes[node]['y'], graph.nodes[node]['x']) for node in path]
        color = "red" if driver_label == lbl else "blue"
        # Add path to the map
        folium.PolyLine(
            locations=path_coords,
            color=color,
            # smooth_factor=1000,
            weight=2.5,
            opacity=0.8,
            popup=driver_label
        ).add_to(m)
        
    # Add marker for office
    office_coords = (graph.nodes[office_node]['y'], graph.nodes[office_node]['x'])
    folium.CircleMarker(
        location=office_coords,
        radius=7,
        color='yellow',
        fill=True,
        fill_color='brown',
        fill_opacity=0.8,
        popup='Office'
    ).add_to(m)
    
    # Plot paths from companions to intersection points and add circular markers
    for (driver_label, companion_node), (distance, time, intersection) in best_intersections.items():
        if intersection and driver_label == lbl:
            # Companion's path to intersection
            companion_path = nx.shortest_path(graph, source=intersection, target=companion_node, weight='travel_time')
            companion_coords = [(graph.nodes[node]['y'], graph.nodes[node]['x']) for node in companion_path]
            cd = (graph.nodes[companion_node]['y'], graph.nodes[companion_node]['x'])  
            folium.CircleMarker(
                location=cd,
                radius=5,
                color='black',
                fill=True,
                fill_color='black',
                fill_opacity=0.8,
                popup='Companion start'
            ).add_to(m)
            
            # Plot the companion's path
            folium.PolyLine(
                locations=companion_coords,
                color='red',
                weight=3,
                opacity=1,
                popup=f'Companion Path for {driver_label}'
            ).add_to(m)
            
            # Intersection point
            intersection_coords = (graph.nodes[intersection]['y'], graph.nodes[intersection]['x'])
            folium.CircleMarker(
                location=intersection_coords,
                radius=7,
                color='blue',
                fill=True,
                fill_color='blue',
                fill_opacity=0.8,
                popup='Best Intersection'
            ).add_to(m)
        # Add legend
    legend_html = '''
    <div style="
    position: fixed; 
    bottom: 50px; left: 50px; width: 160px; height: 100px; 
    background-color: white; 
    border:2px solid grey; 
    border-radius: 5px; 
    z-index:9999; 
    font-size:14px;
    ">&nbsp; <b>Legend</b><br>
    &nbsp; <i class="fa fa-map-marker" style="color:blue"></i>&nbsp; Driver Path<br>
    &nbsp; <i class="fa fa-map-marker" style="color:red"></i>&nbsp; Companion Path<br>
    &nbsp; <i class="fa fa-map-marker" style="color:green"></i>&nbsp; Intersection<br>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(legend_html))
    
    # Add title
    title_html = '''
    <div style="
    position: fixed; 
    top: 10px; left: 50%; transform: translateX(-50%); 
    width: 100%; height: 40px; 
    background-color: white; 
    border:2px solid grey; 
    border-radius: 5px; 
    z-index:9999; 
    font-size:18px;
    text-align: center;
    ">&nbsp; <b>Drivers and Companions Path Mapping</b>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(title_html))
    
    m.save('/home/manab/projects/poc_car_pool/Random_map.html')
    print("Map has been saved as 'Random_map.html'.")

def get_random_locations(num_drivers: int, bounds: Tuple[float, float, float, float]) -> Dict[str, Tuple[float, float]]:
    """
    Generate random locations within the given bounds.
    """
    locations = {}
    lat_min, lat_max, lon_min, lon_max = bounds
    
    for i in range(num_drivers):
        lat = random.uniform(lat_min, lat_max)
        lon = random.uniform(lon_min, lon_max)
        locations[f'driver{i+1}'] = (lat, lon)
    
    return locations

def get_random_companion_location(bounds: Tuple[float, float, float, float]) -> Tuple[float, float]:
    """
    Generate a random location for the companion within the given bounds.
    """
    lat_min, lat_max, lon_min, lon_max = bounds
    lat = random.uniform(lat_min, lat_max)
    lon = random.uniform(lon_min, lon_max)
    return (lat, lon)

def main():
    start1 = tm.time()  # Use time module with alias
    # Load the graph and add travel times
    graph = load_bangalore_map_with_times()
    end1 = tm.time()
    
    print(f"Time to load the graph: {end1 - start1:.2f} seconds")

    # Prompt the user for the number of drivers
    num_drivers = int(input("Enter the number of drivers (1-10): "))
    if not (1 <= num_drivers <= 10):
        print("Number of drivers must be between 1 and 10.")
        return

    start2 = tm.time()
    # Define bounds for random locations
    BANGALORE_BOUNDS = (12.8500, 13.0000, 77.5000, 77.7000)  

    # Generate random locations for drivers
    driver_locations = get_random_locations(num_drivers, BANGALORE_BOUNDS)
    
    # Generate random location for companion
    companion_location = get_random_companion_location(BANGALORE_BOUNDS)
    
    # Define fixed locations
    locations = driver_locations
    locations['office'] = (12.9340, 77.6200)  # Fixed office location
    locations['companion'] = companion_location 
    print(locations)
    # Compute the best paths from drivers to the office
    paths = find_best_paths(graph, locations)
    print(paths)
    # Calculate distances and find best intersection node for the companion
    companion_nodes = [ox.distance.nearest_nodes(graph, locations['companion'][1], locations['companion'][0])]
    print(companion_nodes)
    aerial_distances = calculate_driver_companion_distances(graph, paths, companion_nodes)
    print(aerial_distances)
    driver_companion_distances = find_best_intersection_node(graph, paths, companion_nodes, aerial_distances)

    # Print best drivers for the companion
    best_drivers = sorted(driver_companion_distances.items(), key=lambda x: x[1][0])  # Sort by distance
    for (driver_label, companion_node), (distance, time, intersection) in best_drivers:
        print(f"Driver {driver_label} is {distance:.2f} km away from the companion with a total travel time of {time:.2f} minutes.")
        if intersection:
            intersection_coords = (graph.nodes[intersection]['y'], graph.nodes[intersection]['x'])
            print(f"Best intersection point is at coordinates: {intersection_coords}")

    # Initialize variables to track the best result
    best_driver = None
    best_distance = float('inf')

    # Iterate over the dictionary to find the minimum distance
    for (driver_label, companion_node), (distance, time, intersection) in driver_companion_distances.items():
        if distance < best_distance:
            best_distance = distance
            best_driver = driver_label
    end2 = tm.time()
    print(f"Time to run the script: {end2 - start2:.2f} seconds")
    
    start3 = tm.time()
    plot_paths_on_map(paths, graph, locations, driver_companion_distances, best_driver)
    end3 = tm.time()
    print(f"Time to plot the map: {end3 - start3:.2f} seconds")

if __name__ == "__main__":
    main()


Time to load the graph: 54.02 seconds
{'driver1': (12.96661505694562, 77.57902070732611), 'driver2': (12.962189276518025, 77.62617297994474), 'driver3': (12.85937709258359, 77.51136414835274), 'driver4': (12.942517363358647, 77.66854360539931), 'driver5': (12.857691321021255, 77.63397174416224), 'driver6': (12.924994302195318, 77.60039682194459), 'driver7': (12.90134389318655, 77.62200853577146), 'driver8': (12.970819509977625, 77.6149200774564), 'driver9': (12.892026613458894, 77.65263849899738), 'driver10': (12.888969579878804, 77.5599495109895), 'office': (12.934, 77.62), 'companion': (12.856847212818039, 77.57988429283634)}
{'driver1': [4126165192, 429231078, 429231028, 10026593812, 429231013, 10026593801, 429231031, 429231026, 10026593794, 10026593798, 10026593777, 429231025, 254661639, 429231006, 429231082, 249083792, 4126163687, 429001176, 1410617126, 1837339316, 1837339315, 429001168, 429001173, 11889038534, 320644590, 12077162815, 12128921371, 12128921370, 459598606, 121289213

In [33]:
('driver3', 3145211060): [(1607770096, 1.6335151083524535), (307408561, 1.6798744175478495), (306850818, 1.6956578461182952), (3302507661, 1.7111719742522957), (1723827219, 1.741329541414862)]

'companion': (12.91786359836342, 77.60406242919393)

SyntaxError: only single target (not tuple) can be annotated (1190263176.py, line 1)

In [1]:
import os

num_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cores}")

Number of CPU cores: 32
