## Development Notepad for "get_cycle_network" function

Function description:
For a given bounding box returns a network of cyclable roads with some measure of cyclability against each.

User inputs
- Bounding box
- Impute method
- Self learn LTS
- User defined cyclist

Output
- G, edge/node attributes


Funcion flow
1. Use bounding box to go to osmnx and pull network
2. Impute missing data using given method
3. Run model of LTS (either Ottawa or self learn)
4. Append any other features
5. Compute access costs for pre-defined user types
6. If additional user types also compute access cost

In [2]:
import osmnx as ox
import csv
import shapely
import numpy as np
import pandas as pd
from Functions.imputation_missing_data import knn_dist_impute,get_impute_masks, feature_learning_train_sets, mode_rule,knn_feats,mlp_impute, ottawa_impute_speed
from Functions.LTS import lts_ottawa
from Functions.self_learning import self_learn
from Functions.helper_functions import dedupe_var_replace

In [3]:
tag_file = 'tags.txt'
with open (tag_file, 'r') as f:
    tags_to_add = [row[0] for row in csv.reader(f,delimiter=',')]

Part 1 - using bounding box pull osmnx network

In [4]:
bounding_box = tuple([-0.097933,51.457054,-0.041285,51.486933])

In [5]:
network_type = 'bike'

utw = ox.settings.useful_tags_way + tags_to_add
ox.config(use_cache=True, log_console=True, useful_tags_way=utw)

#Get data from OSMNX
G = ox.graph_from_bbox(bounding_box[3],bounding_box[1], bounding_box[0], bounding_box[2],network_type = network_type, retain_all=True, simplify=False)

  ox.config(use_cache=True, log_console=True, useful_tags_way=utw)


In [6]:
#Get edge attributes
edge_attributes = ox.graph_to_gdfs(G, nodes=True)[1]
#Get edge centroids
edge_attributes['cent_x'] = edge_attributes['geometry'].centroid.x
edge_attributes['cent_y'] = edge_attributes['geometry'].centroid.y
#Add edge index
edge_attributes['edge_index'] = range(len(edge_attributes))
#Get boundary
graph_boundary=shapely.geometry.box(edge_attributes.geometry.total_bounds[0],edge_attributes.geometry.total_bounds[1],edge_attributes.geometry.total_bounds[2],edge_attributes.geometry.total_bounds[3])


  edge_attributes['cent_x'] = edge_attributes['geometry'].centroid.x

  edge_attributes['cent_y'] = edge_attributes['geometry'].centroid.y


Part 2 - Impute Missing Data

Fields to imput: maxspeed, lanes, surface

Methods: knn-dist, knn-feats, mode-rule, mlp, ottawa-rules

In [7]:
#Function params
impute_method = 'knn-feats'
mlp_train_params =  {
    'hidden_layer' : 100,
    'n_epochs' : 50,
    'batch_size' : 10
}

#Impute Max Speed
print('Imputing Max Speed')
tag_to_impute = 'maxspeed'
#Imputation masks
var_exists, var_to_impute = get_impute_masks(tag_to_impute,edge_attributes)
#Get ML training sets
target_to_num, num_to_target, target, y_int, y_onehot, x_hot = feature_learning_train_sets(edge_attributes, tag_to_impute, tags_to_add)
#Impute missing data
if impute_method == 'knn-dist':
    print('Imputing data using method - KNN Dist')
    imputed_vals = knn_dist_impute(edge_attributes,var_exists,var_to_impute,tag_to_impute)
elif impute_method == 'knn-feats':
    print('Imputing data using method - KNN Feats')
    #todo: default value for k
    imputed_vals = knn_feats(x_hot,var_exists,target,var_to_impute,k = 3)
elif impute_method == 'mode-rule':
    print('Imputing data using method - KNN Feats')
    imputed_vals = mode_rule(edge_attributes,var_exists,var_to_impute,tag_to_impute)
elif impute_method == 'mlp':
    print('Imputing data using method - MLP')
    imputed_vals = mlp_impute(y_onehot,x_hot,mlp_train_params['hidden_layer'],var_exists,var_to_impute,mlp_train_params['batch_size'],mlp_train_params['n_epochs'],num_to_target)
