# Level of Traffic Stress

based on https://github.com/mbonsma/LTS-OSM, inspired by https://muenchen.social/@scooooooott@urbanists.social/111745338643572229

This is in progress. Results are not accurate.

In [None]:
from pyrosm import OSM, get_data

region = "district-of-columbia"
fp = get_data(region)
osm = OSM(fp)


def create_node_tags_lookup():
    ids = osm._nodes["id"]
    tags = osm._nodes["tags"]
    return {ids[i]: tags[i] for i in range(0, len(ids))}


osm.keep_node_info = True

network = osm.get_network(
    "all",
    extra_attributes=[],
)

node_tags = create_node_tags_lookup()  # Used to find traffic signals, all-way stops

In [None]:
network.columns

In [None]:
import numpy as np


def biking_permitted(row):
    if row["bicycle"] == "no":
        return "p2"
    if row["access"] == "no":
        return "p6"
    if row["highway"] == "motorway":
        return "p3"
    if row["highway"] == "motorway_link":
        return "p4"
    if row["highway"] == "proposed":
        return "p7"
    if (
        (row["footway"] == "sidewalk")
        & ~(row["bicycle"] == "yes")
        & ((row["highway"] == "footway") | (row["highway"] == "path"))
    ):
        return "p5"
    return "p0"


def is_separated_path(row):
    # TODO
    if row["highway"] == "cycleway":
        return "s3"
    if row["highway"] == "path":
        return "s1"
    if (row["highway"] == "footway") & ~(row["footway"] == "crossing"):
        return "s2"
    return "s0"


def is_bike_lane(row):
    for column in row.index:
        if "cycleway" in column:
            if row[column] in [
                "crossing",
                "lane",
                "left",
                "opposite",
                "opposite_lane",
                "right",
                "yes",
            ]:
                return True
    return False


def parking_present(row):
    for column in row.index:
        if "parking" in column:
            if row[column] in [
                "yes",
                "parallel",
                "perpendicular",
                "diagonal",
                "marked",
            ]:
                return True
    return False


def get_max_speed(row, national=40, local=50, motorway=100, primary=80, secondary=80):
    if row["maxspeed"] is None:
        if row["maxspeed"] == "national":
            return national
        if row["highway"] == "motorway":
            return motorway
        if row["highway"] == "primary":
            return primary
        if row["highway"] == "secondary":
            return secondary
        return local
    if "mph" in row["maxspeed"]:
        return int(row["maxspeed"].split(" ")[0])
    return float(row["maxspeed"])


def level_of_traffic_stress_way_mixed_traffic(row):
    if row["motor_vehicle"] == "no":
        return 1  # m17
    if row["highway"] == "pedestrian":
        return 1  # m13
    if (row["highway"] == "footway") & (row["footway"] == "crossing"):
        return 2  # m14
    if (row["highway"] == "service") & (row["service"] == "alley"):
        return 2  # m2
    if row["highway"] == "track":
        return 2  # m15
    if (
        (row["maxspeed_assumed"] <= 50)
        & (row["highway"] == "service")
        & (row["service"] == "parking_aisle")
    ):
        return 2  # m3
    if (
        (row["maxspeed_assumed"] <= 50)
        & (row["highway"] == "service")
        & (row["service"] == "driveway")
    ):
        return 2  # m4
    if (row["maxspeed_assumed"] <= 35) & (row["highway"] == "service"):
        return 2  # m16
    if (
        (row["maxspeed_assumed"] <= 40)
        & (row["lanes_assumed"] <= 3)
        & (row["highway"] == "residential")
    ):
        return 2  # m5
    if (row["maxspeed_assumed"] <= 40) & (row["lanes_assumed"] <= 3):
        return 3  # m6
    if (row["maxspeed_assumed"] <= 40) & (row["lanes_assumed"] <= 5):
        return 3  # m7
    if (row["maxspeed_assumed"] <= 40) & (row["lanes_assumed"] > 5):
        return 4  # m8
    if (
        (row["maxspeed_assumed"] <= 50)
        & (row["lanes_assumed"] < 3)
        & (row["highway"] == "residential")
    ):
        return 2  # m9
    if (row["maxspeed_assumed"] <= 50) & (row["lanes_assumed"] <= 3):
        return 3  # m10
    if (row["maxspeed_assumed"] <= 50) & (row["lanes_assumed"] > 3):
        return 4  # m11
    if row["maxspeed_assumed"] > 50:
        return 4  # m12


