#### Imports and configurations

In [None]:
#import os
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.5.0-spark2.1-s_2.1 pyspark-shell'

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

import pickle
import os.path
import getpass
import pyspark
import itertools
import numpy as np 
import pandas as pd
import networkx as nx
import matplotlib.pylab as plt

from datetime import *
from datetime import datetime, date

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Window

from collections import Counter
from dateutil.parser import parse
from math import sin, cos, sqrt, atan2, radians

from helper_functions import *

In [None]:
plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

In [None]:
# Initialize SparkSession
spark = SparkSession \
    .builder \
    .master("yarn") \
    .appName('sbb_planner-{0}'.format(getpass.getuser())) \
    .config('spark.executor.memory', '4g') \
    .config('spark.executor.instances', '5') \
    .config('spark.port.maxRetries', '100') \
    .config('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')\
    .getOrCreate()
spark

# Robust Journey planner - Graph construction

## 1. Load and preprocess the dataset
### 1.1 Load the dataset
To tackle the problem, we decided to use 3 different subsets of the whole data set. First we used only the data of january 2019 to extract all the different routes and timetables, then we used the data from december 2018 to validate the routes (see that they existed aswell in december 2018) and finally we used the whole dataset to extract the different probability distributions for each of our clustered routes.

In [None]:
# Change the year to compute validation dataframe
def load_data(subset=True, date="2019/01"):
    """ Loads only data from date unless subset is set to False, then loads all."""
    if(subset):
        df = spark.read.option("header", "true").csv("/datasets/sbb/{:}*".format(date), sep=";")
    else:
        df = spark.read.option("header", "true").csv("/datasets/sbb/*/*/*", sep=";")
    return df

We load the dataset according to the definition of the function above.

In [None]:
# Load SBB dataset
df = load_data()

### 1.2 Preprocess the dataset
We rename the columns to have the names in english instead of german and finally we format the columns.

In [None]:
# Rename and format the columns
df_sbb = rename_columns(df)
df_sbb = format_columns(df_sbb)

### 1.3 Add metadata and filter
After this, let's get a dataframe that contains metadata. More precisely, we are intersted in the  

In [None]:
df_met = spark.read.text("Ressources/BFKOORD_GEO")
df_metadata = get_metadata(df_met)

Distance calculator and filter

As stated in the problem description, we filter out stations that are more than 10 km away from Zurich Hauptbanhof. We use the coordinates in the metadata to implement a function that determines the distance from a given station to the Zurich HB. We have been helped in this task by this website:
https://stackoverflow.com/questions/19412462/getting-distance-between-two-points-based-on-latitude-longitude

In [None]:
# Zurich latitude and longitude 
y_zhb, x_zhb = 47.37743, 8.540192
# Approximate radius of earth in km
R = 6373.0

In [None]:
def distance_to(y1, x1, y2=y_zhb, x2=x_zhb):
    """ This function computes the distances from a point
        (x1, x2) to Zurich HB. """
    lat1, lon1 = radians(y2), radians(x2)
    lat2, lon2 = radians(y1), radians(x1)
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

In [None]:
# Create the udf for the above function
distance_udf = udf(lambda y,x: distance_to(y, x), FloatType())

In [None]:
# Filter out entries of stations that are more than 10k and save the dataframe
df_metadata_filter = df_metadata.withColumn("distance", distance_udf("y", "x")).filter(col("distance") <= 10)
df_metadata_filter.toPandas().to_pickle('Ressources/metadata')
# Join filtered stations with the original dataset we have
df_sbb = df_sbb.join(df_metadata_filter, on="STOP_NAME")

In [None]:
# We get the dataframe containing the distinct lines and save it
df_coords = df_sbb.filter((col('OPERATING_DAY') == pd.Timestamp(2019, 1, 18)))\
                  .select("x", "y", "STOP_NAME", "LINE_TEXT", "LINE_ID", "TRIP_ID", "ARRIVAL_TIME", "DEPARTURE_TIME")\
                  .toPandas()
df_lines_unique = get_distinct_lines(df_coords)
df_lines_unique.to_pickle('Ressources/unique_lines')

### 1.4 Deal with redundant and dirty data
We filter out entries where the transport does not stop, also when it is an additional or a cancelled trip. We also need to take into account the forecast status columns. We decided to approach the problem initally by replacing the null entries in the 
estimated arrival and departure by the scheduled departure and arrival

In [None]:
df_sbb = df_sbb.filter(~(df_sbb.DRIVE_THROUGH | df_sbb.ADDITIONAL_TRIP | df_sbb.CANCELLED_TRIP)) \
               .drop("DRIVE_THROUGH", "ADDITIONAL_TRIP", "CANCELLED_TRIP", "id", "BPUIC")