elif impute_method == 'ottawa':
    print('Imputing data using method - MLP')
    print('WARNING : This method has hardcoded values specific to a UK setting.')
    imputed_vals = ottawa_impute_speed(edge_attributes,var_to_impute)
    
#Add imputed values to edge_attributes
imp_index = 0
for i,r in edge_attributes[var_to_impute].iterrows():
    edge_attributes.loc[i,tag_to_impute] = imputed_vals[imp_index]
    imp_index += 1
speed_num = []
for i in list(edge_attributes[tag_to_impute].values):
    if type(i) != int:
        speed_num.append(int("".join(filter(str.isdigit, i))))
    else:
        speed_num.append(i)
edge_attributes[tag_to_impute] = speed_num

#Replace dupes on osmid with mode
edge_attributes = dedupe_var_replace(edge_attributes,tag_to_impute)

#Impute Lanes
print('Imputing Number of Lanes')

tag_to_impute = 'lanes'
#Imputation masks
var_exists, var_to_impute = get_impute_masks(tag_to_impute,edge_attributes)
#Get ML training sets
target_to_num, num_to_target, target, y_int, y_onehot, x_hot = feature_learning_train_sets(edge_attributes, tag_to_impute, tags_to_add)
#Impute missing data
if impute_method == 'knn-dist':
    print('Imputing data using method - KNN Dist')
    imputed_vals = knn_dist_impute(edge_attributes,var_exists,var_to_impute,tag_to_impute)
elif impute_method == 'knn-feats':
    print('Imputing data using method - KNN Feats')
    #todo: default value for k
    imputed_vals = knn_feats(x_hot,var_exists,target,var_to_impute,k = 3)
elif impute_method == 'mode-rule':
    print('Imputing data using method - KNN Feats')
    imputed_vals = mode_rule(edge_attributes,var_exists,var_to_impute,tag_to_impute)
elif impute_method == 'mlp':
    print('Imputing data using method - MLP')
    imputed_vals = mlp_impute(y_onehot,x_hot,mlp_train_params['hidden_layer'],var_exists,var_to_impute,mlp_train_params['batch_size'],mlp_train_params['n_epochs'],num_to_target)
    
if impute_method == 'ottawa':
    for i,r in edge_attributes[var_to_impute].iterrows():
        edge_attributes.loc[i,tag_to_impute] = 2
else:
    imp_index = 0
    for i,r in edge_attributes[var_to_impute].iterrows():
        edge_attributes.loc[i,tag_to_impute] = imputed_vals[imp_index]
        imp_index += 1

edge_attributes[tag_to_impute] = edge_attributes[tag_to_impute].values.astype(float)
edge_attributes = dedupe_var_replace(edge_attributes,tag_to_impute)

#Impute Surface
print('Imputing Surface')

tag_to_impute = 'surface'
#Imputation masks
var_exists, var_to_impute = get_impute_masks(tag_to_impute,edge_attributes)
#Get ML training sets
target_to_num, num_to_target, target, y_int, y_onehot, x_hot = feature_learning_train_sets(edge_attributes, tag_to_impute, tags_to_add)

#Impute missing data
if impute_method == 'knn-dist':
    print('Imputing data using method - KNN Dist')
    imputed_vals = knn_dist_impute(edge_attributes,var_exists,var_to_impute,tag_to_impute)
elif impute_method == 'knn-feats':
    print('Imputing data using method - KNN Feats')
    #todo: default value for k
    imputed_vals = knn_feats(x_hot,var_exists,target,var_to_impute,k = 3)
elif impute_method == 'mode-rule':
    print('Imputing data using method - mode rule')
    imputed_vals = mode_rule(edge_attributes,var_exists,var_to_impute,tag_to_impute)
elif impute_method == 'mlp':
    print('Imputing data using method - MLP')
    imputed_vals = mlp_impute(y_onehot,x_hot,mlp_train_params['hidden_layer'],var_exists,var_to_impute,mlp_train_params['batch_size'],mlp_train_params['n_epochs'],num_to_target)

