# Ricola Robust Journey Planner
This notebook contains the data pipeline and the implementation of the Journey Planner of the group *Ricola*. 

We implement a *robust* journey planner for the Zurich area. Given a desidered arrival time, the route planner computes the fastet route between departure and arrival stations and the probability of success of journey. You can find a detailed explaination of our solution in each section. 



**Table of contents**

0. Set up environment
1. Data Wrangling
    - 1a. Extract Zurich stops
    - 1b. Extract Zurich delays
    - 1c. Timetables
    - 1d. Walking paths
    - 1e. Storing the data
2. Delay prediction
3. CSA: journey planning
4. Data loading
5. Validation
6. User interface

Note that if section 1 has been run already and the precomputed data has been successfully stored, it is possible to run only sections 0 and 2 to 6. 

## 0. Set up environment

Here we connect the notebook with the Spark cluster, on which we will carry out all the distributed computations. 

In [None]:
%load_ext sparkmagic.magics

In [None]:
import os
from IPython import get_ipython
username = os.environ['RENKU_USERNAME']
server = "http://iccluster029.iccluster.epfl.ch:8998"

# set the application name as "<your_gaspar_id>-homework3"
get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell="""{{ "name": "{0}-finalproject", "executorMemory": "4G", "executorCores": 8, "numExecutors": 10, "driverMemory": "8G"}}""".format(username)
)

In [None]:
get_ipython().run_line_magic(
    "spark", "add -s {0}-finalproject -l python -u {1} -k".format(username, server)
)

---

## 1. Data Wrangling

In this section we prepare the necessary data on which our application will operate. 
We will use the following datasets: 
- SBB Istdaten (to compute the probability of success of a journey)
- SBB Allstops (to extract the stops in the Zurich area)
- SBB GTFS Timetables (to compute the possible journeys) 

In [None]:
%%spark
import pandas as pd
import pyspark.sql.functions as F

In [None]:
%%spark
sbb_istdaten = spark.read.orc('/data/sbb/part_orc/istdaten/')
allstops = spark.read.orc('/data/sbb/orc/allstops')
sbb_istdaten.printSchema()

In [None]:
%%spark
sbb_istdaten.show(5)

In [None]:
%%spark
sbb_istdaten.count()

In [None]:
%%spark
from functools import reduce

oldColumns = sbb_istdaten.schema.names
newColumns = ["date", "trip_id", "operator_id", "operator_short", "operator_name", "ttype", "train_number",
              "train_type", "umlauf_id", "train_service", "not_in_regular_schedule", "trip_failed", "bpuic",
              "stop_name", "arrival_time", "actual_arrival", "arrival_measure", "departure_time", "actual_departure",
              "departure_measure", "transport_continues", "year", "month"]

sbb_istdaten = reduce(lambda sbb_istdaten, idx: sbb_istdaten.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sbb_istdaten)
sbb_istdaten.printSchema()

In [None]:
%%spark
allstops.printSchema()

---

### 1a. Extract Zurich Stops

We start by selecting only the stations which are within 20km of the center of Zurich. Even though we assume that the user will depart from and arrive to a station lcoated within a radius of 10km from Zurich, we keep all the stations within a radius of 20km since certains journeys can include intermediate stops outside of the 10km radius.

In [None]:
%%spark
ZURICH_LAT = 47.378177
ZURICH_LON = 8.540192
MAX_DISTANCE = 20 # kms
from math import sin, cos, sqrt, atan2, radians

@F.udf
def distance(lat1, lon1, lat2=ZURICH_LAT, lon2=ZURICH_LON): 
    # approximate radius of earth in km
    R = 6373.0

    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    return R * c

In [None]:
%%spark
zurich_stops = allstops.withColumn("distance_from_zurich", distance(allstops["stop_lat"],allstops["stop_lon"]))\
                                    .filter(F.col('distance_from_zurich')<=MAX_DISTANCE).cache()
zurich_stops.show(5)
print('We obtained {0} stops'.format(zurich_stops.count()))

---

### 1b. Extract delays in Zurich

Now that we filtered only stops in Zurich, we need to filter SBB `istdaten` in order to remove all the means of transport that don't go through one of those stops. The `istdaten` dataset only contains stop names. For this reason we will perform a JOIN operation on the distinct stop names obtained from the `zurich_stops` table.

In [None]:
%%spark
distinct_zurich_stop_names = zurich_stops.select('stop_name').distinct()
zurich_istdaten = sbb_istdaten.join(distinct_zurich_stop_names, 'stop_name', 'inner')                  
zurich_istdaten.show(5)

We further remove irregular connections from the `zurich_istdaten` dataset. Besides trips that failed or were not scheduled, we remove those observations for which the arrival information is either imprecise or missing. This is done because the historical arrival dealys will be key in computing the confidence interval of a journey. 

In [None]:
%%spark
# Filter out observations that do not have arrival time or had AN_PROGNOSE_STATUS differne to REAL or GESCHAETZT
zurich_istdaten_filtered = zurich_istdaten.filter((zurich_istdaten.arrival_measure == 'REAL') | (zurich_istdaten.arrival_measure == 'GESCHAETZT'))\
                                          .filter((zurich_istdaten.arrival_time.isNotNull()) & (zurich_istdaten.arrival_time != ''))\
                                          .filter(zurich_istdaten.not_in_regular_schedule == False)\
                                          .filter(zurich_istdaten.trip_failed == False)

Now for each connection we compute the arrival delay, the day of the week when it happened and the hour of the day.  

