In [1]:
!pip install osmnx -q

In [2]:
from collections.abc import Callable

from typing import Tuple
from typing import Union
from typing import List
from typing import Any
from typing import Dict

from networkx import MultiDiGraph
from networkx import DiGraph

from matplotlib import pyplot

import networkx
import warnings
import random
import osmnx
import json
import time
import copy
import ast
import os

warnings.filterwarnings('ignore')

In [3]:
osmnx.config(use_cache = True, log_console = True)

ROOT_FOLDER = os.getcwd()
OUTPUT_FOLDER = os.path.join(ROOT_FOLDER, 'osm-network')
ZIP_FOLDER = os.path.join(ROOT_FOLDER, 'osm-network.zip')

## 1. Downloading and saving networks with OSMNX

In [4]:
#
# Networks (generated by this code)
# https://mega.nz/file/4gBT3BLA#EYhBJmAyZFAFHsunvo_4pSFgBIuq5dGlwvQknzbYy1s
#

def generate_osm (output_folder : str, country : str) -> Union[MultiDiGraph, DiGraph] :
    if not os.path.exists(output_folder) :
        os.makedirs(output_folder)

    file_basic = os.path.join(output_folder, f'{country.lower()}-basic.json')
    file_exten = os.path.join(output_folder, f'{country.lower()}-extended.json')
    file_image = os.path.join(output_folder, f'{country.lower()}.png')
    file_graph = os.path.join(output_folder, f'{country.lower()}.osm')

    timer = time.perf_counter()

    MAJOR_ROAD_FILTER = (
        f'["highway"]["area"!~"yes"]'
        f'["highway"!~"residential|escape|secondary_link|tertiary_link|living_street|crossing|speed_camera|traffic_signals|trailhead|stop|bus_stop|busway|toll_gantry|abandoned|traffic_mirror|bridleway|street_lamp|bus_guideway|construction|corridor|cycleway|elevator|escalator|footway|milestone|path|pedestrian|planned|platform|proposed|raceway|service|steps|track|emergency_bay|give_way"]'
        f'["service"!~"alley|driveway|emergency_access|parking|parking_aisle|private"]'
    )

    g = osmnx.graph_from_place(country, custom_filter = MAJOR_ROAD_FILTER)

    osmnx.plot_graph(g, filepath = file_image, show = False, save = True, dpi = 300)

    g = osmnx.distance.add_edge_lengths(g)
    g = osmnx.add_edge_speeds(g)
    g = osmnx.add_edge_travel_times(g)

    with open(file_basic, 'w') as file :
        json.dump(osmnx.basic_stats(g), file, indent = 4)

    with open(file_exten, 'w') as file :
        json.dump(osmnx.extended_stats(g), file, indent = 4)

    osmnx.save_graphml(g, file_graph)

    print(country)
    print(f'Nodes : {len(list(g.nodes))}')
    print(f'Edges : {len(list(g.edges))}')
    print(f'Timer : {(time.perf_counter() - timer):.2f} seconds')
    print()

    return g

In [5]:
if not os.path.exists(ZIP_FOLDER) :
    graphs = {}

    for country in ['Luxembourg', 'Montenegro', 'Slovenia', 'Belgium', 'Netherlands'] :
        graphs[country] = generate_osm(output_folder = OUTPUT_FOLDER, country = country)

In [6]:
if not os.path.exists(ZIP_FOLDER) :
    !zip -q -r osm-network.zip osm-network/

if not os.path.exists(OUTPUT_FOLDER) :
    !unzip -q osm-network.zip

## 2. Reading the network with networkx

In [7]:
def load_networkx (filepath : str, single_graph : bool = True) -> Union[MultiDiGraph, DiGraph] :
    g = networkx.read_graphml(filepath)

    dtypes = {
        "elevation": float,
        "elevation_res": float,
        "lat": float,
        "lon": float,
        "osmid": int,
        "street_count": int,
        "x": float,
        "y": float,
    }

    for _, data in g.nodes(data = True):
            for attr in data.keys() & dtypes.keys():
                data[attr] = dtypes[attr](data[attr])

    dtypes = {
        "bearing": float,
        "grade": float,
        "grade_abs": float,
        "length": float,
        "osmid": int,
        "speed_kph": float,
        "travel_time": float,
    }

    for _, _, data in g.edges(data = True):
        data.pop("id", None)

        for attr, value in data.items():
            if value.startswith("[") and value.endswith("]"):
                try:
                    data[attr] = ast.literal_eval(value)
                except (SyntaxError, ValueError):
                    pass

        for attr in data.keys() & dtypes.keys():
            if isinstance(data[attr], list):
                data[attr] = [dtypes[attr](item) for item in data[attr]]
            else:
                data[attr] = dtypes[attr](data[attr])

    if single_graph :
        return DiGraph(g)

    return g

