# Final Homework : Route planning in Zürich

## 1. Initializing the environment (Spark, HIVE, packages)

This notebook contains the entirety of our work for the final project of the COM-490 course. The aim is to build a robust journey planner in Zürich's area, allowing a user to enquire for an arrival time and a Q value (besides both departure and arrival stop), for which we should output a list of routes arriving no later than the enquiried arrival time, and with a probability of completing the journey being larger than this Q value.


<div class="alert alert-warning">
  <strong>Warning!</strong> Initializing and running a PySpark session is not necessarely needed if you only want to run the core algorithm itself. If you however want to retrace how we preprocessed the data and made assumptions, feel free to do so.
</div>


In [None]:
import os
from IPython import get_ipython
username = os.environ['RENKU_USERNAME']
server = "http://iccluster044.iccluster.epfl.ch:8998"

get_ipython().run_cell_magic(
    'spark',
    line='config',
    cell="""{{ "name": "{0}-final-project","executorMemory": "4G", "executorCores": 4, "numExecutors": 10, "driverMemory": "4G"}}""".format(username)
)

In [None]:
get_ipython().run_line_magic(
    "spark", f"""add -s {username}-final-project -l python -u {server} -k"""
)


In [None]:
%%spark
print('We are using Spark %s' % spark.version)

In [None]:
%%spark
import os
import math
import datetime
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window

pd.set_option("display.max_columns", 50)
                                                                                                
import plotly.express as px
import plotly.graph_objects as go

In [None]:
import os
import math
import datetime
import pandas as pd
import numpy as np
from scipy.stats import norm

pd.set_option("display.max_columns", 50)
                                                                                                
import plotly.express as px
import plotly.graph_objects as go

We are now ready to start handling the data itself.

## 2. Data preprocessing with Pyspark

Before going into our exact method about finding the best journeys, we need to process the `istdaten` table as well as the other timetables we are given according to what we are asked to do. We were indeed given some assumptions in order to simplify the journey planning and these will be the ones guiding this data preprocessing step. More precisely, there will be both some spatial and temporal preprocessing steps to fulfill the assumptions.

### 2.1 Loading the timetable data

We first load the plain timetables located in the `/data/sbb/part_csv/timetables/` path. 
The only preprocessing done in the following cells is to take the tables only for the most recent schedule (Of the 3rd May 2023), as it is the weekly schema we will consider in our work.

Loading the `stops` table that contains the information about the locations of the stops that are documented by the SBB :

In [None]:
%%spark
#Loading the stops.txt table
stops = spark.read.csv("/data/sbb/part_csv/timetables/stops",header=True)
stops = stops.filter((stops.year == 2023) & (stops.month == 5) & (stops.day == 3)).drop("year","month","day")
stops.show(n=5)

Loading the `stop_times` table, containing information about many informations about connections between the stops 

In [None]:
%%spark
stop_times = spark.read.csv("/data/sbb/part_csv/timetables/stop_times",header=True)
stop_times = stop_times.filter((stop_times.year == 2023) & (stop_times.month == 5) & (stop_times.day == 3)).drop("year","month","day")
stop_times.show(n=5)

Loading the `trips` dataframe, containing metadata about the trips listed by the SBB CFF

In [None]:
%%spark
trips = spark.read.csv("/data/sbb/part_csv/timetables/trips",header=True)
trips = trips.filter((trips.year == 2023) & (trips.month == 5) & (trips.day == 3)).drop("year","month","day")
trips.show(n=5)

Finally, loading the `calendar` data that contains information about the regularity/daily patterns of the trips.

In [None]:
%%spark
calendar = spark.read.csv("/data/sbb/part_csv/timetables/calendar",header=True)
calendar = calendar.filter((calendar.year == 2023) & (calendar.month == 5) & (calendar.day == 3)).drop("year","month","day")
calendar.show(n=5)

In [None]:
%%spark
routes = spark.read.csv("/data/sbb/part_csv/timetables/routes",header=True)
routes = routes.filter((routes.year == 2023) & (routes.month == 5) & (routes.day == 3)).drop("year","month","day")
routes.show(n=5)

### 2.1 Spatial processing

#### 2.1.1 Filtering the stops inside a 15km radius

One of the first very important steps is to limit the stops we're studying to the ones belonging to a 15km radius around Zürich. We will thus define two quick functions that will help us retrieve the distance between the coordinates of a certain point in space and Zürich's. 

The inspiration for the code has been taken from https://stackoverflow.com/questions/24680247/check-if-a-latitude-and-longitude-is-within-a-circle-google-maps and quickly adapted to python.

In [None]:
%%spark
def distance_computer(ctr_lat,ctr_lon,pt_lat,pt_lon) :
    """ Computes the distance (euclidean) between (lat,lon) pairs of two points in space.
    
    
    Parameters:
    ctr_lat (double): Latitude of the center point,
    ctr_lon (double): Longitude of the center point,
    pt_lat (double): Latitude of a certain point of interest,
    pt_lon (double): Longitude of a certain point of interest,
    earth_circ(double) : The circumference of the earth, used for geometry computations.

    Returns:
    dist_in_km : The distance in km between the center point and the point of interest

    """
    earth_circ = 40075.0
    km_per_deg_lat = earth_circ / 360.0
    km_per_deg_lon = math.cos(math.pi * float(ctr_lat) / 180.0) * km_per_deg_lat
    dx = np.abs(float(ctr_lon)-float(pt_lon)) * km_per_deg_lon
    dy = np.abs(float(ctr_lat)-float(pt_lat)) * km_per_deg_lat
    dist_in_km = math.sqrt(dx*dx + dy*dy)
    return(dist_in_km)

def dist_from_zurich(stop_lat,stop_lon) :
    """ Computes the distance (euclidean) between Zürich and a certain point in space.

    Parameters:
    pt_lat (double): Latitude of a certain stop/point of interest,
    pt_lon (double): Longitude of a certain stop/point of interest,

    Returns:
    zur_dist : The distance in km between Zürich and the point we are interested in.
    """
    earth_circ = 40075
    zur_lat = 47.378177
    zur_lon = 8.540192
    dist_from_zurich = distance_computer(zur_lat,zur_lon,stop_lat,stop_lon)
    return(dist_from_zurich) 

#Creating a user-defined function based our distance computing method
dist_from_zur = F.udf(dist_from_zurich, T.FloatType())
compute_distance = F.udf(distance_computer,T.FloatType())

We first filter the `stops` dataframe to obtain`close_stops`, which only contains the stops within a $15km$ radius around Zurich. These close stops will be the stops in which the user will be able to choose a source and a destination stop.


The `dist_from_zur` function first allows us to first filter the `stops` table in keep only the `STOP_ID`s that are inside this $15$km radius around Zürich's coordinates :

In [None]:
%%spark
#Filter the stops.txt based (not created yet) table to keep only the ones where the dist_from_zur function outputs a number that is <15.
filtered_stops = stops.withColumn("distance_from_zur",dist_from_zur(stops.stop_lat,stops.stop_lon))
close_stops = filtered_stops.filter(F.col("distance_from_zur") <= 15.0)
close_stops.show(n=5)

Now that we have our close stops `STOP_ID`s, we filter the `stop_times` dataframe in the same manner :

In [None]:
%%spark
#Filter the stop_times based table in order to only keep the trips where the STOP_IDs are the same as the ones just found
# in the cell above.

close_stop_times = stop_times.join(close_stops.select("stop_id"), on="stop_id",how = "inner").drop("pickup_type","drop_off_type")
close_stop_times.show(n=5)

#### 2.1.2 Trying to retrieve transfer stops

However, one of the assumptions of our problem is that we may also use some stops outside the $15km$ radius as transfer stops. Therefore, by removing all the stops further than $15km$ than Zürich, we also rule out these transfer-only stops. It is for instance likely that some of these trips will leave this $15km$ radius for one or two stops before coming back into the limits of our problem. 

Implementing this is not that easy, so we decided to focus on a certain type of trips. More precisely, we look for all of the trips passing by our close stops but that can not be completed only with stops located within the 15km radius: they go, at some point, slightly out of this 15km radius.

Concretely, we aim to find the trips for which the `STOP_SEQUENCE` ids have a missing entry when considering only the close stops filtered earlier. This way, we aim to identify the stops that are most likely not very far away from the $15km$ radius and that are crossed by some trips that go back into the 15km radius : they can thus serve as transfer stops. We illustrate the idea in the figure below :

<img src="../figs/Outside_the_radius.png" width="800"/>

 _Figure 1._ Illustration of the stops found by our method.
 The blue circle represents the 15km radius around Zürich, an example trip with one stop outside the 15km radius is shown in orange, with the stop that we will retrieve and add to our stops database is circled in black.