In [None]:
# Deal with the null values in the dates
null_date = parse('1994-01-01 00:00:00')
df_sbb = df_sbb.withColumn('ARRIVAL_TIME', when(df_sbb.ARRIVAL_TIME.isNull(), null_date)\
                                          .otherwise(df_sbb.ARRIVAL_TIME))\
               .withColumn('DEPARTURE_TIME', when(df_sbb.DEPARTURE_TIME.isNull(), null_date)\
                                            .otherwise(df_sbb.DEPARTURE_TIME))

## 2. Lambda calculation
Considering that the probability distribution of the delay of a given transport is similar to a poisson distribution, we decide to calculate the different lambdas for each trip. We take into account the following when we want to estimate the distribution:
- Departing Station
- Arrival Station
- Day of the week (weekday or not)
- Rush hour or not (7am-9a. and 5pm-6pm)

Once we have grouped the data given a line id and operator, we separate it given the stated variables and produce a dataframe containing the different lambda values for each line.

We will add two columns to our dataframe where we indicate if it is a weekday or not and if it is rush hour or not

In [None]:
# Replace null values in estimated arrival by scheduled arrival time
df_sbb = df_sbb.withColumn("AT_FORECAST", coalesce(df_sbb.AT_FORECAST, df_sbb.ARRIVAL_TIME))
df_sbb = df_sbb.withColumn("DT_FORECAST", coalesce(df_sbb.DT_FORECAST, df_sbb.DEPARTURE_TIME))
# Add hour column
df_sbb = df_sbb.withColumn("hour", hour("ARRIVAL_TIME").cast(IntegerType()))
# Add rush hour indicator column
df_sbb = df_sbb.withColumn("rush", ((col("hour") >= lit(7)) & (col("hour") < lit(9))) 
                                 | ((col("hour") >= lit(17)) & (col("hour") < lit(18))))
# Add weekday indicator column
df_sbb = df_sbb.withColumn("weekday", date_format('OPERATING_DAY', 'u') <= 5)
# Filter out lines where there is no arrival
df_sbb_arrivals = df_sbb.filter(df_sbb.AT_FORECAST.isNotNull() & df_sbb.ARRIVAL_TIME.isNotNull())
# Compute difference between actual arrival and scheduled one
df_sbb_arrivals = df_sbb_arrivals.withColumn("delay", 
                                             unix_timestamp("AT_FORECAST").cast(FloatType()) 
                                             - unix_timestamp("ARRIVAL_TIME").cast(FloatType()))
# Function to map negative and null delays to get a good lambda value
correct_neg_null = udf(lambda diff: 0.0001 if diff <= 0 else diff, FloatType())
# Add delay
df_sbb_arrivals = df_sbb_arrivals.withColumn("delay", correct_neg_null("delay"))

In [None]:
df_sbb_arrivals.cache()

We can compute all the lambdas for the arrival times at each station, this is a long computation. After doing it, values should be stored in a parquet file to avoid recomputing every time

In [None]:
df_lambdas = df_sbb_arrivals.groupBy("LINE_ID", "LINE_TEXT", "STOP_NAME", "weekday", "rush")\
                            .agg({"delay": "mean", "hour": "count"})\
                            .withColumnRenamed("avg(delay)", "lambda")\
                            .withColumnRenamed("count(hour)", "count")
df_lambdas.cache()

In [None]:
# Uncomment the following line if parquet file missing
#df_lambdas.write.parquet("lambda.parquet")
df_l = spark.read.parquet("lambda.parquet")

We join the lambdas and the original dataset. Now we have a dataset with the estimated delay for each line, need to consider if we change the lambda for lines where we dont have many entries or what should we do. Probably drop additional trip lines and cancelled trips.


In [None]:
df_sbb_lambda = df_sbb.join(df_l, (df_l.STOP_NAME == df_sbb.STOP_NAME) & 
                                  (df_l.LINE_ID == df_sbb.LINE_ID) & 
                                  (df_l.LINE_TEXT == df_sbb.LINE_TEXT) &
                                  (df_l.weekday == df_sbb.weekday) &
                                  (df_l.rush == df_sbb.rush))

In [None]:
# Drop the additional trips and cancelled trips
df_sbb_lambda = df_sbb_lambda.filter(col("count") > 10)

## 3. Route computation for every day
Here we use the arrival time of each (trip_id, day) pair to compute the different routes in chronological order by taking advantage of the the null date. We order by the arrival time, hence the station with the null date will be the first in the list. To do this we implement a window.