In [None]:
%%spark
import pyspark.sql.functions as F
zurich_istdaten_delays = zurich_istdaten_filtered.withColumn('delay_arrival', F.round((F.unix_timestamp('actual_arrival', "dd.MM.yyyy HH:mm:ss") - F.unix_timestamp('arrival_time', "dd.MM.yyyy HH:mm"))/60))\
                                                 .withColumn('day_of_week', F.dayofweek(F.from_unixtime(F.unix_timestamp(zurich_istdaten_filtered.date, "dd.MM.yyyy"), "yyyy-MM-dd")))\
                                                 .withColumn('hour', F.hour(F.from_unixtime(F.unix_timestamp(zurich_istdaten_filtered.arrival_time, "dd.MM.yyyy HH:mm"), "yyyy-MM-dd HH:mm")))\
                                                 .drop("operator_id", "operator_short", "operator_name", "umlauf_id",
                                                       "train_service", "not_in_regular_schedule", "trip_failed", "bpuic",
                                                       "arrival_measure",  "transport_continues", "date", "trip_id", "train_number", "arrival_time",
                                                      "actual_arrival", "departure_time", "actial_departure", "departure_measure", "year", "month").cache()
zurich_istdaten_delays.show(5)

In [None]:
%%spark
zurich_istdaten_delays.groupBy('ttype').count().show(5)

We notice that there are two separate entries both representing buses. We thus transform BUS into Bus to have all the transportation types in the same format.

In [None]:
%%spark
zurich_istdaten_delays = zurich_istdaten_delays.withColumn('ttype', F.when(zurich_istdaten_delays.ttype == "BUS","Bus").otherwise(zurich_istdaten_delays.ttype))

In [None]:
%%spark
zurich_istdaten_delays.groupBy('ttype').count().show(5)

Furthermore, we notice that there are 27709 observations where the type of transportation is not specified. Since in the delay-prediction model we will be using historical data regarding the transportation line, we check if for these 22619 observations the `train_type` information are available. If this is not the case, we will remove these observations as they cannot be safely attributed to any transportation type.

In [None]:
%%spark
zurich_istdaten_delays.filter((zurich_istdaten_delays.ttype == '') | (zurich_istdaten_delays.ttype.isNull())).groupBy('train_type').count().show(5)

It seems that all these observation for which the type of transportation is not well defined, the transportation line is well defined. We double check this conclusion in the following cell.

In [None]:
%%spark
zurich_istdaten_delays.filter((zurich_istdaten_delays.ttype == '') | (zurich_istdaten_delays.ttype.isNull()))\
                      .filter((zurich_istdaten_delays.train_type == '') | (zurich_istdaten_delays.train_type.isNull()))\
                      .count()

Thus, no further cleaning is needed in the type of transportation column. Now we explore more in details the `train_type` column.

In [None]:
%%spark
zurich_istdaten_delays.filter((zurich_istdaten_delays.train_type == '') | (zurich_istdaten_delays.train_type.isNull())).count()

Each observation in the dataset is assigned to a line. We now analyze the information available with respect to the day and hour. Firstly, we check if there are `null` values and we have a look at unique values for each column. 

In [None]:
%%spark
zurich_istdaten_delays.groupBy('day_of_week').count().show(7)

Note: 1-Sunday, 2-Monday, 3-Tuesday, 4-Wednesday, 5-Thursday, 6-Friday, 7-Saturday

As expected, Sunday and Saturday display the lowest number of observations. As simplyfying assumption, we build the journey planner only for days of the week. Let's filter out Saturday's and and Sunday's observations 

In [None]:
%%spark
zurich_istdaten_delays = zurich_istdaten_delays.filter((F.col("day_of_week") != 1)  & (F.col("day_of_week") != 7)).cache()
zurich_istdaten_delays.groupBy('day_of_week').count().show(7)

In [None]:
%%spark
zurich_istdaten_delays.groupBy('hour').count().orderBy('hour').show(24)

All the observations have the hour properly assigned. As expected, we notice that the number of observations between midnight and 5am is significantly smaller since most of the public transport does not run during the night.

Finally, we check the information regarding the arrival delays to see the distribution. 

In [None]:
%%spark
zurich_istdaten_delays.groupBy('delay_arrival').count().orderBy('delay_arrival').show(5)
# the user can change the number of observations visaulized to display the full distribution.

It seems that there are many observations where the delay is negative. Let's understand why this might be the case

In [None]:
%%spark
zurich_istdaten_delays.filter(zurich_istdaten_delays.delay_arrival < -0).count()

A total of 1698880 observations are characterized by a negative arrival delay. 

In [None]:
%%spark
zurich_istdaten_delays.filter(zurich_istdaten_delays.delay_arrival < -40).count()

Only 599 connections arrived 40 minutes (or more) early. Let's try to understand why these few observations happened and decide whether it is safe to discard them as outliers.

In [None]:
%%spark
zurich_istdaten_delays.filter(zurich_istdaten_delays.delay_arrival < -500).groupBy('ttype').count().show(5)

In [None]:
%%spark
zurich_istdaten_delays.filter(zurich_istdaten_delays.delay_arrival < -500).groupBy('train_type').count().orderBy(F.col("count").desc()).show(5)

We noticed that all the very extreme values are bus rides and, specifically, the lines 231, 352, and 350. It thus seems reasonable to assume that these extreme values are probably due to changes in the schedule. 
To prevent affecting the estimation of the probability significantly with these absurd values, we decide to drop all the observations for which the arrival "delay" is smaller than -40 minutes. 

Similarly, some connections are characterised by very large delays. While this is more natural than very early arrivals (e.g. there could be an accident in the highway and then a bus can be stuck in traffic for multiple hours), we believe that observations displaying more than 11 hours of delay are most likely due to external factors. Thus we remove delays larger than 700 minutes.

In [None]:
%%spark
zurich_istdaten_delays = zurich_istdaten_delays.filter((F.col("delay_arrival") > -40)  & (F.col("delay_arrival") < 700)).cache()
zurich_istdaten_delays.count()

In [None]:
%%spark
zurich_istdaten_delays.select(F.mean('delay_arrival')).collect()

In [None]:
%%spark
zurich_istdaten_delays.groupBy('ttype').agg(F.mean('delay_arrival')).show()

As expected, busses seam to display the largest average delay, probably due to the stronger dependency with traffic and external factors out of the control of the service provider. 