if impute_method != 'ottawa':
    imp_index = 0
    for i,r in edge_attributes[var_to_impute].iterrows():
        edge_attributes.loc[i,tag_to_impute] = imputed_vals[imp_index]
        imp_index += 1
    edge_attributes = dedupe_var_replace(edge_attributes,tag_to_impute)

Imputing Max Speed
Imputing data using method - KNN Feats
Imputing Number of Lanes
Imputing data using method - KNN Feats
Imputing Surface
Imputing data using method - KNN Feats


Part 3 - Implement LTS

Methods - Ottawa, self-learning

In [9]:
lts_method = 'self-learn'
k = 5

In [10]:
if lts_method == 'ottawa':
    print('Calculating LTS using Ottawa Advocacy Group method')
    lts = lts_ottawa(edge_attributes)
    edge_attributes['LTS'] = lts['LTS_ottawa']
    edge_attributes = pd.concat([edge_attributes, pd.get_dummies(lts['LTS_ottawa'])], axis=1)
elif lts_method == 'self-learn':
    print('Calculating LTS using Self-Learning Approach')
    print('WARNING : this approach is under development, please check your results carefully')
    lts = self_learn(edge_attributes,k)
    edge_attributes['LTS'] = lts['cluster']
    edge_attributes = pd.concat([edge_attributes, pd.get_dummies(lts['cluster'])], axis=1)

Calculating LTS using Self-Learning Approach


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  cluster_data['{}_zscore'.format(col)] = zscore(cluster_data[col])


Part 4 - Additional Features

To Do

Part 5 - Compute Access per Edge for Pre-Specified Cyclist Types

- User 1 - Beginner
- User 2 - Eager but cautious
- User 3 - Experienced

In [15]:
class cyclist:
    def __init__(self, description, cycle_speed, risk_weights, risk_allowance, risk_decay):
        self.description = description
        self.cycle_speed = cycle_speed
        self.risk_weights = risk_weights
        self.risk_allowance = risk_allowance
        self.risk_decay = risk_decay
    def return_beta_linear(self,edge):
        beta = (edge[0] * self.risk_weights[0]) + (edge[1] * self.risk_weights[1]) + (edge[2] * self.risk_weights[2]) + (edge[3] * self.risk_weights[3]) + (edge[4] * self.risk_weights[4])
        return beta

In [16]:
#Define weight matrices for different users
weights_beginner = {0:0.1,1:0.2,2:2,3:4,4:10}
weights_eager = {0:0.1,1:0.2,2:1.2,3:2,4:5}
weights_experienced = {0:0.1,1:0.1,2:0.25,3:1,4:1.5}

#Define cyclist objects
beginner = cyclist(description = 'Beginner', cycle_speed=4.5, risk_weights=weights_beginner,risk_allowance = 3, risk_decay = 2)
eager = cyclist(description = 'Eager', cycle_speed=5.5, risk_weights=weights_eager,risk_allowance = 2, risk_decay = 2)
experienced = cyclist(description = 'Experienced', cycle_speed=6, risk_weights=weights_experienced,risk_allowance = 1.2, risk_decay = 2)

In [17]:
risk_vectors = np.zeros((len(edge_attributes),3))
it = 0
for i,r in edge_attributes.iterrows():
    risk_vectors[it,0] = beginner.return_beta_linear(r)
    risk_vectors[it,1] = eager.return_beta_linear(r)
    risk_vectors[it,2] = experienced.return_beta_linear(r)
    it += 1
    
normalized_risk_vectors = 1 + (risk_vectors - risk_vectors.min()) / (risk_vectors.max() - risk_vectors.min())

it = 0
for i,r in edge_attributes.iterrows():
    G[i[0]][i[1]][i[2]]['ac_beginner'] = normalized_risk_vectors[it,0] * r['length']
    G[i[0]][i[1]][i[2]]['ac_eager'] = normalized_risk_vectors[it,1] * r['length']
    G[i[0]][i[1]][i[2]]['ac_expert'] = normalized_risk_vectors[it,2] * r['length']

Part 6 - Compute access cost for user-specified cyclists

ToDo