**Authors: Ali Alami-Idrissi, Atul Kumar Sinha, Prakhar Srivastava**
# Uncertainty-aware Route Planner

Classically, journey planners have always provided the user with itineraries that are efficient in terms of both time taken and distance traveled. Gradually they have become more robust with the inclusion of flexible departure/arrival times, preferred number of changes and suggestion of short walk paths in between. However, a key element which still seems missing is taking into account the delays. Despite the numerous technological advances, delays remain a ubiquitous feature for public transportation services and incorporating them into the route-planner would definitely increase their robustness by giving the user a rich array of routes and associated risks to choose from.

# Structure

The overall layout for the report is as follows:

1. <a href='#Data Processing'>Data Processing</a>

    a. <a href='#Fetching Data'>Fetching Data</a>
    
    b. <a href='#Renaming columns'>Renaming columns</a>
    
    c. <a href='#Keeping only Zurich stations'>Keeping only Zurich stations</a>
    
    d. <a href='#Computing walking distances'>Computing walking distances</a>
    
    e. <a href='#Sanitizing input dataframe'>Sanitizing input dataframe</a>
    
    f. <a href='#Computing delays'>Computing delays</a>
    
    g. <a href='#Grouping the dataframe'>Grouping the dataframe</a>
    
    
2. <a href='#Creating nodes and edges'>Creating nodes and edges</a>

    a. <a href='#Making edges'>Making edges</a>
    
    b. <a href='#Exploding the dataframe'>Exploding the dataframe</a>
    
    c. <a href='#Merging walk data and public transport data'>Merging walk data and public transport data</a>
    
    d. <a href='#Computing statistics'>Computing statistics</a>
    
    
3. <a href='#Building the graph'>Building the graph</a>


4. <a href='#Itinerary Generator'>Itinerary Generator</a>


5. <a href='#Mapv'>Map visualization</a>


6. <a href='#isochrone'>Isochrone map</a>


7. <a href='#improv'>Possible improvements</a>

## Imports 

In [57]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [60]:
import routing_algo as route
import  pyspark.sql.functions as F
import pandas as pd
from operator import itemgetter
import numpy as np
from datetime import *
import ast
from pyspark.sql.types import *
import itertools
import os
import getpass
import pyspark
from pyspark.sql import SparkSession
import math
from geopy import distance
from  dateutil import parser
import re
from pyspark.sql.types import Row
import copy
from pyspark.sql import Window
import networkx as nx
from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource,Range1d,Circle,HoverTool, LabelSet,Select, TextInput, ColumnDataSource, CustomJS
from bokeh.models.widgets import Button,Select,TextInput,AutocompleteInput,Div
from bokeh.layouts import row as row_l,column as column_l
from IPython.display import display, HTML
from geopy.geocoders import Nominatim
from functools import reduce
from pyproj import Proj, transform
from bokeh.embed import components
from bokeh.io import curdoc
import random
import string
from bokeh.io import show
from bokeh.layouts import widgetbox
import pyspark.sql.types as t
import math
import pickle
import branca.colormap as cm
import folium
from constants import *
from bokeh.tile_providers import get_provider, Vendors
from helpers import *
output_notebook()

## Helper functions 

In [49]:
def find_path(dummy):
    """ Function called when parameters are changed, handles plotting """

    global lines
    groups, groups_display, durations, global_confidence, total_time = find_path_sub(G,selected_src, selected_dst,
                                                                                     selected_time, float(
                                                                                         proba) / 100,
                                                                                     n_iter=4, cutoff=selected_cutoff)
    if groups is not None:

        for line in lines:
            p.renderers.remove(line)

        lines = []

        for idx, group in enumerate(groups):
            lon = interleave(group.lon_x, group.lon_y)
            lat = interleave(group.lat_x, group.lat_y)
            lines.append(p.line(lon, lat, line_alpha=0.6,
                                line_color=colors[idx % len(colors)], line_width=6))

        div_path.text = default_text_div + \
            show_plan(groups_display, durations, global_confidence, total_time)

    else:
        html_returned = '<div id=path>'
        html_returned += '<div class=path_header><center> No valid path for the current selection !<center></div><br></div>'
        div_path.text = default_text_div + html_returned

    push_notebook(handle=bokeh_handle)


def reset(dummy):
    """ Reset map """

    global lines
    for line in lines:
        p.renderers.remove(line)

    lines = []
    div_path.text = default_text_div
    push_notebook(handle=bokeh_handle)


In [145]:
##################################################  Data Preprocessing ##############################################################################


