## Network X functions

In [1]:
## Import libraries
import osmnx as ox
import networkx as nx
import math
import dash_leaflet as dl
from jupyter_dash import JupyterDash
from dash import html
import dash
from collections import deque

Test a Tolworth 04 route

In [2]:
start_node = 23779844
G_tolworth = ox.graph_from_point((51.3829463, -0.2933327), dist=5000, network_type='drive')  # Adjust lat/lon to roughly London
node_id = 10282513496
centre_lat = 51.3829463
centre_lon = -0.2933327

In [3]:
type(G_tolworth)

networkx.classes.multidigraph.MultiDiGraph

In [None]:
# function to get nodes on a road
def get_nodes_on_road(G, road_name: str) -> set:
    """_summary_
    Given a collection of nodes via a networkx.classes.multidigraph.MultiDiGraph find
    all of the nodes on a given road

    Args:
        G (OSMNX digraph): Collection of nodes to investigate
        road_name (str): Road name 

    Returns:
        set: set of all osmnx nodes on a road
    """
    nodes = set()
    for u, v, data in G.edges(data=True): # u is the start node, v is the edge node, data is edge data
        if data.get('name') == road_name:
            nodes.add(u)
            nodes.add(v)
    return nodes

# Function to get the angle of turn nodes
def turn_angle(current_node: dict, shared_node: dict, neighbour_node: dict):
    """_summary_

    Args:
        current_node (dict): _description_
        shared_node (dict): _description_
        neighbour_node (dict): _description_

    Returns:
        _type_: _description_
    """
    x1 =  shared_node['x'] - current_node['x']
    y1 =  shared_node['y'] - current_node['y']
    x2 =  neighbour_node['x'] - shared_node['x']
    y2 =  neighbour_node['y'] - shared_node['y']
    
    # print(f'x1: {x1}, y1: {y1}' )
    # print(f'x2: {x2}, y2: {y2}' )

    angle1 = math.atan2(y1, x1)
    angle2 = math.atan2(y2, x2)
    
    # Signed angle difference
    angle_deg = math.degrees(angle1 - angle2)

    return angle_deg + 180

# Turn direction 
def get_turn_node(G, current_road_name: str, next_road_name: str, current_node: str, direction: str):
    """_summary_

    Args:
        G (_type_): _description_
        current_road_name (str): _description_
        next_road_name (str): _description_
        current_node (str): _description_
        direction (str): _description
    """
    # 1) Get the nodes on the current road
    curr_road_nodes = get_nodes_on_road(G, current_road_name)
    # 2) Get the nodes on the next road
    next_road_nodes = get_nodes_on_road(G, next_road_name)
    # 3) Find the nodes common to both to get junction nodes 
    junc_node = list(curr_road_nodes.intersection(next_road_nodes)) # get nodes in both roads
    if len(junc_node) == 1:
        junc_node = list(curr_road_nodes.intersection(next_road_nodes))[0]
    elif len(junc_node) > 1:
        # Check if the nodes have neighbours which are not shared
        next_node_not_shared = next_road_nodes.difference(set(junc_node)) # check for nodes on next road not in current
        junc_node = {node for node in junc_node 
         if len({n for n in G.neighbors(node)}.intersection(next_node_not_shared))}
            
        # Check if there is one node left, otherwise use networkx to get shortest distance
        if len(junc_node) == 1:
            junc_node = list(junc_node)[0]
        else:
            junc_node = min(
            junc_node,
            key=lambda node: nx.shortest_path_length(G, current_node, node, weight="length")
)

        
    else:
        raise ValueError("No shared nodes between the two roads!")
    # 4) Get neighbours of common node
    junc_node_neighbours = [neighbour for neighbour in G.neighbors(junc_node) 
                            if (neighbour in next_road_nodes 
                            and neighbour not in curr_road_nodes)]
    # 5) Get angles of the turns
    neigbour_angles = [turn_angle(G.nodes[current_node], G.nodes[junc_node], G.nodes[x]) for x in junc_node_neighbours]
    # 7) Select Neighbour node dependent on turn direction
    if direction == 'left':
        final_node = junc_node_neighbours[neigbour_angles.index(min(neigbour_angles))]
    elif direction == "right":
        final_node = junc_node_neighbours[neigbour_angles.index(max(neigbour_angles))]
    elif direction == "straight":
        junc_node_neighbours[min(enumerate([abs(180 - (n)) for n in neigbour_angles]), key=lambda x: x[1])[0]]
    else:
        pass # may need to add this??
    # 7) Get the route from current node to final node
    current_node_to_junction_pth = nx.shortest_path(G, source=current_node, target=junc_node)
    junction_to_turn_pth = nx.shortest_path(G, junc_node, final_node)
    
    return current_node_to_junction_pth[:-1] + junction_to_turn_pth

