# DATA SCIENCE LAB - FINAL PROJECT

### by David Salathé (262712), Marian Stoschitzky (305851), Arthur Vernet (245828)

### Table of Contents
1. [Preprocessing data](#un)
2. [Routing algorithm](#deux)
3. [Extracting historical data using Spark](#trois)
4. [Incertitude model](#quatre)
5. [Validation method](#cinq)
6. [Visualisation](#six)
7. [Demonstration](#sept)
8. [Model limitation and analysis](#huit)

### Video [here](https://youtu.be/0w992SYaxxE)

#### Basic libraries imports

In [1]:
import pandas as pd
from datetime import datetime, timedelta
from math import sqrt
import scipy.stats as st
from functools import reduce
import numpy as np
from array import array
import csv

import getpass
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql.types import BooleanType, IntegerType
from pyspark import sql

import folium

#### Spark import

In [2]:
gaspar = getpass.getuser()

spark = SparkSession \
    .builder \
    .master("yarn") \
    .appName('final-{0}'.format(gaspar)) \
    .config('spark.executor.memory', '4g') \
    .config('spark.executor.instances', '5') \
    .config('spark.port.maxRetries', '100') \
    .getOrCreate()

In [3]:
spark

<a id='un'></a>
## 1. Preprocessing data


#### 1.1 Stations in the Zürich area  
Our project is assumed to use only stations 10 kilometers away of Zürich. Therefore, using a database listing every stations used by the CFF with their longitude and latitude, we filtered out stations outside of this 10 kilometers radius around Zürich main station (i.e. Zürich HB).  

To perform this part you need to upload [this file](https://dslab2019.github.io/final_project/BFKOORD_GEO) on the cluster in the HDFS file system. The following commands should work.
```bash
scp /local/path/to/BFKOORD_GEO.dms <GASPAR>@iccluster042.iccluster.epfl.ch:
hdfs dfs -copyFromLocal BFKOORD_GEO.dms /user/<GASPAR>
```

In [33]:
def load_gps_coordinates():
    """Load gps coordinates from a dms database
    
    returns: pandas dataframe of stations with some useful gps information, 
    such as longitude and lattitude.
    
    """
    
    gps = pd.read_csv('BFKOORD_GEO.dms', sep='%', header=None)
    
    def strip_column(x):
        numbers = []
        for number in x.split():
            try:
                numbers.append(float(number))
            except ValueError:
                pass
        series = pd.Series(numbers)
        return series

    gps[['StationID','Longitude','Latitude','Height']] = gps[0].apply(strip_column)
    gps.drop([0], axis=1, inplace=True)
    gps.rename(columns={1:'Station'}, inplace=True)
    gps['Station'] = gps['Station'].astype(str).apply(lambda x: x.strip())
    gps['StationID'] = gps['StationID'].astype(int)
    gps['Height'] = gps['Height'].astype(int)
    
    return gps

In [34]:
gps = load_gps_coordinates()

In [35]:
gps.head()

Unnamed: 0,Station,StationID,Longitude,Latitude,Height
0,Bucuresti,2,26.074412,44.44677,0
1,Calais,3,1.811446,50.901549,0
2,Canterbury,4,1.075329,51.284212,0
3,Exeter,5,-3.543547,50.729172,0
4,"Fideris, Bahnhof",7,9.733756,46.922368,744


In [36]:
def distance_between_gps(lon1, lat1, lon2, lat2):
    #https://stackoverflow.com/questions/19412462/getting-distance-between-two-points-based-on-latitude-longitude
        from math import sin, cos, sqrt, atan2, radians

        R = 6373.0

        lat1 = radians(lat1)
        lon1 = radians(lon1)
        lat2 = radians(lat2)
        lon2 = radians(lon2)

        dlon = lon2 - lon1
        dlat = lat2 - lat1

        a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
        c = 2 * atan2(sqrt(a), sqrt(1 - a))

        distance = R * c

        return distance

def filter_around_zurich(gps):
    """ Main function used to filter out only stations in a 10km radius from Zurich"""
    
    zurich_lon, zurich_lat = gps[gps['Station'] == 'Zürich HB'][['Longitude', 'Latitude']].values[0]
    radius = 10

    def inside_radius(row):
        return distance_between_gps(zurich_lon, zurich_lat, row['Longitude'], row['Latitude']) <= radius

    return gps[gps.apply(inside_radius, axis=1)]

In [37]:
zurich_gps = filter_around_zurich(gps)

In [38]:
zurich_stations = list(zurich_gps["Station"])

#### 1.2 Timetable generation
We genereated locally on another notebook the timetable, the code can be found in `timetable_preprocessing.ipynb`. This generated 3 files useful for the routing algorithm: `zurich_lean.cs`, `zurich_stops.cs` and `zurich_trips.csv`. They can be found on gitlab.

<a id='deux'></a>
## 2. Routing algorithm  
We used the [Connection Scan Algorithm](https://arxiv.org/abs/1703.05997) (CSA) for our routing algorithm. Unlike most routing algorithm, the CSA is not a graph-based algorith, but works directly on tabular data structures. It is comparably easy to understand, yet fast.
### How it works
Whenever a connection is reachable for us and brings us earlier to its arrival stop than any other connection, we consider it useful and keep it. The sequence of useful connections between our departure station and destination represents the path which lets us arrive at our destination as early as possible. 

A connection is defined *reachable* if there exists a path from the initial departure station to the connection's departure stop which lets us arrive there before the connection's departure time. Therefore, the first reachable connection must depart at the user's given departure station at a point in time later than the given departure time. The CSA iterates over all connections, sorted by departure time, to span a net of useful connections (starting at the user's departure station) until the user's destination is reached. Once no better connections can be found (when connection.departure_time > destination.arrival_time), the useful connections are tracked back from destination to departure to build the returned journey.

The CSA guarantees earliest possible arrival. However, the basic implementation does not take transfer times, footpaths or the number of connections into account.

### Extensions
We have used a python implementation of the basic CSA by Captain train as our starting point (see the notebook [here](https://github.com/trainline-eu/csa-challenge) and extended it by multiple features:
* **transfer times**: when switching from one trip to another, we have assumed a universal 2min duration to do so
* **footpaths**: The algorithm takes into account walking distances. Walking times are calculated as the Euclidian distances between stops, walked with 3.5km/h.
* **latest-departure routing**: While the basic algorithm works with a given departure time, we have implemented the possibility to set the desired arrival time and get the latest possible departure times.
* **alternative paths**: First, the best journey is found. Then, alternative routes are generated by blocking trips used in the best solution or by departing just after the time of the best solution. A sorted list of k best paths (according to arrival/departure time respectively) is returned. While the best single paths are obtained by calling `compute_best` (or `compute_best_ld` for latest-departure routing), the lists of k best paths are returned by `compute` or `compute_ld` respectively.

The implementation can be seen below.

In [24]:
from array import array
import math
import pandas as pd
from datetime import datetime
import csv

MAX_STATIONS = 1000000000
MAX_INT = 2**32 - 1
PATH_TO_FILE = 'zurich_lean.csv'
PATH_TO_STATIONS = 'zurich_stops.csv'
PATH_TO_TRIPS = 'zurich_trips.csv'

class Connection:
    def __init__(self, line):
        tokens = line.split(" ")

        self.departure_station = int(tokens[0])
        self.arrival_station = int(tokens[1])
        self.departure_timestamp = int(tokens[2])
        self.arrival_timestamp = int(tokens[3])
        self.trip = tokens[4]

class Station:
    def __init__(self, station_id, name, lat, lon):
        self.id = int(station_id)
        self.name = name
        self.lat = float(lat)
        self.lon = float(lon)        
        
class Walk:
    def __init__(self, departure_station: Station, departure_timestamp, arrival_station: Station):
        self.departure_station = departure_station
        self.departure_timestamp = departure_timestamp
        self.arrival_station = arrival_station
        self.walking_time = calculate_walking_time(self.departure_station, self.arrival_station)
        self.arrival_timestamp = self.departure_timestamp + self.walking_time
        
    def __str__(self):
        out = str(self.departure_station.name) + "\n"
        out += "| Walk ({}min {}s)\n".format(self.walking_time//60, self.walking_time%60)
        out += str(self.arrival_station.name) + "\n"
        return out
    
    def get_interface_format(self):
        return (
            self.departure_station.name,
            datetime.fromtimestamp(self.departure_timestamp),
            self.arrival_station.name,
            datetime.fromtimestamp(self.arrival_timestamp),
            0,
            self.walking_time)

class Trip:
    def __init__(self, name, connections):
        self.name = name
        self.line_id = TRIPS.loc[self.name]['trip_short_name']
        self.stations_and_timestamps = []
        # set stations_and_timestamps
        departure_station = STATIONS[connections[0].departure_station]
        departure_timestamp = connections[0].departure_timestamp
        self.stations_and_timestamps.append((departure_timestamp, departure_station))
        for c in connections:
            self.stations_and_timestamps.append((c.arrival_timestamp, STATIONS[c.arrival_station]))
            
    def get_departure_station(self):
        return self.stations_and_timestamps[0][1]
    
    def get_arrival_timestamp(self):
        return self.stations_and_timestamps[-1][0]
    
    def get_arrival_station(self):
        return self.stations_and_timestamps[-1][1]
            
    def __str__(self):
        # print departure station and time
        output = "{}           {}".format(
            datetime.fromtimestamp(self.stations_and_timestamps[0][0]),
            self.stations_and_timestamps[0][1].name) + "\n"
        # print trip name
        output += "| Trip '{}' / {}".format(self.name, self.line_id) + "\n"
        # print intermediary stops
        for s in self.stations_and_timestamps[1:-1]:
            output += "| {} - {}".format(datetime.fromtimestamp(s[0]), s[1].name) + "\n"
        # print arrival station and time
        output += "{}           {}".format(
            datetime.fromtimestamp(self.stations_and_timestamps[-1][0]),
            self.stations_and_timestamps[-1][1].name) + "\n"
        
        return output
    
    def get_interface_format(self):
        """
        Return departure_station.name, arrival_station.name and tripID"""
        return (
            self.stations_and_timestamps[0][1].name,
            datetime.fromtimestamp(self.stations_and_timestamps[0][0]),
            self.stations_and_timestamps[-1][1].name,
            datetime.fromtimestamp(self.stations_and_timestamps[-1][0]),
            self.line_id,
            0)
        
class Journey:
    def __init__(self, departure_station, arrival_station, connections,
                departure_timestamp=None, arrival_timestamp=None):
        self.departure_station = STATIONS[departure_station]
        self.arrival_station = STATIONS[arrival_station]
        self.connections = connections
        self.trips = []
        self.walks = []
        self.trips_and_walks = []
        
        if connections:
            departure_walktime = calculate_walking_time(
                self.departure_station, STATIONS[connections[0].departure_station])
            self.departure_timestamp = connections[0].departure_timestamp - departure_walktime
            arrival_walktime = calculate_walking_time(
                self.arrival_station, STATIONS[connections[-1].arrival_station])
            self.arrival_timestamp = connections[-1].arrival_timestamp + arrival_walktime
        elif departure_timestamp:
            self.departure_timestamp = departure_timestamp
            self.arrival_timestamp = departure_timestamp + calculate_walking_time(
                self.departure_station, self.arrival_station)
        elif arrival_timestamp:
            self.arrival_timestamp = arrival_timestamp
            self.departure_timestamp = arrival_timestamp - calculate_walking_time(
                self.departure_station, self.arrival_station)
        
        # segment into trips
        trip_names = []
        for c in self.connections:
            if c.trip not in trip_names:
                trip_names.append(c.trip)
                
        # create Trip objects for every 
        for t in trip_names:
            trip_connections = list(filter(lambda x: x.trip == t, connections))
            self.trips.append(Trip(t, trip_connections))
            
        # calculate and save walking times
        if not connections:
            self.walks.append(Walk(
                self.departure_station, 
                self.departure_timestamp, 
                self.arrival_station))
            self.trips_and_walks = self.walks
        else:
            previous_station = self.departure_station
            previous_timestamp = self.departure_timestamp
            for t in self.trips:
                if previous_station.name != t.get_departure_station().name:
                    walk = Walk(previous_station, previous_timestamp, t.get_departure_station())
                    self.walks.append(walk)
                    self.trips_and_walks.append(walk)
                self.trips_and_walks.append(t)
                previous_station = t.get_arrival_station()
                previous_timestamp = t.get_arrival_timestamp()
            if previous_station.name != self.arrival_station.name:
                walk = Walk(previous_station, previous_timestamp, self.arrival_station)
                self.walks.append(walk)
                self.trips_and_walks.append(walk)
            
    def __str__(self):
        # print departure station and time
        output = "{}           {}".format(
            datetime.fromtimestamp(self.departure_timestamp),
            self.departure_station.name) + "\n"
        output += "|" + "\n"
        # print trips
        for t in self.trips_and_walks:
            output += t.__str__()
            output += "|" + "\n"
        # print arrival station and time
        output += "{}           {}".format(
            datetime.fromtimestamp(self.arrival_timestamp),
            self.arrival_station.name) + "\n\n"
        
        return output

    def get_interface_format(self):
        return [trip_or_walk.get_interface_format() for trip_or_walk in self.trips_and_walks]
    
class Timetable:
    # reads all the connections from stdin
    def __init__(self):
        self.connections = []

        with open(PATH_TO_FILE) as f:
            for line in f:
                if line.rstrip() == "":
                    break

                self.connections.append(Connection(line.rstrip()))
        
def import_stations(path_to_stations):
    with open(path_to_stations, encoding='utf-8') as f:
        csv_reader = csv.reader(f)
        stations = {}
        for row in csv_reader:
            if row[0] == 'stop_id':
                # skip header
                continue
            if ":" in row[0]:
                # skip sub-stations
                row[0] = row[0][:row[0].index(':')]
            if "P" in row[0]:
                # remove P from station_id
                row[0] = row[0][:-1]
            # add station to dictionary
            stations[int(row[0])] = Station(row[0], row[1], row[2], row[3])
        return stations

def distance_between_gps(lon1, lat1, lon2, lat2):
    #https://stackoverflow.com/questions/19412462/getting-distance-between-two-points-based-on-latitude-longitude
        from math import sin, cos, sqrt, atan2, radians

        R = 6373.0

        lat1 = radians(lat1)
        lon1 = radians(lon1)
        lat2 = radians(lat2)
        lon2 = radians(lon2)

        dlon = lon2 - lon1
        dlat = lat2 - lat1

        a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
        c = 2 * atan2(sqrt(a), sqrt(1 - a))

        distance = R * c

        return distance
    
def calculate_walking_time(station_a, station_b):
    """
    Return expected walking time between 2 stations in seconds
    """
    distance = distance_between_gps(station_a.lon, station_a.lat, station_b.lon, station_b.lat)
    walking_hours = distance / 3.5
    walking_seconds = walking_hours * 3600
    return int(walking_seconds)
    
class CSA:
    def __init__(self, timetable=None, stations=None, VERBOSE=False):
        self.stations = stations or import_stations(PATH_TO_STATIONS)
        self.timetable = timetable or Timetable()
        self.in_connection = array('I')
        self.earliest_arrival = array('I')
        self.out_connection = array('I')
        self.latest_departure = array('I')
        self.VERBOSE = VERBOSE

    def main_loop(self, departure_station, arrival_station, forbidden_trips = []):

        for i, c in enumerate(self.timetable.connections):
            if c.trip in forbidden_trips:
                # this trip is blocked
                continue
            transfer_time = 120
            if self.in_connection[c.departure_station] == MAX_INT \
                    or (self.in_connection[c.departure_station] != MAX_INT and \
                    c.trip == self.timetable.connections[self.in_connection[c.departure_station]].trip):
                transfer_time = 0
            if c.departure_timestamp >= self.earliest_arrival[c.departure_station] + transfer_time \
                    and c.arrival_timestamp <= self.earliest_arrival[c.arrival_station]:
                self.earliest_arrival[c.arrival_station] = c.arrival_timestamp
                self.in_connection[c.arrival_station] = i
                
                #print(c.departure_station, c.arrival_station)
                self.update_footpaths(c.arrival_station, c.arrival_timestamp, i)

            elif c.arrival_timestamp > self.earliest_arrival[arrival_station]:
                return
            #if i % 100000 == 0:
                #print(i)
                
    def main_loop_latest_departure(self, departure_station, arrival_station, forbidden_trips = []):
        length = len(self.timetable.connections) - 1
        for i, c in enumerate(reversed(self.timetable.connections)):
            if c.trip in forbidden_trips:
                # this trip is blocked
                continue
            transfer_time = 120
            if self.out_connection[c.arrival_station] == -MAX_INT \
                    or (self.out_connection[c.arrival_station] != -MAX_INT and \
                    c.trip == self.timetable.connections[self.out_connection[c.arrival_station]].trip):
                transfer_time = 0
            if c.arrival_timestamp <= self.latest_departure[c.arrival_station] - transfer_time \
                    and c.departure_timestamp >= self.latest_departure[c.departure_station]:
                self.latest_departure[c.departure_station] = c.departure_timestamp
                self.out_connection[c.departure_station] = (length - i)
                
                #print(c.departure_station, c.arrival_station)
                self.update_footpaths_latest_departure(
                    c.departure_station, c.departure_timestamp, (length - i))

            elif c.departure_timestamp < self.latest_departure[departure_station]:
                return
            #if i % 100000 == 0:
            #    print(i)

    def create_journey(self, departure_station, arrival_station):
        trips = []
        if self.in_connection[arrival_station] == MAX_INT:
            # no trips, just walking
            journey = None
        else:
            route = []

            # We have to rebuild the route from the arrival station
            last_connection_index = self.in_connection[arrival_station]
            while last_connection_index != MAX_INT:
                connection = self.timetable.connections[last_connection_index]
                route.append(connection)
                last_connection_index = self.in_connection[connection.departure_station]                
            
            journey = Journey(departure_station, arrival_station, [c for c in reversed(route)])
            for c in reversed(route):
                if len(trips) == 0 or trips[-1] != c.trip:
                    trips.append(c.trip)
        return journey
        
    def create_journey_latest_departure(self, departure_station, arrival_station):
        trips = []
        if self.out_connection[departure_station] == -MAX_INT:
            journey = None
        else:
            route = []

            # We have to rebuild the route from the arrival station
            last_connection_index = self.out_connection[departure_station]
            while last_connection_index != -MAX_INT:
                connection = self.timetable.connections[last_connection_index]
                route.append(connection)
                last_connection_index = self.out_connection[connection.arrival_station]                
            
            journey = Journey(departure_station, arrival_station, [c for c in route])
            for c in route:
                if len(trips) == 0 or trips[-1] != c.trip:
                    trips.append(c.trip)
        return journey
        
    def update_footpaths(self, departure_station, departure_time, connection_id=None):
        for station in self.stations.values():
            footpath_time = calculate_walking_time(self.stations[departure_station], station)
            if departure_time + footpath_time < self.earliest_arrival[station.id]:
                
                self.earliest_arrival[station.id] = departure_time + footpath_time
                if connection_id:
                    self.in_connection[station.id] = connection_id
                    
    def update_footpaths_latest_departure(self, arrival_station, arrival_time, connection_id=None):
        for station in self.stations.values():
            footpath_time = calculate_walking_time(self.stations[arrival_station], station)
            if arrival_time - footpath_time > self.latest_departure[station.id]:
                
                self.latest_departure[station.id] = arrival_time - footpath_time
                if connection_id:
                    self.out_connection[station.id] = connection_id

    def compute_best(self, departure_station, arrival_station, departure_time, forbidden_trips = []):
        departure_station = gps[gps['Station'] == departure_station][['StationID']].values[0][0]
        arrival_station = gps[gps['Station'] == arrival_station][['StationID']].values[0][0]
        
        self.in_connection = dict((int(station_id), MAX_INT) for station_id in self.stations.keys())
        self.earliest_arrival = dict((int(station_id), MAX_INT) for station_id in self.stations.keys())

        self.earliest_arrival[departure_station] = departure_time

        if departure_station <= MAX_STATIONS and arrival_station <= MAX_STATIONS:
            self.update_footpaths(departure_station, departure_time)
            #print('yep, entering main loop {}'.format(arrival_station))
            self.main_loop(departure_station, arrival_station, forbidden_trips)

        journey = self.create_journey(departure_station, arrival_station) or Journey(
            departure_station, arrival_station, [], departure_timestamp=departure_time)
        
        if self.VERBOSE:
            print(journey)
        return journey
    
    def compute_best_ld(self, departure_station, arrival_station, arrival_time, forbidden_trips = []):
        departure_station = gps[gps['Station'] == departure_station][['StationID']].values[0][0]
        arrival_station = gps[gps['Station'] == arrival_station][['StationID']].values[0][0]
        
        self.out_connection = dict((int(station_id), -MAX_INT) for station_id in self.stations.keys())
        self.latest_departure = dict((int(station_id), -MAX_INT) for station_id in self.stations.keys())

        self.latest_departure[arrival_station] = arrival_time

        if departure_station <= MAX_STATIONS and arrival_station <= MAX_STATIONS:
            self.update_footpaths_latest_departure(arrival_station, arrival_time)
            #print('yep, entering main loop {}'.format(arrival_station))
            self.main_loop_latest_departure(departure_station, arrival_station, forbidden_trips)

        journey = self.create_journey_latest_departure(departure_station, arrival_station) or Journey(
            departure_station, arrival_station, [], arrival_timestamp=arrival_time)
        if self.VERBOSE:
            print(journey)
        return journey
    
    def compute(self, departure_station, arrival_station, departure_time, paths=5):  
        no_journeys = paths
        journeys = []
        while len(journeys) < 2*no_journeys:
            # get best trip without restriction
            best_journey = self.compute_best(departure_station, arrival_station, departure_time)
            # append to journeys if not already in there
            journeys = self.sorted_insert(best_journey, journeys)
            used_trips = [trip.name for trip in best_journey.trips]
            # now compute journey without given trips
            i = len(used_trips) - 1
            while i > 0: 
                alternative_journey = self.compute_best(departure_station, arrival_station, departure_time,
                                           forbidden_trips = used_trips[i])
                journeys = self.sorted_insert(alternative_journey, journeys)
                i -= 1            
            # skip with departure_time to right after the departure of the previous best_journey
            departure_time = best_journey.departure_timestamp + 1
        return journeys[:no_journeys]
    
    def compute_ld(self, departure_station, arrival_station, arrival_time, paths=5):        
        no_journeys = paths
        journeys = []
        while len(journeys) < 2*no_journeys:
            # get best trip without restriction
            best_journey = self.compute_best_ld(
                departure_station, arrival_station, arrival_time)
            # append to journeys if not already in there
            journeys = self.sorted_insert(best_journey, journeys, earliest_arrival=False)
            used_trips = [trip.name for trip in best_journey.trips]
            # now compute journey without given trips
            i = len(used_trips) - 1
            while i > 0: 
                alternative_journey = self.compute_best_ld(
                    departure_station, arrival_station,
                    arrival_time, forbidden_trips = used_trips[i])
                journeys = self.sorted_insert(
                    alternative_journey, journeys, earliest_arrival=False)
                i -= 1            
            # skip with departure_time to right after the departure of the previous best_journey
            arrival_time = best_journey.arrival_timestamp - 1
        return journeys[:no_journeys]
    
    def sorted_insert(self, journey, journeys, earliest_arrival=True):
        # if journey does not already exist, add to journeys (sorted by arrival_time or departure_time)
        if str(journey) not in [str(j) for j in journeys]:
            journeys.append(journey)
            if earliest_arrival:
                journeys.sort(key=(lambda x: x.arrival_timestamp))
            else:
                journeys.sort(key=(lambda x: x.departure_timestamp), reverse=True)
        return journeys

Here we initialize the variables needed for running the Connection Scan Algorithm, and define methods for the final pipeline.

In [25]:
STATIONS = import_stations(PATH_TO_STATIONS)
TRIPS = pd.read_csv(PATH_TO_TRIPS, index_col='trip_id')
timetable = Timetable()

def route_planning_departure(departure_stop, arrival_stop, departure_timestamp, no_paths, VERBOSE=False):
    csa = CSA(stations=STATIONS, timetable=timetable, VERBOSE=VERBOSE)
    journeys = csa.compute(departure_stop, arrival_stop, departure_timestamp, paths=3)
    return [j.get_interface_format() for j in journeys]

def route_planning_arrival(departure_stop, arrival_stop, arrival_timestamp, no_paths, VERBOSE=False):
    csa = CSA(stations=STATIONS, timetable=timetable, VERBOSE=VERBOSE)
    journeys = csa.compute_ld(departure_stop, arrival_stop, arrival_timestamp, paths=3)
    return [j.get_interface_format() for j in journeys]

<a id='trois'></a>
## 3. Extracting historical data using Spark  
Now that we know which stations are inside the Zürich area, we load the dataset of the CFF stored on the cluster and only keep stops made around Zürich. Note that we decided to only use data from the schedule of 2018 to reduce computation time, but also to get more accurate results. Indeed it appears that the CFF are releasing a new schedule every year. According to [this link](https://opentransportdata.swiss/en/dataset/timetabeloverview/resource/10b9e0cc-c823-4f68-8bd1-476377ce8cfa), the 2018 schedule starts on the 10th of December 2017 and ends on the 8th of December 2018.

In [13]:
sbb_2018 = spark.read.csv(["/datasets/sbb/2018/*/*", "/datasets/sbb/2017/12/*"], header=True, sep=";", mode="DROPMALFORMED")

In [14]:
sbb_2018.show(5)

+-----------+-------------------+------------+-------------+--------------------+----------+---------+-----------+---------+-------------------+--------------+-------------+-------+-----------------+----------------+-------------------+------------------+----------------+-------------------+------------------+-------------+
|BETRIEBSTAG|   FAHRT_BEZEICHNER|BETREIBER_ID|BETREIBER_ABK|      BETREIBER_NAME|PRODUKT_ID|LINIEN_ID|LINIEN_TEXT|UMLAUF_ID|VERKEHRSMITTEL_TEXT|ZUSATZFAHRT_TF|FAELLT_AUS_TF|  BPUIC|HALTESTELLEN_NAME|    ANKUNFTSZEIT|        AN_PROGNOSE|AN_PROGNOSE_STATUS|    ABFAHRTSZEIT|        AB_PROGNOSE|AB_PROGNOSE_STATUS|DURCHFAHRT_TF|
+-----------+-------------------+------------+-------------+--------------------+----------+---------+-----------+---------+-------------------+--------------+-------------+-------+-----------------+----------------+-------------------+------------------+----------------+-------------------+------------------+-------------+
| 15.10.2018|80:06____

In [15]:
sbb_2018.printSchema()

root
 |-- BETRIEBSTAG: string (nullable = true)
 |-- FAHRT_BEZEICHNER: string (nullable = true)
 |-- BETREIBER_ID: string (nullable = true)
 |-- BETREIBER_ABK: string (nullable = true)
 |-- BETREIBER_NAME: string (nullable = true)
 |-- PRODUKT_ID: string (nullable = true)
 |-- LINIEN_ID: string (nullable = true)
 |-- LINIEN_TEXT: string (nullable = true)
 |-- UMLAUF_ID: string (nullable = true)
 |-- VERKEHRSMITTEL_TEXT: string (nullable = true)
 |-- ZUSATZFAHRT_TF: string (nullable = true)
 |-- FAELLT_AUS_TF: string (nullable = true)
 |-- BPUIC: string (nullable = true)
 |-- HALTESTELLEN_NAME: string (nullable = true)
 |-- ANKUNFTSZEIT: string (nullable = true)
 |-- AN_PROGNOSE: string (nullable = true)
 |-- AN_PROGNOSE_STATUS: string (nullable = true)
 |-- ABFAHRTSZEIT: string (nullable = true)
 |-- AB_PROGNOSE: string (nullable = true)
 |-- AB_PROGNOSE_STATUS: string (nullable = true)
 |-- DURCHFAHRT_TF: string (nullable = true)



In [16]:
# Cast each columns in its right type
sbb_2018 = sbb_2018.withColumn("BETRIEBSTAG", to_date(sbb_2018["BETRIEBSTAG"], format='dd.MM.yyyy'))
sbb_2018 = sbb_2018.withColumn("ANKUNFTSZEIT", to_timestamp(sbb_2018["ANKUNFTSZEIT"], format='dd.MM.yyyy HH:mm'))
sbb_2018 = sbb_2018.withColumn("AN_PROGNOSE", to_timestamp(sbb_2018["AN_PROGNOSE"], format='dd.MM.yyyy HH:mm:ss'))
sbb_2018 = sbb_2018.withColumn("ABFAHRTSZEIT", to_timestamp(sbb_2018["ABFAHRTSZEIT"], format='dd.MM.yyyy HH:mm'))
sbb_2018 = sbb_2018.withColumn("AB_PROGNOSE", to_timestamp(sbb_2018["AB_PROGNOSE"], format='dd.MM.yyyy HH:mm:ss'))
sbb_2018 = sbb_2018.withColumn("DURCHFAHRT_TF", sbb_2018["DURCHFAHRT_TF"].cast(BooleanType()))
sbb_2018 = sbb_2018.withColumn("ZUSATZFAHRT_TF", sbb_2018["ZUSATZFAHRT_TF"].cast(BooleanType()))
sbb_2018 = sbb_2018.withColumn("FAELLT_AUS_TF", sbb_2018["FAELLT_AUS_TF"].cast(BooleanType()))
sbb_2018 = sbb_2018.withColumn("BPUIC", sbb_2018["BPUIC"].cast(IntegerType()))

sbb_2018 = sbb_2018.drop("BETREIBER_ID", "BETREIBER_ABK", "BETREIBER_NAME")

sbb_2018 = sbb_2018.where("BETRIEBSTAG BETWEEN '2017-12-10' AND '2018-12-08'")

In [17]:
zurich_stops = sbb_2018.where(sbb_2018.HALTESTELLEN_NAME.isin(zurich_stations))

In [18]:
zurich_stops.printSchema()

root
 |-- BETRIEBSTAG: date (nullable = true)
 |-- FAHRT_BEZEICHNER: string (nullable = true)
 |-- PRODUKT_ID: string (nullable = true)
 |-- LINIEN_ID: string (nullable = true)
 |-- LINIEN_TEXT: string (nullable = true)
 |-- UMLAUF_ID: string (nullable = true)
 |-- VERKEHRSMITTEL_TEXT: string (nullable = true)
 |-- ZUSATZFAHRT_TF: boolean (nullable = true)
 |-- FAELLT_AUS_TF: boolean (nullable = true)
 |-- BPUIC: integer (nullable = true)
 |-- HALTESTELLEN_NAME: string (nullable = true)
 |-- ANKUNFTSZEIT: timestamp (nullable = true)
 |-- AN_PROGNOSE: timestamp (nullable = true)
 |-- AN_PROGNOSE_STATUS: string (nullable = true)
 |-- ABFAHRTSZEIT: timestamp (nullable = true)
 |-- AB_PROGNOSE: timestamp (nullable = true)
 |-- AB_PROGNOSE_STATUS: string (nullable = true)
 |-- DURCHFAHRT_TF: boolean (nullable = true)



We store the data in parquet format now that it is filtered. This part can take up to half an hour.

In [23]:
zurich_stops.write.mode("overwrite").parquet("/user/{}/zurich_stops.parquet".format(gaspar))

From now on, everything above this cell doesn't need to be ran anymore when going back to the notebook (besides the library imports). We can just load the data stored in parquet.

In [13]:
zurich_stops = spark.read.parquet("/user/{}/zurich_stops.parquet".format(gaspar))

Here we perform the numerous SQL queries we need for later stages of our pipeline.

In [15]:
zurich_stops.createOrReplaceTempView("sbb_zurich_2018")

In [26]:
spark.sql('DESCRIBE sbb_zurich_2018').show()

+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|        BETRIEBSTAG|     date|   null|
|   FAHRT_BEZEICHNER|   string|   null|
|         PRODUKT_ID|   string|   null|
|          LINIEN_ID|   string|   null|
|        LINIEN_TEXT|   string|   null|
|          UMLAUF_ID|   string|   null|
|VERKEHRSMITTEL_TEXT|   string|   null|
|     ZUSATZFAHRT_TF|  boolean|   null|
|      FAELLT_AUS_TF|  boolean|   null|
|              BPUIC|      int|   null|
|  HALTESTELLEN_NAME|   string|   null|
|       ANKUNFTSZEIT|timestamp|   null|
|        AN_PROGNOSE|timestamp|   null|
| AN_PROGNOSE_STATUS|   string|   null|
|       ABFAHRTSZEIT|timestamp|   null|
|        AB_PROGNOSE|timestamp|   null|
| AB_PROGNOSE_STATUS|   string|   null|
|      DURCHFAHRT_TF|  boolean|   null|
+-------------------+---------+-------+



The database can now be queried using the methods below in order to get historical data about trips we're interested in.

In [16]:
def get_departure_times(journey_id, station):
    """Query the database to retrieve all departure times
    
    return: a pandas dataframe in the following format:
    - Start station: name of the starting station
    - Expected departure time: departure time according to the schedule
    - Real departure date: date of the actual departure
    - Week-end departure date: Boolean informing if the departure is a week-end day (versus a week day)
    - Real departure time: actual departure time according to logs
    
    """
    
    df = spark.sql('SELECT ABFAHRTSZEIT, AB_PROGNOSE \
              FROM sbb_zurich_2018 \
              WHERE HALTESTELLEN_NAME LIKE "{}" AND LINIEN_ID LIKE "{}" AND AB_PROGNOSE_STATUS LIKE "REAL"'
              .format(station, journey_id)).toPandas()
    
    if df.empty:
        return df
        
    df['Start station'] = station
    df['Real departure date'] = df['AB_PROGNOSE'].dt.strftime('%d/%m/%Y')
    df['Week-end departure date'] = df['AB_PROGNOSE'].apply(lambda date: date.weekday() >= 5)
    df['Real departure time'] = df['AB_PROGNOSE'].dt.strftime('%H:%M:%S')
    df['Expected departure time'] = df['ABFAHRTSZEIT'].dt.strftime('%H:%M:%S')

    df.drop(['ABFAHRTSZEIT', 'AB_PROGNOSE'], inplace=True, axis=1)
    
    return df
    
def get_arrival_times(journey_id, station):
    """Query the database to retrieve all arrival times
    
    return: a pandas dataframe in the following format:
    - End station: name of the ending station
    - Expected arrival time: arrival time according to the schedule
    - Real arrival date: date of the actual arrival
    - Week-end arrival date: Boolean informing if the arrival is a week-end day (versus a week day)
    - Real arrival time: actual arrival time according to logs
    
    """
    
    df = spark.sql('SELECT ANKUNFTSZEIT, AN_PROGNOSE \
              FROM sbb_zurich_2018 \
              WHERE HALTESTELLEN_NAME LIKE "{}" AND LINIEN_ID LIKE "{}" AND AN_PROGNOSE_STATUS LIKE "REAL"'
              .format(station, journey_id)).toPandas()
     
    if df.empty:
        return df
        
    df['End station'] = station
    df['Real arrival date'] = df['AN_PROGNOSE'].dt.strftime('%d/%m/%Y')
    df['Week-end arrival date'] = df['AN_PROGNOSE'].apply(lambda date: date.weekday() >= 5)
    df['Real arrival time'] = df['AN_PROGNOSE'].dt.strftime('%H:%M:%S')
    df['Expected arrival time'] = df['ANKUNFTSZEIT'].dt.strftime('%H:%M:%S')

    df.drop(['ANKUNFTSZEIT', 'AN_PROGNOSE'], inplace=True, axis=1) 
    
    return df

<a id='quatre'></a>
## 4. Incertitude model

Last step of our pipeline before visualization.

Given several paths, compute the uncertainty associated with each path.

Our incertitude model is based on a normal distribution. We first compute the mean and standard deviation of delays for each intermediate stations. Then we compute the probability to observe a delay below a given threshold: the available waiting time. the latter is computed by taking the difference, in seconds, between an arrival time and its next associated departure.

The overall score is the product of the uncertainty score of each station, because we decided to assume independence between each delays.

More details about the choice of this implementation can be found in chapter **7. Model limitation and analysis**.

In [26]:
def final_output(paths, VERBOSE=False, ENABLE_WARNING=True, DEFAULT_UNCERTAINTY=0.95):
    """Given a list of paths, returns all their respective uncertainty score
    
    input: a list of paths
    
    returns: a list of 3-tuples such as (t1, t2, t3):
    - t1: the path (part of thre output to the user)
    - t2: a list of uncertainty score at each transfer (used for visualization and model assesment)
    - t3: the overall uncertainty score. (part of the output to the user)
    
    
    """
    
    def lookup_historic_data(path):
        """ Retrieve data from the given arrivals and compute their means and standard deviations."""
        
        
        # Initialize some lists to compute waiting time between each stations
        delay_stds = []
        delay_means = []
        waitings = []
        arrival_times = []
        departure_times = []
        
        is_weekend = path[0][1].weekday() >= 5 # We want to optimize the query based on if the user asks his travel in a weekend or not

        
        # Iterate on each intermediate station and: (1) compute waiting time. (2) compute std and mean of delays
        for departure_station, departure_time, arrival_station, arrival_time, line, walking_time in path:
            
            corrected_departure_time = departure_time
            
            if line == 0: # If it is a walk, we don't want the systems thinks he can "miss" his walk
                corrected_departure_time = departure_time + timedelta(minutes=15)
            departure_times.append(corrected_departure_time)
            arrival_times.append(arrival_time)
            
            # Check if the current "transport" is not a walking connection
            if line != 0:
                past_arrivals = get_arrival_times(line, arrival_station)
                past_departures = get_departure_times(line, departure_station)
            else:
                past_arrivals = pd.DataFrame()
                past_departures = pd.DataFrame()

            # Add some robustness
            if past_arrivals.empty or past_departures.empty:
                if ENABLE_WARNING:
                    print(f"Warning, not enough data for travel from {departure_station} to {arrival_station} with {line} ID.")
                
                # By default assumption, the train would then not be late.
                delay_means.append(0)
                delay_stds.append(1)
                continue
            
            # Model v2.0: Filter out on meaningful data, such as weekend versus week days
            past_arrivals = past_arrivals[past_arrivals['Week-end arrival date'] == is_weekend]
            past_arrivals['Real arrival time'] = pd.to_datetime(past_arrivals['Real arrival time'])
            past_arrivals['Expected arrival time'] = pd.to_datetime(past_arrivals['Expected arrival time'])            
            arrival_delays = (past_arrivals['Real arrival time'] -
                      past_arrivals['Expected arrival time']).array.total_seconds()
            delay_means.append(arrival_delays.mean())
            delay_stds.append(arrival_delays.std())
            
            
            # Compute expected waitings
            past_departures = past_departures[past_departures['Week-end departure date'] == is_weekend]
            past_departures['Expected departure time'] = pd.to_datetime(past_departures['Expected departure time'])
            
        
        # Shift correctly departures and arrivals
        departure_times = np.array(departure_times[1:])
        arrival_times = np.array(arrival_times[:-1])
        
        waitings = list(map(lambda d, a: (d - a).total_seconds(), departure_times, arrival_times))
        
        if VERBOSE:
            print("waitings:", waitings)
        

        return delay_means, delay_stds, waitings
    
    def compute_uncertainty(path):
        
        # Now is the specific model. First one is the simplest one, Just take every data as equal and do a simple mean
        delay_means, delay_stds, waitings = lookup_historic_data(path)
                
        if VERBOSE:
            print(delay_means, delay_stds)
        

        # Not miss: P (Z < (waitings_i - delays_mean_i) / delays_std_i)
        # Then miss if any of them fail : 1 - PI_i=0 to len(waitings) (Not miss)
        # then the score is 1 - miss_score = P (no failures)

        taken_score = list(map(lambda waiting, mean, std: st.norm.cdf((waiting - mean)/std), waitings, delay_means, delay_stds))
        
        # We assume that a perfect one is due to default settings of walking or missing historical data.
        taken_score = [x if x < 1.0 else DEFAULT_UNCERTAINTY for x in taken_score]
        score = reduce(lambda x, y: x * y, taken_score)
        return (path, taken_score, score)
        
    return [compute_uncertainty(uncertainty_path) for uncertainty_path in paths]

In [28]:
def keep_best_output(final_output, threshold, OPTIMIZING_ARRIVAL_TIME=True):
    """ Custom function to filter out results to a given threshold and to sort the results,
    according to both total time and uncertainty results.
    input:
    - final output: the list of path with their respective uncertainty
    - threshold: correspond to (1 - alpha) 
    
    output: A filtered sorted final_output, filtered given alpha, sortered, given their time and uncertainty
    score.
    
    """
    
    final_output = np.array(final_output)
    final_output = final_output[final_output[:, 2] >= threshold]
    
    def get_total_times(x):
        total_times = []
        for path, _, _ in x:
            arrival = path[-1][3] # fourth value is expected arrival time
            departure = path[0][1] # Second value is expected departure time
            
            if OPTIMIZING_ARRIVAL_TIME:
                total_times.append((arrival - datetime(1970,1,1)).total_seconds())
            else:
                total_times.append((arrival - departure).total_seconds())
        return total_times
    
    def custom_order(x):
        total_times = get_total_times(x)
        return total_times / x[:,2]


    predicate = custom_order(final_output)
    order = np.argsort(predicate)

    return final_output[order]

<a id='cinq'></a>
## 5. Validation method

We decided to implement a custom metric, using MSE, for the loss of our model. We compared clustered data vs non-clustered (such as filtering by weekdays/weekend or no filtering at all).

The metric computes the result of our uncertainty model and compares it with actual ratio of taken/miss connection in the dataset.

In [256]:
def test_model(path):
    """Custom model assesment. There is no possible overfitting, so splitting by train/test set is
    not necessarily meaningful. Notice that it is quite biased since we should actually test it on real data with
    the real path."""


    ratio_catched = []    
    departure_times = []
    arrival_times = []
    
    for _ , dep_time, _, arr_time, _, _ in path:
        departure_times.append(dep_time)
        arrival_times.append(arr_time)
    
    # Shift correctly departures and arrivals
    departure_times = np.array(departure_times[1:])
    arrival_times = np.array(arrival_times[:-1])
    
    waitings = map(lambda d, a: (d - a).total_seconds(), departure_times, arrival_times)
        

    for w, (departure_station, _, arrival_station, _, line, _) in zip(waitings, path):

        past_arrivals = get_arrival_times(line, arrival_station)
        
        if past_arrivals.empty:
            ratio_catched.append(1.0)
            continue
            
        past_arrivals = past_arrivals[past_arrivals['Week-end arrival date'] == debug_weekend]
        past_arrivals['Real arrival time'] = pd.to_datetime(past_arrivals['Real arrival time'])
        past_arrivals['Expected arrival time'] = pd.to_datetime(past_arrivals['Expected arrival time'])         
        
        delays_train = (past_arrivals['Real arrival time'] -
                  past_arrivals['Expected arrival time']).array.total_seconds()

        ratio_catched.append(len(delays_train[delays_train < w]) / len(delays_train))
        
    predicted_score = final_output([path])
    custom_loss = np.sum((predicted_score[0][1] - np.array(ratio_catched)) ** 2)
    return predicted_score[0][1], ratio_catched, custom_loss


start_station = 'Zürich Manegg'
end_station = 'Erlenbach ZH'
departure_timestamp = int(datetime.timestamp(datetime.strptime('2019-06-22T10:00:00.000Z', '%Y-%m-%dT%H:%M:%S.%fZ')))

csa = CSA(stations=STATIONS, timetable=timetable)

path = route_planning_departure(start_station, end_station, departure_timestamp)
predicted, actual, custom_loss = test_model(path)
print("predicted values", predicted)
print("actual ratio", actual)
print("custom loss:", custom_loss)

2019-06-22 10:00:00           Zürich Manegg
|
2019-06-22 10:00:00           Zürich Manegg
| Trip '290.TA.26-4-A-j19-1.10.R' / 12494
| 2019-06-22 10:02:00 - Zürich Brunau
| 2019-06-22 10:04:00 - Zürich Saalsporthalle
| 2019-06-22 10:06:00 - Zürich Giesshübel
| 2019-06-22 10:08:00 - Zürich Selnau
2019-06-22 10:11:00           Zürich HB SZU
|
Zürich HB SZU
| Walk (1min 56s)
Zürich HB
|
2019-06-22 10:15:00           Zürich HB
| Trip '176.TA.26-16-A-j19-1.57.H' / 19637
| 2019-06-22 10:17:00 - Zürich Stadelhofen
| 2019-06-22 10:21:00 - Zürich Tiefenbrunnen
| 2019-06-22 10:23:00 - Zollikon
| 2019-06-22 10:25:00 - Küsnacht Goldbach
| 2019-06-22 10:27:00 - Küsnacht ZH
2019-06-22 10:30:00           Erlenbach ZH
|
2019-06-22 10:30:00           Erlenbach ZH


[datetime.datetime(2019, 6, 22, 10, 26)
 datetime.datetime(2019, 6, 22, 10, 15)]
[datetime.datetime(2019, 6, 22, 10, 11)
 datetime.datetime(2019, 6, 22, 10, 12, 56)]
waitings: [900.0, 124.0]
[0, 0, 44.1764705882353] [1, 1, 84.65965060068956]


<a id='six'></a>
## 6. Visualisation

Note that in case of non-reproducibility problems, there is a screenshot of how does our results look like. Please see ´visualisation.png´.

You can also see a demonstration on the video we made [here](https://youtu.be/0w992SYaxxE?t=261) (linked at 4:21)

In [45]:
zurich_longitude, zurich_latitude = gps[gps['Station'] == 'Zürich HB'][['Longitude', 'Latitude']].values[0]
def visualize(itineraries):
    """Given a list of itinerary, output a visualization displaying the incertitudes of each itinerary on a map.
    The itineraries should be sorted by incertitude beforehand."""
    if len(itineraries) == 0:
        print("No path found...")
        return
        
    def process_itineraries(itineraries):
        new_itineraries = []
        for itinerary in itineraries:
            trip = []
            trip_times = []
            route, station_incertitude, total_incertitude = itinerary
            for i, connection in enumerate(route):
                departure_station, departure_date, arrival_station, arrival_date, line_id, walking_time = connection
                if i == 0:
                    trip_times.append(departure_date)
                    
                trip.append(departure_station)
                
                if i == len(route) - 1:
                    trip_times.append(arrival_date)
                    trip.append(arrival_station)

            itinerary = (trip, trip_times, station_incertitude, total_incertitude)

            new_itineraries.append(itinerary)
        return new_itineraries

    itineraries = process_itineraries(itineraries)
    
    
    m = folium.Map(location=[zurich_latitude, zurich_longitude], zoom_start=12)
    
    from_station = itineraries[0][0][0]
    to_station = itineraries[0][0][-1]
    from_longitude, from_latitude = gps[gps['Station'] == from_station][['Longitude', 'Latitude']].values[0]
    to_longitude, to_latitude = gps[gps['Station'] == to_station][['Longitude', 'Latitude']].values[0]

    folium.Marker(
        location=[from_latitude, from_longitude],
        popup=folium.Popup(from_station + ' - Start station', max_width=300, min_width=100),
        icon=folium.Icon(color='green')
    ).add_to(m)

    folium.Marker(
        location=[to_latitude, to_longitude],
        popup=folium.Popup(to_station + ' - End station', max_width=300, min_width=100),
        icon=folium.Icon(color='green')
    ).add_to(m)

    colors = [
        'blue',
        'orange',
        'purple',
        'gray',
        'lightgreen',
        'darkgreen',
        'lightblue',
        'cadetblue',
        'darkblue',
        'beige',
        'lightgray',
        'pink',
        'darkpurple',
        'lightred',
        'red',
        'darkred',
        'black',
        'green'
    ]
    
    for i, itinerary in enumerate(itineraries):
        incertitude = itinerary[3]
        times = itinerary[1]
        if i >= len(colors):
            i = len(colors) - 1
        color = colors[i]
        feature_group = folium.FeatureGroup(name='{} itinerary: {:.2f}%, from {} to {}'.format(color, incertitude*100, times[0], times[1]))
        points = []
        for j, station in enumerate(itinerary[0]):           
            longitude, latitude = gps[gps['Station'] == station][['Longitude', 'Latitude']].values[0]
            points.append((latitude, longitude))
            if station != from_station and station != to_station:
                popup = station
                if j >= 1 and j < len(itinerary[0])-1:
                    popup = folium.Popup(station + ' - Transfer certitude: {:.2f}%'.format(itinerary[2][j-1]*100), max_width=300, min_width=100)
                    
                folium.Marker(
                    location=[latitude, longitude],
                    popup=popup,
                    icon=folium.Icon(color=color) #https://getbootstrap.com/docs/3.3/components/#glyphicons-glyphs
                ).add_to(feature_group)

        folium.PolyLine(points, color=colors[i]).add_to(feature_group)
        m.add_child(feature_group)

    m.add_child(folium.map.LayerControl())
    
    return m

<a id='sept'></a>
## 7. Demonstration

We demonstrate below the full pipeline we developed.

In [39]:
zurich_stations

['Zimmerberg-Basistunnel',
 'Urdorf',
 'Birmensdorf ZH',
 'Bonstetten-Wettswil',
 'Urdorf Weihermatt',
 'Waldegg, Birmensdorferstrasse',
 'Zürich, Goldbrunnenplatz',
 'Aesch ZH, Gemeindehaus',
 'Bonstetten, Dorfplatz',
 'Birmensdorf ZH, Zentrum',
 'Zürich HB',
 'Zürich Altstetten',
 'Zürich Stadelhofen',
 'Zürich Tiefenbrunnen',
 'Zürich Oerlikon',
 'Zürich Seebach',
 'Zürich Affoltern',
 'Zürich Wollishofen',
 'Zürich Enge',
 'Zürich Wiedikon',
 'Zürich Wipkingen',
 'Zürich Flughafen',
 'Zürich Hardbrücke',
 'Zürich Binz',
 'Zürich Friesenberg',
 'Zürich Schweighof',
 'Zürich Triemli',
 'Uitikon Waldegg',
 'Ringlikon',
 'Uetliberg',
 'Zürich Stadelhofen FB',
 'Zumikon',
 'Zollikerberg',
 'Maiacher',
 'Zürich Rehalp',
 'Neue Forch',
 'Waltikon',
 'Spital Zollikerberg',
 'Waldburg',
 'Felsenegg',
 'Adliswil (Luftseilbahnstation)',
 'Zürich, Römerhof',
 'Zürich, Dolder',
 'Zürich Brunau',
 'Zürich Saalsporthalle',
 'Zürich HB SZU',
 'Wildpark-Höfli',
 'Zürich Selnau',
 'Zürich Giesshübel

In [46]:
# User input
start_station = 'Waltikon'
end_station = 'Kilchberg'
departure_date = datetime.timestamp(datetime.strptime('2019-06-18T10:00:00.000Z', '%Y-%m-%dT%H:%M:%S.%fZ'))
departure_time = int(departure_date)
arrival_date = datetime.timestamp(datetime.strptime('2019-06-18T10:00:00.000Z', '%Y-%m-%dT%H:%M:%S.%fZ'))
arrival_time = int(arrival_date)
no_paths = 3

# Tweakable parameters
VERBOSE = False # Display additional information such as paths by the Connection Scan Algorithm (CSA).
ENABLE_WARNING = True # Output warnings when transfers don't have available historical data.
DEFAULT_UNCERTAINTY = 0.95 # Default uncertainty associated with tranfers without available historical data.
OPTIMIZING_ARRIVAL_TIME = False # Versus optimizing travel time.
THRESHOLD = 0.5 # Filter uncertainty scores above 0.5

In [47]:
if OPTIMIZING_ARRIVAL_TIME:
    paths = route_planning_arrival(start_station, end_station, arrival_time, no_paths, VERBOSE)
else:
    paths = route_planning_departure(start_station, end_station, departure_time, no_paths, VERBOSE)
output = keep_best_output(final_output(paths, VERBOSE, ENABLE_WARNING, DEFAULT_UNCERTAINTY), THRESHOLD,
                         OPTIMIZING_ARRIVAL_TIME)
visualize(output)

In [None]:
#spark.stop()

<a id='huit'></a>
## 8. Model limitation and analysis
To summary our modelisation, **we decided to implement the following**:
* **Preparation of the Database**:
    * Filtering stations 10km around Zurich using gps information
* **Route planning**:
    * Connection Scan Algorithm. It's pretty fast and perfectly suited for our application.
    * Walking time connection.
* **Uncertainty modelisation**:
    * Clustering Data.
    * Retrieve means and standard deviations of each delays for a given hour/station/trip in the main DB.
    * Run basic Normal test statistic to get p-value. More concretely:
        * Using H0 as: $\mu_0 = x_{average}$.
        * Compute probability that a given value is above the waiting time
        * Take into account walking time
    * Use independence assumption to get an overall uncertainty score.
    
* **Model Assesment**:
    * Compute the actual ratio of numbers of succesful trip.
    * Compare it to the uncertainty predicted value.
    
* **Visualization**:
    * Display possible routes with associated color to distinguish them easily.
    * Show uncertainty in missing certain connections at each concerned station.
    
**What we decided not to implement**:

* **Preparation of the Database**:
    * Additional data.

* **Route planning**:
    * Graph oriented algorithm
    
* **Uncertainty modelisation**:
    * Linear or logistic regression model
    
* **Model Assesment**:
    * Cross-validation
 
* **Visualization**:
    * Gradient-based coloring of the possible routes: folium appears to be too limited.
    
**What are the pro and cons of our choices**:

* **Preparation of the Database**:
    * *Pros*:
        * This filtering gives a good sample of a city that we know. We can have better judgements on the accuracy of our models
    * *Cons*:
        * We could have used additional data such as weather conditions to get some meaningful regressions between delays and weather conditions
        * Problem is that we couldn't find such information.
        
* **Route planning**:
    * *Pros*:
        * Our Connection Scan Algorithm is pretty fast and simple to implement/understand.
    * *Cons*:
        * As already mentioned in section Route planner, the CSA as implemented is not optimized for few transfers or few walking distances. Its only concern is earliest arrival / latest departure. Graph oriented Algorithms might handle these issues better but need more knowledge on the domain.
    
* **Uncertainty modelisation**:
    * *Pros*:
        * The choice of running a simple statistic model let us keep the exact p-value, instead of a threshold as suggested in the project statement. The big advantage we see here is that we think the user would love to know himself what is the exact uncertainty for a given trip, because he might want to optimise its total time and don't care if he might completely fail (for example, a student woke up too lately and needs to go to his exam: either he's just at time and can do it or he's 3min late but is not allowed to attempt to the exam session :( ). On the opposite, another student woke up quite earlier and has some margin, he would therefore prefer a very safe trip.
        
    * *Cons*:
         * Linear or logistic regression would certainly gives better accurate results. However, we didn't have enough interesting data to use them for regression. Indeed, we used "discrete" features such as "is it a weekend day or a weekday". It could be used to cluster data. If we would have use it for a regression, it would yield harder result to interpret an exact p-value: We would have been constraint to use threshold as suggested in the project statement, therefore loosing our original point of view (summaried on the Pro paragraph above).
   
* **Model Assesment**:
    * *Pros*:
        * We did a custom metric to compare predicted p-values against actual ratio of succesful trip.
        * It is easy to interpret
    * *Cons*:
        * We think it is higly biased since we compare past data against past data. We would like to asses model with current data. We can imagine a protocol where at the end of a trip, we have immediate feedback from the succesfuled (or not) trip. Several techniques for privacy could be easily used for this kind of information retrieval, but go out of the subject and objectives of the course.
        
* **Visualization**:
    * *Pros*:
        * Easy to understand and manipulate.
    * *Cons*:
        * Lacks detailled information about arrival at each stop. This info can be found by using the VERBOSE attribute.
    
* **parameters description**:
    * user input regroups possible input of the application, such as the start station, end station and the timestamp of the query. It can also tweak number of results
    * VERBOSE: used to output more information such as format of paths
    * ENABLE_WARNING: Shows when data of TRIP are not in the 2018 dataset, or if we had to estimate a walking transfer.
    * DEFAULT_UNCERTAINTY: This one is to make the simulation more realistic. Even if we don't know the uncertainty of a given transfer, we can estimate it with a default value. 0.95 seemed to be the most meaningful. Indeed, imagine 2 extreme cases when one path has no available data in the 2018 db and has only one transfer, it should be more safe than one that also have no available data but has 10 transfers. It should be something like 10 times more "unsafe" that the other one.
    * OPTIMIZING_ARRIVAL_TIME: One can decides if he wants a short travel time (ie. FALSE) against arriving as quick as possible at destination (ie. TRUE).
    * THRESHOLD: the 1-alpha confidence interval


* **Limitations**:
    * We lately noticed how bad was the connection between the 2019 timetable and the 2018 dataset. Indeed, our first assumption was to keep the 2019 timetable to get better meaningful results when simulating user's input, and as well when assessing our model. It turns out that our ENABLE_WARNING showed us that most of the paths have no available data in the 2018 dataset.
    * The model could be greatly improved with additional data to get meaningful result on some linear or logistic regressions.