We also need to consider the case when the trip only covers 1 station in the zurich radius, hence if the number of stations in a trip is smaller than 2 we should not consider this line.


In [None]:
window_trip_day = Window.partitionBy("OPERATING_DAY", "TRIP_ID").orderBy(asc("ARRIVAL_TIME"))
window_total_stops = Window.partitionBy("TRIP_ID", "OPERATING_DAY")
stop_num = rank().over(window_trip_day).alias("stop_num")
num_stops = functions.max("stop_num").over(window_total_stops).alias("total_stops")

In [None]:
# We add the stop numbers and filter the trips where there is only one stop considered
df_sbb = df_sbb.withColumn("stop_num", stop_num)\
               .withColumn("total_stops", num_stops) \
               .filter(col("total_stops") > 1).drop("total_stops")

### 3.1 Compute the time difference between 2 stations

We need to compute the time it takes to go from one station to the next in a given trip, to do this we use the arrival time of the next station minus the departure time of the current station for a given trip_id, operating_day pair.

In [None]:
# We do a rolling window to compute the time difference
next_time_arrival = lead("ARRIVAL_TIME").over(window_trip_day).alias("next_time_arrival")
next_stop_f = lead("STOP_NAME").over(window_trip_day).alias("next_stop_name")
# Add to each row the time of arrival to the next station
df_sbb = df_sbb.select("*", next_time_arrival, next_stop_f)
df_sbb = df_sbb.withColumn("time_to_next", unix_timestamp(df_sbb.next_time_arrival) - 
                           unix_timestamp(df_sbb.DEPARTURE_TIME))
# Keep only valid data
df_sbb = df_sbb.filter(df_sbb.time_to_next.isNotNull() & (df_sbb.time_to_next >= 0))

Here we transform departure/arrival times to HH:mm format

In [None]:
columns_to_transform = ["DEPARTURE_TIME", "DT_FORECAST", "ARRIVAL_TIME", "AT_FORECAST", "next_time_arrival"]
df_sbb_times = df_sbb.withColumn('weekday_num', functions.dayofweek("OPERATING_DAY"))
for time in columns_to_transform:
    df_sbb_times = df_sbb_times.withColumn(time, date_format(time, "HH:mm"))

### 3.2 Create a working structure to find the shortest path
In order to compute the different paths we want from A to B, we need to construct a proper data structure where we can look at the different possible immediate movements. Immediate movements meaning the a direct trip from two adjacent stations.

We already created the different lambda list for the arrival delays by grouping the different possible trips by:
- "LINE_ID", "LINE_TEXT"
- "STOP_NAME", 
- "weekday", (here need to specify also for sunday as timetables are different)
- "rush"

And we'll do the same here. These with the addition of __the next stop__ will be the __index values__ for our data structure and in the different column values we will have:
- ``departures`` (all possible times during the day)
- ``next_arrivals`` (all possible arrival times during the day for the next stop)
- ``the time to the next station`` (we will take the median time of all the connexions from A to B)

The departures and the arrivals need to be sorted. To get the described datastructure we will also need to initially group by the operating day so that we dont get duplicates of a particular trip. After that we will group by the same values except the operating day and for the multiple lists we will take the one considered to be the median.

After having this datastructure we will also be able to link it with the uncertainty values in the lambda table.

In [None]:
df_connexions = df_sbb_times.groupBy("STOP_NAME", "next_stop_name", 'PRODUCT_ID', 
                                     "LINE_ID", "LINE_TEXT", "weekday_num")\
                            .agg(collect_list("DEPARTURE_TIME").alias("departures"),
                                 collect_list("next_time_arrival").alias("next_arrivals"),
                                 collect_list("time_to_next").alias("time_to_next_list")) 
df_connexions.cache()

In [None]:
# Sort arrays departures and next_arrivals
df_connexions = df_connexions.withColumn("departures", sort_array("departures"))\
                             .withColumn("next_arrivals", sort_array("next_arrivals"))\

After all this preprocessing, we can finally transform our data to pandas dataframes.

In [None]:
# Convert data to dataframes
dfp_connexions = df_connexions.toPandas()
dfp_lambda = df_l.toPandas()
dfp_metadata = df_metadata_filter.toPandas()

In [None]:
# Filter the lambda by count and save the dataframe
dfp_lambda_filtered = dfp_lambda[dfp_lambda["count"] > 50].drop(columns=["count"])
dfp_lambda_filtered.to_pickle('Ressources/lambdas')

### 3.3 Prepare data for Graph

We filter the edges by recurrence and make them unique.