We thus want to identify the trips that quickly go out of the 15km radius before going back in, if any. We define a `trip_id` based window, ordered by `stop_sequence`. Using the lead and lag functions, for each `stop_sequence`, we define new columns with the `stop_sequence` value of the previous/next stop on the trip. That way, by quickly looking at the difference between the current and previous/next `stop_sequence` value, we can easily tell if there is one `stop_sequence` is missing from the trip (a jump from 3 to 5, for instance).

In [None]:
%%spark
close_stop_times = close_stop_times.withColumn('stop_sequence',F.col('stop_sequence').cast('int'))
trip_window = Window.partitionBy('trip_id').orderBy('stop_sequence')


lag_stop_seq = F.lag(F.col("stop_sequence")).over(trip_window)
lead_stop_seq = F.lead(F.col("stop_sequence")).over(trip_window)

tmp_lag_lead = close_stop_times.withColumn('lag_stop_seq',lag_stop_seq)\
                               .withColumn('lead_stop_seq',lead_stop_seq)\
                               .withColumn('curr_lag_diff', F.col('stop_sequence')-F.col('lag_stop_seq'))\
                               .withColumn('curr_lead_diff', F.col('lead_stop_seq')-F.col('stop_sequence'))

tmp_lag_lead.show(n=5)

We then only select the ones for which the difference between the current `stop_sequence` value and the previous/next one is equal to 2 :

In [None]:
%%spark
missing_stop_seq_trips  = tmp_lag_lead.filter(
    ((F.col('curr_lag_diff') == 2) | (F.col('curr_lead_diff') == 2 )) &
      ((F.col('curr_lag_diff').isNotNull()) & (F.col('curr_lead_diff').isNotNull()))
).orderBy('trip_id')

missing_stop_seq_trips.show(n=5)

