In [None]:
# import libraries
import multiprocessing as mp
import geopandas as gpd
import osmnx as ox
import networkx as nx
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from shapely import wkt
from warnings import warn
from heapq import heappop, heappush
from itertools import count

In [None]:
# generate G of LA County
G = ox.graph_from_place('Los Angeles County, CA, USA', network_type='drive')

In [None]:
# add attributes
G = ox.add_edge_speeds(G)
G = ox.add_edge_travel_times(G)
G = ox.bearing.add_edge_bearings(G)

In [None]:
# create gdfs
nodes_gdf, edges_gdf = ox.graph_to_gdfs(G)

In [None]:
# import O,D
origins_df = pd.read_csv("../Data/origins_gdf.csv")
destinations_df = pd.read_csv("../Data/destinations_gdf.csv")

origins_df['geometry'] = origins_df['geometry'].apply(wkt.loads)
destinations_df['geometry'] = destinations_df['geometry'].apply(wkt.loads)

# geodataframe of O, D
origins_gdf = gpd.GeoDataFrame(origins_df, geometry='geometry')
destinations_gdf = gpd.GeoDataFrame(destinations_df, geometry='geometry')

# Add attributes to G edges

Use add_edge_traffic_times function to add extra time based on traffic controls

In [None]:
from functions import add_edge_traffic_times

In [None]:
G = add_edge_traffic_times(G, traffic_signals_time = 30, stop_time = 15, turning_circle_time = 5, crossing_time = 5, give_way_time = 5, mini_roundabout_time = 5)

In [None]:
# create gdfs
nodes_gdf, edges_gdf = ox.graph_to_gdfs(G)

In [None]:
edges_gdf

In [None]:
edges_gdf[['travel_time', 'traffic_time', 'total_time']]

# Penalty functions

Use get_turn_penalty_dict function to get turn penalty dictionary

In [None]:
from functions import get_turn_penalty_dict

In [None]:
penalty = get_turn_penalty_dict(G, left_turn_penalty = 30, right_turn_penalty = 10, u_turn_penalty = 90)

In [None]:
penalty

# shortest_path_turn_penalty

https://github.com/maxtmng/shortest_path_turn_penalty/

In [None]:
"""
    Uses Dijkstra's algorithm to find the shortest weighted paths to one or multiple targets with turn penalty.
    This function is adapted from networkx.algorithms.shortest_paths.weighted._dijkstra_multisource.
    The turn penalty implementation is based on:
    Ziliaskopoulos, A.K., Mahmassani, H.S., 1996. A note on least time path computation considering delays and prohibitions for intersection movements. Transportation Research Part B: Methodological 30, 359–367. https://doi.org/10.1016/0191-2615(96)00001-X
    Parameters
    ----------
    G : NetworkX graph
    source : non-empty iterable of nodes
        Starting nodes for paths. If this is just an iterable containing
        a single node, then all paths computed by this function will
        start from that node. If there are two or more nodes in this
        iterable, the computed paths may begin from any one of the start
        nodes.
    target : node label, single node or a list
        Ending node (or a list of ending nodes) for path. Search is halted when any target is found.
    weight: function
        Function with (u, v, data) input that returns that edge's weight
        or None to indicate a hidden edge
    penalty : dict, optional (default={})
        Dictionary containing turn penalties. The key is a tuple (u, v, m) where
        u, v are the nodes of the current edge and m is the next node.
    next_node : node, optional (default=None)
        Next node to consider from the source.
    Returns
    -------
    list of nodes
        Path from source to target.
    Raises
    ------
    NodeNotFound
        If the source or target is not in `G`.
    ValueError
        If contradictory paths are found due to negative weights.
    """

In [None]:
## used the function without any modification

def shortest_path_turn_penalty(G, source, target, weight="weight", penalty={}, next_node = None):

    G_succ = G._adj  # For speed-up (and works for both directed and undirected graphs)
    weight = nx.algorithms.shortest_paths.weighted._weight_function(G, weight)
    push = heappush
    pop = heappop
    dist = {}  # dictionary of final distances
    paths = {source: [source]}
    target_list = [target] if not isinstance(target, list) else target
    reached_target = None
    seen = {}
    c = count()
    fringe = []
    seen[source] = {}
    if next_node is None:
        for m,_ in G_succ[source].items():
            seen[source][m] = 0
            push(fringe, (0, next(c), source, m))
    else:
        push(fringe, (0, next(c), source, next_node))
    while fringe:
        (d, _, v, m) = pop(fringe)
        u = m
        if v in dist:
            if u in dist[v]:
                continue  # already searched this node.
        else:
            dist[v] = {}
        dist[v][u] = d
        if v in target_list:
            reached_target = v
            break
        e = G[v][u]
        for m in G_succ[u]:
            cost = weight(v, u, e)
            if (v,u,m) in penalty:
                cost += penalty[v,u,m]

            if cost is None:
                continue
            vu_dist = dist[v][u] + cost
            if u in dist:
                if m in dist[u]:
                    u_dist = dist[u][m]
                    if vu_dist < u_dist:
                        raise ValueError("Contradictory paths found:", "negative weights?")
            elif u not in seen or m not in seen[u] or vu_dist < seen[u][m]:
                if u not in seen:
                    seen[u] = {}
                seen[u][m] = vu_dist
                push(fringe, (vu_dist, next(c), u, m))
                if paths is not None:
                    paths[u] = paths[v] + [u]
    # The optional predecessor and path dictionaries can be accessed
    # by the caller via the pred and paths objects passed as arguments.
    return paths[reached_target]