def parse(str_):
    """ Metadata parsing """
    columns = ['id', 'lon', 'lat', '_', 'name']
    regex_pattern = r'(\d+)\s+([-+]*\d+|[-+]*\d+\.\d+)\s+([-+]*\d+|[-+]*\d+\.\d+)\s+([-+]*\d+)\s+\%\s+(.*)'

    try:
        ret = Row(**dict(zip(columns, re.search(regex_pattern, str_).groups())))

    except:
        raise RuntimeError('String "{}" generated error'.format(str_))

    return ret


def rename_cols(orig_cols, new_names):

    list_ret = [F.col(orig_col).alias(new_col)
                for orig_col, new_col in zip(orig_cols, new_names)]

    return list_ret


def replace_null_dates(df, cols):
    """ Replace null dates with a specific token """

    for col_ in cols:
        df = df.withColumn(col_, F.when(
            df[col_].isNull(), null_date).otherwise(df[col_]))
        df = df.withColumn(col_, to_timestamp(col_))

    return df


def remove_unvalid(remove_top=False):
    """ Sanitize data so that inner arrays are now corresponding to nodes """

    def inner(struct):
        nb_stops = struct[0]
        array = struct[1]

        if remove_top:
            top = 1
            bottom = 0

        else:
            top = 0
            bottom = 1

        return [y for x in range(0, len(array), nb_stops) for y in array[x + top:x + nb_stops - bottom]]

    return inner


def make_udf(type_, func):
    "create a UDF"

    return F.udf(func, type_)


def chunk_by_edge(struct_):
    """ Group data of same node in one array """

    ret_array = []
    array = struct_[0]
    chunk_size = struct_[1] - 1

    for i in range(chunk_size):
        ret_array.append([array[n] for n in range(i, len(array), chunk_size)])

    return ret_array


def combine_columns(*cols):

    return list(zip(*cols))


def combine_and_explode(cols, schema_combine, df):
    """ Group data for a specific edge """

    combine_columns_udf = F.udf(combine_columns, schema_combine)

    return df.withColumn('new', combine_columns_udf(*cols)).withColumn("new", F.explode("new"))


def get_sec(time_str):

    try:
        h, m, s = time_str.split(':')

    except:

        return 0

    return float(h) * 3600 + float(m) * 60 + float(s)


##################################################  Compute probas ##############################################################################

def pxy(_lambda, duration):
    """ Compute P(x<y) """

    if duration < 0:

        return 0.

    if _lambda == math.inf:

        return 1.

    return 1 - math.exp(-_lambda * duration)


def time_diff(departure_time, arrival_time):
    """ Time difference between two timestamps """

    h, m, s = [int(x) for x in departure_time.split(':')]
    t1 = time(h, m, s)

    try:
        h, m, s = [int(x) for x in arrival_time.split(':')]
        t2 = time(h, m, s)

    except:
        t2 = t1

    dateTimeA = datetime.combine(date.today(), t1)
    dateTimeB = datetime.combine(date.today(), t2)

    dateTimeDifference = dateTimeA - dateTimeB

    return dateTimeDifference.total_seconds()


def add_time(given_time, num_sec):
    """ Add timedelta to a given timestamp """

    h, m, s = [int(x) for x in given_time.split(':')]
    t = time(h, m, s)

    dateTimeOld = datetime.combine(date.today(), t)
    dateTimeNew = dateTimeOld + timedelta(seconds=num_sec)

    h, m, s = dateTimeNew.hour, dateTimeNew.minute, dateTimeNew.second

    return "%02d:%02d:%02d" % (h, m, s)



##################################################  Building nx graph ##############################################################################


def add_to_graph_edge(edge_data):
    """ For building multigraph edges """

    cols = edge_data.index
    edge_attribs = {x: edge_data[x]
                    for x in cols if type(edge_data[x]) != list}
    edge_attribs['weight'] = 0
    G.add_edge(edge_data.departure_station,
               edge_data.arrival_station, **edge_attribs)


def add_to_graph_node(node_data):
    """ For building multigraph nodes """
    cols = node_data.index
    node_attribs = {x: node_data[x] for x in cols}
    G.add_node(node_attribs['name'], **node_attribs)


## Udfs

In [98]:
@F.udf
def filter_coords(row):
    """filter out all station > 10 km to ZH HB"""
    lon = float(row[0])
    lat = float(row[1])
    distance_to_hb = distance.distance(
        (hb_coords.lon, hb_coords.lat), (lon, lat)).km <= 10

    return distance_to_hb


@F.udf
def filter_walking(row):
    """Find distance between two stations"""

    coords_1 = (row.lon, row.lat)
    coords_2 = (row.lon_2, row.lat_2)
    distance_walk = distance.distance(coords_1, coords_2).km

    return distance_walk