## Roundabout functions ======================================================================= ##

## Get the potential roundabout nodes
def get_roundabout_path(G, current_road_name: str, next_road_name: str, current_node: str, exit: int):
    """_summary_

    Args:
        G (_type_): _description_
        current_road_name (str): _description_
        next_road_name (str): _description_
        current_node (str): _description_
    """
    current_road_nodes = get_nodes_on_road(G, current_road_name) # get the nodes on the current road
    potential_roundabout_current_road_nodes = set()
    
    for node in current_road_nodes: # loop through current road nodes
        for neighbour in G.successors(node): # get the neighbours of the current road nodes
            for _, edge_data in G.get_edge_data(node, neighbour).items(): # get the edges of the current road nodes
                if edge_data.get('junction') == 'roundabout': # check if any of the edges are roundabouts
                    potential_roundabout_current_road_nodes.add(node) # append these edges to a list
                    
    roundabout_nodes_dict = dict()
                    
    ## Look at the connected nodes to find the roundabout nodes
    for node in potential_roundabout_current_road_nodes:
        queue = [node] # intiate a queue
        visited = set([node])
        roundabout_nodes = [node]
        
        
        while queue:
            current_ra_node = queue.pop(0)
            for nd_ra in G.successors(current_ra_node):
                if nd_ra not in visited:
                    edge_ra_datas = G.get_edge_data(current_ra_node, nd_ra)
                    if edge_ra_datas:
                        for _, edge_ra_data in edge_ra_datas.items():
                            if edge_ra_data.get('junction') == 'roundabout':
                                queue.append(nd_ra)
                                visited.add(nd_ra)
                                roundabout_nodes.append(nd_ra)
                                
        roundabout_nodes_dict[node] = roundabout_nodes
                                
    ## Check for duplicated roundabouts
    unique_roundabouts = {}
    seen = set()
    for entry_node, nodes in roundabout_nodes_dict.items():
        frozen = frozenset(nodes)
        if frozen not in seen:
            seen.add(frozen)
            unique_roundabouts[entry_node] = nodes
            
    ## Get the road names for each exit node
    roundabout_exit_roads_dict = dict()
    for ra_key in unique_roundabouts:
        roundabout_exit_roads = dict()
        ra = unique_roundabouts[ra_key]
        for rb_nd in ra:
            #print(ra)
            for rb_nd_neighbour in G.successors(rb_nd):
                if rb_nd_neighbour not in ra:
                    rd_nb_edges = G.get_edge_data(rb_nd, rb_nd_neighbour)
                    for _, data in rd_nb_edges.items():
                        roundabout_exit_roads[rb_nd] = data.get('name')
                    
            roundabout_exit_roads_dict[ra_key] = roundabout_exit_roads
        
    ## Remove roundabouts that do not have any exits onto the road
    roundabout_exit_roads_dict = {
        k: v for k, v in roundabout_exit_roads_dict.items()
        if next_road_name in v.values()
    }
    
    ## check that there is only one roundabout still left after filtering
    if len(roundabout_exit_roads_dict) > 1:
        raise ValueError("Multiple roundabouts after filtering")    
 
    if len(roundabout_exit_roads_dict) == 0:
        raise ValueError("No roundabouts found")  
    
    ## Get the most likely entrance node using shortest distance (this will be the path to the roundabout)
    roundabout_entrance_curr_node_nodes = {k : ox.shortest_path(G, current_node, k,  weight='length') 
                                  for k, v in next(iter(roundabout_exit_roads_dict.values())).items() 
                                  if v in current_road_name}
    
    roundabout_entrance_node, roundabout_entrance_path = min(roundabout_entrance_curr_node_nodes.items(), key=lambda item: len(item[1]))
    
    ## Get the exit node from the exit number
    roundabout_nodes = list(next(iter(roundabout_exit_roads_dict.values())).keys())
    
    # Use deque to rotate the list so that entrance_node is first
    dq = deque(roundabout_nodes)
    
    while dq[0] != roundabout_entrance_node:
        dq.rotate(-1)  # Move elements left until entrance_node is at front
        
    roundabout_exit_node = dq[exit]
    
    # Get route between roundabout entrance and exit nodes
    roundabout_entrance_exit_path = ox.shortest_path(G, roundabout_entrance_node, roundabout_exit_node,  weight='length')
    
    # Get the final node on the road
    final_node = [nd for nd in G.successors(roundabout_exit_node) if nd not in roundabout_nodes]
    
    ## check if there aren't multiple routes off the exit node
    if len(final_node) != 1:
        raise ValueError(f"Multiple nodes off exit node found from node: {roundabout_exit_node}")

    final_path = roundabout_entrance_path[:-1] + roundabout_entrance_exit_path + final_node
    
    ## check if there are any nodes on the current road on roundabouts
    if not potential_roundabout_current_road_nodes:
        raise ValueError(f"Could not find any nodes connected to roundabouts on the current road: {current_road_name}")
                    
    return final_path
        