As better explained in the `Delay Prediction` section, the probability of succes of a journey is computed by multiplying the probability of success of each connection, where connection is defined as the quadruple (day_of_week, hour, location, train_type). To do so, we already precompute the empirical comulative distribuion of the arrival delays for each quadruple. 

In [None]:
%%spark
zurich_istdaten_delays = zurich_istdaten_delays.groupBy("stop_name", "ttype", "day_of_week", "hour", "delay_arrival", "train_type").count()

Finally, for future purposes, we encode the `stop_name` using the utf-8 encoding.

In [None]:
%%spark
zurich_istdaten_delays = zurich_istdaten_delays.withColumn('stop_name', F.encode(F.col('stop_name'), "utf-8"))

---

### 1c. Timetables
We craete 5 tables, one for each weekday, of the week between Monday, May 13th 2019 to Friday, May 17th 2019. The journey planner will provide trips using this timetable as reference. 

In [None]:
%%spark
# Load stop times for the corresponding week
stop_times = spark.read.csv('/data/sbb/csv/stop_times/2019/05/15/stop_times.txt', header=True, encoding='utf8')\
    .drop('pickup_type','drop_off_type')\
    .filter((F.col('departure_time') < '24') & (F.col('arrival_time') < '24'))
# Only take data for the city of Zurich
zurich_stop_times = stop_times.join(zurich_stops.select('stop_id', 'stop_name'), 'stop_id').drop('stop_id')

# Duplicate the table to separate departures and arrivals
departures = zurich_stop_times\
    .withColumn('connection_id', F.concat('trip_id',F.lit('_'),'stop_sequence'))\
    .withColumnRenamed('stop_name', 'departure_stop')\
    .drop('arrival_time', 'stop_sequence')
arrivals = zurich_stop_times\
    .withColumn('connection_id', F.concat('trip_id',F.lit('_'),(F.col('stop_sequence')-1).cast('integer')))\
    .withColumnRenamed('stop_name', 'arrival_stop')\
    .drop('departure_time', 'stop_sequence', 'trip_id')

# Join departures and arrivals to obtain rows of connections between stopsb
connections = departures.join(arrivals, 'connection_id')\
    .drop('connection_id')\
    .cache()

connections.show(5, truncate=False)

The obtained table does not discriminate between days of the week, such information can be found in the `calendar.txt` dataset. 
To join our `connections` table to `calendar` we need to perform an intermediate join with `trips.txt`.

In [None]:
%%spark
trips = spark.read.csv('/data/sbb/csv/trips/2019/05/15/trips.txt', header=True, encoding='utf8').select('trip_id', 'service_id', 'route_id')
connections_extended = connections.join(trips, 'trip_id')

calendar = spark.read.csv('/data/sbb/csv/calendar/2019/05/15/calendar.txt', header=True, encoding='utf8').drop('start_date','end_date')
connections_with_days = connections_extended.join(calendar, 'service_id')

We also need to include information about the means of transport. Let's retrieve the name and the type for each connection.

We will adopt some simplifying assumptions in order to avoid irregularities in data. Let's only take the most frequent means of transport.

In [None]:
%%spark
routes = spark.read.csv('/data/sbb/csv/routes/2019/05/15/routes.txt', header=True, encoding='utf8').select('route_id', 'route_short_name', 'route_desc')
connections_with_means_names = connections_with_days\
    .join(routes, 'route_id')

connections_with_means_names.groupBy('route_desc')\
    .count()\
    .orderBy(F.col('count').desc())\
    .show()

The most frequent means of transport are Bus, Tram and two different type of trains. We will feed only these connections to our route-finding algorithm.

We also remove fabricated rows.

In [None]:
%%spark
connections_filtered = connections_with_means_names\
    .filter(F.col('route_desc').isin(['Bus', 'Tram', 'S-Bahn', 'InterRegio']))\
    .filter(~F.col('route_short_name').contains('Y'))

To allow interactions between `timetables` and `istdaten` later down the pipeline, we need to retrieve short names and types of the 4 selected means of transport.

In [None]:
%%spark
@F.udf
def retrieve_transportation_name(route_desc, route_short_name):
    def train_name(train_name, train_number):
        long_to_short_name = {
            'S-Bahn':'S',
            'InterRegio':'IR'
        }
        return long_to_short_name[train_name]+str(train_number)
    
    return route_short_name if route_desc in ['Bus', 'Tram'] else train_name(route_desc, route_short_name)

@F.udf
def retrieve_transportation_type(route_desc, route_short_name):    
    return route_desc if route_desc in ['Bus', 'Tram'] else 'Zug'

weekly_timetable = connections_filtered\
    .withColumn('route_short_name',F.col('route_short_name').cast('int'))\
    .withColumn('transportation_name', retrieve_transportation_name(F.col('route_desc'), F.col('route_short_name')))\
    .withColumn('transportation_type', retrieve_transportation_type(F.col('route_desc'), F.col('route_short_name')))\
    .drop('route_id', 'service_id', 'route_desc', 'route_short_name')

weekly_timetable.show(5)

In [None]:
%%spark
weekly_timetable.select('transportation_name','transportation_type').distinct()

---

### 1d. Walking paths

We allow short (max 500m) walking distances for transfers between two stops, and assume a walking speed of 50m/1min on a straight line, regardless of obstacles, human-built or natural, such as building, highways, rivers, or lakes.

In [None]:
%%spark
stops_relevant_info = zurich_stops.select('stop_name', 'stop_lat', 'stop_lon')
walking_departure_stops = stops_relevant_info.alias('departure')\
    .withColumnRenamed('stop_name', 'dep_stop_name')
walking_arrival_stops = stops_relevant_info.alias('arrival')\
    .withColumnRenamed('stop_name', 'arr_stop_name')
all_walks = walking_departure_stops.crossJoin(walking_arrival_stops)\
    .withColumn('dep_arr', F.concat('dep_stop_name','arr_stop_name'))\
    .dropDuplicates(['dep_arr'])\
    .drop('dep_arr')\
    .filter(F.col('dep_stop_name') != F.col('arr_stop_name'))