@F.udf(IntegerType())
def number_of_stops(x):
    """Compute the number of stop for every trip"""

    nb_stops = len(x)

    try:
        nb_stops = x[1:].index(x[0]) + 1

    except:
        pass

    return nb_stops


udf_shorten = F.udf(lambda x: x[0][:x.nb_stops], ArrayType(StringType()))


@F.udf(ArrayType(ArrayType(StringType())))
def edge_split(stops):
    """Create individual edges from stop names"""

    return [(stops[i], stops[i + 1]) for i in range(len(stops) - 1)]


compute_trip_time = F.udf(lambda struct: [
                          x - y for x, y in zip(struct[0], struct[1])], ArrayType(DoubleType()))


@F.udf(ArrayType(StringType()))
def extract_time(array):
    """Extract the time from a date"""

    ret = []

    for elem in array:
        date_obj = datetime.fromtimestamp(elem)
        ret.append('{:02d}:{:02d}:{:02d}'.format(
            date_obj.hour, date_obj.minute, date_obj.second))

    return ret


types = StructType([StructField('S_departure_time', ArrayType(StringType())),
                    StructField('S_arrival_time', ArrayType(StringType()))])


@F.udf(types)
def get_unique_time(struct_):
    """Build multiedges (one edge per departure time)"""

    # Sort unique departure times
    departure_times = sorted(list(set(struct_.S_departure_time)))

    # Sort unique arrival times
    arrival_times = sorted(list(set(struct_.S_arrival_time)))
    arrival_times_ordered = []

    # Match pairs of Departure/arrival time
    for idx, departure_time in enumerate(departure_times):
        stop = False
        idx_arrival = idx

        while(not stop and idx_arrival < len(arrival_times)):
            arrival_time = arrival_times[idx_arrival]

            if departure_time < arrival_time:
                arrival_times_ordered.append(arrival_time)
                stop = True

            idx_arrival += 1

    return departure_times, arrival_times_ordered


udf_sec = F.udf(get_sec, DoubleType())


@F.udf(DoubleType())
def get_lambda_mle(values_list):
    """compute statistics for exponential distribution"""

    list_true_samples = [x for x in values_list if x != -1]

    if len(list_true_samples) > 0:
        mean = sum(list_true_samples) / len(list_true_samples)

    else:

        return math.inf

    if mean == 0:

        return math.inf

    return 1. / mean


pxy_udf = F.udf(lambda x: pxy_2(x[0], x[1], x[2]), DoubleType())


## Setting up a spark session