We retrieve, for each `trip_id`, the missing `stop_sequence`  values (They're only in two different columns previous/next in the output of the cell below for more simplicity when manipulating the PySpark dataframes).

In [None]:
%%spark
missing_stops = missing_stop_seq_trips.withColumn('prev_missing_stop_seq', F.when(F.col('curr_lag_diff') == 2, F.col('stop_sequence') -1))\
                                      .withColumn('next_missing_stop_seq', F.when(F.col('curr_lead_diff') == 2, F.col('stop_sequence') +1))\
                                      .drop("stop_id","stop_sequence","arrival_time","departure_time","lag_stop_seq","lead_stop_seq","curr_lag_diff",
                                           "curr_lead_diff")
missing_stops.show(n=5)

Now that we know, for each trip, exactly which `stop_sequence` value is missing, we identify all the stops corresponding to these missing values : 

In [None]:
%%spark
transfer_stops = stop_times.join(missing_stops,on = "trip_id", how = "inner")\
                              .filter( (stop_times.stop_sequence == missing_stops.prev_missing_stop_seq) |
                                       (stop_times.stop_sequence == missing_stops.next_missing_stop_seq))\
                              .orderBy("trip_id").select("stop_id").distinct()

transfer_stops.show(n=10)

We thus only obtain 4 distinct stops, but through which many different trips go in and out the $15km$ radius. These stops can still therefore be interesting to add to our pool of stops through which the users can travel (but that can not be source or departure stops). 

Let's double check if these transfer stops are indeed further than $15km$ from Zürich :

In [None]:
%%spark
transfer_stops_dists = stops.join(transfer_stops, on = "stop_id" , how = "inner")
transfer_stops_dists = transfer_stops_dists.withColumn("distance_from_zur",dist_from_zur(transfer_stops_dists.stop_lat,transfer_stops_dists.stop_lon))
transfer_stops_dists.show(n=5)

In [None]:
%%spark
all_stops = close_stops.union(transfer_stops_dists)

In [None]:
%%spark
all_stops.repartition(1).write.option('header',True).csv("/user/rochepea/all_stops.csv" , mode="overwrite")

As expected, our transfer-only stops are indeed just outside the 15km radius. We then add them these stops to our `close_stops` and thus our `close_stop_times`, in order to obtain the `valid_stop_times` containing all the trips/stops that we decide to keep based on the spatial assumptions we were given :

In [None]:
%%spark
transfer_stop_times = stop_times.join(transfer_stops, on = "stop_id", how="inner").drop("pickup_type","drop_off_type")
transfer_stop_times.show(n=5)

In [None]:
%%spark
valid_stop_times = close_stop_times.union(transfer_stop_times)
valid_stop_times.show(n=5)

### 2.2 Temporal processing

As stated in the project description, we also need to be only considering journeys that are on reasonable hours of a typical business day. 
First, we use the `calendar` table to filter all the `SERVICE_ID`s, i.e group of trips that take place on normal business days, from Monday to Friday.

In [None]:
%%spark
# Filter the calendar.txt based table to keep only the group of trips only happening on business days.
usual_calendar = calendar.filter((F.col("monday") == 1) &
                                 (F.col("tuesday") == 1) &
                                 (F.col("wednesday") == 1) &
                                 (F.col("thursday") == 1) &
                                 (F.col("friday") == 1) & 
                                 (F.col("saturday") == 0) &
                                 (F.col("sunday") == 0))            

We then join the `calendar` table to the `trips` one based on the recently filtered `SERVICE_ID` key to obtain the `regular_trips` table :

In [None]:
%%spark
# Build the regular_trips table 
regular_trips = trips.join(usual_calendar, on = "service_id", how = "inner")

This first joined table thus only contains the group of trips taking place from Monday to Friday on a daily basis, i.e the regular ones (as well as the other columns of both the `calendar` and `trips` table, but we will get back to them later). 

Now, let us only select the specific trips that take place in reasonable hours. We decide to define these reasonable hours as being from 7AM to 9PM and we thus filter the `close_stop_times` accordingly :

In [None]:
%%spark
# Filter the stop_times table to keep only the rows where (DEPARTURE_TIME & ARRIVAL_TIME) belong to [7AM,9PM]
valid_stop_times = valid_stop_times.withColumn('departure_time',F.date_format(F.to_timestamp('departure_time', "HH:mm:ss"),"HH:mm:ss"))\
                                   .withColumn('arrival_time',F.date_format(F.to_timestamp('arrival_time', "HH:mm:ss"),"HH:mm:ss"))
regu_valid_stop_times = valid_stop_times.filter((F.hour("departure_time") >= 7) &
                                                (F.hour("arrival_time") >= 7) &
                                                (F.hour("departure_time") <= 21) &
                                               (F.hour("arrival_time") <= 21)
                                               )
regu_valid_stop_times.show(n=5)

Now, we join the filtered `stop_times` and the `regular_trips` tables with respect to the `TRIP_ID` key, which characterizes the unique instance of a certain trip. This way, we obtain another table `valid_trips` that only contains the trips that travel following a regular basis during the 7AM-9PM shift from Monday to Friday in a 15km (without transfer stops) radius around Zürich : 

In [None]:
%%spark
#Build the valid_trips table as explained just above.
valid_trips = regu_valid_stop_times.join(regular_trips, on = "trip_id", how = "inner").drop('trip_headsign','trip_short_name')
valid_trips = valid_trips.withColumn('stop_sequence',F.col('stop_sequence').cast('int'))
valid_trips.show(n=5)

### 2.3 Additional preprocessing :

We use the spatial and temporal preprocessing in order to build three very important tables that will be used in our journey-finding algorithm.

#### 2.3.1 Setting up all the existing connections in the network :

We first, based on our `valid_trips` table, define all the connections that link the different stops in the Zürich area. We characterize a connection by the `trip_id` it belongs to, the departure stop and time of the connection as well as the arrival stop and time of the latter. We build the `connections` dataframe in the following manner :


- Take our `valid_trips` and define a window that partitions it by the `trip_id`, ordered by `arrival_time`.
- Then, thanks to the F.Lead function, for each row, we fetch the `arrival_stop` and the `arrival_time` of the next stop on the trip.
- We set the rows with the current trip, stop and departure time, and add these arrival stops and times.

In [None]:
%%spark
trip_window = Window.partitionBy('trip_id').orderBy('arrival_time')
arrival_stop = F.lead("stop_id").over(trip_window).alias("arrival_stop")
arrival_time = F.lead("arrival_time").over(trip_window).alias("arrival_time")

connections = valid_trips.select('trip_id',F.col('stop_id').alias('departure_stop'),'departure_time',arrival_stop,arrival_time)
connections = connections.filter(F.col('departure_time').isNotNull() & F.col('arrival_time').isNotNull())
connections = connections.orderBy('departure_time')
connections.show(n=5)

In [None]:
#%%spark
#connections.repartition(1).write.option('header',True).csv("/user/rochepea/connections.csv" , mode="overwrite")

#### 2.3.2 Finding all walking paths between our stops :

As stated in the project description, we have to take into account the fact that walking from one stop to the other is totally possible. We use the compute_distance function defined earlier on all our valid stops (transfer stops + <15km stops) to only keep the ones closer than 500m from each other. In addition to both stop ids and names, we also retrieve the distance between the two as well as the travel time.

In [None]:
%%spark
transfer_stops_info = stops.join(transfer_stops, on = 'stop_id', how = 'inner')
valid_stops = transfer_stops_info.union(close_stops.drop('distance_from_zur')).drop('location_type','parent_station')


tmp_valid_stops = valid_stops.alias('a').crossJoin(valid_stops.alias('b'))


tmp_valid_stops = tmp_valid_stops.withColumn("distance_btw_stops",compute_distance(F.col('a.stop_lat'),F.col('a.stop_lon'),F.col('b.stop_lat'),F.col('b.stop_lon')))
footpaths = tmp_valid_stops.filter( (F.col("distance_btw_stops") < 0.5) &
                  (F.col('a.stop_name') != F.col('b.stop_name')))\
                      .select(F.col('a.stop_id').alias('first_stop_id'),
                              F.col('a.stop_name').alias('first_stop_name'),
                              F.col('b.stop_id').alias('second_stop_id'),
                              F.col('b.stop_name').alias('second_stop_name'),
                              F.col('distance_btw_stops').alias('distance'))

footpaths = footpaths.withColumn('duration',F.col('distance')/0.05)
footpaths.show(n=5)

In [None]:
#%%spark
#footpaths.repartition(1).write.option('header',True).csv("/user/rochepea/footpaths.csv" , mode="overwrite")

#### Retrieve the transport types of the trips of our connections

For vizualisation purposes, we also want to be able to know which transport type each of the `trip_id`s corresponds to (Bus, Train, Metro, etc.)

We thus join the `trips` original table based on the unique `trip_id`s present in our connections table. Then, thanks to the `route_id` column for each trip, we look at the corresponding `ROUTE_DESC` column in the `routes` table.

In [None]:
%%spark
connections_trips = trips.join(connections, on ='trip_id', how = 'inner')
joined_trips = connections_trips.join(routes, on = 'route_id')
transport_types = joined_trips.select(connections_trips['trip_id'],routes['route_desc'])
transport_types = transport_types.dropDuplicates(['trip_id'])
transport_types.show(n=5)

In [None]:
#%%spark
#transport_types.repartition(1).write.option('header',True).csv("/user/rochepea/transport_types.csv" , mode="overwrite")

### 2.4 Preparing the delays information with the SBB data

In order to take into account the risk of the user missing a certain connection when planning an itinerary, we will use the previous years information from the `istdaten` table to retrieve historical delays for the stops of our network around Zürich. For these delays to be as useful as possible to product a robust journey planner, we will consider the following when building them :

- The delays we will base ourselves on are the **arrival delays**, meaning the difference between the expected arrival time and the actual arrival time, for each stop in our network.
- For each connection, the delay will most likely depend on the transport type, the day during which the connection is taken and the time during the day as well (peak/off hours). These are all parameters that we will take into account when computing the different delay characteristics (mean,the variance etc.) for each connection.



We first load the plain SBB data :

In [None]:
%%spark
sbb_data = spark.read.orc("/data/sbb/part_orc/istdaten")
sbb_data.show(n=5)

We tidy our SBB data by only keeping rows with information that we can use to compute the delays. That is:
- the rows with the `betreibstag`,`produkt_id` having valid values,
- `an_prognose_status` being `REAL`, `GESCHAETZT` or `PROGNOSE`. Ultimately, we would want to only compute the delays thanks to effective arrival time, but we still keep the other values in case some transport means or stops do not have enough real values to compute the delays, in which case we will use the forecasted or estimated ones.
- We also only keep the rows where `ANKUNFTSZEIT` and `AN_PROGNOSE` have valid values, usable to compute delays,
- We only keep the trips that have been completed and that are not additional non-regular trips (`FAELLT_AUS_TF` & `ZUSATZFAHRT_TF` = false)
- We take the sbb data starting from 2021, as two years of data should be enough to obtain delay estimates and we need some recent data to have a good estimation of the current delays.
- We only select the rows where `bpuic` corresponds to valid `STOP_ID`s found earlier

In [None]:
%%spark
valid_stop_ids = valid_trips.select("stop_id").distinct()

clean_sbb_data = sbb_data.filter((F.col('betriebstag').like('__.__.____')) &
                           (F.col('produkt_id').isNotNull()) &
                           (F.length(F.col('produkt_id')) > 0) &
                           (F.col('an_prognose_status').isin('PROGNOSE','REAL','GESCHAETZT')) &
                           (F.col('ankunftszeit').isNotNull()) &
                           (F.length(F.col('ankunftszeit')) > 0) &
                           (F.col('an_prognose').isNotNull()) &
                           (F.length(F.col('an_prognose')) > 0) &
                           (F.col('ab_prognose').isNotNull()) &
                           (F.length(F.col('ab_prognose')) > 0) &
                           (F.col('faellt_aus_tf') == 'false') &
                           (F.col('zusatzfahrt_tf') == 'false') &
                           (F.col('year').isin(2021,2022,2023)))\
                           .drop('betreiber_abk','betreiber_name','linien_id','linien_text','umlauf_id','verkehrsmittel_text',
                                 'betreiber_id', 'durchfahrt_tf')\
                           .withColumnRenamed('bpuic','stop_id')

clean_sbb_data = clean_sbb_data.join(valid_stop_ids, on = "stop_id", how = "inner")
clean_sbb_data.show(n=5)

We also want to only compute the delays we need, i.e the ones for the trips running in the $[7AM,9PM]$ interval. We thus modify our sbb_data to only keep the rows where this criteria is verified :

In [None]:
%%spark
usable_sbb_data = clean_sbb_data.filter(
    (F.substring('an_prognose', 12, 2).cast('integer') >= 7) &
    (F.substring('an_prognose', 12, 2).cast('integer') <= 21) &
    (F.substring('ab_prognose', 12, 2).cast('integer') >= 7) &
    (F.substring('ab_prognose', 12, 2).cast('integer') <= 21)
)
usable_sbb_data.show(n=5)

Now that our `sbb_data` has been cleaned for use, we can look at the nature of the arrival times computation status for the different types of transport, i.e if the `AN_PROGNOSE_STATUS` is `REAL`, `PROGNOSE` or `GESCHAETZT`. The aim is to identify any transport type for which using the real measured values of arrival times would not yield enough data to obtain a decent estimate of the delays at the different stops.

In [None]:
%%spark
# Group by 'produkt_id' and pivot on 'an_prognose_status' column
count_df = usable_sbb_data.groupBy('produkt_id').pivot('an_prognose_status').count()

# Rename the columns for better readability
count_df = count_df.withColumnRenamed('REAL', 'real_count') \
                   .withColumnRenamed('PROGNOSE', 'prognose_count') \
                   .withColumnRenamed('GESCHAETZT', 'geschaetzt_count')

# Display the resulting DataFrame
count_df.show()

We can observe that for the trains, only using the `REAL` and thus measured arrival times shouldn't be an issue. However, for Trams and Buses, we have even more `PROGNOSE` arrival time status than `REAL` ones. We can therefore decide to keep rows having either `PROGNOSE` or `REAL` an_prognose_status values to compute our delays. We also decide to only keep reasonable delay values $<1h$ and set all negative delays (i.e stop reached before the scheduled time) to 0. We also set the day of the week and the hour during which the connection happens, for the next step of the delays preprocessing.

In [None]:
%%spark
usable_sbb_data = usable_sbb_data.withColumn('actual_arr_time',F.unix_timestamp('an_prognose', 'dd.MM.yyyy HH:mm:ss'))
usable_sbb_data = usable_sbb_data.withColumn('scheduled_arr_time', F.unix_timestamp('ankunftszeit','dd.MM.yyyy HH:mm'))

delays_df = usable_sbb_data.withColumn('arr_delay',(usable_sbb_data.actual_arr_time - usable_sbb_data.scheduled_arr_time)/60)
delays_df = delays_df.select('stop_id',F.col('betriebstag').alias('date'),F.col('fahrt_bezeichner').alias('trip_id'),F.col('produkt_id').alias('transport_type'),
                             F.col('haltestellen_name').alias('stop_name'),'actual_arr_time','scheduled_arr_time','arr_delay',
                             'an_prognose_status')
                           
delays_df = delays_df.withColumn('arr_delay', F.when(F.col('arr_delay') <0,0).otherwise(F.col('arr_delay')))
delays_df = delays_df.filter((F.col('arr_delay') < 60) & (F.col('an_prognose_status').isin('REAL','PROGNOSE')))

delays_df = delays_df.withColumn('arrival_hour',F.date_format(F.from_unixtime(delays_df.scheduled_arr_time),'H'))
delays_df = delays_df.withColumn('weekday',F.date_format(F.from_unixtime(delays_df.scheduled_arr_time),'EEEE'))

delays_df.show(n=5)

In [None]:
%%spark

std_value_delays = delays_df.select(F.stddev('arr_delay')).first()[0]
mean_value_delays = delays_df.select(F.mean('arr_delay')).first()[0]
print(mean_value_delays)
print(std_value_delays)

These values: std_value_delays and mean_value_delays will be used later to calculate the probability of being late for stop_id without a known delay.
In order to avoid rexecting everything, we will simply assign the values that we found to the variables in the function where it's needed: 3.3.3 Delays & Confidence Intervals

In [None]:
mean_value_delays = 1.227
std_value_delays = 1.428

As previously explained, we now decide to distribute our delays for each arrival stop in function of the transport type, the day of the week and the hour during which the connection takes place. For instance, by taking the `delays_df` just above, 'Kloten Waldeggweg' will have for each transport type, each day of the week and each hour between 7AM and 9PM, a mean delay and a variance of the delays obtained thanks to the historical SBB istdaten data.

In [None]:
%%spark
delays_stats_df = delays_df.groupBy('stop_id','transport_type','arrival_hour','weekday')\
                  .agg(F.mean('arr_delay').alias('mean_delay'),
                       F.stddev('arr_delay').alias('stddev_delay'),
                       F.count('arr_delay').alias('sample_size'))
delays_stats_df.show(n=10)

These lines below are used to test out if, it is logic to take a normal distribution as being the distributions of the delays at each stop.

In [None]:
%%spark -o filtered_delays

# Filter delays for the desired group
group_filter = (F.col('stop_id') == '8502471') & (F.col('transport_type') == 'Bus') & (F.col('arrival_hour') ==7)  & (F.col('weekday') == 'Friday')
filtered_delays = delays_df.filter(group_filter).select('arr_delay')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde, norm

data = filtered_delays['arr_delay']

kde = gaussian_kde(data)

x = np.linspace(np.min(data), np.max(data), 100)

kde_pdf = kde(x)

mu, sigma = np.mean(data), np.std(data)
norm_pdf = norm.pdf(x, mu, sigma)

plt.plot(x, kde_pdf, label='KDE')
plt.plot(x, norm_pdf, label='Normal Distribution')
plt.xlabel('Data')
plt.ylabel('Probability Density')
plt.title('Probability Density Function')
plt.legend()
plt.show()


Our distribution of delays, here at one stop is therefore not really normal but close enough for us to assume a normal distribution. We can add that taking more hours to compute the delays (2h-gaps,3h-gaps, etc.) would get us closer to a normal distribution, but a mean delay during a 3hour shift does not really mean anything : the delay at 2pm should not be the same, at a stop, than during peak hours around 5pm. We thus decided to do it that way.

In [None]:
#%%spark
#delays_stats_df.repartition(1).write.option('header',True).csv("/user/rochepea/delays_stats_df.csv" , mode="overwrite")

## 3. A CSA-based approach

We reckon that the query to which our work is answering is the following : **What route from A to B is the fastest at least Q% of the time if I want to arrive at B before instant T ?**

In order to take advantage of the data we were given, we opted to heavily base our algorithm on the Connection Scanning Algorithm (CSA) introduced by Dibbelt et al. in 2018. Unless some methods such as the A* or Dijkstra's algorithms, CSA is not network/graph based and only iterates on a timetable containing connections that we're interested in.

Here, we consider a connection to be a vehicle (Bus, Train, Tram, Metro, etc.) that links two different stops without any halt. A connection is therefore characterized by 5 elements :
- The trip to which the connection belongs ($c_{trip}$)
- The connection's departure stop ($c_{dep\_stop}$)
- The connection's arrival stop ($c_{arr\_stop}$)
- The connection's departure time ($c_{dep\_time}$)
- The connection's arrival time ($c_{arr\_time}$)

### 3.1 Introduction to the CSA : the Earliest Arrival problem


Let us introduce the basic implementation of CSA for the Earliest Arrival problem : We want to obtain the earliest arrival time $\tau_{d}$ at a target stop $d$, given a source stop $s$ and a source departure time $\tau_{s}$.

The only preprocessing we need for the algorithm to work properly is to sort our connections timetable $C$ by **increasing order of departure time,** $c_{dep\_time}$, as well as a table containing all the existing footpaths between our stops (the ones that are <500m away in our case).

Two data structures (within the algorithm) are also needed to keep track of these operations :

- The tentative arrival times for each of our stops, which is an array $T$.
- The reachability flags for each trip (TRUE or FALSE values), stored in an array $R$. These flags indicates if the user has been able to hop on one of the connections in a particular trip.

CSA then works by iterating on this sorted timetable of connections only once while performing simple operations on each connection, based on these data structures. 
Very quickly, each connection of interest (following some starting/stopping criteria) is tested for reachability and if it is indeed reachable, the tentative arrival times for each stops in walking distance of the arrival stop of the connection are improved if possible.

A way more detailed and throughout explanation of the algorithm is provided in the short video explaining our project, but the pseudo-code explaining the earliest time CSA is shown just below.

<img src="../figs/EACSA.png" width="800"/>


### 3.2 Adaptation of the earliest arrival CSA to build a journey planner

#### 3.2.1 In theory

Choosing CSA as the base algorithm for the project left us with several challenges :
- Besides the two stops, the earliest arrival CSA takes a departure time as an input, while in our case, the user will only give an arrival time at the destination stop. 

- This version of the algorithm is also made to only output the arrival time at destination. However, in our case, we would like to be able to reconstruct a route from departure stop to arrival stop as well.

- The delays are not taken into account in the algorithm neither, while they are a very important part of the journey planning


Our method thus answers these three big challenges and constructs a journey planner that that works with only an arrival time, the departure and arrival stops as well as the Q% the user enquires for. 

1. First, we decided to keep the CSA in the "Earliest Arrival form", meaning that our `updatedCSA` function runs with a departure time as a parameter. We brought modifications to the algorithm in order to be able to correctly reconstruct the route after finding the arrival time at the destination stop. 

    While updating the tentative arrival times, we don’t only store this value alone anymore, but a triple containing the latter with two other values : whether it was improved by a footpath or a connection, and the index of that specific footpath or connection. At the very end of the loop explained earlier, we then backtrack, from the latest stop, thanks to the stored indices, the connections or footpaths that allowed us to obtain this earliest arrival time at the destination stop, as shown on Figure 3.

<img src="../figs/journey_reconstruction.png" width="800"/>

_Figure 3._ Illustration of the journey reconstruction method we used. For each stop, the tentative arrival time is stored as well as the travel method ('connection' or 'footpath') and the index of the connection of footpath that allowed the improvement. Starting from the last stop, when the CSA algorithm is done running, we reconstruct the journey backwards by looking at which connexions allowed to obtain this earliest arrival time.

2. Having a way to construct a route, we needed to circumvent the fact that the user inputs an arrival time and not a departure time, we decided to perform a first step where we apply the algorithm with the departure time being the same as the arrival time given by the user. Let’s say that the user wants to arrive at stop B no later than 9:30 : we then set our departure time to also be 9:30 at stop A.

    Obviously, this won’t yield any valid arrival time that is below or equal 9:30 at stop B. We thus reduce this 9:30 departure time to 9:29 and see if any arrival time below or equal to 9:30 is found. This is done until we find a departure time that gives an arrival time, at stop B, which is below or equal to 9:30. This is our latest departure time, as the CSA won’t allow us to find any later departures.


3. Now that we have a valid departure time for our problem, we need to find several routes linking A to B as we remember that our algorithm only outputs a single route. We thus apply our algorithm to different departure times, spaced by 5 minutes, until an arbitrary time of 1 hour before the valid departure time found earlier. At each of these times, we would then find an optimal route linking A to B : we then re-run the algorithm by severing/deleting the connections present on this route. This way, we’re forcing our algorithm to consider other connections than the ones yielding the optimal route on each trip, which might also be the riskiest. 

    Let us take again the example just above and let us assume that the first step found a valid departure time of 9:20. We would then run the algorithm with departure times being 9:15, 9:10, 9:05, etc. For each of these times, if an optimal route is found, we run the algorithm as many times as the number of trip_ids in this optimal route, by thus suppressing it from our connections table. This allows to find other valid routes that link our departure stop to our arrival stop, without this trip.

4. The way in which we take the delays into account are explained in part 3.3.3 and in the video (Part of Mehdi Sellami)

### 3.3 In practice 

#### 3.3.1 Local data preparation

In [None]:
#Loading the necessary dataframes
connections = pd.read_csv('../data/connections.csv')
footpaths = pd.read_csv('../data/footpaths.csv')
transport_types = pd.read_csv('../data/transport_types.csv')
stops_coordinates = pd.read_csv('../data/all_stops.csv').drop(['location_type','parent_station','distance_from_zur'],axis=1)
delays_stats = pd.read_csv('../data/delays_stats.csv')

<div class="alert alert-warning">
  <strong>Information!</strong> The below cell runs some additional preprocessing on the dataframes loaded just above (takes ~30s). This preprocessing would have been better if done using PySpark, but we decided to do it here due to the complications we had with running PySpark sessions/operations in the last days of the project.
</div>


In [None]:
#Preparing the transport types.
#We rearrange the different transport_types dataframe to only have to Bus, Tram, or Train transport types.
rows_to_remove = transport_types['route_desc'].isin(['FUN', 'FAE', 'PB', 'BAT'])
transport_types = transport_types[~rows_to_remove]
modify_to_train = transport_types['route_desc'].isin(['S','IR','IC','RE',''])
modify_to_tram = transport_types['route_desc'].isin(['T'])
modify_to_bus = transport_types['route_desc'].isin(['B'])
transport_types.loc[modify_to_train, 'route_desc'] = 'Train'
transport_types.loc[modify_to_tram, 'route_desc'] = 'Tram'
transport_types.loc[modify_to_bus, 'route_desc'] = 'Bus'


#We remove the delays computed for Saturday or Sunday as we're not having any connections on those days in our timetable
delays_to_remove = delays_stats['weekday'].isin(['Saturday','Sunday'])
delays_stats = delays_stats[~delays_to_remove]

#Building necessary structures for the forward CSA : footpaths and connections timetables. We basically only keep the
#connections with valid transport types (if not filtered out earlier), and filter the footpaths based on the
#stops through which actual connections stop.
unique_trip_ids = transport_types['trip_id'].unique()
mask = connections['trip_id'].isin(unique_trip_ids)
selected_connections = connections[mask]

unique_stops_list = np.unique(selected_connections[['departure_stop','arrival_stop']].values)
unique_trips_list = selected_connections.trip_id.unique()
footpaths = footpaths[footpaths['first_stop_id'].isin(unique_stops_list) &
                                footpaths['second_stop_id'].isin(unique_stops_list)]

#Getting our footpaths in a more usable manner for the CSA algorithm
all_footpaths = [list(row) for idx,row in footpaths.iterrows()]

#Sort the timetable and put the time strings into Timestamps
selected_connections = selected_connections.sort_values('departure_time')
selected_connections['departure_time'] = pd.to_datetime(selected_connections['departure_time'])
selected_connections['arrival_time'] = pd.to_datetime(selected_connections['arrival_time'])

#Getting our connections in a more usable manner for the CSA algorithm
all_connections = [list(row) for idx,row in selected_connections.iterrows()]

#One final filter of the all_stops and delays_stats dataframes to only keep the stop_ids we're interested in
stops_coordinates = stops_coordinates[stops_coordinates['stop_id'].isin(unique_stops_list)]
delays_stats = delays_stats[delays_stats['stop_id'].isin(unique_stops_list)]

#Build a dictionnary for transport types : (key = 'trip_id' & value = transport type) 
#and meta-information about our stops : (key = 'trip_id', value = [stop_name, lat, lon]
ttypes_dict = transport_types.set_index(transport_types.columns[0]).to_dict()[transport_types.columns[1]]
stops_info_dict = stops_coordinates.set_index('stop_id').\
    apply(lambda row: [row['stop_name'], row['stop_lat'], row['stop_lon']], axis=1).to_dict()

#### 3.3.2 Updated CSA algorithm

The following function performs a binary search : it returns the index of the first connection departing no later than a certain departure time. It is used in the core `UpdatedCSA` function.

In [None]:
def find_first_connection(connections,low,high,start_time):

    if high >= low:
        mid = (high + low) // 2

        if connections[mid][2] == start_time:
            return mid

        # If element is smaller than mid, then it can only
        # be present in left subarray
        elif connections[mid][2] > start_time:
            return find_first_connection(connections, low, mid - 1, start_time)

        # Else the element can only be present in right subarray
        else:
            return find_first_connection(connections, mid + 1, high, start_time)

    else:
        # Element is not present in the array
        return -1

The `UpdatedCSA` function is the core function of our algorithm. It follows the pseudo code of Figure 2 to retrieve the earliest arrival time at the arrival stop, and it follows the logic explained in Figure 3 to retrieve an optimal route and to reconstruct it. We also set a `reconstructing_journey` boolean depending on if we want to reconstruct a journey or not. For instance, to find the latest departure time, we use this function but we don't want to even try and reconstruct a route : the boolean will thus be set to 0. Otherwise, to reconstruct routes, we set it to 1.

In [None]:
def UpdatedCSA(source,destination,source_time,connections,footpaths,deleted_trips,reconstructing_journey) :
    """.
    Implementation of an updated CSA version for the earliest arrival problem.
    
    Parameters:
    source (string) : The source stop of the journey we aim to build.
    destination (string) : The destination stop of the journey we aim to build
    source_time (datetime) : The time at which the journey starts at our source stop
    connections (List) : Timetable containing all the connections in our preprocessed network
    footpaths (List) : Table containing all the valid footpaths between the stops present in our network
    deleted_trips (list) : List of trips that we don't want to consider in the CSA algorithm 
    reconstructing_journey (boolean) : Set to True if we want to reconstruct the route in addition to the arrival time
    
    Returns:
     
    The earliest arrival time at the destination and the journey linking the source to the destination.
    """
    #First, the T array containing the tentative arrival times for each stops :
    T = {}
    for stop in unique_stops_list : 
        T[stop] = [0,None,None]
    #Then the reachability flag for each trip_id :
    R = {}
    for trip in unique_trips_list:
        R[trip] = 0

    today = datetime.datetime.now().date()
    transfer_time = datetime.timedelta(minutes = 2)
    #Initializing the T and S dictionnaries
    for stop_id in T :
        T[stop_id][0] = datetime.datetime.combine(today,datetime.time(hour=23, minute=59, second = 59))
    for trip in R :
        R[trip] = 0
    
    T[source][0] = source_time
    #For all the stops reachable by foot from the source, update the tentative arrival times
    for foot_idx,footpath in enumerate(footpaths) :
        if footpath[0] == source :
            T[footpath[2]] = [source_time + datetime.timedelta(minutes = footpath[5]),'footpath', foot_idx]
    
    #Find the index of the first connection c_0
    low = 0
    high = len(connections) - 1
    starting_idx = find_first_connection(connections,low,high,source_time)
    
    #Starting from c_0
    for idx,connection in enumerate(connections[starting_idx:]) :
        idx = idx + starting_idx
        #Check if the current connection is not part of deleted_trips,
        if not (connection[0] in deleted_trips) :
            #Check if the departure time of the current connection is >= the tentative time of the destination stop,
            if T[destination][0] <= connection[2] :
                break
            #Check if the connection is reachable,
            if T[connection[1]][0] < (connection[2] - transfer_time) or R[connection[0]] :
                R[connection[0]] = 1
                #Check if it improves the tentative arrival time of the arrival stop of the current connection
                if connection[4] < T[connection[3]][0] :
                    T[connection[3]] = [connection[4],'connection',idx]
                    #We try to improve all the tentative arrival times for the stops reachable by foot from the current connection's 
                    #arrival stop
                    for foot_idx,footpath in enumerate(footpaths) :
                        if footpath[0] == connection[3] :
                            if T[footpath[2]][0] > connection[4] + datetime.timedelta(minutes = footpath[5]):
                                    T[footpath[2]] = [connection[4] + datetime.timedelta(minutes = footpath[5]),'footpath',foot_idx]
     
    if reconstructing_journey :
        journey = []
        current_stop = destination
        while current_stop != source :
            step = []
            current_stop_info = T[current_stop]
            curr_arr_time,step_type,idx = current_stop_info[0],current_stop_info[1],current_stop_info[2]
            if step_type == 'footpath' :
                step = all_footpaths[idx]
                journey.append(['Walking',step[0],curr_arr_time - datetime.timedelta(minutes =step[5]),step[2],curr_arr_time,'Walking'])
                current_stop = step[0]
            if step_type == 'connection' :
                step = connections[idx].copy()
                transport_type = ttypes_dict[step[0]]
                step.append(transport_type)
                journey.append(step)
                current_stop = step[1]
            if step_type == None :
                return(T[destination][0],[])
          
        return(T[destination][0],journey) 
    
    else :
        return(T[destination][0],[])

This function is used to find the first and thus latest departure time based on a given arrival time at destination. This will be the core function when trying to valid departure time when provided with a query from the user.

In [None]:
def DepartureTimeFinder(source,destination,arrival_time,connections,footpaths) :
    """
    Find a valid latest departure time given the user's desired arrival time at destination.
    
    Parameters:
    source (string) : The source stop of the journey 
    destination (string) : The final arrival stop of the journey
    arrival_time (datetime) : Latest arrival time of the journey (queried by the user)
    connections (List) : Timetable containing all the connections in our preprocessed network
    footpaths (List) : Table containing all the valid footpaths between the stops present in our network
    
    Returns: 
    The first valid departure time found by the CSA algorithm.
    """
    
    #Defining a 3hours time interval with a 1min frequency before arrival time to find the departure time
    departure_times= pd.date_range(start=arrival_time - datetime.timedelta(hours = 3), end=arrival_time, freq='1min').tolist()
    departure_times.reverse()
    
    best_departure_time = None
    #For each of our tentative departure times, we check if we can find an arrival time which is < arrival_time, If it is, we select it
    #and end the loop.
    for curr_dep_time in departure_times :
        best_found_arrival = UpdatedCSA(source,destination,curr_dep_time,all_connections,all_footpaths,[],0)[0]
        if best_found_arrival <= arrival_time :
            best_departure_time = curr_dep_time
            return(pd.to_datetime(best_departure_time.strftime('%Y-%m-%d %H:%M:%S')))
        
    return(best_departure_time)

`RoutesLister` reconstruct a list of routes for a certain departure time of travel. A first route is reconstructed (the most optimal one) and all the trip_ids of this route are registered. One trip_id at a time, all connections that are part of this trip are virtually removed from the connections timetable to find other routes, if possible, linking A to B for this particular departure time. We also double check that the arrival time of the connection is indeed smaller than the query of the user.

In [None]:
def RoutesLister(source,destination,departure_time,arrival_time,connections,footpaths) :
    """
    Find routes linking the source to destination.
    
    Parameters:
    source (string) : The source stop of the journey 
    destination (string) : The final arrival stop of the journey
    departure_time (datetime) : Departure time of the journey
    arrival_time(datetime) : Arrival time of the journey
    connections (List) : Timetable containing all the connections in our preprocessed network
    footpaths (List) : Table containing all the valid footpaths between the stops present in our network
    
    Returns: 
    A list of routes linking source to destination.
    """
    routes = []
    
    #Find the first route linking source to destination when leaving at source_time
    first_arrival_time, first_route = UpdatedCSA(source,destination,departure_time,all_connections,all_footpaths,[],1)
    routes.append(first_route)
    
    #Retrieve all the trip_ids present in this specific route
    first_trip_ids = list(set(connection[0] for connection in first_route if connection[0] != 'Walking'))
    
    #For each of these trip_ids, try to find a route that uses other connections. If no route without a certain connection,
    #we don't append it
    for trip_id in first_trip_ids :
        new_arrival_time,new_route = UpdatedCSA(source,destination,departure_time,all_connections,all_footpaths,[trip_id],1)
        if (new_arrival_time <= arrival_time) & (new_route != None) :
            routes.append(new_route)
            
    return(routes)

This last function basically uses the `RoutesLister` function on different departure times, to reconstruct a list of routes that we can then filter for the Q value and order by number of transfers, stops, etc. It therefore encapsulates all the other functions previously defined in this part.

In [None]:
def DiffTimesRouteLister(source,destination,arrival_time,connections,footpaths) :
    """
    Find routes linking the source to destination for different departure times at source.
    
    Parameters:
    source (string) : The source stop of the journey (queried by the user)
    destination (string) : The final arrival stop of the journey (queried by the user)
    arrival_time(datetime) : Arrival time of the journey (queried by the user)
    connections (List) : Timetable containing all the connections in our preprocessed network
    footpaths (List) : Table containing all the valid footpaths between the stops present in our network
    
    Returns: 
    A dictionnary of routes, where the key is a departure time and the value is a list of routes linking source to destination, for this 
    departure time
    """
    routes_dic = {}
    
    #We first find the first valid departure time for the arrival time wanted by the user
    first_departure_time = DepartureTimeFinder(source,destination,arrival_time,connections,footpaths)
    if first_departure_time == None :
        return(TypeError('No suitable departure time was found for a trip from {} to {}, arriving no later than {}'.format(source,destination,arrival_time)))
    
    departure_times= pd.date_range(start=first_departure_time - datetime.timedelta(hours = 1), end=first_departure_time, freq='5min').tolist()
    departure_times.reverse()
    
    for new_departure_time in departure_times :
        routes_dic[new_departure_time] = RoutesLister(source,destination,new_departure_time,arrival_time,connections,footpaths)
        
    return(routes_dic)

#### 3.3.3 Delays & Confidence Intervals

The main function in this part is add_q_to_routes, which adds a Q-value to each route in a given dictionary of routes. The Q-value represents the probability of a route being feasible with potential delays at least Q% of the time, considering the desired arrival time and the confidence tolerance.

The algorithm uses a delay dataset,  which contains information about the delays for each stop_id and transport type,hour of arrival and weekday. It calculates the Q-value for each route by considering the delays and arrival time. The Q-value is then added to the route.

The route planner includes several utility functions:

reduce_route: Reduces a route by removing consecutive stops with the same trip_id. This is due to the assumption that the last stop of a trip encapsulates the delays of the previous stop on the same trip.

get_delay_stats: Retrieves delay statistics for a given stop ID, transport type, arrival time, weekday, and the delay dataset. If specific delay information is not available, it uses the mean and  standard deviation of the entire dataset.

get_time_diff: Calculates the time difference in minutes between two timestamps.

get_prob: Calculates the probability given the time left, mean, and standard deviationusing a normal distribution.

get_q_value: Calculates the Q-value for a given route, arrival time, and delay dataset. It considers the delays for each stop along the route and computes the cumulative Q-value.

The route planner thus functions in the follwing manner :
Provide a dictionary of routes containing the possible routes between departure 
and arrival stops.
Specify the arrival time and the desired Q-value threshold (Q%) as inputs
 to the add_q_to_routes function.
Pass the delays dataset and the queried weekday as arguments to the add_q_to_routes
 function.
The function will return a new dictionary of routes with the Q-values
 added to each route.

In [None]:
#these are the mean and std of all the delays
mean_value_delays = 1.227
std_value_delays = 1.428
def add_q_to_routes(all_routes_dict,arrival_time,delays,desired_Q,queried_weekday):
    """
    Adds a Q-value to each route in the given dictionary of routes.

    Args:
        all_routes_dict (dict): A dictionary containing routes.
        arrival_time (datetime): The arrival time.
        delays (pd.DataFrame): A DataFrame containing delay information.
        desired_Q (float): The desired Q-value threshold.
        queried_weekday (string): The weekday of the trip
    Returns:
        dict: A new dictionary with Q-values added to the routes.
    """
        
    new_routes_list = []
    for key in all_routes_dict:
        routes = all_routes_dict[key]
        for index, route in enumerate(routes):
            q_value = get_q_value(route,arrival_time,queried_weekday,delays)
            if q_value>= desired_Q:
                new_route = []
                for x in list(reversed(route)):
                    x.append(q_value)  
                    new_route.append(x)  
                new_routes_list.append(new_route)

    return new_routes_list


def reduce_route(route):
    """
    Reduces the given route by removing consecutive stops with the same stop_id.

    Args:
        route (list of list): A list of lists containing the information about each stop

    Returns:
        list of list: A new route with consecutive stops having the same trip_id removed.
    """
        
    new_route = []
    for i in range(len(route)-1):
        if route[i][0] != route[i+1][0]:
            new_route.append(route[i])
    new_route.append(route[-1])
    
    return new_route 

def get_delay_stats(stop_id,transport_type,arr_time,weekday,delays):
    """
    Retrieves delay statistics for the given stop_id, transport_type, arrival time, weekday, and delays DataFrame.

    Args:
        stop_id (str): The stop ID.
        transport_type (str): The transport type.
        arr_time (timestamp): arrival time.
        weekday (str): The weekday.
        delays (pd.DataFrame): A DataFrame containing delay information.

    Returns:
        tuple: A tuple containing the mean delay and standard deviation.
    """
    #for some stop_ids we don't have a delay, we use the weighted sum and std of all the dataset
    
    datetime_obj = pd.to_datetime(arr_time)
    hour = datetime_obj.hour
    if ':' in stop_id:
        #these values were calculated at the end of 2.4
        return mean_value_delays,std_value_delays
    row = delays[(delays['stop_id'] == int(stop_id)) &
                  (delays['transport_type'] == transport_type) &
                  (delays['arrival_hour'] == hour) &
                  (delays['weekday'] == weekday)]
   
    is_empty = row.empty
    if is_empty:
        #these values were calculated at the end of 2.4
        return mean_value_delays,std_value_delays
    mu = row['mean_delay'].item()
    std = row['stddev_delay'].item()
    return mu,std

def get_time_diff(timestamp1,timestamp2):
    """
    Calculates the time difference in minutes between two timestamps.

    Args:
        timestamp1 (pd.Timestamp): The first timestamp.
        timestamp2 (pd.Timestamp): The second timestamp.

    Returns:
        float: The time difference in minutes.
    """
    timestamp1 = timestamp1.to_pydatetime()
    timestamp2 = timestamp2.to_pydatetime()
        
    time_difference = timestamp2 - timestamp1
    return time_difference.total_seconds()/60
        
def get_prob(time_left,mu,std):
    """
    Calculates the probability given the time left, mean, and standard deviation.

    Args:
        time_left (float): The time left.
        mu (float): The mean value of the delays.
        std (float): The standard deviation of the delays.

    Returns:
        float: The calculated probability.
    """
    probability = norm.cdf(time_left, mu, std)

    return probability

def get_q_value(route,arrival_time,queried_weekday,delays):
    """
    Calculates the Q-value for the given route, arrival time, and delays DataFrame.

    Args:
        route (list of list): A list of lists containing the information about each stop
        arrival_time (datetime): The arrival time.
        queried_weekday(str): The weekday of the trip
        delays (pd.DataFrame): A DataFrame containing delay information.

    Returns:
        float: The calculated Q-value.
    """
    reduced_route = reduce_route(route)
    q_value = 1
    walk_left_over = 0
    walked = False
    offset = 0
    stop_id,transport_type,arr_time,weekday,dep_time = 0,0,0,0,0
    if reduced_route[-1][0] == 'Walking':
        offset = 1
    for i in range(len(reduced_route)-1-offset,0,-1):
        if reduced_route[i-1][0] == 'Walking':
            walked = True
            stop_id = reduced_route[i][3]
            transport_type = reduced_route[i][5]
            arr_time = reduced_route[i][4]
            dep_time = reduced_route[i-1][2]
            weekday = queried_weekday
            walk_left_over = get_time_diff(arr_time,dep_time)
            continue
        if not(walked):
            stop_id = reduced_route[i][3]
            transport_type = reduced_route[i][5]
            arr_time = reduced_route[i][4]
            dep_time = reduced_route[i-1][2]
            weekday = queried_weekday
        mu,std = get_delay_stats(stop_id,transport_type,arr_time,weekday,delays)
       
        arr_time = reduced_route[i][4]
        dep_time = reduced_route[i-1][2]
        time_diff = get_time_diff(arr_time,dep_time) + walk_left_over
        proba = get_prob(time_diff,mu,std)
        q_value *=proba
        walk_left_over = 0
        walked = False
        
    if reduced_route[0][0]=='Walking':
        mu,std = get_delay_stats(stop_id,transport_type,arr_time,weekday,delays)
        
        arr_time = reduced_route[0][4]
        time_diff = get_time_diff(arr_time,arrival_time)+walk_left_over
        proba = get_prob(time_diff,mu,std)
        q_value *=proba
        return q_value
    stop_id = reduced_route[0][3]
    transport_type = reduced_route[0][5]
    arr_time = reduced_route[0][4]
    weekday = queried_weekday
    time_diff = get_time_diff(arr_time,arrival_time)
    mu,std = get_delay_stats(stop_id,transport_type,arr_time,weekday,delays)
    proba = get_prob(time_diff,mu,std)
   
    q_value *=proba
    return q_value 

#### 4. Visualization & Validation

We're now ready to first visualize and then run some validation tests on our results.

#### 4.1 Visualization

<div class="alert alert-info">
  <strong>Important!</strong> Please read carefully the few following lines to understand how to use our visualization interface.
</div>

To be able to visualize the results in a user-friendly manner we've created the two code cells below :

- The first cell, **when ran**, is going to output widgets from which the user will be able to select the different parameters of the trip. They include both departure and arrival stops, the arrival time (hour + minute, the weekday during which we want to make the trip and the Q value to query. 

    
    When the user is done choosing these values, clicking on the **'RUN'** button will fetch a list of routes taking all the parameters into account. A text will then appear, indicating the number of routes that have been found for this list of parameters. The visualization itself takes place in the next code cell.
    
    If needed, the user can also reset all the widgets to some default values by pressing the **'RESET'** button.

<div class="alert alert-block alert-warning">
<b>⚠️ WARNING! The following markdown cell explains what should normally be happening when running the second cell. 
    To visualize our results on a map in a nice manner, we used the folium package. Unfortunately, with Renku, even with trying to trust the current notebook with our best efforts, it won't allow us to show the map. The routes results (connections, confidence value) are thus available on the logs of the notebook and the current route shown is displayed in 'map.html'.  We deeply apologize for the inconvenience.</div>


- The second cell will allow the user to visualize the actual result of the query. A button "Show route" can be clicked and a dropdown menue shows a list of routes, ordered by latest departure times, transfers, walking distances and q values. When clicking the button, a map is displayed with some text containing the important information about the route.

In [None]:
import ipywidgets as widgets
from IPython.display import display

output = widgets.Output()

#Set the usable 
choosable_stops = set([stop[0] for stop in stops_info_dict.values()])
choices = sorted(choosable_stops)

Q_choice=widgets.FloatSlider(
    value=0.95,
    min=0,
    max=1.0,
    step=0.01,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.2f',
)

start_stop=widgets.Dropdown(
    options=choices,
    value='Zürich, Klosbach'
)

end_stop=widgets.Dropdown(
    options=choices,
    value = 'Zürich, ETH/Universitätsspital'
)

day=widgets.Dropdown(
    description='Pick a Day',
    options = ['Monday','Tuesday','Wednesday','Thursday','Friday'],
    value = 'Wednesday'
)

start_hour=widgets.Dropdown(
    options=range(7,21),
    description='Arrival Hour:',
    value = 10,
    disabled=False,
)

start_minute=widgets.Dropdown(
    options=range(0,60),
    description='Arrival Min.:',
    value = 30,
    disabled=False,
)

#initalize button
first_button = widgets.Button(
    description='RUN',
    button_style='info',
    tooltip='Query info'
)

result_label = widgets.Label()  # Label to display the message

# Variable to store the result
routes_found = None

def on_query_clicked(f_b) :
    
    Q_value = Q_choice.value
    user_start_stop = start_stop.value
    user_end_stop = end_stop.value
    user_day = day.value
    user_start_hour = start_hour.value
    user_start_minute = start_minute.value
    
    unique_routes= []
    
    arrival_time=pd.to_datetime("%02d:%02d:00"%(user_start_hour,user_start_minute))
    
    start_id = [stop_id for stop_id, stop_info in stops_info_dict.items() if stop_info[0] == user_start_stop][0]
    end_id = [stop_id for stop_id, stop_info in stops_info_dict.items() if stop_info[0] == user_end_stop][0]
    
    print(start_id,end_id,arrival_time,Q_value)
    final_routes_dic = DiffTimesRouteLister(start_id,end_id,arrival_time,connections,footpaths)
    if not isinstance(final_routes_dic,dict) :
            unique_routes = []
            result_label.value = 'No Routes have been found, please try again.'
            
    else :   
            routes_list = add_q_to_routes(final_routes_dic,arrival_time,delays_stats,Q_value,user_day)
    
            if not isinstance(routes_list,list) or len(routes_list) == 0 :
                unique_routes = []
                routes_nb = len(unique_routes)
                result_label.value = 'No Routes have been found, please try again.'
    
            else :
                unique_tuples = set(tuple(tuple(connection) for connection in route) for route in routes_list)
                unique_routes = [list(route) for route in unique_tuples]
                routes_nb = len(unique_routes)
                result_label.value = '{} Route(s) have been found! Please run the next cell to display them.'.format(routes_nb)
    
    
    # Store the result in the variable
    global final_routes
    final_routes = unique_routes
    
    
def reset_query(b):
    # Reset the widget values
    Q_choice.value = 0.95
    start_stop.value = 'Zürich, Klosbach'
    end_stop.value = 'Zürich, ETH/Universitätsspital'
    day.value = 'Wednesday'
    start_hour.value = 10
    start_minute.value = 30
    
    # Reset the result variable and message
    global final_routes
    final_routes = None
    result_label.value = ""
    
first_button.on_click(on_query_clicked)
reset_button = widgets.Button(description='Reset')  # Reset button
reset_button.on_click(reset_query)

widget_by_row =[widgets.HBox([widgets.Label("Min Confidence Level:"), Q_choice]),
                widgets.HBox([widgets.Label("Starting station:"), start_stop]),
                widgets.
                HBox([widgets.Label("Ending station:"), end_stop]),
                day,
                widgets.HBox([start_hour,start_minute])]

all_widget_in_one=widgets.VBox(widget_by_row)
display(all_widget_in_one,first_button,result_label,output,reset_button)    

In [None]:
import folium
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML

def sort_key(route):
    departure_time = route[0][2].to_pydatetime()
    walking_time = sum((trip[4].to_pydatetime() - trip[2].to_pydatetime()).total_seconds() for trip in route if trip[5] == 'Walking')
    num_stops = len(route)
    return (-departure_time.timestamp(), walking_time, num_stops)

new_final_routes = sorted(final_routes, key=sort_key)
routes = new_final_routes
tooltip = None

# Create a function to generate the tooltip for a stop position
def get_stop_tooltip(trip_id, stop_id, dep_time, arr_time):
    tooltip = f'Stop Name: {stop_id}<br>'
    tooltip += f'Arrival Time at the stop: {arr_time}<br>'
    tooltip += f'Departure Time from the stop : {dep_time}<br>'
    tooltip += f'With trip ID: {trip_id}<br>'
    return tooltip


# Create a function to generate the tooltip for an edge (mode)
def get_edge_tooltip(mode):
    tooltip = f'Mode: {mode}<br>'
    return tooltip
 

def on_button_clicked(b):
    
    selected_route = dropdown.value - 1
    clear_output()
   
    departure_stop = routes[selected_route][0][1]
    print(departure_stop)
    map_route = folium.Map(location=[stops_info_dict[departure_stop][1],stops_info_dict[departure_stop][2]], zoom_start=13)
    
    # Create markers for each stop position in the selected route
    # Special case for the departure stop of the trip
    first_trip, dep_stop, journey_dep_time, sec_stop, arr_time_sec_stop, ttype, conf = routes[selected_route][0]
    first_stop_name, first_stop_lat, first_stop_lon = stops_info_dict[dep_stop][0], stops_info_dict[dep_stop][1], stops_info_dict[dep_stop][2]
    first_tooltip = get_stop_tooltip(first_trip,first_stop_name,journey_dep_time.strftime("%H:%M:%S"),'-')
    folium.Marker(
                location=[first_stop_lat, first_stop_lon],
                tooltip=first_tooltip,
                icon=folium.Icon(color="blue")
            ).add_to(map_route)
        
    for idx, position in enumerate(routes[selected_route][1:-1]):
            trip_id, dep_stop, dep_time, arr_stop, arr_time, ttype, conf = position
            prev_arr_time = routes[selected_route][idx][4]
            stop_name, stop_lat, stop_lon = stops_info_dict[dep_stop][0], stops_info_dict[dep_stop][1], stops_info_dict[dep_stop][2]
            tooltip = get_stop_tooltip(trip_id, stop_name, dep_time.strftime("%H:%M:%S"), prev_arr_time.strftime("%H:%M:%S"))
            folium.Marker(
                location=[stop_lat, stop_lon],
                tooltip=tooltip,
                icon=folium.Icon(color="blue")
            ).add_to(map_route)
            
    last_trip, last_dep_stop, last_dep_time, final_stop, journey_arr_time, ttype, conf = routes[selected_route][-1]
    last_stop_name, last_stop_lat, last_stop_lon = stops_info_dict[final_stop][0], stops_info_dict[final_stop][1], stops_info_dict[final_stop][2]
    last_tooltip = get_stop_tooltip('-',last_stop_name,'-',journey_arr_time.strftime("%H:%M:%S"))
    folium.Marker(
                location=[last_stop_lat, last_stop_lon],
                tooltip=last_tooltip,
                icon=folium.Icon(color="blue")
            ).add_to(map_route)
    
        # Create polylines to connect consecutive stop positions in the selected route
    for i in range(len(routes[selected_route])):
            first_stop = routes[selected_route][i][1]
            second_stop = routes[selected_route][i][3]
            lng1,lat1 = stops_info_dict[first_stop][2], stops_info_dict[first_stop][1]
            lng2,lat2 = stops_info_dict[second_stop][2], stops_info_dict[second_stop][1]
            mode = routes[selected_route][i][5]
            
            polyline = folium.PolyLine(
                locations=[(lat1, lng1), (lat2, lng2)],
                color="red",
                weight=2.5,
                opacity=1.0,
                tooltip=get_edge_tooltip(mode)
            )
            map_route.add_child(polyline)
    
        #display(dropdown)
        #display(button)
        
        # Print the route information
    for position in routes[selected_route]:
            trip_id, dep_stop, dep_time, arr_stop, arr_time, ttype, conf = position
            dep_stop_name = stops_info_dict[dep_stop][0]
            arr_stop_name = stops_info_dict[arr_stop][0]
            print(f'Depart from: {dep_stop_name} at {dep_time.strftime("%H:%M:%S")}, arrive to {arr_stop_name} at {arr_time.strftime("%H:%M:%S")} by {ttype}')
        
    confidence = routes[selected_route][0][6]  # Assuming confidence is the same for all positions in the route
    print(f"Route Confidence: {confidence*100}%")
    map_route.save('map.html')
    #display((map_route))

dropdown = widgets.Dropdown(options=list(range(1, len(routes) + 1)), description='Select Route:')
button = widgets.Button(description='Show Route')
display(dropdown)
display(button)
    
button.on_click(on_button_clicked)

#### 4.2 Validation

We will end our work with trying to see how accurate our method is and what are thus the downsides and positive points of what we did.

Let us start with the following example : we go from Zürich Waidspital to Zürich Oerlikon, on a Wednesday, and we want to arrive no later than 10h30, as shown below :

<img src="../figs/first_validation.png" width="800"/>

The first route (knowing that they're sorted by Latest departure time, taking the Q value into account as well as the walking distance), the first route outputed by our method is the following :

<img src="../figs/first_validation_route.png" width="800"/>

The confidence of this route is very good and we see that not many changes are forced upon the user. Moreover, when looking at the SBB CFF, we can see that for the same parameters the outputed route is the following :

<img src="../figs/first_validation_sbb.png" width="800"/>


We thus see that the two routes are actually the same : only the walking duration at the very end changes (the SBB CFF app says 4min of travelling time is required) but we can assume this is due to our approximate footpaths calculations. Other than that, the journey seems to be the same. In reality, the SBB CFF app also proposes, to the user, a route arriving at 10:30 at destination (at that also leaves 7mins later). However, it is not contained in our list of routes. Knowing that the arrival time is <= to 10:30 and that our walking distances seem to be smaller than what's previsioned by the SBB CFF app in that case, we could have assumed that it was removed because of a Q value being too small, the trip maybe being too risky. In reality, even when setting a minimal Q value, the trip is not outputed by our method. This could be due to some shorthandings with regard to the walking distances or for precision of our algorithm.

Nonetheless, this result (not being the sole example of such a behaviour of our algorithm) is very encouraging and indicates that our method is able to give pretty accurate results to the user. 


Let's now see what happens for a longer trip. To illustrate, we select the following parameters :

<img src="../figs/scd_validation_params.png" width="800"/>

The outputed route is :

<img src="../figs/scd_validation_route.png" width="800"/>

One of the proposed SBB CFF trips is :

<img src="../figs/scd_validation_sbb.png" width="800"/>

If we look attentively, we can see that the routes are actually the same until the last transfer. The SBB CFF app indicates to the user to stay in the last tram until Horgen, while our algorithm advises the user to get out at Zürich Enge and take another connection. However, we can observe that our results is not too far of the reality and can be considered as satisfying. The only next route outputed by the SBB CFF arrives at 14:30, which is most likely too risky to be considered.

This is most likely seen with long distance transports : in our case, the short journeys seem to work pretty well but some differences may occur between the optimal routes given by the state of the art journey planners. Overall, our results are still pretty good. It also happens that no routes are found between stops, while the SBB CFF app says that some trips exist. This could be due to the limitations of our algorithm, but could also be due to the manner in which we computed our delays, maybe not allowing some connections "too much" while they're not that risky in general. This could also be due to our algorithm method : if we recall, we said that we were severing each trip_id at a time for a certain departure time, trying to find routes not using this trip. However, this might be not optimal and, our case, remove some routes that would have taken a removed trip differently (i.e not until the end, or en route, etc.). This is also one of the limitations of our method.

As we didn't focus on the computational optimization of our algorithm after choosing the CSA as a basis, another important point to consider, as you've been able to see if you ran the query cell with these parameters, is that the execution time is very long (~2 minutes), which is symptomatic of we try to find routes linking two stops that are really far away from each other. In our case, it is not that much of problem in the sense that we are not students running a project and we can afford to wait a bit. However, in a real-case scenario, having to wait that much to get results is maybe not really wishable for a user-friendly experience, and this is one of the limitations of our method. This is most likely due to the number of departure times considered and way in which we force our algorithm to take several routes into account.

The delays did not seem to vary much between the stops. For the first example we took earlier, changing the arrival time for 17:30, what we expect being peak hours, has changed the probability of the trips by a fine margin, but the algorithm still find confident routes. This might be due to the assumptions we made when computing the delays.

In the same manner, choosing the departure time as Friday between the same query on different weekdays did not change anything for the first example we took earlier. We can assume that this is most likely reasonable as, on weekdays and considering only usual trips on the schedule, there shouldn't be any differences.

To conclude, we can say that our method outputs overall satisfying results : many trips are predicted accurately even if some trips do not follow the optimal routes found by the state-of-the-art planners. Concerning the delays, they are indeed taken into account and outputing some interesting Q values : it seems that some trips are not considered thanks to this, even if we did not make 100% sure it was the case, due to project-ending time constraints. However, they do not seem to render some trips absolutely impossible during peak hours for instance. There are still some questions to be answered as, due to project-ending time constraints, we have not been able to test many different things deeply such as an observation of a same trip with many different Q values, etc. 

In [None]:
%spark cleanup