# test functions section of cell ======================================================= ##
#get_turn_candiates(G_tolworth, 'Douglas Road')

# turn_angle(G_tolworth.nodes[start_node], G_tolworth.nodes[2578590287], G_tolworth.nodes[304437]), turn_angle(G_tolworth.nodes[start_node], G_tolworth.nodes[2578590287], G_tolworth.nodes[304438])
# turn_angle(G_tolworth.nodes[23780711], G_tolworth.nodes[start_node], G_tolworth.nodes[2578590287])
get_turn_node(G_tolworth, 'Douglas Road', 'Ewell Road', 23780711, 'left'), get_turn_node(G_tolworth, 'Douglas Road', 'Ewell Road', 23780711, 'right'),get_turn_node(G_tolworth, 'Douglas Road', 'Ewell Road', 2578590287, 'left')
#get_turn_node(G_tolworth, 'Ewell Road', 'Kingsdowne Road', 304437, 'straight')


#get_potential_roundabout_node(G_tolworth, 'Kingsdowne Road', 'Upper Brighton Road', 1736768194)
#get_potential_roundabout_node(G_tolworth, 'Langley Road','Upper Brighton Road',   304081)
#get_potential_roundabout_node(G_tolworth, 'Kingsdowne Road', 'Upper Brighton Road', 1736768194, exit = 3)

([23780711, 23779844, 2578590287, 304437],
 [23780711, 23779844, 2578590287, 304438],
 [2578590287, 304437])

In [21]:
def get_continuing_road_path(G, current_road_name: str, next_road_name: str, current_node: str):
    """Find a simple continuation path from current road to next road with a name change but no junction."""
    visited = set()
    path = [current_node]
    
    while True:
        node = path[-1]
        visited.add(node)
        
        successors = [n for n in G.successors(node) if n not in visited]
        if not successors:
            raise ValueError("No further successors from current node")
        
        next_node = None
        for succ in successors:
            edge_data = G.get_edge_data(node, succ)
            for _, data in edge_data.items():
                road_name = data.get('name')
                if road_name == current_road_name or road_name == next_road_name:
                    next_node = succ
                    break
            if next_node:
                break
        
        if not next_node:
            raise ValueError(f"Couldn't find continuation from {current_road_name} to {next_road_name}")
        
        path.append(next_node)
        
        # Check if we've arrived on the next road
        edge_data = G.get_edge_data(node, next_node)
        for _, data in edge_data.items():
            if data.get('name') == next_road_name:
                return path

get_continuing_road_path(G_tolworth, 'Upper Brighton Road', 'Brighton Road', 1736768193)

[1736768193, 242768242, 1685721459, 1652501763, 1736772747, 304097, 304098]

In [5]:
#G_tolworth.nodes[start_node]['x'] - G_tolworth.nodes[2578590287]['x']
# [x for x in G_tolworth.neighbors(2578590287)]
# G_tolworth.nodes[start_node], G_tolworth.nodes[2578590287], G_tolworth.nodes[304437]

# [x for x in G_tolworth.successors(2578590373)]
# d = {'x': 10, 'y': 12}

# [x for x in d.items()], [x for x in d.keys()]

# xx1 = get_potential_roundabout_node(G_tolworth, 'Kingsdowne Road', 'Kingsdowne Road', 1736768194)
# xx2 = get_potential_roundabout_node(G_tolworth, 'Kingsdowne Road', 'Upper Brighton Road', 1736768194)
# xx = get_potential_roundabout_node(G_tolworth, 'Upper Brighton Road', 'Kingsdowne Road',  8)
# xx1, xx2, 
# {k : ox.shortest_path(G_tolworth, 1736768193, k,  weight='length') for k, v in next(iter(xx.values())).items() if v in 'Upper Brighton Road'}#, xx

# Example dict of lists
my_dict = {
    "a": [1, 2, 3]
    #"b": [4, 5, 5],
    #"c": [6, 7, 8, 9]
}

# Get the key and list with the shortest list
shortest_key, shortest_list = min(my_dict.items(), key=lambda item: len(item[1]))

shortest_key, shortest_list


('a', [1, 2, 3])