In [None]:
%%spark
short_walks = all_walks.withColumn('distance', distance('departure.stop_lat', 'departure.stop_lon', 'arrival.stop_lat', 'arrival.stop_lon'))\
    .filter('distance < 0.5')\
    .withColumn('duration', F.ceil(F.col('distance') / 0.05))\
    .select('dep_stop_name', 'arr_stop_name', 'duration')\
    .withColumnRenamed('dep_stop_name', 'departure')\
    .withColumnRenamed('arr_stop_name', 'arrival')
short_walks.show(5, truncate=False)

---

### 1e. Store the data

We store all the data in our folder on the IC cluster. This way we can avoid re-computing everything for each query.  

##### Zurich stops

In [None]:
%%spark
zurich_stops.write.mode('overwrite').option("header","true").format("csv").save('/group/ricola/zurich_stops.csv')

##### Zurich delays

In [None]:
%%spark
zurich_istdaten_delays.write.mode('overwrite').option("header","true").partitionBy("day_of_week","hour").format("orc").save('/group/ricola/zurich_istdaten_delays.orc')

##### Timetable

In [None]:
%%spark 
import pickle
def create_weekday_dataframe(weekday_string):
    return (weekly_timetable
            .filter(F.col(weekday_string) == '1')
            .select('trip_id','departure_time','departure_stop','arrival_time','arrival_stop','transportation_name','transportation_type'))

for day in ['monday', 'tuesday', 'wednesday', 'thursday', 'friday']:
    create_weekday_dataframe(day).write.mode('overwrite').option("header","true").format("csv").save('/group/ricola/'+day+'.csv')

##### Walking paths

In [None]:
%%spark
short_walks.write.mode("overwrite").option("header","true").format("csv").save('/group/ricola/short_walks.csv')

---

## 2. Delay prediction

Now that the historical data have been properly processed, we can develop our predictive model. 

Given a specific journey plan, we estimate the probability that the user will arrive at destination by multiplying the probability of arriving on time to catch each connection and at the destination. That is:

$P(\text{journey is successful}) = \prod_{j=1}^{C} P(\text{connection j is successful}) \\
\qquad \qquad \qquad \qquad \! \; \; \,= \prod_{j=1}^{C} P(\text{arrival } c_{j} \leq \text{time available until next departure})$

To do so, we will group the historical data according to the following criteria:
- `stop_name`: the bus/train/metro station where the user is taking the means of transport
- `day_of_week`: which day the user is travelling
- `hour`: at which hour of the day the next public transport is scheduled
- `train_type`: the line on which the user is travelling to arrive at the station being studied

Then, we can calculate the probability of succesfully catching the next connection as the percentage of times that the arriving connection arrived within `time_buffer` minutes of delay, where `time_buffer` is the difference between the scheduled arrival time and the next departure time, including the required time to change which we assume to be 1 minute. 

In [None]:
%%spark
import pandas as pd
import pyspark.sql.functions as F

In [None]:
%%spark
def compute_probability(stop_name, day_of_week, train_type, hour, ttype, time_buffer):
    total = zurich_istdaten_delays.filter((zurich_istdaten_delays.stop_name == stop_name) & 
                                                    (zurich_istdaten_delays.day_of_week == day_of_week) &
                                                    (zurich_istdaten_delays.train_type == train_type) &
                                                    (zurich_istdaten_delays.hour == hour) &
                                                    (zurich_istdaten_delays.ttype == ttype)).select(F.sum('count')).collect()[0][0]
    # If not enough information available
    if total <= 10:
        return 1.0
    filtered = zurich_istdaten_delays.filter((zurich_istdaten_delays.stop_name == stop_name) & 
                                                    (zurich_istdaten_delays.day_of_week == day_of_week) &
                                                    (zurich_istdaten_delays.train_type == train_type) &
                                                    (zurich_istdaten_delays.hour == hour) &
                                                    (zurich_istdaten_delays.delay_arrival <= time_buffer) &
                                                    (zurich_istdaten_delays.ttype == ttype)).select(F.sum('count')).collect()[0][0]

    confidence = float(filtered) / float(total)
    
    return confidence

---

## 3. CSA: journey planning



To provide the best possible journey between two stop we implement a modified version of the *Connection Scan Algorithm*. 

Our algorithm computes the path backwards starting from the arrival stop to the departure one. The pseudocode of our implementation is shown below. 

<img src="../notebooks/CSA-Ricola-implementation.jpg">

In [None]:
%%spark
from datetime import time, datetime, date
import pandas as pd
def add_time(t, d):
    return (datetime.combine(date(1,1,1), t) + d).time()

In [None]:
%%spark
def find_path(S, starting_stop):
    path = []
    # step is the current connection
    step = S[starting_stop]
    old_trip_id = step["trip_id"]
    # to add is the route we're covering
    to_add = step
    # while we have not reached the end of the path
    while step["trip_id"]!= None:
        # if the current trip_id is different than the previous one
        if step["trip_id"] != old_trip_id:
            path.append(to_add)
            to_add = step
            old_trip_id = step["trip_id"]
        else:
            to_add["arrival_time"] = step["arrival_time"]
            to_add["arrival_stop"] = step["arrival_stop"]
        step = S[step["arrival_stop"]]
    path.append(to_add)
    return path

In [None]:
%%spark
from collections import defaultdict

delta = pd.to_timedelta(2, unit='m')

