In [1]:
# Imports
%load_ext autoreload
%autoreload 2

import re
import yaml
import numpy as np
import pandas as pd

import lts_functions as lts
import LTS_OSM

# Using jupyter notebooks with a virtual environment
# https://anbasile.github.io/posts/2017-06-25-jupyter-venv/
# https://stackoverflow.com/questions/58119823/jupyter-notebooks-in-visual-studio-code-does-not-use-the-active-virtual-environm
# ipython kernel install --user --name=.venv

In [2]:
# Settings
region = 'Cambridge'
unit = 'english'
tables = yaml.safe_load('config/tables.yml')

def read_yaml():
    with open('config/rating_dict.yml', 'r') as yml_file:
        rating_dict = yaml.safe_load(yml_file)
    return rating_dict

rating_dict = read_yaml()

In [3]:
# Load Data
gdfNodes, gdfEdges = LTS_OSM.download_data(region)

Loading saved graph for Cambridge
gdf_edges.shape=(105548, 193)
gdf_nodes.shape=(48501, 6)


In [12]:
def biking_permitted(gdf_edges, rating_dict):
    prefix = 'p'
    bikingPermittedRules = {k:v for (k,v) in rating_dict.items() if prefix in k}
    # defaultRule = f'{prefix}0'

    gdf_edges['biking_permitted'] = 'yes'
    gdf_edges['biking_permitted_rule'] = 'Assume biking permitted'
    gdf_edges['biking_permitted_condition'] = 'default'
    
    for _, value in bikingPermittedRules.items():
        # FIXME need to handle single sided tags so that can include both sides in outputs
        gdf_filter = gdf_edges.eval(value['condition'])
        gdf_edges.loc[gdf_filter, 'biking_permitted'] = value['biking_permitted']
        gdf_edges.loc[gdf_filter, 'biking_permitted_rule'] = value['rule_message']
        gdf_edges.loc[gdf_filter, 'biking_permitted_condition'] = value['condition']
        gdf_edges.loc[gdf_filter, 'LTS'] = value['LTS']

    # Save memory by setting as category, need to set categories first
    for col in ['biking_permitted', 'biking_permitted_rule', 'biking_permitted_condition']:
        gdf_edges[col] = gdf_edges[col].astype('category')

    return gdf_edges

rating_dict = read_yaml()
gdf_biking_permitted = biking_permitted(gdfEdges.copy(), rating_dict)

In [46]:
# get the columns that start with 'cycleway'
cycleway_tags = gdfEdges.columns[gdfEdges.columns.str.contains('cycleway')]
# print(cycleway_tags)

for tag in cycleway_tags.sort_values():
    print(tag, gdfEdges[tag].unique())

check_date:cycleway [nan '2022-02-18' '2021-09-25' '2023-06-09']
cycleway [nan 'shared_lane' 'no' 'lane' 'separate' 'crossing' 'shared' 'track'
 'construction']
cycleway:both [nan 'no' 'shared_lane' 'separate' 'lane' 'share_busway']
cycleway:both:buffer [nan 'no' 'yes']
cycleway:both:lane [nan 'pictogram' 'exclusive' 'advisory']
cycleway:buffer [nan "2'" 'no']
cycleway:lanes:backward [nan 'none|none|lane']
cycleway:left [nan 'shared_lane' 'no' 'separate' 'track' 'lane' 'share_busway']
cycleway:left:buffer [nan 'yes']
cycleway:left:lane [nan 'exclusive' 'pictogram']
cycleway:left:oneway [nan '-1' 'no']
cycleway:left:separation [nan 'buffer' 'flex_post']
cycleway:right [nan 'lane' 'separate' 'no' 'track' 'shared_lane' 'shoulder'
 'share_busway' 'buffered_lane']