In [None]:
def make_edges(row):
    """ This function unites departure/arrival/time columns in one """
    s = set()
    for (d, a, t) in zip(row['departures'], row['next_arrivals'], row['time_to_next_list']) :
        time_d = to_time(d)
        time_a = to_time(a)
        if time_d <= time_a:
            s.add((time_d, time_a, t))
    return s

In [None]:
# Define the smallest number of trip appearance
MIN_NUMBER_REAL = 3

In [None]:
# Filter null values and links to itself
dfp_connexions_f = dfp_connexions[(dfp_connexions['STOP_NAME'].notna()) & (dfp_connexions['next_stop_name'].notna())]
dfp_connexions_f = dfp_connexions_f[(dfp_connexions_f['STOP_NAME'] != dfp_connexions_f['next_stop_name'])]
# We take only trips that appear many times. The other one might just be extraordinary trips
dfp_connexions_f = dfp_connexions_f[dfp_connexions_f['departures'].apply(lambda l : len(l) >= MIN_NUMBER_REAL)]
# Create edges for the graph
dfp_connexions_f['edges'] = dfp_connexions_f.apply(make_edges , axis = 1)
dfp_connexions_f['edges'] = dfp_connexions_f['edges'].apply(list)
dfp_connexions_f = dfp_connexions_f.drop(columns = ['departures', 'next_arrivals', 'time_to_next_list'])

Now we flatten the dataframe in order to have only 1 row per trip.

In [None]:
dfp_flatten = dfp_connexions_f.edges.apply(pd.Series) \
                              .merge(dfp_connexions_f, right_index = True, left_index = True) \
                              .drop(["edges"], axis = 1) \
                              .melt(id_vars = ['STOP_NAME', 'next_stop_name', 'PRODUCT_ID', 
                                               'LINE_ID', 'LINE_TEXT', 'weekday_num'], value_name = "edges") \
                              .drop("variable", axis = 1).dropna()
dfp_flatten = dfp_flatten[dfp_flatten['edges'].apply(len) == 3]
dfp_flatten['departure'] = dfp_flatten['edges'].apply(lambda e : e[0])
dfp_flatten['arrival'] = dfp_flatten['edges'].apply(lambda e : e[1])
dfp_flatten['time'] = dfp_flatten['edges'].apply(lambda e : e[2])
dfp_flatten = dfp_flatten.sort_values(by = 'departure',axis = 0)

### 3.4 Build edges

#### 3.4.1 Transfer edges
When going from a station to itself with a different line.

In [None]:
def transfer_time(node1, node2):
    """ This function defines the transfer time needed depending
        on the different transport types changes. """
    old_type, new_type = node1[5], node2[5]
    old_line_id, new_line_id = node1[3], node2[3]
    old_line_text, new_line_text = node1[4], node2[4]
    time = 0
    # We check the changes in the type of transportation
    if old_type == None:
        time = 0   
    elif old_type == new_type == 'Zug':
        time = 2 
    elif old_type == new_type == 'Bus': 
        time = 1
    elif old_type == new_type == 'Tram': 
        time = 1   
    elif old_type == new_type:
        time = 2    
    else :
        time = 3
    # We check whether the line_id and line_text are the same
    if old_line_id == new_line_id and old_line_text == new_line_text:
        time = 0
    return timedelta(minutes = time)

In [None]:
cols = ['LINE_ID', 'LINE_TEXT', 'PRODUCT_ID', 'departure']
dfp_departures = dfp_flatten.groupby(['STOP_NAME', 'weekday_num'])[cols].agg(list).reset_index()
cols = ['LINE_ID', 'LINE_TEXT', 'PRODUCT_ID', 'arrival']
dfp_arrivals = dfp_flatten.groupby(['next_stop_name', 'weekday_num'])[cols].agg(list).reset_index()

In [None]:
df_connex_joined = dfp_arrivals.merge(dfp_departures,
                                      left_on=['next_stop_name', 'weekday_num'],
                                      right_on = ['STOP_NAME', 'weekday_num'],
                                      suffixes=('_arr','_dep'))\
                               .drop(columns = ['next_stop_name'])

#### 3.4.2 Walking time edges
We consider it is possible to do a connexion between different lines by walking when the distance is not too big. Here for simplicity we will consider the distance bewteen two stations is walkable when the distance is smaller or equal to 1km. Also we consider the average human walking speed of 5 km/h. We need to create a dataframe with the walking distances between stations.

In [None]:
dfp_walking = dfp_metadata.drop(columns=["distance", "id"])
dfp_walking['join'] = 1
dfp_walking = dfp_walking.set_index("join")
dfp_pairs = dfp_walking.join(dfp_walking, lsuffix="", rsuffix="_2")