def level_of_traffic_stress_way_bike_lane_parking(row):
    if (row["lanes_assumed"] >= 3) & (row["maxspeed_assumed"] <= 55):
        return 3  # b2
    if row["width"] <= 4.1:
        return 3  # b3
    if row["width"] <= 4.25:
        return 2  # b4
    if (row["width"] <= 4.5) & (
        (row["maxspeed_assumed"] <= 40) & (row["highway"] == "residential")
    ):
        return 2  # b5
    if (row["maxspeed_assumed"] > 40) & (row["maxspeed_assumed"] <= 50):
        return 2  # b6
    if (row["maxspeed_assumed"] > 50) & (row["maxspeed_assumed"] <= 55):
        return 3  # b7
    if ["maxspeed_assumed"] > 55:
        return 4  # b8
    if ["highway"] != "residential":
        return 3  # b9
    return 1  # b1


def level_of_traffic_stress_way_bike_lane_no_parking(row):
    if (row["lanes_assumed"] >= 3) & (row["maxspeed_assumed"] <= 65):
        return 3  # c3
    if row["width"] <= 1.7:
        return 2  # c4
    if (row["maxspeed_assumed"] > 50) & (row["maxspeed_assumed"] <= 65):
        return 3  # c5
    if row["maxspeed_assumed"] > 65:
        return 4  # c6
    if row["highway"] != "residential":
        return 3  # c7
    return 1  # c1


def level_of_traffic_stress_way(row):
    if row["biking_permitted"] != "p0":
        return 0
    if row["is_separated_path"] != "s0":
        return 1
    if row["is_bike_lane"]:
        if row["parking_present"]:
            return level_of_traffic_stress_way_bike_lane_parking(row)
        return level_of_traffic_stress_way_bike_lane_no_parking(row)
    return level_of_traffic_stress_way_mixed_traffic(row)


network["is_separated_path"] = network.apply(is_separated_path, axis=1)
network["biking_permitted"] = network.apply(biking_permitted, axis=1)
network["is_bike_lane"] = network.apply(is_bike_lane, axis=1)
network["parking_present"] = network.apply(parking_present, axis=1)
network["lanes_assumed"] = (
    network["lanes"]
    .fillna(2)
    .apply(lambda x: np.array(x, dtype="int"))
    .apply(lambda x: np.max(x))
)
network["maxspeed_assumed"] = network.apply(get_max_speed, axis=1)
# network['maxspeed_assumed'] = network['maxspeed_assumed'].apply(lambda x: np.array(x, dtype = 'int')).apply(lambda x: np.max(x))
network["width"].fillna(np.nan, inplace=True)
network["width"] = 5  # TODO
network["level_of_traffic_stress"] = network.apply(level_of_traffic_stress_way, axis=1)
print(network["level_of_traffic_stress"].value_counts())

In [None]:
from lonboard import Map, PathLayer
from lonboard.colormap import apply_categorical_cmap

layer = PathLayer.from_geopandas(
    gdf=network[
        [
            "geometry",
            "level_of_traffic_stress",
            "name",
            "is_separated_path",
            "biking_permitted",
            "is_bike_lane",
            "parking_present",
            "lanes_assumed",
            "maxspeed_assumed",
            "length",
        ]
    ],
    width_scale=10,
)
layer.get_color = apply_categorical_cmap(
    values=network["level_of_traffic_stress"],
    cmap={
        0: [0, 0, 0],  # black
        1: [0, 128, 0],  # green
        2: [255, 255, 0],  # yellow
        3: [255, 165, 0],  # orange
        4: [255, 0, 0],  # red
    },
)

Map(layers=[layer])