def CSA(starting_stop, arrival_stop, arrival_time, connections, avoid=[]):
    '''
    Connections Scan Algorithm to maximize the starting time
    
    Return: List of trips, where each trip is a list of 5 elements: 
                [means of transport (trip ID or 'walking'), starting time (datetime), starting_stop (name), arriving_time (datetime), arriving stop (name)]
                
    '''   
    
    # Best possible connection
    latest_departure_from = defaultdict(lambda: pd.Series(data = [None, time.min, None, time.max, None], index = ['trip_id', 'departure_time', 'departure_stop', 'arrival_time', 'arrival_stop']))
    latest_departure_from[arrival_stop] = pd.Series(data = [None, arrival_time, None, time.max, None], index = ['trip_id', 'departure_time', 'departure_stop', 'arrival_time', 'arrival_stop'])
    
    # For all footpaths where footpath.arrival is arrival_stop
    for i, footpath in footpaths[ footpaths["arrival"] == arrival_stop ].iterrows():
        # The latest departure is given by  #### arrival_time - footpath["duration"]
        latest_departure_from[footpath['departure']] = pd.Series(data = ['Walking', add_time(arrival_time, -footpath['duration']), footpath['departure'], arrival_time,arrival_stop, 'Walking', ''], index = ['trip_id', 'departure_time', 'departure_stop', 'arrival_time', 'arrival_stop', 'transportation_type', 'transportation_name'])
       
    # Take all connections via means of transport which arrive before wanted arrival time
    connections = connections[connections['arrival_time']<arrival_time]
    # We search among the connections whose arrival time is smaller than our arrival_time 
    for i, connection in connections.iterrows():
        # If the connection is not to be avoided (due to previously examined paths)
        if not (connection["trip_id"] in avoid): 
            # If the latest departure time from the starting station is greater than the arrival
            # time of the current connection, we have already found the optimal path
            if latest_departure_from[starting_stop]["departure_time"] >= connection["arrival_time"]:
                break
            #If the best trip for the next station is reachable in time 
            if latest_departure_from[connection["arrival_stop"]]["trip_id"] == connection["trip_id"] or (latest_departure_from[connection["arrival_stop"]]["departure_time"] >= add_time(connection["arrival_time"], delta)):
                # Check if the connection that allows the latest departure time is the current one
                if latest_departure_from[connection["departure_stop"]]["departure_time"] < connection["departure_time"]:
                    latest_departure_from[connection["departure_stop"]] = connection
                    # We add footpath information for nearby stations
                    for i, footpath in footpaths[ footpaths["arrival"] == connection["departure_stop"] ].iterrows():
                        if latest_departure_from[footpath["departure"]]["departure_time"] < (add_time(connection["departure_time"], - footpath["duration"])):
                            latest_departure_from[footpath["departure"]] = pd.Series(data = ['Walking', add_time(connection["departure_time"],-footpath["duration"]), footpath['departure'], connection["departure_time"], footpath["arrival"], 'Walking', ''], index = ['trip_id', 'departure_time', 'departure_stop', 'arrival_time', 'arrival_stop', 'transportation_type', 'transportation_name'])
            
    if latest_departure_from[starting_stop]["trip_id"] is not None:
        return find_path(latest_departure_from, starting_stop)
    else:
        return []



In [None]:
%%spark
week = {'Monday':2, 'Tuesday':3, 'Wednesday':4, 'Thursday':5, 'Friday':6}

def difference(t1, t2):
    return (datetime.combine(date(1,1,1), t1)-datetime.combine(date(1,1,1), t2)).seconds//60

def confidence_interval(path, week_day, arrival_time):
    p = 1.0
    if len(path) < 1:
        return 1
    
    for i,trip in enumerate(path): 
        if trip["trip_id"] != "Walking":
            # if it is not the last connection, we compare its arrival delay with the departure of the next connection minus the change time (1 minute)
            if i < len(path)-1:
                input_prob = (trip["arrival_stop"], week[week_day], trip["transportation_name"], trip["arrival_time"].hour, trip["transportation_type"], difference(path[i+1]["departure_time"], trip["arrival_time"]) - 1)
                p = p * compute_probability(input_prob[0],input_prob[1],input_prob[2],input_prob[3],input_prob[4],input_prob[5])
            # if it is the last connection, then we compare its arrival delay with the desired arrival time of the user    
            if i == len(path)-1:
                input_prob = (trip["arrival_stop"], week[week_day], trip["transportation_name"], trip["arrival_time"].hour, trip["transportation_type"], difference(arrival_time, trip["arrival_time"]))
                p = p * compute_probability(input_prob[0],input_prob[1],input_prob[2],input_prob[3],input_prob[4],input_prob[5])                
    return p

def fix_walking(path):
    if len(path)<2:
        return path
    for i,trip in enumerate(path): 
        if trip["trip_id"] == "Walking" and i>0:
            diff = difference(trip["departure_time"], path[i-1]["arrival_time"])
            trip["departure_time"] = add_time(trip["departure_time"], pd.to_timedelta(- diff, unit='m'))
            trip["arrival_time"] = add_time(trip["departure_time"], pd.to_timedelta(- diff, unit='m'))
    return path


def query(from_, to_, arrival_time, week_day, min_prob):
    
    df = timetables[week_day.lower()]
    
    avoid=[]
    alternatives = []
    probabilities = []
    i = 0
    prob = 0
    
    # Check if stop is in dataset
    if ((from_ not in df["departure_stop"].values) | (to_ not in df["arrival_stop"].values)) :
        return [], ["missing"]
    
    for i in range(3):
    
        path = CSA(from_, to_, arrival_time, df, avoid)
        if (len(path) > 0) :
            for c in reversed(path):
                if c["trip_id"] != "Walking":
                    avoid.append(c["trip_id"])
                    break
            
            prob = confidence_interval(path, week_day, arrival_time)
            path = fix_walking(path)

            if prob >= min_prob:
                alternatives.append(path)
                probabilities.append(prob)

            if prob == 1:
                break
    
    return alternatives, probabilities


import json
from pyspark.sql.types import StringType
def queryFromLocalContext(from_, to_, arrival_time, week_day, min_prob) : 
    """
    In order to send the data to the local context, we need to embedded the output in a Spark DataFrame, which can be serialized and sent through the network. 
    In particular, we use the python library `json` to convert the output of the algorithm (a list of journeys) into a string which can be embedded in a Spark DataFrame. 
    """
    journeys, probabilities = query(from_, to_, arrival_time, week_day, min_prob)
    return spark.createDataFrame([json.dumps([[{k: str(v) for k, v in x.to_dict().items()} for x in list_] for list_ in journeys]), json.dumps(probabilities)], StringType())