In [8]:
g = load_networkx(os.path.join(OUTPUT_FOLDER, 'luxembourg.osm'))

print(g)

for _, _, data in g.edges.data() :
    print(json.dumps(data, indent = 4))
    break

for _, data in g.nodes.data() :
    print(json.dumps(data, indent = 4))
    break

DiGraph with 6040 nodes and 13026 edges
{
    "osmid": [
        676494528,
        669225292,
        27911159,
        622769271,
        24477182,
        676491615
    ],
    "oneway": "True",
    "lanes": [
        "2",
        "3"
    ],
    "ref": "A 6",
    "name": "Autoroute d'Arlon",
    "highway": "motorway",
    "maxspeed": "130",
    "length": 3709.267,
    "bridge": "yes",
    "geometry": "LINESTRING (6.0136282 49.6387254, 6.0142127 49.6386874, 6.0151568 49.6386092, 6.0156678 49.6385571, 6.0165757 49.638452, 6.0175922 49.6383087, 6.0182708 49.638201, 6.0192825 49.6380163, 6.0203908 49.6377916, 6.0210527 49.6376396, 6.0217988 49.6374563, 6.0228882 49.6371573, 6.0240449 49.6368097, 6.0251503 49.6364362, 6.0267634 49.6358385, 6.0280162 49.6353654, 6.0306868 49.6343012, 6.0318534 49.6338548, 6.0326716 49.6335612, 6.0335048 49.6332732, 6.0345366 49.6329497, 6.0355873 49.63263, 6.0370415 49.632231, 6.0380332 49.6319834, 6.0392106 49.6317112, 6.0405706 49.6314348, 6.0414453 49.6

## 3. Random node attacks on directed network

In [9]:
def remove_random_nodes (graph : Union[MultiDiGraph, DiGraph], percentage : float = 0.01) -> Union[MultiDiGraph, DiGraph] :
    graph = copy.deepcopy(graph)

    threshold = round(percentage * graph.number_of_nodes())
    target = random.sample(list(graph.nodes), threshold)

    print(f'Removing {threshold} randomly selected nodes.')

    graph.remove_nodes_from(target)

    return graph

In [10]:
for country in ['montenegro', 'netherlands'] :
    path = os.path.join(OUTPUT_FOLDER, f'{country}.osm')
    graph = load_networkx(filepath = path, single_graph = True)

    print(graph)
    print()

    for i in [0.25, 0.50, 0.75] :
        timer = time.perf_counter()

        g = remove_random_nodes(graph = graph, percentage = i)

        print(g)
        print(f'{country[0].upper()}{country[1:]:<10s} - {i:4.2f} ~~ {(time.perf_counter() - timer):5.2f} seconds')
        print()

DiGraph with 4030 nodes and 9181 edges

Removing 1008 randomly selected nodes.
DiGraph with 3022 nodes and 5109 edges
Montenegro  - 0.25 ~~  0.20 seconds

Removing 2015 randomly selected nodes.
DiGraph with 2015 nodes and 2272 edges
Montenegro  - 0.50 ~~  0.23 seconds

Removing 3022 randomly selected nodes.
DiGraph with 1008 nodes and 594 edges
Montenegro  - 0.75 ~~  0.31 seconds

DiGraph with 252752 nodes and 559879 edges

Removing 63188 randomly selected nodes.
DiGraph with 189564 nodes and 315163 edges
Netherlands - 0.25 ~~ 17.54 seconds

Removing 126376 randomly selected nodes.
DiGraph with 126376 nodes and 140715 edges
Netherlands - 0.50 ~~ 17.50 seconds

Removing 189564 randomly selected nodes.
DiGraph with 63188 nodes and 34789 edges
Netherlands - 0.75 ~~ 20.89 seconds



## 4. Random edge attacks on directed networks

In [11]:
def remove_random_edges (graph : Union[MultiDiGraph, DiGraph], percentage : float = 0.01) -> Union[MultiDiGraph, DiGraph] :
    graph = copy.deepcopy(graph)

    threshold = round(percentage * graph.number_of_edges())
    target = random.sample(list(graph.edges), threshold)

    print(f'Removing {threshold} randomly selected edges.')

    graph.remove_edges_from(target)

    return graph

In [12]:
for country in ['montenegro', 'netherlands'] :
    path = os.path.join(OUTPUT_FOLDER, f'{country}.osm')
    graph = load_networkx(filepath = path, single_graph = True)

    print(graph)
    print()

    for i in [0.25, 0.50, 0.75] :
        timer = time.perf_counter()

        g = remove_random_edges(graph = graph, percentage = i)

        print(g)
        print(f'{country[0].upper()}{country[1:]:<10s} - {i:4.2f} ~~ {(time.perf_counter() - timer):5.2f} seconds')
        print()

DiGraph with 4030 nodes and 9181 edges

Removing 2295 randomly selected edges.
DiGraph with 4030 nodes and 6886 edges
Montenegro  - 0.25 ~~  2.02 seconds

Removing 4590 randomly selected edges.
DiGraph with 4030 nodes and 4591 edges
Montenegro  - 0.50 ~~  1.45 seconds

Removing 6886 randomly selected edges.
DiGraph with 4030 nodes and 2295 edges
Montenegro  - 0.75 ~~  0.25 seconds

DiGraph with 252752 nodes and 559879 edges

Removing 139970 randomly selected edges.
DiGraph with 252752 nodes and 419909 edges
Netherlands - 0.25 ~~ 17.05 seconds

Removing 279940 randomly selected edges.
DiGraph with 252752 nodes and 279939 edges
Netherlands - 0.50 ~~ 19.68 seconds

Removing 419909 randomly selected edges.
DiGraph with 252752 nodes and 139970 edges
Netherlands - 0.75 ~~ 20.78 seconds



## 5. Random walk attacks on directed networks

In [13]:
def remove_random_walks (graph : Union[MultiDiGraph, DiGraph], percentage : float = 0.05, max_walk_length : int = 5) -> Tuple[Union[MultiDiGraph, DiGraph], float] :
    graph = copy.deepcopy(graph)

    threshold = round(percentage * graph.number_of_edges())
    target = set()
    active = None

    sum_length = 0
    n_length = 0

    length = 0
    repeat = 0

    while len(target) < threshold :
        if repeat > 3 :
            active = None

        if active is None :
            active = random.choice(list(graph.nodes))

        edges = list(graph.out_edges(active))

        if len(edges) > 0 :
            edge = random.choice(edges)

            if edge not in target :
                target.add(edge)

                length = length + 1
                repeat = 0

                active = edge[1]
            else :
                repeat = repeat + 1

            if length >= max_walk_length :
                sum_length = sum_length + length
                n_length = n_length + 1

                length = 0
                repeat = 0
                active = None
        else :
            sum_length = sum_length + length
            n_length = n_length + 1

            length = 0
            repeat = 0
            active = None

    print(f'Removing {threshold} semi-randomly selected edges with max walk length of {max_walk_length}.')

    graph.remove_edges_from(list(target))

    return graph, sum_length / n_length if n_length > 0 else 0

In [14]:
for country in ['montenegro', 'netherlands'] :
    path = os.path.join(OUTPUT_FOLDER, f'{country}.osm')
    graph = load_networkx(filepath = path, single_graph = True)

    print(graph)
    print()

    for l in [5, 10, 25] :
        for i in [0.10, 0.50] :
            timer = time.perf_counter()

            g, avg_l = remove_random_walks(graph = graph, percentage = i, max_walk_length = l)

            print(g)
            print(f'{country[0].upper()}{country[1:]:<10s} - {i:4.2f} {avg_l:5.2f} ~~ {(time.perf_counter() - timer):7.2f} seconds')
            print()

DiGraph with 4030 nodes and 9181 edges

Removing 918 semi-randomly selected edges with max walk length of 5.
DiGraph with 4030 nodes and 8263 edges
Montenegro  - 0.10  4.97 ~~    0.23 seconds

Removing 4590 semi-randomly selected edges with max walk length of 5.
DiGraph with 4030 nodes and 4591 edges
Montenegro  - 0.50  4.98 ~~    2.61 seconds

Removing 918 semi-randomly selected edges with max walk length of 10.
DiGraph with 4030 nodes and 8263 edges
Montenegro  - 0.10  9.93 ~~    0.23 seconds

Removing 4590 semi-randomly selected edges with max walk length of 10.
DiGraph with 4030 nodes and 4591 edges
Montenegro  - 0.50  9.84 ~~    0.37 seconds

Removing 918 semi-randomly selected edges with max walk length of 25.
DiGraph with 4030 nodes and 8263 edges
Montenegro  - 0.10 23.92 ~~    0.24 seconds

Removing 4590 semi-randomly selected edges with max walk length of 25.
DiGraph with 4030 nodes and 4591 edges
Montenegro  - 0.50 24.61 ~~    0.34 seconds

DiGraph with 252752 nodes and 55987

## 6. Directed edge attack on directed networks

In [15]:
def increment_dict (table : Dict[str, int], key : str) -> None :
    if key in table.keys() :
        table[key] = table[key] + 1
    else :
        table[key] = 1

def edge_type_distribution (graph : Union[MultiDiGraph, DiGraph]) -> None :
    stats = dict()

    for x, y, data in graph.edges.data() :
        if 'highway' in data.keys() :
            highway = data['highway']

            if type(highway) == str :
                increment_dict(table = stats, key = highway)
            elif type(highway) == list :
                for name in highway :
                    increment_dict(table = stats, key = name)
            else :
                increment_dict(table = stats, key = 'error')
        else :
            increment_dict(table = stats, key = 'n/a')

    print(json.dumps(stats, indent = 4))

In [16]:
for country in ['montenegro', 'netherlands'] :
    path = os.path.join(OUTPUT_FOLDER, f'{country}.osm')
    graph = load_networkx(filepath = path, single_graph = True)

    print(graph)
    print()

    edge_type_distribution(graph)

    print()

DiGraph with 4030 nodes and 9181 edges

{
    "unclassified": 5220,
    "secondary": 1022,
    "tertiary": 1189,
    "primary": 1652,
    "primary_link": 112,
    "road": 6
}

DiGraph with 252752 nodes and 559879 edges

{
    "unclassified": 334027,
    "tertiary": 113024,
    "primary": 36840,
    "secondary": 61491,
    "trunk": 3301,
    "primary_link": 1838,
    "trunk_link": 1580,
    "motorway_link": 4880,
    "motorway": 3449,
    "road": 44,
    "rest_area": 63,
    "passing_place": 4
}



In [17]:
def remove_direct_edges (graph : Union[MultiDiGraph, DiGraph], edges : List[Any]) -> Union[MultiDiGraph, DiGraph] :
    graph = copy.deepcopy(graph)

    print(f'Removing {len(edges)} directly selected edges.')

    graph.remove_edges_from(edges)

    return graph

def filter_edges (graph : Union[MultiDiGraph, DiGraph], filter : str = 'motorway') -> List[Tuple[Any, Any]] :
    edges = list()

    for x, y, data in graph.edges.data() :
        if 'highway' in data.keys() :
            highway = data['highway']

            if type(highway) == str :
                if highway.replace('"', '').startswith(filter) :
                    edges.append((x, y))
            elif type(highway) == list :
                for name in highway :
                    if name.replace('"', '').startswith(filter) :
                        edges.append((x, y))

    return edges

In [18]:
for country in ['montenegro', 'netherlands'] :
    path = os.path.join(OUTPUT_FOLDER, f'{country}.osm')
    graph = load_networkx(filepath = path, single_graph = True)

    print(graph)
    print()

    for filter in ['motorway', 'trunk', 'primary'] :
        timer = time.perf_counter()

        edges = filter_edges(graph = graph, filter = filter)

        i = 100.0 * len(edges) / graph.number_of_edges()

        g = remove_direct_edges(graph = graph, edges = edges)

        print(g)
        print(f'{country[0].upper()}{country[1:]:<10s} - {filter:8s} {i:5.2f} % ~~ {(time.perf_counter() - timer):7.2f} seconds')
        print()

DiGraph with 4030 nodes and 9181 edges

Removing 0 directly selected edges.
DiGraph with 4030 nodes and 9181 edges
Montenegro  - motorway  0.00 % ~~    2.31 seconds

Removing 0 directly selected edges.
DiGraph with 4030 nodes and 9181 edges
Montenegro  - trunk     0.00 % ~~    2.64 seconds

Removing 1764 directly selected edges.
DiGraph with 4030 nodes and 7417 edges
Montenegro  - primary  19.21 % ~~    0.23 seconds

DiGraph with 252752 nodes and 559879 edges

Removing 8329 directly selected edges.
DiGraph with 252752 nodes and 551551 edges
Netherlands - motorway  1.49 % ~~   17.56 seconds

Removing 4881 directly selected edges.
DiGraph with 252752 nodes and 555002 edges
Netherlands - trunk     0.87 % ~~   18.85 seconds

Removing 38678 directly selected edges.
DiGraph with 252752 nodes and 521207 edges
Netherlands - primary   6.91 % ~~   17.53 seconds



## 7. Directed node attack on directed networks

In [19]:
def increment_dict (table : Dict[str, int], key : str) -> None :
    if key in table.keys() :
        table[key] = table[key] + 1
    else :
        table[key] = 1

def node_type_distribution (graph : Union[MultiDiGraph, DiGraph]) -> None :
    stats = dict()

    for x, data in graph.nodes.data() :
        if 'street_count' in data.keys() :
            degree = data['street_count']

            if type(degree) == str :
                increment_dict(table = stats, key = degree)
            elif type(degree) == int :
                increment_dict(table = stats, key = degree)
            else :
                increment_dict(table = stats, key = 'error')
        else :
            increment_dict(table = stats, key = 'n/a')

    print(json.dumps(stats, indent = 4))

In [20]:
for country in ['montenegro', 'netherlands'] :
    path = os.path.join(OUTPUT_FOLDER, f'{country}.osm')
    graph = load_networkx(filepath = path, single_graph = True)

    print(graph)
    print()

    node_type_distribution(graph)

    print()

DiGraph with 4030 nodes and 9181 edges

{
    "3": 2898,
    "1": 922,
    "4": 170,
    "2": 40
}

DiGraph with 252752 nodes and 559879 edges

{
    "1": 48729,
    "3": 167943,
    "4": 28900,
    "2": 6629,
    "5": 503,
    "6": 44,
    "8": 2,
    "7": 2
}



In [21]:
def remove_direct_nodes (graph : Union[MultiDiGraph, DiGraph], nodes : List[Any]) -> Union[MultiDiGraph, DiGraph] :
    graph = copy.deepcopy(graph)

    print(f'Removing {len(nodes)} directly selected nodes.')

    graph.remove_nodes_from(nodes)

    return graph

def filter_nodes (graph : Union[MultiDiGraph, DiGraph], min_degree : int = 5) -> List[Any] :
    nodes = list()

    for x, data in graph.nodes.data() :
        if 'street_count' in data.keys() and data['street_count'] >= min_degree :
            nodes.append(x)

    return nodes

In [22]:
for country in ['montenegro', 'netherlands'] :
    path = os.path.join(OUTPUT_FOLDER, f'{country}.osm')
    graph = load_networkx(filepath = path, single_graph = True)

    print(graph)
    print()

    for degree in [4, 5, 6] :
        timer = time.perf_counter()

        nodes = filter_nodes(graph = graph, min_degree = degree)

        i = 100.0 * len(nodes) / graph.number_of_nodes()

        g = remove_direct_nodes(graph = graph, nodes = nodes)

        print(g)
        print(f'{country[0].upper()}{country[1:]:<10s} - {degree} {i:5.2f} % ~~ {(time.perf_counter() - timer):7.2f} seconds')
        print()

DiGraph with 4030 nodes and 9181 edges

Removing 170 directly selected nodes.
DiGraph with 3860 nodes and 8258 edges
Montenegro  - 4  4.22 % ~~    2.10 seconds

Removing 0 directly selected nodes.
DiGraph with 4030 nodes and 9181 edges
Montenegro  - 5  0.00 % ~~    2.68 seconds

Removing 0 directly selected nodes.
DiGraph with 4030 nodes and 9181 edges
Montenegro  - 6  0.00 % ~~    0.23 seconds

DiGraph with 252752 nodes and 559879 edges

Removing 29451 directly selected nodes.
DiGraph with 223301 nodes and 414766 edges
Netherlands - 4 11.65 % ~~   15.21 seconds

Removing 551 directly selected nodes.
DiGraph with 252201 nodes and 556286 edges
Netherlands - 5  0.22 % ~~   16.69 seconds

Removing 48 directly selected nodes.
DiGraph with 252704 nodes and 559598 edges
Netherlands - 6  0.02 % ~~   16.64 seconds