Used get_routes_from_gdfs function to find routes of O-D pairs.

In [None]:
def get_routes_from_gdfs(G, origins_gdf, destinations_gdf, weight='weight', penalty={}):
  routes = []
  for i in range(len(origins_gdf)):
    # find nearest nodes
    orig_node = ox.distance.nearest_nodes(G, origins_gdf.iloc[i]['geometry'].x, origins_gdf.iloc[i]['geometry'].y)
    dest_node = ox.distance.nearest_nodes(G, destinations_gdf.iloc[i]['geometry'].x, destinations_gdf.iloc[i]['geometry'].y)
        
    # find routes while considering penalties
    route = shortest_path_turn_penalty(G, orig_node, dest_node, weight='weight', penalty={})
    routes.append(route)

  return routes

In [None]:
routes = get_routes_from_gdfs(G, origins_gdf, destinations_gdf, weight='total_time', penalty=penalty)

In [None]:
# travel time of 10 O-D pair routes
time_result = []

for i in range(len(routes)):
    x = sum(ox.utils_graph.get_route_edge_attributes(G, routes[i], 'total_time'))
    time_result.append(x)

In [None]:
time_result

In [None]:
# format time_result
def format_travel_time(seconds):
    if seconds is None:
        return None

    rounded_seconds = round(seconds)
    minutes, remaining_seconds = divmod(rounded_seconds, 60)
    hours, minutes = divmod(minutes, 60)

    formatted_time = []

    if hours:
        formatted_time.append(f"{hours} hour")

    if minutes:
        formatted_time.append(f"{minutes} min")

    if remaining_seconds:
        formatted_time.append(f"{remaining_seconds} sec")

    return ' '.join(formatted_time)

formatted_times = [format_travel_time(t) for t in time_result]

In [None]:
formatted_times

### TEST

In [None]:
# sample
## for test, used 7th O-D pair
orig_node = ox.distance.nearest_nodes(G, origins_gdf.iloc[6]['geometry'].x, origins_gdf.iloc[6]['geometry'].y)
dest_node = ox.distance.nearest_nodes(G, destinations_gdf.iloc[6]['geometry'].x, destinations_gdf.iloc[6]['geometry'].y)

In [None]:
# with both traffic, turn penalties
shortest_path = shortest_path_turn_penalty(G, orig_node, dest_node, weight="total_time", penalty=penalty)

In [None]:
# without traffic or turn penalties
# weight = 'travel_time'
no_penalty = shortest_path_turn_penalty(G, orig_node, dest_node, weight='travel_time')

In [None]:
# only traffic penalties
only_traffic = shortest_path_turn_penalty(G, orig_node, dest_node, weight='total_time')

In [None]:
# only turn penalties
only_turn = shortest_path_turn_penalty(G, orig_node, dest_node, weight='travel_time', penalty=penalty)

In [None]:
# difference in the routes
# plot the routes on top of the network
ox.plot_graph_routes(G, routes = [shortest_path, no_penalty, only_traffic, only_turn], route_linewidth=2, route_colors = ['r', 'b', 'g', 'y'], bgcolor='k', node_size=0)

plt.show()


# Calculate time

In [None]:
# calculate the total travel time of each route
no_penalties = sum(ox.utils_graph.get_route_edge_attributes(G, no_penalty, 'travel_time'))
all_penalties = sum(ox.utils_graph.get_route_edge_attributes(G, shortest_path, 'total_time'))
traffic_penalties = sum(ox.utils_graph.get_route_edge_attributes(G, only_traffic, 'total_time'))
turn_penalites = sum(ox.utils_graph.get_route_edge_attributes(G, only_turn, 'travel_time'))

In [None]:
osmnx = ox.distance.shortest_path(G, orig_node, dest_node, weight='travel_time')
osmnx_result = sum(ox.utils_graph.get_route_edge_attributes(G,osmnx, 'travel_time'))

In [None]:
data = {
    'osmnx_result': [osmnx_result],
    'no_penalties': [no_penalties],
    'all_penalties': [all_penalties],
    'traffic_penalties': [traffic_penalties],
    'turn_penalites': [turn_penalites]
}

comparison = pd.DataFrame(data)

In [None]:
comparison