## 4. Data loading

In this section we load all the precomputed data of section 1. 
Note that the Ricola Journey planner can be ran by simply executing sections 2 to 6, skipping section 1, provided that the precomputed data is already stored on the cluster. 
In particular, the filtered `istdaten` dataset, used to compute the probabilities is loaded using PySpark, while the other datasets are loaded into the memory of the master node, as pandas dataframes. 

##### Timetable

In [None]:
%%spark
from pyspark.sql import functions as F
def getDatasetOfDay(week_day) : 
    df = spark.read.option("header",True).option("encoding", "UTF-8").csv("/group/ricola/"+week_day.lower()+".csv").toPandas()
    df['arrival_time'] = pd.to_datetime(df['arrival_time'], format= '%H:%M:%S' ).dt.time
    df['departure_time'] = pd.to_datetime(df['departure_time'], format= '%H:%M:%S' ).dt.time
    
    for column in ["trip_id", "departure_stop", "arrival_stop"] : 
        df[column] = df[column].apply(lambda x: x.encode("utf-8"))

    df = df.sort_values(by=['arrival_time'], ascending=False)
    return df

timetables = {}
for day in ['monday', 'tuesday', 'wednesday'] : 
    timetables[day] = getDatasetOfDay(day)

In [None]:
%%spark
for day in ['thursday', 'friday'] : 
    timetables[day] = getDatasetOfDay(day)

##### Zurich delays

In [None]:
%%spark
zurich_istdaten_delays = spark.read.option("header",True).option("encoding", "UTF-8").orc("/group/ricola/zurich_istdaten_delays.orc").cache()

##### Walking paths

In [None]:
%%spark
short_walks = spark.read.option("header",True).option("encoding", "UTF-8").csv("/group/ricola/short_walks.csv").toPandas()
short_walks["duration"] = short_walks["duration"].apply(lambda x: pd.to_timedelta(int(x), unit='m'))

for column in ["departure", "arrival"] : 
    short_walks[column] = short_walks[column].apply(lambda x: x.encode("utf-8"))
footpaths = short_walks
journey_planning_output = spark.createDataFrame([], StringType())

##### Zurich stops
We need to move locally the dataset containing all the stops in Zurich. 

In [None]:
data_filepath = os.path.join('../data')
if 'zurich_stops' not in os.listdir(data_filepath):
    !/usr/hdp/current/hadoop-3.1.1/bin/hdfs dfs -copyToLocal /group/ricola/zurich_stops.csv ../data/zurich_stops

In [None]:
import os
import pandas as pd
import glob
import pandas as pd
csv_file = glob.glob("../data/zurich_stops/*.csv")[0]
zurich_stops = pd.read_csv(csv_file)
zurich_stops = zurich_stops.drop(columns=["stop_id", "location_type", "parent_station", "distance_from_zurich"]).sort_values(by=["stop_name"])
#zurich_stops["stop_name"] = zurich_stops["stop_name"].apply(lambda x: x.encode("utf-8"))
# We convert it to a dictionary with key `stop_name` and value latitute and longitude
zurich_stops = dict(zip(zurich_stops["stop_name"], zip(zurich_stops["stop_lat"], zurich_stops["stop_lon"])))

# 5. Validation

#### Validation CSA algorithm

We first validate the output of the CSA algorithm by comparing the suggested trips to those indicated on SBB and Google Maps. The trips we will try are the following:
- From "Zürich Oerlikon" to "Zürich Wollishofen" on a Wednesday with arrival within 11:20am
- From "Ringlikon" to "Rümlang" on a Monday with arrival within 7:20pm (This trip is within the city of Zurich and thus shoudl test Trams and Buses)
- From "Dübendorf" to "Zürich Altstetten" on a Tuesday with arrival within 3pm 


In [None]:
%%spark
result = CSA('Zürich Oerlikon', 'Zürich Wollishofen', time(11, 20, 0), timetables['wednesday'], avoid = [])
result

We notice that the selected journey corresponds to what is suggested on the SBB app. This is a relatively easy journey as the two selected stations are directly connected by public transports, however it works well as a simple proof of concept. 

In the next cell we test two stations that are not directly connected and thus require to change the mean of transportation.

In [None]:
%%spark
result = CSA('Dübendorf', 'Zürich Altstetten', time(14, 55, 0), timetables['monday'], avoid = [])
result

Once again, the outcome of our implementation of the CSA algorithm corresponds to the journey suggested on the SBB app.

As a final test we experiment with two locations that are hardly reacheable with public transport and involve multiple changes. As can be seen if tried on Google Maps, the fact that many changes are need adds many dergees of freedom to the algorithm and thus the outcome is likely to be different compared to that suggested on the SBB app.

In [None]:
%%spark
result = CSA('Wildpark-Höfli', 'Küsnacht Goldbach', time(19, 20, 0), timetables['tuesday'], avoid = [])
result

We notice that the solution of our algorithm corresponds to that from the SBB app. 

#### Validation Robust Journey Planner

Using the same journeys as above, we validate now the overall architecture of our Journey Planner, including the computation of the probability of success of a trip. For now, we set the minimum probability to be 0.5.

In [None]:
%%spark
result = query('Zürich Oerlikon', 'Zürich Wollishofen', time(11, 20, 0), 'Wednesday', 0.5)
result

We notice that the Planner returns three trips and theirs probabilities of success are estimated to be relatively high. 

The first suggestion is the direct train and its high proabbility of success is reasonable as the arrival time is 6 minutes before the required arrival time. The second trip includes a change at the Zuirch station with 3 miuntes to change. We acknowledge that, given the dimensions of Zurich HB, this might be an overestimation of the probability of success. For the next version of the planner, we could include information about the platform of arrival-departure to improve the precision of the probability estimation.