cycleway:right:buffer [nan 'yes']
cycleway:right:lane [nan 'exclusive' 'pictogram' 'advisory']
cycleway:right:oneway [nan 'no']
cycleway:right:separation [nan 'flex_post']
cycleway:surface [nan 'asphalt']
cycleway:width [nan "5'" 

In [51]:
def is_separated_path(gdf_edges, rating_dict):   
    prefix = 's'
    separatedPathRules = {k:v for (k,v) in rating_dict.items() if prefix in k}
    defaultRule = f'{prefix}0'

    gdf_edges['bike_lane_separation'] = 'no'
    gdf_edges['bike_lane_separation_rule_num'] = defaultRule
    gdf_edges['bike_lane_separation_rule'] = 'Assume no bike lane separation'
    gdf_edges['bike_lane_separation_condition'] = 'default'

    # get the columns that start with 'cycleway'
    # tags = gdf_edges.columns[gdf_edges.columns.str.contains('cycleway')]
    # for tag in tags:
    #     print(tag, gdfEdges[tag].unique())

    for key, value in separatedPathRules.items():
        # Check rules in order, once something has been updated, leave it be
        # FIXME need to handle single sided tags so that can include both sides in outputs
        gdf_filter = gdf_edges.eval(f"{value['condition']} & (`bike_lane_separation_condition` == 'default')")
        gdf_edges.loc[gdf_filter, 'bike_lane_separation'] = value['bike_lane_separation']
        gdf_edges.loc[gdf_filter, 'bike_lane_separation_rule'] = value['rule_message']
        gdf_edges.loc[gdf_filter, 'bike_lane_separation_rule_num'] = key
        gdf_edges.loc[gdf_filter, 'bike_lane_separation_condition'] = value['condition']
        gdf_edges.loc[gdf_filter, 'LTS'] = value['LTS']

    # Save memory by setting as category, need to set categories first
    for col in ['bike_lane_separation', 'bike_lane_separation_rule', 
                'bike_lane_separation_condition', 'bike_lane_separation_rule_num']:
        gdf_edges[col] = gdf_edges[col].astype('category')

    return gdf_edges

rating_dict = read_yaml()
gdf_separated_edges = is_separated_path(gdf_biking_permitted.copy(), rating_dict)

rules_used = gdf_separated_edges['bike_lane_separation_rule_num'].unique().tolist()
rules_used = [int(s[7:]) for s in rules_used]
rules_used.sort()
print(rules_used)

In [48]:
# get the columns that start with 'cycleway'
cycleway_tags = gdfEdges.columns[gdfEdges.columns.str.contains('cycleway')]
# print(cycleway_tags)

for tag in cycleway_tags.sort_values():
    print(tag, gdfEdges[tag].unique())

# print('shoulder:access:bicycle', gdfEdges['shoulder:access:bicycle'].unique())

check_date:cycleway [nan '2022-02-18' '2021-09-25' '2023-06-09']
cycleway [nan 'shared_lane' 'no' 'lane' 'separate' 'crossing' 'shared' 'track'
 'construction']
cycleway:both [nan 'no' 'shared_lane' 'separate' 'lane' 'share_busway']
cycleway:both:buffer [nan 'no' 'yes']
cycleway:both:lane [nan 'pictogram' 'exclusive' 'advisory']
cycleway:buffer [nan "2'" 'no']
cycleway:lanes:backward [nan 'none|none|lane']
cycleway:left [nan 'shared_lane' 'no' 'separate' 'track' 'lane' 'share_busway']
cycleway:left:buffer [nan 'yes']
cycleway:left:lane [nan 'exclusive' 'pictogram']
cycleway:left:oneway [nan '-1' 'no']
cycleway:left:separation [nan 'buffer' 'flex_post']
cycleway:right [nan 'lane' 'separate' 'no' 'track' 'shared_lane' 'shoulder'
 'share_busway' 'buffered_lane']
cycleway:right:buffer [nan 'yes']
cycleway:right:lane [nan 'exclusive' 'pictogram' 'advisory']
cycleway:right:oneway [nan 'no']
cycleway:right:separation [nan 'flex_post']
cycleway:surface [nan 'asphalt']
cycleway:width [nan "5'" 

In [55]:
def is_bike_lane(gdf_edges):
    """
    Check if there's a bike lane, use road features to assign LTS
    """
    prefix = 'a'
    bikeLaneRules = {k:v for (k,v) in rating_dict.items() if prefix in k}
    defaultRule = f'{prefix}0'

    gdf_edges['bike_lane'] = 'no'
    gdf_edges['bike_lane_rule_num'] = defaultRule
    gdf_edges['bike_lane_rule'] = 'Assume no bike lane'
    gdf_edges['bike_lane_condition'] = 'default'

    # get the columns that start with 'cycleway'
    # tags = gdf_edges.columns[gdf_edges.columns.str.contains('cycleway')]
    # for tag in tags:
    #     print(tag, gdfEdges[tag].unique())

    for key, value in bikeLaneRules.items():
        # Check rules in order, once something has been updated, leave it be
        # FIXME need to handle single sided tags so that can include both sides in outputs
        gdf_filter = gdf_edges.eval(f"{value['condition']} & (`bike_lane_condition` == 'default')")
        gdf_edges.loc[gdf_filter, 'bike_lane'] = value['bike_lane']
        gdf_edges.loc[gdf_filter, 'bike_lane_rule'] = value['rule_message']
        gdf_edges.loc[gdf_filter, 'bike_lane_rule_num'] = key
        gdf_edges.loc[gdf_filter, 'bike_lane_condition'] = value['condition']

    # Save memory by setting as category, need to set categories first
    for col in ['bike_lane', 'bike_lane_rule', 
                'bike_lane_condition', 'bike_lane_rule_num']:
        gdf_edges[col] = gdf_edges[col].astype('category')

    return gdf_edges

rating_dict = read_yaml()
gdf_bike_lanes = is_bike_lane(gdf_separated_edges.copy())

rules_used = gdf_bike_lanes['bike_lane_rule_num'].unique().tolist()
rules_used = [int(s[7:]) for s in rules_used]
rules_used.sort()
print(rules_used)


In [56]:
tags = gdfEdges.columns[gdfEdges.columns.str.contains('parking')]
for tag in tags.sort_values():
    print(tag, gdfEdges[tag].unique())

parking:both [nan 'no' 'lane' 'separate' 'street_side']
parking:both:orientation [nan 'parallel']
parking:condition:both [nan 'no_parking' 'no_stopping' 'ticket;residents']
parking:condition:left [nan 'no_parking' 'residents' 'ticket;residents']
parking:condition:right [nan 'no_parking' 'residents']
parking:lane:both [nan 'parallel' 'no_stopping' 'no']
parking:lane:both:parallel [nan 'on_street']
parking:lane:left [nan 'parallel' 'no_stopping' 'no']
parking:lane:left:parallel [nan 'on_street']
parking:lane:right [nan 'no_stopping' 'parallel' 'no']
parking:lane:right:parallel [nan 'on_street' 'painted_area_only']
parking:left [nan 'no' 'lane' 'separate' 'street_side']
parking:left:orientation [nan 'parallel']
parking:left:restriction [nan 'no_stopping']
parking:right [nan 'lane' 'no' 'separate' 'street_side']
parking:right:access [nan 'permit']
parking:right:fee [nan 'yes']
parking:right:orientation [nan 'parallel']


In [81]:
def parking_present(gdf_edges):
    """
    Detect where parking is and isn't allowed.
    """
    # tags = gdfEdges.columns[gdfEdges.columns.str.contains('parking')]
    # for tag in tags.sort_values():
    #     print(tag, gdfEdges[tag].unique())

    prefix = 'parking'
    parkingRules = {k:v for (k,v) in rating_dict.items() if prefix in k}
    defaultRule = f'{prefix}0'

    gdf_edges['parking'] = 'yes'
    gdf_edges['parking_rule_num'] = defaultRule
    gdf_edges['parking_rule'] = 'Assume street parking is allowed on both sides'
    gdf_edges['parking_condition'] = 'default'

    for key, value in parkingRules.items():
        # Check rules in order, once something has been updated, leave it be
        # FIXME need to handle single sided tags so that can include both sides in outputs
        gdf_filter = gdf_edges.eval(f"{value['condition']} & (`parking_condition` == 'default')")
        gdf_edges.loc[gdf_filter, 'parking'] = value['parking']
        gdf_edges.loc[gdf_filter, 'parking_rule_num'] = key
        gdf_edges.loc[gdf_filter, 'parking_rule'] = value['rule_message']
        gdf_edges.loc[gdf_filter, 'parking_condition'] = value['condition']

    # Save memory by setting as category, need to set categories first
    for col in ['parking', 'parking_rule_num', 
                'parking_rule', 'parking_condition']:
        gdf_edges[col] = gdf_edges[col].astype('category')

    return gdf_edges

rating_dict = read_yaml()
gdf_parking = parking_present(gdf_bike_lanes.copy())

rules_used = gdf_parking['parking_rule_num'].unique().tolist()
rules_used = [int(s[7:]) for s in rules_used]
rules_used.sort()
print(rules_used)

[0, 1, 2, 3, 4, 6, 7, 8, 9, 10, 21, 22, 23, 24, 26, 28, 29, 30, 41, 42, 44, 50]


In [80]:
tags = gdfEdges.columns[gdfEdges.columns.str.contains('speed')]
for tag in tags.sort_values():
    print(tag, gdfEdges[tag].unique())

print('\n---\n')
tags = gdfEdges.columns[gdfEdges.columns.str.contains('highway')]
for tag in tags.sort_values():
    print(tag, gdfEdges[tag].unique())

maxspeed [nan '25 mph' '30 mph' '20 mph' '15 mph' '35 mph' '25' '40 mph' '20'
 '55 mph' '10 mph' '5 mph']
maxspeed:advisory [nan '5 mph']
maxspeed:type [nan 'US:urban' 'sign']
source:maxspeed [nan 'massgis']

---

area:highway [nan 'traffic_island' 'yes']
construction:highway [nan 'cycleway']
highway ['footway' 'residential' 'secondary' 'unclassified' 'trunk' 'service'
 'tertiary' 'primary' 'primary_link' 'cycleway' 'path' 'pedestrian'
 'trunk_link' 'tertiary_link' 'secondary_link' 'construction'
 'living_street' 'busway' 'track']
note:highway [nan 'busway']


In [109]:
print(gdfEdges[gdfEdges['highway'] == 'primary']['maxspeed'].unique())

['25 mph' '35 mph' nan '25' '20 mph' '40 mph' '30 mph']


In [124]:
def get_prevailing_speed(gdf_edges):
    """
    Get the speed limit for ways
    If not available, make assumptions based on road type
    This errs on the high end of assumptions
    """
    prefix = 'speed'
    speedRules = {k:v for (k,v) in rating_dict.items() if prefix in k}
    defaultRule = f'{prefix}0'

    gdf_edges['speed'] = gdf_edges['maxspeed'] 
    gdf_edges['speed'] = gdf_edges['speed'].fillna(0)
    gdf_edges['speed_rule_num'] = defaultRule
    gdf_edges['speed_rule'] = 'Use signed speed limits.'
    gdf_edges['speed_condition'] = 'default'

    for key, value in speedRules.items():
        # Check rules in order, once something has been updated, leave it be
        gdf_filter = gdf_edges.eval(f"{value['condition']} & (`speed` == 0)")
        gdf_edges.loc[gdf_filter, 'speed'] = value['speed']
        gdf_edges.loc[gdf_filter, 'speed_rule_num'] = key
        gdf_edges.loc[gdf_filter, 'speed_rule'] = value['rule_message']
        gdf_edges.loc[gdf_filter, 'speed_condition'] = value['condition']

    # If mph
    if gdf_edges[gdf_edges['speed'].astype(str).str.contains('mph')].shape[0] > 0:
        mph = gdf_edges['speed'].astype(str).str.contains('mph', na=False)
        gdf_edges.loc[mph, 'speed'] = gdf_edges['speed'][mph].str.split(
            ' ', expand=True)[0].apply(lambda x: np.array(x, dtype = 'int'))

    # # if multiple speed values present, use the largest one
    # gdf_edges['maxspeed_assumed'] = gdf_edges['maxspeed_assumed'].apply(
    #     lambda x: np.array(x, dtype = 'int')).apply(lambda x: np.max(x))

    # Make sure all speeds are numbers
    gdf_edges['speed'] = gdf_edges['speed'].astype(int)
    # Save memory by setting as category, need to set categories first
    for col in ['speed_rule_num', 'speed_rule', 'speed_condition']:
        gdf_edges[col] = gdf_edges[col].astype('category')

    return gdf_edges

rating_dict = read_yaml()
gdf_speed = get_prevailing_speed(gdf_parking.copy())

rules_used = gdf_speed['speed_rule_num'].unique().tolist()
rules_used = [int(s[5:]) for s in rules_used]
rules_used.sort()
print(rules_used)

print(gdf_speed['speed'].unique())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
[25 30 20 35 40 50 15  0 55 10  5]


In [125]:
print(gdfEdges['lanes'].unique())

[nan '2' '1' '3' '4' '5' '7']


In [132]:
def get_lanes(gdf_edges, default_lanes = 2):

    # make new assumed lanes column for use in calculations

    # fill na with default lanes
    # if multiple lane values present, use the largest one
    # this usually happens if multiple adjacent ways are included in the edge and 
    # there's a turning lane
    gdf_edges['lane_count'] = gdf_edges['lanes'].fillna(default_lanes).apply(
        lambda x: np.array(re.split(r'; |, |\*|\n', str(x)), dtype = 'int')).apply( 
            # Converted to a raw string to avoid 'SyntaxWarning: invalid escape sequence '\*' python re',
            # check that this is doing the right thing
            lambda x: np.max(x))
    
    gdf_edges['lane_source'] = 'OSM'
    assumed = gdf_edges['lanes'] == np.nan
    gdf_edges.loc[assumed, 'lane_source'] = 'Assumed lane count'

    return gdf_edges

rating_dict = read_yaml()
gdf_lanes = get_lanes(gdf_speed.copy())

In [133]:
print(gdf_lanes['lane_count'].unique())

[2 1 3 4 5 7]


In [137]:
tags = gdfEdges.columns[gdfEdges.columns.str.contains('lane_markings')]
for tag in tags.sort_values():
    print(tag, gdfEdges[tag].unique())

lane_markings [nan 'no' 'yes']


In [140]:
# This is new.
# The newer LTS rating takes into account whether there is a centerline striped or not.
def get_centerlines(gdf_edges):

    prefix = 'center'
    parkingRules = {k:v for (k,v) in rating_dict.items() if prefix in k}
    defaultRule = f'{prefix}0'

    gdf_edges['centerline'] = 'yes'
    gdf_edges['centerline_rule_num'] = defaultRule
    gdf_edges['centerline_rule'] = 'Assume centerlines.'
    gdf_edges['centerline_condition'] = 'default'

    for key, value in parkingRules.items():
        # Check rules in order, once something has been updated, leave it be
        gdf_filter = gdf_edges.eval(f"{value['condition']} & (`centerline_condition` == 'default')")
        gdf_edges.loc[gdf_filter, 'centerline'] = value['centerline']
        gdf_edges.loc[gdf_filter, 'centerline_rule_num'] = key
        gdf_edges.loc[gdf_filter, 'centerline_rule'] = value['rule_message']
        gdf_edges.loc[gdf_filter, 'centerline_condition'] = value['condition']

    # Save memory by setting as category, need to set categories first
    for col in ['centerline', 'centerline_rule_num', 
                'centerline_rule', 'centerline_condition']:
        gdf_edges[col] = gdf_edges[col].astype('category')

    return gdf_edges

rating_dict = read_yaml()
gdf_centerlines = get_centerlines(gdf_lanes.copy())

rules_used = gdf_centerlines['centerline_rule_num'].unique().tolist()
rules_used = [int(s[6:]) for s in rules_used]
rules_used.sort()
print(rules_used)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]


In [141]:
tags = gdfEdges.columns[gdfEdges.columns.str.contains('width')]
for tag in tags.sort_values():
    print(tag, gdfEdges[tag].unique())

cycleway:width [nan "5'" '6\'0"' '3\'0"']
source:width [nan 'ARCore']
width [nan '7.6' '9.5' '15.2' '21.3' '14.9' '12.2' '11.6' '19.2' '30.5' '18.9'
 '20.7' '11.0' '7.3' '15.9' '9.1' '17\'0"' '11.9' '20\'0"' '17.1' '24.4'
 '14.0' '18.3' '24\'0"' '30.2' '14.6' '15.2;18.6' '13.4' '25.9' '6.7'
 '28.0' '36.6' '16.8' '7' '3.7' '19.8' '10.4;12.2' '11.7' '4.3' '4.6'
 '8.5' '12.8' '10.4' '15.5' '13.7' '6.1' '25.6' '9.8' '4.9' '26.8' '3.0'
 '11.3' '5.5' '10.7' '7.9' '5.8' '8.2' '10.1' '28\'0"' '20.1' '18.6' '6.4'
 '5.2' '16.5' '14.3' '17.7' '19.5' '38.1' '8.8' '22.9' '30\'0"' '16.2'
 '4.7' '16.8;15.2' '2' '12.5' '46\'0"' '50\'0"' '16.1' '27\'0"' '91.5'
 '18.0' '22.0' '11.6;10.4' '5' '27.4' '8\'0"' '3.3' '3' '3.5' '8\'8"'
 '5\'0"' '6' '21\'0"' '4' "10'" '1' '2.7' '1.5' '9\'0"' '8' '10\'0"'
 '12\'0"' '1.4' "8'" '7.0' '1.8' '6\'0"' '16\'0"']
width:carriageway [nan '9' '8']
width:lanes [nan '3|3']


In [171]:
def convert_feet_with_quotes(series):
    series = series.copy()
    # Calculate decimal feet and inches when each given separately
    quoteValues = series.str.contains('\'')
    meterValues = quoteValues == False

    quoteValues[quoteValues.isna()] = False
    quoteValues = quoteValues.astype(bool)

    feetinch = series[quoteValues].str.strip('"').str.split('\'', expand=True)
    feetinch.loc[feetinch[1] == '', 1] = 0
    feetinch = feetinch.apply(lambda x: np.array(x, dtype = 'int'))
    if feetinch.shape[0] > 0:
        feet = feetinch[0] + feetinch[1] / 12
        series[quoteValues] = feet

    # Use larger value if given multiple
    multiWidth = series.str.contains(';', na=False) 

    maxWidth = series[multiWidth].str.split(';', expand=True).max(axis=1)
    series[multiWidth] = maxWidth

    series = series.apply(lambda x: np.array(x, dtype = 'float'))

    # Convert (assumed) meter values to feet
    series[meterValues] = series[meterValues].astype(float) * 3.28084

    series_notes = pd.Series('No Width', index=series.index)
    series_notes[quoteValues] = 'Converted ft-in to decimal feet'
    series_notes[meterValues] = 'Converted m to feet'

    return series, series_notes

def width_ft(gdf_edges):
    '''
    Convert OSM width columns to use decimal feet
    '''
    # print('width_ft')
    gdf_edges['width_ft'], gdf_edges['width_ft_notes'] = convert_feet_with_quotes(gdf_edges['width'])
    # print('cycleway:width_ft')
    gdf_edges['cycleway:width_ft'], gdf_edges['cycleway:width_ft_notes'] = convert_feet_with_quotes(gdf_edges['cycleway:width'])

    return gdf_edges

gdf_width_ft = width_ft(gdf_centerlines.copy())

In [172]:
print(gdf_width_ft['width_ft'].unique())
print(gdf_width_ft['cycleway:width_ft'].unique())

[         nan  24.934384    31.16798     49.868768    69.881892
  48.884516    40.026248    38.057744    62.992128   100.06562
  62.007876    67.913388    36.08924     23.950132    52.165356
  29.855644    17.          39.041996    20.          56.102364
  80.052496    45.93176     60.039372    24.          99.081368
  47.900264    61.023624    43.963256    84.973756    21.981628
  91.86352    120.078744    55.118112    22.96588     12.139108
  64.960632    38.385828    14.107612    15.091864    27.88714
  41.994752    34.120736    50.85302     44.947508    20.013124
  83.989504    32.152232    16.076116    87.926512     9.84252
  37.073492    18.04462     35.104988    25.918636    19.028872
  26.902888    33.136484    28.          65.944884    20.997376
  17.060368    54.13386     46.916012    58.070868    63.97638
 125.000004    28.871392    75.131236    30.          53.149608
  15.419948     6.56168     41.0105      46.          50.
  52.821524    27.         300.19686     59.05512 

In [175]:
tags = gdfEdges.columns[gdfEdges.columns.str.contains('one')]
for tag in tags.sort_values():
    print(tag, gdfEdges[tag].unique())

tags = gdf_width_ft.columns[gdf_width_ft.columns.str.contains('parking')]
for tag in tags.sort_values():
    print(tag, gdf_width_ft[tag].unique())

cycleway:left:oneway [nan '-1' 'no']
cycleway:right:oneway [nan 'no']
oneway [False  True]
oneway:bicycle [nan 'no' 'yes']
oneway:conditional [nan '-1 @ (17:00-19:00); ' 'yes @ (07:00-09:00,15:00-19:00)']
parking ['yes', 'no', 'left:no', 'left:yes', 'right:no', 'right:yes']
Categories (6, object): ['left:no', 'left:yes', 'no', 'right:no', 'right:yes', 'yes']
parking:both [nan 'no' 'lane' 'separate' 'street_side']
parking:both:orientation [nan 'parallel']
parking:condition:both [nan 'no_parking' 'no_stopping' 'ticket;residents']
parking:condition:left [nan 'no_parking' 'residents' 'ticket;residents']
parking:condition:right [nan 'no_parking' 'residents']
parking:lane:both [nan 'parallel' 'no_stopping' 'no']
parking:lane:both:parallel [nan 'on_street']
parking:lane:left [nan 'parallel' 'no_stopping' 'no']
parking:lane:left:parallel [nan 'on_street']
parking:lane:right [nan 'no_stopping' 'parallel' 'no']
parking:lane:right:parallel [nan 'on_street' 'painted_area_only']
parking:left [nan '

In [176]:
# This is new.
# There are some conditions where the wideness of the street affects the rating in the new LTS ratings.
def define_narrow_wide(gdf_edges):

    gdf_edges['street_narrow_wide'] = 'wide'

    gdf_edges[(gdf_edges['oneway'] == 'True') & (gdf_edges['width_ft'] < 30) & (gdf_edges['parking'] == 'yes')] = 'narrow'

    gdf_edges[(gdf_edges['oneway'] == 'True') & (gdf_edges['width_ft'] < 22) & (gdf_edges['parking'] == 'left:yes')] = 'narrow'
    gdf_edges[(gdf_edges['oneway'] == 'True') & (gdf_edges['width_ft'] < 22) & (gdf_edges['parking'] == 'right:yes')] = 'narrow'

    gdf_edges[(gdf_edges['oneway'] == 'True') & (gdf_edges['width_ft'] < 15) & (gdf_edges['parking'] == 'no')] = 'narrow'

    return gdf_edges

rating_dict = read_yaml()
gdf_nw = define_narrow_wide(gdf_width_ft.copy())

In [None]:
# Functions to update


def bike_lane_analysis_with_parking(gdf_edges):
    # get lanes, width, speed
    gdf_edges = get_lanes(gdf_edges)
    gdf_edges = get_prevailing_speed(gdf_edges)
    if unit == 'english':
        gdf_edges['width'] = convert_feet_with_quotes(gdf_edges['width'])

    # create a list of lts conditions
    # When multiple conditions are satisfied, the first one encountered in conditions is used
    conditions = [
        (gdf_edges['lanes_assumed'] >= 3) & (gdf_edges['maxspeed_assumed'] <= tables['s4'][unit]),
        (gdf_edges['width'] <= tables['w2'][unit]),
        (gdf_edges['width'] <= tables['w3'][unit]),
        (gdf_edges['width'] <= tables['w4'][unit]) &
            ((gdf_edges['maxspeed_assumed'] <= tables['s2'][unit]) &
                (gdf_edges['highway'] == 'residential')),
        (gdf_edges['maxspeed_assumed'] > tables['s2'][unit]) &
            (gdf_edges['maxspeed_assumed'] <= tables['s3'][unit]),
        (gdf_edges['maxspeed_assumed'] > tables['s3'][unit]) & 
            (gdf_edges['maxspeed_assumed'] <= tables['s4'][unit]),
        (gdf_edges['maxspeed_assumed'] > tables['s4'][unit]),
        (gdf_edges['highway'] != 'residential')
        ]

    # create a list of the values we want to assign for each condition
    values = ['b2', 'b3', 'b4', 'b5', 'b6', 'b7', 'b8', 'b9']
    gdf_edges['rule'] = np.select(conditions, values, default='b1')
    rule_dict = {'b1':1, 'b2':3, 'b3':3, 'b4':2, 'b5':2, 'b6':2, 'b7':3, 'b8':4, 'b9':3}
    gdf_edges['lts'] = gdf_edges['rule'].map(rule_dict)

    return gdf_edges

def bike_lane_analysis_no_parking(gdf_edges):
    """
    LTS depends on presence of median, but this is not commonly tagged in OSM. 
    Possibly check the 'dual_carriageway' tag. 
    """

    # get lanes, width, speed
    gdf_edges = get_lanes(gdf_edges)
    gdf_edges = get_prevailing_speed(gdf_edges)
    if unit == 'english':
        gdf_edges['width'] = convert_feet_with_quotes(gdf_edges['width'])

    # assign widths that are a string to nan
    gdf_edges.loc[gdf_edges[['width']].map( # applymap depreciated
        lambda x: isinstance(x, str))['width'], 'width'] = np.nan

    # create a list of lts conditions
    # When multiple conditions are satisfied, the first one encountered in conditions is used
    conditions = [
        (gdf_edges['lanes_assumed'] >= 3) & (gdf_edges['maxspeed_assumed'] <= tables['s5'][unit]),
        (gdf_edges[['width']].map(lambda x: isinstance(x, float))['width']) & # applymap depreciated
            (gdf_edges['width'] <= tables['w1'][unit]),
        (gdf_edges['maxspeed_assumed'] > tables['s3'][unit]) &
            (gdf_edges['maxspeed_assumed'] <= tables['s5'][unit]),
        (gdf_edges['maxspeed_assumed'] > tables['s5'][unit]),
        (gdf_edges['highway'] != 'residential')
        ]

    values = ['c3', 'c4', 'c5', 'c6', 'c7']
    gdf_edges['rule'] = np.select(conditions, values, default='c1')
    rule_dict = {'c1':1, 'c3':3, 'c4':2, 'c5':3, 'c6':4, 'c7':3}
    gdf_edges['lts'] = gdf_edges['rule'].map(rule_dict)

    return gdf_edges

def mixed_traffic(gdf_edges):
    # get lanes, width, speed
    gdf_edges = get_lanes(gdf_edges)
    gdf_edges = get_prevailing_speed(gdf_edges)

    # create a list of lts conditions
    # When multiple conditions are satisfied, the first one encountered in conditions is used
    conditions = [
        (gdf_edges['motor_vehicle'] == 'no'),
        (gdf_edges['highway'] == 'pedestrian'),
        (gdf_edges['highway'] == 'footway') & (gdf_edges['footway'] == 'crossing'),
        (gdf_edges['highway'] == 'service') & (gdf_edges['service'] == 'alley'),
        (gdf_edges['highway'] == 'track'),
        (gdf_edges['maxspeed_assumed'] <= tables['s3'][unit]) & (gdf_edges['highway'] == 'service') &
            (gdf_edges['service'] == 'parking_aisle'),
        (gdf_edges['maxspeed_assumed'] <= tables['s3'][unit]) & (gdf_edges['highway'] == 'service') &
            (gdf_edges['service'] == 'driveway'),
        (gdf_edges['maxspeed_assumed'] <= tables['s1'][unit]) & (gdf_edges['highway'] == 'service'),
        (gdf_edges['maxspeed_assumed'] <= tables['s2'][unit]) & (gdf_edges['lanes_assumed'] <= 3) &
            (gdf_edges['highway'] == 'residential'),
        (gdf_edges['maxspeed_assumed'] <= tables['s2'][unit]) & (gdf_edges['lanes_assumed'] <= 3),
        (gdf_edges['maxspeed_assumed'] <= tables['s2'][unit]) & (gdf_edges['lanes_assumed'] <= 5),
        (gdf_edges['maxspeed_assumed'] <= tables['s2'][unit]) & (gdf_edges['lanes_assumed'] > 5),
        (gdf_edges['maxspeed_assumed'] <= tables['s3'][unit]) & (gdf_edges['lanes_assumed'] < 3) &
            (gdf_edges['highway'] == 'residential'),
        (gdf_edges['maxspeed_assumed'] <= tables['s3'][unit]) & (gdf_edges['lanes_assumed'] <= 3),
        (gdf_edges['maxspeed_assumed'] <= tables['s3'][unit]) & (gdf_edges['lanes_assumed'] > 3),
        (gdf_edges['maxspeed_assumed'] > tables['s3'][unit])
        ]

    # create a list of the values we want to assign for each condition
    values = ['m17', 'm13', 'm14', 'm2', 'm15', 'm3', 
              'm4', 'm16', 'm5', 'm6', 'm7',
              'm8', 'm9', 'm10', 'm11', 'm12']

    # create a new column and use np.select to assign values to it using our lists as arguments
    gdf_edges['rule'] = np.select(conditions, values, default='m0')

    rule_dict = {'m17':1, 'm13':1, 'm14':2, 'm2':2, 'm15':2, 'm3':2, 'm4':2, 'm16':2,
                 'm5':2, 'm6':3, 'm7':3, 'm8':4, 'm9':2, 'm10':3, 'm11':4, 'm12':4}

    gdf_edges['lts'] = gdf_edges['rule'].map(rule_dict)

    return gdf_edges