In [None]:
def station_pair(row):
    if row["STOP_NAME"] > row["STOP_NAME_2"]:
        return row["STOP_NAME"] + ", " + row["STOP_NAME_2"]
    return row["STOP_NAME_2"] + ", " + row["STOP_NAME"]

In [None]:
dfp_pairs["stop_pair"] = dfp_pairs.apply(station_pair, axis=1)
dfp_pairs = dfp_pairs.drop_duplicates(subset="stop_pair").drop(["z", "z_2"], axis=1)
dfp_pairs = dfp_pairs.reset_index().drop(columns=["stop_pair", "join"])
# Filter out pairs with same stop name
dfp_pairs = dfp_pairs[dfp_pairs.STOP_NAME != dfp_pairs.STOP_NAME_2]
# Get distance between pair of stations
dfp_pairs["dist"] = dfp_pairs.apply(lambda row: distance_to(row["y"], row["x"], row["y_2"], row["x_2"]), axis=1)
# Drop pairs where distance is smaller than 1 km
dfp_pairs = dfp_pairs[dfp_pairs["dist"] <= 1]
dfp_pairs["walking_time_secs"] = dfp_pairs["dist"] * 60*60 / 5

### 3.5 Compute graph
Each node is specified by (Stop name, day of week, Time, Line id, Line Text, Transport Type)

Each edge is specified by (from_node, to_node,  {
- 'trip' : The time of the trip (will be non null if it is in a public transport)
- 'wait' : Waiting time at station (will be non null if it is from one station to itself but later)
- 'walk' : Transfer time inside a station (walking from one train to other, or bus to train...)})

In [None]:
def construct_graph(graph_filename):
    G = nx.DiGraph()
    
    # First compute all the edges that are a trip (using Bus, train, tram...)
    trip_edges_to_add = []
    i = 0
    size_trips = len(dfp_flatten)
    for index, r in dfp_flatten.iterrows():
        e = r['edges'] 
        node_from = (r['STOP_NAME'], r['weekday_num'], e[0], r['LINE_ID'], r['LINE_TEXT'], r['PRODUCT_ID'])
        node_to = (r['next_stop_name'], r['weekday_num'], e[1], r['LINE_ID'], r['LINE_TEXT'], r['PRODUCT_ID'])
        labels = {'trip' : e[2], 'wait' : 0, 'walk' : 0}
        trip_edges_to_add.append((node_from, node_to, labels))
        i += 1
        if i%1000 == 0:
            print("Building trips: %.2f%%" % (i*100/size_trips), end= '\r')
            sys.stdout.flush()
    print("Adding trip edges...")
    G.add_edges_from(trip_edges_to_add)
    
    name2nodes = get_name_to_nodes(G)
    
    # Second add all the edges that are a transfer/waiting time (same station, different line)
    transfer_edges_to_add = []
    j = 0
    size_transfers = len(G.nodes())
    for node_from in G.nodes():
        nodes_time = get_nodes_time(node_from[0], node_from[2], add_timedelta(node_from[2], timedelta(seconds=600)),
                                    node_from[1], name2nodes)
        for node_to in nodes_time:
            trans_time = transfer_time(node_from, node_to)
            if(add_timedelta(node_from[2],trans_time) <= node_to[2]):
                wait_time = substract_times(node_to[2], add_timedelta(node_from[2], trans_time)).seconds
                labels = {'trip' : 0, 'wait' : wait_time, 'walk' : trans_time.seconds}   
                transfer_edges_to_add.append((node_from, node_to, labels))
        j += 1
        if j%1000 == 0:
            print("Building transfers/waitings: %.2f%%" % (j*100/size_transfers), end= '\r')
            sys.stdout.flush()
    
    print("Adding transfer edges...")
    G.add_edges_from(transfer_edges_to_add)
                
    print('Saving...')
    with open(graph_filename, 'wb') as handle:
        pickle.dump(G, handle, protocol=pickle.HIGHEST_PROTOCOL)
    print("Done")
    
    return G

In [None]:
graph_filename = 'Ressources/graph.pickle'
try:
    f = open(graph_filename, 'rb')
    print("Loading Graph...")
    with f as handle:
        G = pickle.load(handle)  
    print("Done")
except IOError:
    G = construct_graph(graph_filename)

In [None]:
print('Number of nodes: {}'.format(len(G.nodes)))
print('Number of edges: {}'.format(len(G.edges)))

For testing, you can now use Robust_Journey_Planner-Results_Vizualisation.ipynb that loads the Graphe built and plot the results