In [6]:
# Look at some nodes
[dx for dx in G_tolworth.neighbors(23780711)], 


all_keys = set()
for _, attrs in G_tolworth.nodes(data=True):
    all_keys.update(attrs.keys())

print(sorted(all_keys))

print(G_tolworth.nodes[start_node])

[dx for dx in G_tolworth.successors(start_node)], [dx for dx in G_tolworth.predecessors(start_node)]

['highway', 'street_count', 'x', 'y']
{'y': 51.381894, 'x': -0.2938243, 'street_count': 3}


([23780702, 23780711, 2578590287], [23780702, 23780711, 2578590287])

In [7]:
## Explore edges
edge1 = G_tolworth.get_edge_data(23779844, 2578590287)#
print(edge1)
edge2 = G_tolworth.get_edge_data(2578590287, 23779844)#
print(edge2)

{0: {'osmid': 251673770, 'highway': 'residential', 'maxspeed': '20 mph', 'name': 'Douglas Road', 'oneway': False, 'reversed': True, 'length': np.float64(294.1048198309258), 'geometry': <LINESTRING (-0.294 51.382, -0.294 51.382, -0.294 51.382, -0.294 51.382, -0....>}}
{0: {'osmid': 251673770, 'highway': 'residential', 'maxspeed': '20 mph', 'name': 'Douglas Road', 'oneway': False, 'reversed': False, 'length': np.float64(294.1048198309258), 'geometry': <LINESTRING (-0.293 51.384, -0.293 51.384, -0.293 51.384, -0.293 51.383, -0....>}}


In [8]:
## Node 1 to node 2
node1_to_node2 = get_turn_node(G_tolworth, 'Douglas Road', 'Ewell Road', 23780711, 'left')


def get_markers_and_polylines(G, nodes, node_color = 'red', edge_color = 'blue'):
    ## Get markers
    node_dict = {node: G_tolworth.nodes[node] for node in nodes}
    markers = [
    dl.Marker(position=(node_dict[dx]['y'], node_dict[dx]['x']), 
              children=dl.Tooltip(f'ID: {dx}, x: {node_dict[dx]['x']}, y: {node_dict[dx]['y']}'))
    for dx in node_dict.keys()
    ]
    
    ## Get polylines
    edges = [G_tolworth.get_edge_data(nodes[u], nodes[u + 1])[0] for u in range(len(nodes) - 1)]

    edge_lines = []

    for idx, data in  zip(range(len(edges)), edges):
        # Some edges have multiple geometries (from OSM), handle those first
        if 'geometry' in data:
            # If geometry is a LineString, extract lat/lon pairs
            coords = [(point[1], point[0]) for point in data['geometry'].coords]
        else:
            #Otherwise use straight line between nodes
            coords = [
                markers[idx].position, 
                markers[idx + 1].position
            ]
        
        edge_lines.append(dl.Polyline(positions=coords, color=edge_color, weight=2))

    return markers, edge_lines

get_markers_and_polylines(G_tolworth, node1_to_node2)

([Marker(children=Tooltip('ID: 23780711, x: -0.2944253, y: 51.3806375'), position=(51.3806375, -0.2944253)),
  Marker(children=Tooltip('ID: 23779844, x: -0.2938243, y: 51.381894'), position=(51.381894, -0.2938243)),
  Marker(children=Tooltip('ID: 2578590287, x: -0.2926397, y: 51.3844335'), position=(51.3844335, -0.2926397)),
  Marker(children=Tooltip('ID: 304437, x: -0.2926802, y: 51.3844458'), position=(51.3844458, -0.2926802))],
 [Polyline(color='blue', positions=[(51.3806375, -0.2944253), (51.3809071, -0.2942964), (51.3815349, -0.2939961), (51.381894, -0.2938243)], weight=2),
  Polyline(color='blue', positions=[(51.381894, -0.2938243), (51.3819982, -0.2937758), (51.3820815, -0.293737), (51.3821779, -0.2936921), (51.3826096, -0.2934912), (51.3829463, -0.2933327), (51.383964, -0.2928535), (51.3843965, -0.2926565), (51.3844335, -0.2926397)], weight=2),
  Polyline(color='blue', positions=[(51.3844335, -0.2926397), (51.3844458, -0.2926802)], weight=2)])

In [9]:

node1_to_node2_nodes = {node: G_tolworth.nodes[node] for node in node1_to_node2}

node1_to_node2_markers = [
    dl.Marker(position=(node1_to_node2_nodes[dx]['y'], node1_to_node2_nodes[dx]['x']), 
              children=dl.Tooltip(f'ID: {dx}, x: {node1_to_node2_nodes[dx]['x']}, y: {node1_to_node2_nodes[dx]['y']}'))
    for dx in node1_to_node2_nodes.keys()
    ]



## Get the edges
node1_to_node2_edges = [G_tolworth.get_edge_data(node1_to_node2[u], node1_to_node2[u + 1])[0] for u in range(len(node1_to_node2) - 1)]

node1_to_node2_edge_lines = []

for idx, data in  zip(range(len(node1_to_node2_edges)), node1_to_node2_edges):
    # Some edges have multiple geometries (from OSM), handle those first
    if 'geometry' in data:
        # If geometry is a LineString, extract lat/lon pairs
        coords = [(point[1], point[0]) for point in data['geometry'].coords]
    else:
        print('arrg')
        #Otherwise use straight line between nodes
        coords = [
            node1_to_node2_markers[idx].position, 
            node1_to_node2_markers[idx + 1].position
        ]
    
    node1_to_node2_edge_lines.append(dl.Polyline(positions=coords, color='blue', weight=2))
    
node1_to_node2_edge_lines

arrg


[Polyline(color='blue', positions=[(51.3806375, -0.2944253), (51.3809071, -0.2942964), (51.3815349, -0.2939961), (51.381894, -0.2938243)], weight=2),
 Polyline(color='blue', positions=[(51.381894, -0.2938243), (51.3819982, -0.2937758), (51.3820815, -0.293737), (51.3821779, -0.2936921), (51.3826096, -0.2934912), (51.3829463, -0.2933327), (51.383964, -0.2928535), (51.3843965, -0.2926565), (51.3844335, -0.2926397)], weight=2),
 Polyline(color='blue', positions=[(51.3844335, -0.2926397), (51.3844458, -0.2926802)], weight=2)]

In [10]:
## Node 2 to Node 3
node2_to_node3 = get_turn_node(G_tolworth, 'Ewell Road', 'Kingsdowne Road', 304437, 'left')

node2_3_mks, node2_3_edges = get_markers_and_polylines(G_tolworth, node2_to_node3, edge_color='orange')

## Node 3 to 4
node3_to_node4 = get_potential_roundabout_node(G_tolworth, 'Kingsdowne Road', 'Upper Brighton Road', 1736768194, exit = 3)
node3_4_mks, node3_4_edges = get_markers_and_polylines(G_tolworth, node3_to_node4, edge_color='black')

node3_4_edges

[Polyline(color='black', positions=[(51.3854084, -0.2959625), (51.3854184, -0.2960024), (51.3855504, -0.296483)], weight=2),
 Polyline(color='black', positions=[(51.3855504, -0.296483), (51.3855876, -0.2966153)], weight=2),
 Polyline(color='black', positions=[(51.3855876, -0.2966153), (51.3856261, -0.2967495), (51.386197, -0.298791), (51.386346, -0.2993174), (51.3863653, -0.2993864), (51.3863885, -0.2994688), (51.386673, -0.3004315), (51.3867114, -0.3005719), (51.3867659, -0.3008119)], weight=2),
 Polyline(color='black', positions=[(51.3867659, -0.3008119), (51.3867422, -0.3008155), (51.3867181, -0.3008344), (51.3867009, -0.3008643), (51.3866926, -0.3008888)], weight=2),
 Polyline(color='black', positions=[(51.3866926, -0.3008888), (51.3866909, -0.3009438), (51.3866969, -0.3009741), (51.386711, -0.3010057), (51.3867265, -0.3010243)], weight=2),
 Polyline(color='black', positions=[(51.3867265, -0.3010243), (51.3867462, -0.301036), (51.3867722, -0.3010369), (51.3867961, -0.3010232), (51.

In [11]:
## Plot some stuffffffffffffffffffffff
app = JupyterDash(__name__)
app.layout = html.Div([
    dl.Map(center=(centre_lat, centre_lon), zoom=18, children=[
        dl.TileLayer(),
        dl.LayerGroup(node1_to_node2_markers + 
                      node1_to_node2_edge_lines + 
                        node2_3_mks + node2_3_edges + 
                        node3_4_mks +  node3_4_edges)
    ], style={'width': '100%', 'height': '4200px'})
])

app.run(mode='inline', port=8051)



JupyterDash is deprecated, use Dash instead.
See https://dash.plotly.com/dash-in-jupyter for more details.



In [20]:
G_tolworth.get_edge_data(1652501763, 1736772747)[0]['name'], G_tolworth.get_edge_data(1736772747, 304097)[0]['name'], G_tolworth.get_edge_data(304097, 304098)[0]['name']

('Upper Brighton Road', 'Upper Brighton Road', 'Brighton Road')