In [None]:
%%spark
result = query('Dübendorf', 'Zürich Altstetten', time(14, 55, 0), 'Monday', 0.5)
result

In [None]:
%%spark
result = query('Wildpark-Höfli', 'Küsnacht Goldbach', time(19, 20, 0), 'Tuesday', 0.5)
result

# 6. User interface

Our solution involves computation of journeys on the cluster. Although we possess limited computational resources in this project, we thought that such a project in a real-world situation would involve the computation of journeys and related probabilities in a distributed system, consequently we looked for a way to enable I/O communication of our GUI with the cluster. 

##### Connection with Spark
The GUI we provided communicates with the cluster sending HTTP requests and uses the jupyter magic `%spark` to retrieve the output. In order to make it work, we need to find out the Spark session id. 

In [None]:
%%spark -o application_id
from pyspark.sql.types import StringType
application_id = spark.sparkContext.applicationId
application_id = spark.createDataFrame([application_id], StringType())

In [None]:
import json, requests
def find_session_id(sessions_json, application_id) : 
    for session in sessions_json : 
        if session["appId"] == application_id : 
            return session['id']
    return None

host = server
headers = {'Content-Type': 'application/json'}
sessions_url = f"{host}/sessions"
sessions_request = requests.get(sessions_url, headers=headers)
session_id = find_session_id(sessions_request.json().get('sessions'), application_id["value"][0])
print("Your session id is", session_id)

In [None]:
import time
def send_request_to_spark(from_, to, arrival_hour, arrival_minute, weekday, min_prob) : 
    """
    This function forwards a request to the cluster. We use an HTTP request to execute some code in the cluster (we execute the code contained in the variable `pyspark_code`). 
    The global variable `session_id`, which corresponds to the Spark session id on the cluster, has to be defined prior to running this function. 
    
    Returns a list of pandas DataFrames
    """
    global journey_planning_output
    
    # Configure remote host
    host = server
    headers = {'Content-Type': 'application/json'}
    sessions_url = f"{host}/sessions"
    statements_url = f"{sessions_url}/{session_id}/statements"
    
    # Set function call on Spark and output variable
    function = """queryFromLocalContext("{0}", "{1}", time({2}, {3}, 0), "{4}", {5})""".format(from_, to, arrival_hour, arrival_minute, weekday, min_prob)
    var_name = "journey_planning_output"

    # Build pyspark code
    pyspark_code = u"""del {0}; {0} = {1}""".format(var_name, function)

    # Send request
    b1 = requests.post(statements_url, data=json.dumps({'code': pyspark_code}), headers=headers)
    b1.json()
    
    journey_planning_output = ""
    while (len(journey_planning_output) == 0) : 
        time.sleep(1)
        %spark -o journey_planning_output -n -1
    
    output_journeys = json.loads(journey_planning_output["value"][0])
    output_probabilities = json.loads(journey_planning_output["value"][1])

    # Construct a list of pandas DataFrames from the output
    return [pd.DataFrame([pd.Series(trip) for trip in journey]) for journey in output_journeys], output_probabilities

bubu = "asd";

In [None]:
import pandas as pd
days_of_week = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]

example_journey = pd.DataFrame([
    ["Train IC152", 47.37809, 8.54031, 47.38523, 8.54254, "Zurich HB", "Ottikerstrasse"], 
    ["Bus 421", 47.38523, 8.54254, 47.37777, 8.54829, "Ottikerstrasse", "ETH/Universitatsspital"], 
    ["Foot 500m", 47.37777, 8.54829, 47.36718, 8.54513, "ETH/Universitatsspital", "Bellevue"]
], columns=["means_of_transport", "start_latitude", "start_longitude", "arrival_latitude", "arrival_longiture", "start_station", "arrival_station"])
example_journeys = [example_journey, example_journey, example_journey]

##### Graphical user interface

In [None]:
import plotly.express as px
import plotly.graph_objects as go

MAPBOX_TOKEN_GORSI = "pk.eyJ1IjoiZ2lhY29tb29yc2kiLCJhIjoiY2pubTM0Nml6MW02MDNwcWY0ajc3ZHE3diJ9.fz0p1ZmseERTYVzXJPqS0Q"
GEOPS_API_KEY = "5cc87b12d7c5370001c1d655ec23ec0cdd5242b08c54811292516f60"
px.set_mapbox_access_token(MAPBOX_TOKEN_GORSI)

from ipywidgets import Accordion