In [85]:
conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('scheduler-{0}'.format(getpass.getuser()))
conf.set('spark.executor.memory', '4g')
conf.set('spark.executor.instances', '3')
conf.set('spark.port.maxRetries', '100')
conf.set('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
sc

In [86]:
sc.addPyFile('geopy.zip')
sc.addPyFile('geographiclib.zip')
sc.addPyFile('helpers.py')
sc.addPyFile('routing_algo.py')
sc.addPyFile('constants.py')
spark = SparkSession(sc)

In [87]:
spark

<a id='Data Processing'></a>
## I. Data Processing
<a id='Fetching Data'></a>
### I. a. Fetching Data

We begin by simply fetching the schedule dataset and the associated metadata. We only use 2018 weekdays data for computational constraints purposes. However the same pipeline can be used for the whole dataset

In [153]:
df= spark.read.option("header", "true").csv("/datasets/sbb/2018/*/*", sep= ';')



In [101]:
## Read metadata
metadata = sc.textFile("/homes/alami/metadata.txt")
metadata = metadata.map(parse).toDF()

<a id='Renaming columns'></a>
### I. b. Renaming columns

Columns are given appropriate English names for ease of use later.

In [102]:
original_df_names = ['BETRIEBSTAG',
 'FAHRT_BEZEICHNER',
 'PRODUKT_ID',
 'FAELLT_AUS_TF',
 'HALTESTELLEN_NAME',
 'ANKUNFTSZEIT',
 'AN_PROGNOSE',
 'AN_PROGNOSE_STATUS',
 'ABFAHRTSZEIT',
 'AB_PROGNOSE',
 'AB_PROGNOSE_STATUS']

new_df_names = ['date','trip_id','type','irregular_trip','stop_name','S_arrival_time',
                'A_arrival_time','delay_arrival','S_departure_time','A_departure_time','delay_departure']

df = df.select(*rename_cols(original_df_names,new_df_names))

In [103]:
# Keep only weekdays data 
funcWeekDay =  F.udf(lambda x: int(datetime.strptime(x, '%d.%m.%Y').strftime('%w')),IntegerType())
df = df.withColumn('day',funcWeekDay('date'))
df = df.where(F.col('day') <= 5)
df = df.drop('day')

<a id='Keeping only Zurich stations'></a>
### I. c.  Keeping only Zurich stations

The algorithm we present for uncertainty awareness works as a proof-of-concept for the Zurich Area. There is definitely a lot of scope for optimization in a big-data setting which will be discussed in the end.

In [104]:
hb_coords = metadata.where(F.col('name') == 'Zürich HB').collect()[0]
metadata = metadata.withColumn('keep',filter_coords(F.struct([metadata[x] for x in ['lon','lat']])).cast('boolean'))
metadata = metadata[metadata.keep]

In [105]:
## Filter Far stops in df

df = df.join(metadata,df.stop_name == metadata.name).select(df.columns)
metadata = metadata.drop('keep').cache()

<a id='Computing walking distances'></a>
### I. d. Computing walking distances

In order to incorporate short walks in the suggested route planner, we need to explicitly create a dataframe of pairwise distances between all possible stops which will be later used to create "WALK" edges in the final graph that we build. Of course, we will filter these pairwise distances under an appropriate threshold in order to keep reasonable walking distances.

In [106]:
## Generate all pairs of stations

metadata_renamed = metadata.select(rename_cols(metadata.columns,[x+'_2'for x in metadata.columns]))
pairwise_distances = metadata.crossJoin(metadata_renamed)

## Filter out self-loops

pairwise_distances = pairwise_distances.where(pairwise_distances.id != pairwise_distances.id_2)

## Compute distances

struct_instance = F.struct([pairwise_distances[x] for x in pairwise_distances.columns])
pairwise_distances = pairwise_distances.withColumn('distance',filter_walking(struct_instance))

## Keep only reasonable walkable paths

pairwise_distances = pairwise_distances.where(pairwise_distances.distance <= distance_walk_thresh).cache()

<a id='Sanitizing input dataframe'></a>
### I. e. Sanitizing input dataframe

Now that we have all the relavant information with us, let's begin data-cleaning. Specifically, we need to take care of null dates and cases when actual arrival times haven't been noted.

In order to obtain correct delay we need to take care of missing delay values by marking every field with a delay value different than "GESCHAETZT" as a  field without evaluated delay.

In [107]:
## Removing non_regular trips 

df = df[df.irregular_trip == 'false']

## Sanitizing boolean fields

columns_to_keep = []

for col_ in new_df_names:
    
    if col_ !=  'irregular_trip':
        col_obj = F.col(col_)

        if 'delay' in col_:
            col_obj = (col_obj == 'GESCHAETZT').cast('boolean').alias(col_)
        
        columns_to_keep.append(col_obj)

df = df.select(columns_to_keep)

In [108]:
## Sanitizing dates

df = replace_null_dates(df,['S_arrival_time','A_arrival_time','S_departure_time','A_departure_time'])

<a id='Computing delays'></a>
### I. f. Computing delays

We make the following assumptions :
- Negative departure delays are a mistake on the end of the public transportation service and hence will not be considered.
- Positive departure delays and negative arrival delays will always result in 100% probability of transition.
- Hence only positive arrival delays need to be computed.

In [109]:
list_selection = [('A_arrival_time','delay_arrival','S_arrival_time'),('A_departure_time','delay_departure','S_departure_time')]

In [110]:
for col_ in list_selection:
    df = df.withColumn('delay_in_secs',(F.col(col_[0])-F.col(col_[2])))
    
    #keep only those delays which are strictly positive, this also filters all rows where PROGNOSE_STATUS was False
    
    df = df.withColumn('delay_in_secs',(F.when(F.col('delay_in_secs') > 0,F.col('delay_in_secs')).otherwise(0)))
    df = df.withColumn('delay_in_secs',F.when(~df[col_[1]], F.lit(-1)).otherwise(F.col('delay_in_secs')))
    df = df.withColumn(col_[1], F.col('delay_in_secs'))
    df = df.drop('delay_in_secs')

<a id='Grouping the dataframe'></a>
### I. g. Grouping the dataframe

We will group all records of the dataframe by trip-id so that we can form relevant edges for our graph later.

In [111]:
## Kept columns

columns_df = [ col_ for col_ in df.columns if col_ != 'trip_id' ]

In [112]:
## Swap columns order

cp = columns_df[0]
idx = columns_df.index('S_departure_time')
columns_df[0] = columns_df[idx]
columns_df[idx] = cp

In [113]:
## Create a window to keep trips ordered by time inside every group

w = Window.partitionBy('trip_id').orderBy('S_departure_time')

## Sort by departure time inside every group

df_groupped = df.withColumn(
            'sorted_col',F.collect_list(F.struct(*columns_df)).over(w))

## Groupby trip_id and explode columns after sort

df_groupped = df_groupped.groupBy('trip_id').agg(F.max('sorted_col').alias('sorted_col'))
df_groupped = df_groupped.select('trip_id',*[F.col('sorted_col')[col_].alias(col_) for col_ in columns_df]).cache()

In [114]:
## Compute the number of stops in every trip

df_groupped_ = df_groupped.withColumn('nb_stops',number_of_stops('stop_name'))

## Keep only unique stop names

selection = ['stop_name']

for col_ in selection:
    df_groupped_ = df_groupped_.withColumn(col_,udf_shorten(F.struct(col_,'nb_stops')))

In [115]:
## Keep only one type for all groups (since its similar)

df_groupped_ = df_groupped_.withColumn('type',F.udf(lambda x: x[0])('type'))

<a id='Creating nodes and edges'></a>
## II. Creating nodes and edges

<a id='Making edges'></a>
### II. a. Making edges

Making use of the dataframe that had been grouped by trip-id, we further pair and group relevant columns to make an edge dataframe which basically consists of arrival/departure times/delays and stops for every trip-id as a list.

In [116]:
## Create edges for every trip

df_groupped_ = df_groupped_.withColumn('edges',edge_split('stop_name'))

In [117]:
## Move from node to edge representation

## Create a schema that will be used later to regroup trips informations

dep = ['S_departure_time','A_departure_time','delay_departure']
arr = ['S_arrival_time','A_arrival_time','delay_arrival']

dep_tupples= list(zip(dep,
               [df_groupped_.schema.fields[i].dataType for i in [1,8,9]],
               [False,False,False]))

arr_tupples =  list(zip(arr,
               [df_groupped_.schema.fields[i].dataType for i in [4,5,6]],
               [True,True,True]))

schemas = dep_tupples+arr_tupples

for tupple in schemas:
    df_groupped_=  df_groupped_.withColumn(tupple[0],
                                           F.udf(remove_unvalid(tupple[2]),tupple[1])(F.struct('nb_stops',tupple[0])))

In [118]:
## Pair actual and scheduled times

time_pairing = list(zip(dep[:-1],arr[:-1],['sheduled_trip_time','actual_trip_time']))


In [119]:
## Compute actual and scheduled trip times

tupple = time_pairing[1]
df_groupped_ = df_groupped_.withColumn(tupple[2],compute_trip_time(F.struct(F.col(tupple[1]),F.col(tupple[0]))))

In [120]:
## Drop date field 

df_groupped_ = df_groupped_.drop('date','stop_name')

In [121]:
## Keep only hours:minutes:seconds

list_dates = ['S_departure_time','A_departure_time','S_arrival_time','A_arrival_time']

for col_ in list_dates:
    df_groupped_ = df_groupped_.withColumn(col_,extract_time(col_))    

In [122]:
## Handles different types

apply_to = [(['S_arrival_time','S_departure_time','A_departure_time','A_arrival_time'],StringType()),
(['delay_arrival','delay_departure','actual_trip_time'],DoubleType())]

## Create an array of arrays at every entry where subarrays are nodes data

for (cols,type_) in apply_to:
    udf_func = make_udf(ArrayType(ArrayType(type_)),chunk_by_edge)
    
    for col_ in cols:
        df_groupped_ = df_groupped_.withColumn(col_,udf_func(F.struct(col_,'nb_stops')))

<a id='Exploding the dataframe'></a>
### II. b. Exploding the dataframe

Now we will explode our dataframe so that every row contains only information about a unique edge which defined by a trip_id, a departure station and an arrival station.

In [123]:
s = copy.deepcopy(df_groupped_.schema.fields)

In [124]:
## Trip_id, nb_stops and type

remaining_names = [s[i].name for i in [0,2,8]]

## Columns exploded schema

s_reduced = [s[i] for i in [1,3,4,5,6,7,9,10]]
col_names = [elem.name for i,elem in enumerate(s_reduced)]

for i,elem in enumerate(s_reduced):
    s_reduced[i].dataType = elem.dataType.elementType

s_reduced =  ArrayType(StructType(s_reduced))

In [125]:
## Group edge data for every row

df_groupped_ = combine_and_explode(col_names,s_reduced,df_groupped_)

In [126]:
## Drop the grouping column

col_names = ['new.'+x for x in col_names]
selection = [F.col(x).alias(x.split('.')[1]) for x in col_names] +[F.col(x) for x in remaining_names]
df_groupped_ = df_groupped_.select(*selection)    

In [127]:
tmp = df_groupped_.columns

In [128]:
## Split the edge column into two separate columns (source,target)

type_ = StructType([StructField("departure_station", StringType(), True),
                   StructField("arrival_station", StringType(), True)])
col_splitter = F.udf(lambda x: x,type_ )
df_groupped_ = df_groupped_.withColumn('struct_edges',col_splitter('edges'))
df_groupped_ = df_groupped_.select(*tmp,'struct_edges.*')

In [129]:
tmp = [
 'A_arrival_time',
 'delay_arrival',
 'A_departure_time',
 'delay_departure',
 'edges',
 'actual_trip_time',
 'trip_id',
 'type',
 'nb_stops',
 'departure_station',
 'arrival_station']

In [130]:
## Explode trip_id to multiple trips (1 per departure time)

df_groupped_ = df_groupped_.withColumn('new_col',get_unique_time(F.struct(*list_dates)))
df_groupped_ = df_groupped_.select(*tmp,'new_col.*')

In [131]:
## Explode 

list_dates = ['S_departure_time','S_arrival_time']
types = StructType([StructField('S_departure_time',StringType()),
                   StructField('S_arrival_time',StringType())])
df_groupped_ = combine_and_explode(list_dates,ArrayType(types),df_groupped_)
selection = ['new.*'] + tmp
df_groupped_ = df_groupped_.select(*selection)

In [132]:
df_groupped_ = df_groupped_.withColumn('scheduled_trip_time',udf_sec('S_arrival_time')-udf_sec('S_departure_time'))

<a id='Merging walk data and public transport data'></a>
### II. c. Merging walk data and public transport data

We modify the dataframe containing information about walkable paths so that it fits the one we built for the remaining edges and merge them.

In [133]:
## Normalize walk data

list_null_elem = ((['S_departure_time','S_arrival_time'],'-1'),
                 (['A_arrival_time','A_departure_time'],['-1']),(['delay_arrival','delay_departure'],[float(0)]),
                 (['type'],'walk'),(['nb_stops'],1))

for list_cols in list_null_elem:
    
    for col_ in list_cols[0]:
        
        if type(list_cols[1]) == list:
            cst = F.array(F.lit(list_cols[1][0]))
        
        else:
            cst = F.lit(list_cols[1])
        
        pairwise_distances = pairwise_distances.withColumn(col_,cst)

concat_columns = F.udf(lambda x: [x[i] for i in len(x)],ArrayType(StringType()))
pairwise_distances = pairwise_distances.withColumn('edges',F.array('name','name_2')).\
                     withColumn('departure_station',F.col('name')).\
                     withColumn('arrival_station',F.col('name_2')).\
                     withColumn('scheduled_trip_time',F.col('distance')/speed_walk).\
                     withColumn('actual_trip_time',F.array(F.col('distance')/speed_walk)).\
                     withColumn('trip_id',F.monotonically_increasing_id().cast('string'))
                    

In [134]:
pairwise_distances =pairwise_distances.select(*df_groupped_.columns,F.col('id').alias('src'),F.col('id_2').alias('dst')) 

In [135]:
## Nodes df

df_nodes = metadata.select('lon','lat','name','id')

In [136]:
## Add ids to edges

cols_tmp = df_groupped_.columns
df_groupped_ = df_groupped_.join(df_nodes,on=df_nodes.name == df_groupped_.departure_station)
df_groupped_ = df_groupped_.select(*cols_tmp,F.col('id').alias('src'))
df_groupped_ = df_groupped_.join(df_nodes,on=df_nodes.name == df_groupped_.arrival_station)
df_groupped_ = df_groupped_.select(*cols_tmp,'src',F.col('id').alias('dst')).cache()

In [137]:
## Edges df
df_edges = df_groupped_.union(pairwise_distances.select(*df_groupped_.columns)).cache()

<a id='Computing statistics'></a>
### II. d. Computing statistics


Since we ignore negative delays in the preprocessing steps, we can now build our model for non-negative delays. 

We assume that the non-negative arrival delays are exponentially distributed. So we begin by estimating the lambda parameter associated with a typical exponential distribution : $$f(x; \lambda) = \begin{cases}\lambda e^{-\lambda x}, & \text{for } x \geq 0 \\ 0, & \text{for } x < 0 \end{cases}$$

The maximum likelihood estimate for the lambda parameter is given by $\hat{\lambda} = \frac{1}{\mathbb{E}(X)}$.

Now, the way we evaluate goodness-of-path is by computing the probability of transition success at each transfer point. Next, we assume each transfer is independent of the other for a given trip and hence the probability of success is given by cumulatively multiplying all these probabilities. Note that we assume that walking paths have no uncertainty (i.e the probability of transition success from a train/bus/tram to a walking path is 1).

Let's say the scheduled arrival time at a stop is given by $S_a$, non-negative arrival delay by $d_a$ (a random variable, modelled via an exponential distribution) and scheduled departure by $S_d$. Mathematically, under the said assumptions, calculating probability of transition success implies finding $$\mathbb{P}(S_a + d_a < S_d) = \mathbb{P}(d_a < S_d - S_a)$$.


This is equivalent to evaluating the CDF (cumulative distribution function) of the given exponential at $S_d - S_a$. The CDF of an exponential is given by $$F(x; \lambda) = \int_{-\infty}^x f(t; \lambda) dt = \begin{cases} 1 - e^{-\lambda x}, & \text{for } x \geq 0 \\ 0, & \text{for } x < 0\end{cases}$$

In [138]:
## Exponential Distribution

df_edges = df_edges.withColumn("lambda_arrival_delay", get_lambda_mle(df_edges.delay_arrival))
df_edges = df_edges.withColumn("lambda_departure_delay", get_lambda_mle(df_edges.delay_departure))

## Saving preprocessed data

In [21]:
df_edges.write.parquet("/homes/alami/edges_2018_wd.parquet")
df_nodes.write.parquet("/homes/alami/nodes_2018_wd.parquet")



## Loading the dataframes

In [None]:
df_edges = spark.read.parquet("/homes/alami/edges_2018_wd.parquet")
df_nodes = spark.read.parquet("/homes/alami/nodes_2018_wd.parquet")

<a id='Building the graph'></a>
## III. Building the graph

We finally collect and pickle the node and edge dataframes and use NetworkX to create a Directed Multigraph, where nodes are stations and edges are trips between consecutive stations at a given departure time. Note that walking edges have no specific departure time and can be taken at every point in time. <br><br>

<figure class="image">
  <img height="75%" width="75%" src="files/Trip Graph.png" alt="Example excerpt of multi-digraph">
  <figcaption>Figure 1: Here, we show an example subgraph. Note that it is a multi-digraph, meaning that multiple directed edges can exist between a pair of nodes corresponding to the trip id, direction of trip and different schedule times. </figcaption>
</figure>

In [141]:
kept_cols = ['S_departure_time',
 'S_arrival_time',
 'trip_id',
 'type',
 'nb_stops',
 'departure_station',
 'arrival_station',
 'scheduled_trip_time',
 'src',
 'dst',
 'lambda_arrival_delay',
 'lambda_departure_delay']

In [142]:
df_edges_query = df_edges.select(*kept_cols)
final_query_matrix = df_edges_query.toPandas()
final_nodes_matrix = df_nodes.toPandas()

In [143]:
# This operation could have been done in pyspark however we noticed that it runs faster on pandas
final_query_matrix = final_query_matrix[final_query_matrix.src != final_query_matrix.dst].\
    drop_duplicates(subset=['S_departure_time','S_arrival_time','departure_station','arrival_station',
                           'trip_id'])

In [146]:
G = nx.MultiDiGraph()

_= final_nodes_matrix.apply(add_to_graph_node,axis=1)
_= final_query_matrix.apply(add_to_graph_edge,axis=1)

The following line of codes allows to load the prebuilt graph from the cluster in order to run the visualization map without any further preprocessing

In [8]:
with open('/home/alami/graph_nx.pkl','rb') as f:
    G = pickle.load(f)

In [7]:
with open('/home/alami/data_graph_2018.pkl','rb') as f:
    final_nodes_matrix,final_query_matrix = pickle.load(f)

<a id='Itinerary Generator'></a>
## IV. Itinerary Generator

To finally create relevant routes, we use a modified version of the Djikstra algorithm. For finding the shortest path from a source station to a target station at a specified departure time, we run the Djisktra algorithm and ignore all edges before the specified departure time. We also ignore edges if the time interval between connections (changing trip_id) is less than one minute. Moreover, our search algorithm also allows us to prune paths that arrive after a specified threshold time. 

Now in order to take into account uncertainty, we run the Djikstra algorithm to find the shortest path. If the latter does not meet our uncertainty threshold (i.e cumulative product of transition success probabilities in the path); we remove from the graph the edge with lowest lambda parameter in the path and rerun the search algorithm on the modified graph. This procedure is repeated iteratively until we find a path satisfying the uncertainty condition or we reach a maximum numbber of iterations. In our example we set the maximum number of iterations to 5.

The routing functions are written in a separate python file "routing_algo.py".

<a id='Mapv'></a>

## V. Map visualisation

In [147]:
CARTODBPOSITRON = get_provider(Vendors.CARTODBPOSITRON)

In [148]:
lines = []
p = figure(x_range=(8.8e5, 10e5), y_range=(59.8e5, 60.1e5),width=600)
p.add_tile(CARTODBPOSITRON)
p.axis.visible = False

source = ColumnDataSource({'lon':[6.05e5],'lat':[68e5]})

selected_time,selected_src,selected_dst,selected_cutoff,proba = None,None,None,None,None
format_str = """
 function callback(msg){
            console.log("Python callback returned unexpected message:", msg)
        }
var callbacks = {iopub: {output: callback}};
console.log(cb_obj.value)
if (IPython.notebook.kernel !== undefined) {
    var kernel = IPython.notebook.kernel;
    cmd = "%s('" + cb_obj.value + "')";
    kernel.execute(cmd , callbacks, {silent : false})
}
"""

options = [(int(i),"{}%".format(int(i))) for i in np.arange(0,101,10)]
text_input_time_departure = TextInput(value="", title="Departure time:",sizing_mode='scale_both')
text_input_time_departure.js_on_change('value', CustomJS(code=format_str%"selected_time="))
text_input_time_cutoff = TextInput(value="", title="Maximum arrival time:",sizing_mode='scale_both')
text_input_time_cutoff.js_on_change('value', CustomJS(code=format_str%"selected_cutoff="))
thresh_selector = Select(value="", title="Probability of completion:",
                                   options=options,sizing_mode='scale_both')
thresh_selector.js_on_change('value',CustomJS(code=format_str%"proba="))

text_src_station = AutocompleteInput(completions=final_nodes_matrix.name.tolist(), 
                                     title="From:",sizing_mode='scale_both')
text_src_station.js_on_change('value',CustomJS(code=format_str%"selected_src="))
text_dst_station = AutocompleteInput(completions=final_nodes_matrix.name.tolist(), title="To:",
                                     sizing_mode='scale_both')
text_dst_station.js_on_change('value',CustomJS(code=format_str%"selected_dst="))

div_path = Div(width=350,text=default_text_div,style={ 'resize': 'horizontal'})
button_validate =  Button(label="Search", button_type="success",sizing_mode='scale_both')
button_reset =  Button(label="Reset", button_type="success")

button_validate.js_on_click(CustomJS(code=format_str%"find_path"))
button_reset.js_on_click(CustomJS(code=format_str%"reset"))

## Text input as a title

## Layout widgets next to the plot                     

controls = row_l(text_input_time_departure,text_input_time_cutoff,text_src_station,text_dst_station,thresh_selector,button_validate)
layout = column_l(controls,row_l(p,div_path,width=950),button_reset,width=950,height=800)
display(HTML('''<style>
    .bk.bk-menu.bk-below {
    white-space: pre;
    text-overflow: ellipsis;
    max-height: 200px;
    scroll-behavior: smooth;
    overflow: auto;
}
</style>
'''))

with open('style.html') as f:
    display(HTML(f.read()))
    
bokeh_handle = show(layout, notebook_handle=True)

<a id='isochrone'></a>
## VI. Isochrone

An isochrone is defined as "a line drawn on a map connecting points at which something occurs or arrives at the same time". In this part, we visualize the regions on a map which are reachable  within the given cutoff time from a given source station when a passenger starts at a given start time with atleast threshold certainty.

Let us visualize the isochrone map for source=`Zürich HB`, when we start at `start time = 08:00 am`, with `cutoff time = 08:15 am` and `threshold=20%` 

In [None]:
source = 'Zürich HB'
source_coords = (G.node[source]['lat'], G.node[source]['lon'])
start_time = '08:00:00'
cutoff_time = '08:20:00'
threshold = 0.20

get_isochrone_map(source=source, source_coords=source_coords, 
               start_time=start_time, cutoff_time=cutoff_time, threshold=threshold, G=G)

The same plot with a higher threshold. We would expect that the reachable region will be a subset of the above isochrone.

In [None]:
source = 'Zürich HB'
source_coords = (G.node['Zürich HB']['lat'], G.node['Zürich HB']['lon'])
start_time = '08:00:00'
cutoff_time = '08:20:00'
threshold = 0.95

get_isochrone_map(source=source, source_coords=source_coords, 
               start_time=start_time, cutoff_time=cutoff_time, threshold=threshold, G = G)

The resulting maps can be found in the "maps/" folder

In [154]:
sc.stop()

<a id='improv'></a>
## VII. Possible optimizations

Due to time constraint,we could not implement all the features we wanted. Below are some improvement that could be added to the current pipeline:
    - Limiting the number of walks in a path by pruning consecutive walk edges
    - Impose a maximum number of hops in a trip
    - Optimize the current djiskra algorithm by sorting the adjacency lists of every node by departure time. 
    - Impute missing delay measurments