# Results page generator 
def show_results(journeys, probabilities) : 
    
    if (len(journeys) == 0) : 
            return HTML("<h3>We are sorry, we couldn't find any journey with the given parameters. ")
    
    def generate_summary_journey(journey, i) : 
        last_trip = journey.shape[0]-1
        out = "{0} ({1}) -> {2} ({3})   ({4} changes, probability {5})".format(journey["departure_stop"][0], journey["departure_time"][0], journey["arrival_stop"][last_trip], journey["arrival_time"][last_trip], journey.shape[0]-1, round(probabilities[i], 2))
        
        return out
    
    def generate_summary_trip(trip) : 
        return "{0} {1} from {2} ({3}) to {4} ({5})".format(trip["transportation_type"], trip["transportation_name"], trip["departure_stop"], trip["departure_time"], trip["arrival_stop"], trip["arrival_time"])
        
    
    def generate_maps(journeys) : 
        
        line_colors = {'Zug':'red', 'Walking':'grey', 'Tram': 'brown', 'Bus': 'blue'}
        
        maps = []
        for journey in journeys : 
            fig = go.FigureWidget(layout={"autosize":False, "width":1000})
            for (i, row) in journey.iterrows() : 
                fig.add_trace(go.Scattermapbox(
                    mode = "markers+lines",
                    lat = [zurich_stops[row["departure_stop"]][0], zurich_stops[row["arrival_stop"]][0]],
                    lon = [zurich_stops[row["departure_stop"]][1], zurich_stops[row["arrival_stop"]][1]],
                    marker = {'size': 15},
                    line = go.scattermapbox.Line(width=(2 if row["transportation_type"] == "Walking" else 5), color=line_colors.get(row["transportation_type"], 'black')),
                    text = generate_summary_trip(row),
                    name = "{} {}".format(row["transportation_type"], row["transportation_name"])  
                )
                )
            fig.update_geos(
                projection_type="orthographic"
            )
            fig.update_layout(
                autosize=False,
                margin ={'l':0,'t':0,'b':0,'r':0},
                width=1000,
                mapbox = dict(
                    style="https://maps.geops.io/styles/base_bright_v2/style.json?key="+GEOPS_API_KEY,
                    accesstoken=MAPBOX_TOKEN_GORSI,
                    center=dict(lat=47.37809, lon=8.54031),
                    zoom=11
                ),
            )
            fig.update_layout(
                width=1000    
            )

            maps += [fig]
        return maps
    
    def on_click_accordion(e) :
        # Update widgets every time a user clicks on the accordion in order to reload the map
        output.clear_output(wait=True)
        with output:
            display(VBox([form, results_view]))
    
    maps = generate_maps(journeys)
    solutions = []
    for (i, journey) in enumerate(journeys) : 
        elements = []
        for (j, trip) in journey.iterrows() : 
            
            elements += [HBox([Label(generate_summary_trip(trip))])]

        elements += [maps[i]]#[[go.FigureWidget(maps[i].to_dict())]
        solutions += [VBox(elements)]

    accordion = widgets.Accordion(children=solutions, selected_index=None)

    for (i, journey) in enumerate(journeys) : 
        accordion.set_title(i, generate_summary_journey(journey, i))


    footer = HTML("<h3>Bubuxino</h3>")
    results_page = VBox([HTML("<h1>Results</h1>"), accordion, footer])
    
    accordion.observe(on_click_accordion, names="selected_index")
        
    return accordion

In [None]:
from ipywidgets import Layout, Button, Textarea, Dropdown, Label, IntSlider, Combobox, BoundedIntText, HTML, HBox, GridBox, VBox
import ipywidgets as widgets
from IPython.display import display, clear_output


### Graphical interface view controller


departure_station_dropdown = Dropdown(options=list(zurich_stops.keys()), ensure_option=True, disabled=False)
arrival_station_dropdown   = Dropdown(options=list(zurich_stops.keys()), ensure_option=True, disabled=False)
confidence_slider          = IntSlider(min=0, max=100, value=80)
day_of_week_dropdown       = Dropdown(options=days_of_week)
hour_text                  = BoundedIntText(value=12, min=0, max=23, disabled=False, layout=Layout(width="50px"))
minute_text                = BoundedIntText(value=30, min=0, max=59, disabled=False,  layout=Layout(width="50px"))
search_button              = Button(description='Search journey', disabled=False, button_style='info', tooltip='Click here to search for a journey', icon='search')

loader                     = HTML("""<style>.loader{border:5px solid #f3f3f3;border-radius:50%;border-top:5px solid #3498db;width:20px;height:20px;-webkit-animation:spin 2s linear infinite;animation:spin 2s linear infinite}@-webkit-keyframes spin{0%{-webkit-transform:rotate(0deg)}100%{-webkit-transform:rotate(360deg)}}@keyframes spin{0%{transform:rotate(0deg)}100%{transform:rotate(360deg)}}</style>
<div style="margin-top:20px">Request received. Please wait... <div class="loader" style="display:inline-block;vertical-align:middle"></div></div>""")
error_validation_view      = HTML("""<h4 style="margin-top: 20px">Please check the data you have inserted</h4>""")

form_items = [
    Label(value='Departure Station'), departure_station_dropdown,
    Label(value='Arrival Station'), arrival_station_dropdown,
    Label(value='Day of the week'), day_of_week_dropdown,
    Label(value='Time of arrival'), HBox([hour_text,Label(value=":"), minute_text]),
    Label(value='Minimum confidence'), confidence_slider
]

grid = GridBox(form_items, layout=Layout(grid_template_columns="repeat(2, 300px)"))
form = VBox([HTML("<h1>Ricola Journey Planner</h1>"), grid, search_button], layout=Layout(padding="20px", border="solid 2px", width="650px"))




In [None]:
output = widgets.Output()
results = None

def on_search(b):
    form_data = {"departure":departure_station_dropdown.value, "arrival":arrival_station_dropdown.value, "day": day_of_week_dropdown.value, "hour": hour_text.value, "minute": minute_text.value, "confidence" : float(confidence_slider.value)/100, "weekday": day_of_week_dropdown.value }
    if (check_gui_query(form_data)) : 
        execute_query(form_data)
    else :
        with output: 
            output.clear_output(wait=True)
            display(VBox([form, error_validation_view]))
            
def check_gui_query(q) : 
    stations = list(zurich_stops.keys())
    bubu =  (q["departure"] in stations) and (q["arrival"] in stations) and (q["day"] != "") and (q["hour"] != "") and (q["minute"] != "") and (q["confidence"] != "")
    return bubu


def execute_query(data) : 
    global results_view 
    output.clear_output(wait=True)
    with output: 
        display(VBox([form, loader]))

        journeys, probabilities = send_request_to_spark(data["departure"], data["arrival"], data["hour"], data["minute"], data["weekday"], data["confidence"])
        #print(journeys)

        results_view = show_results(journeys, probabilities)
        output.clear_output(wait=True)

        display(VBox([form, results_view]))

search_button.on_click(on_search)

In [None]:
# Visualize the interface
with output : 
    display(form)
    
output

**PS** Do not get scared of "Current status is busy", a suggested trip is underway!

#### Aknowledgements
The map shown above is the official SBB map that can be found on the SBB website and apps. We contacted geOps GmbH, that is the company that manages the map platform for SBB and they were so kind to provide us a free API key to use their map service.