# Homies App

### Pyspark connection

In [1]:
%%configure -f
{
    "jars": ["/user/chahed/graphframes-0.6.0-spark2.3-s_2.11.jar", 
             "/user/chahed/scala-logging-api_2.11-2.1.2.jar",
             "/user/chahed/scala-logging-slf4j_2.11-2.1.2.jar"],
    "pyFiles": ["/user/chahed/graphframes-0.6.0-spark2.3-s_2.11.jar"],
    "conf": {
        "spark.app.name": "homies_final"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8983,application_1589299642358_3520,pyspark,idle,Link,Link,
8996,application_1589299642358_3534,pyspark,idle,Link,Link,
9028,application_1589299642358_3573,pyspark,idle,Link,Link,
9054,application_1589299642358_3599,pyspark,idle,Link,Link,
9055,application_1589299642358_3601,pyspark,idle,Link,Link,
9065,application_1589299642358_3611,pyspark,idle,Link,Link,
9073,application_1589299642358_3622,pyspark,busy,Link,Link,
9074,application_1589299642358_3623,pyspark,idle,Link,Link,
9081,application_1589299642358_3641,pyspark,idle,Link,Link,
9084,application_1589299642358_3644,pyspark,idle,Link,Link,


### Start Spark

In [2]:
# Initialization

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9143,application_1589299642358_3708,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### To save the computations

In [3]:
import subprocess, pickle

def run_cmd(args_list):
    """Run linux commands."""
    print('Running system command: {0}'.format(' '.join(args_list)))
    proc = subprocess.Popen(args_list, 
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    s_output, s_err = proc.communicate()
    s_return =  proc.returncode
    return s_return, s_output, s_err

def save_hdfs(localPath, hdfsPath):
    (ret, out, err)= run_cmd(['hdfs', 'dfs', '-put', '-f', localPath, hdfsPath])
    if ret:
        print(err)
    else:
        print('Success')
        
def read_hdfs(hdfsPath):
    (ret, out, err) = run_cmd(['hdfs', 'dfs', '-cat', hdfsPath])
    if ret:
        print(err)
    else:
        print('Success')
        return pickle.loads(out)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format

In [4]:
sbb = spark.read.orc('/data/sbb/orc/istdaten')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
sbb.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- betriebstag: string (nullable = true)
 |-- fahrt_bezeichner: string (nullable = true)
 |-- betreiber_id: string (nullable = true)
 |-- betreiber_abk: string (nullable = true)
 |-- betreiber_name: string (nullable = true)
 |-- produkt_id: string (nullable = true)
 |-- linien_id: string (nullable = true)
 |-- linien_text: string (nullable = true)
 |-- umlauf_id: string (nullable = true)
 |-- verkehrsmittel_text: string (nullable = true)
 |-- zusatzfahrt_tf: string (nullable = true)
 |-- faellt_aus_tf: string (nullable = true)
 |-- bpuic: string (nullable = true)
 |-- haltestellen_name: string (nullable = true)
 |-- ankunftszeit: string (nullable = true)
 |-- an_prognose: string (nullable = true)
 |-- an_prognose_status: string (nullable = true)
 |-- abfahrtszeit: string (nullable = true)
 |-- ab_prognose: string (nullable = true)
 |-- ab_prognose_status: string (nullable = true)
 |-- durchfahrt_tf: string (nullable = true)

### Read the station list data [BFKOORD_GEO](https://opentransportdata.swiss/en/cookbook/hafas-rohdaten-format-hrdf/#Abgrenzung)

In [6]:
metadata = spark.read.csv('/data/sbb/stations/bfkoordgeo.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
metadata.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- StationID: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Remark: string (nullable = true)

We want the latitude and longitude as float 

In [8]:
metadata = metadata.na.drop() 

metadata = metadata.select(
        metadata.StationID,
        metadata.Longitude.cast("float"), 
        metadata.Latitude.cast("float"), 
        metadata.Height.cast("int"),
        metadata.Remark.cast("float"), 
    )
metadata.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- StationID: string (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Remark: float (nullable = true)

In [9]:
metadata.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------+---------+------+------+
|StationID|Longitude| Latitude|Height|Remark|
+---------+---------+---------+------+------+
|  0000002|26.074411| 44.44677|     0|  null|
|  0000003| 1.811446| 50.90155|     0|  null|
|  0000004| 1.075329| 51.28421|     0|  null|
|  0000005|-3.543547| 50.72917|     0|  null|
|  0000007| 9.733756|46.922367|   744|  null|
+---------+---------+---------+------+------+
only showing top 5 rows

### Stops

**Assumption 1 :** We only consider stations in a 15km radius of Zürich's train station, Zürich HB (8503000), (lat, lon) = (47.378177, 8.540192)

We use Haversine formula to compute the distance between stations and Zurich center.(https://en.wikipedia.org/wiki/Haversine_formula)

In [10]:
import pyspark.sql.functions as functions
from math import radians, sin, cos, asin, sqrt

@functions.udf
def calculateDistanceZurich(lat,lon):
    # approximate radius of earth in km
    R = 6371.0
  
    lat = radians(lat)
    lon = radians(lon)
            
    zurich_lat = radians(47.378178)
    zurich_lon = radians(8.540192)
    
    hav_lat = sin((lat - zurich_lat)/2)**2
    hav_lon = sin((lon - zurich_lon)/2)**2
    hav = hav_lat + hav_lon * cos(lat) * cos(zurich_lat)
    
    d = 2*R*asin(sqrt(hav))  #distance in km
   
    return d

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
metadata_zurich = metadata.withColumn('distance_to_Zurich', calculateDistanceZurich(metadata.Latitude, metadata.Longitude))\
                        .filter(functions.col('distance_to_Zurich') <= 15.0)\
                        .select('StationID','Longitude','Latitude')
metadata_zurich.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------+---------+
|StationID|Longitude| Latitude|
+---------+---------+---------+
|  0000065| 8.595545| 47.40921|
|  0000066| 8.595545| 47.40921|
|  0000176| 8.521961| 47.35168|
|  8502186| 8.398942|47.393406|
|  8502187| 8.377032| 47.36474|
+---------+---------+---------+
only showing top 5 rows

In [12]:
@functions.udf
def calculateDistance(lat1,lon1,lat2,lon2):
    # approximate radius of earth in km
    R = 6371.0
  
    lat1 = radians(lat1)
    lon1 = radians(lon1)
            
    lat2 = radians(lat2)
    lon2 = radians(lon2)
    
    hav_lat = sin((lat1 - lat2)/2)**2
    hav_lon = sin((lon1 - lon2)/2)**2
    hav = hav_lat + hav_lon * cos(lat1) * cos(lat2)
    
    d = 2*R*asin(sqrt(hav))  #distance in km
   
    return d

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
def pairs_distance(data):
    
    data_from = data.withColumnRenamed('StationID', 'from_StationID')\
                                      .withColumnRenamed('Longitude', 'from_Longitude')\
                                      .withColumnRenamed('Latitude', 'from_Latitude')\
                
    data_to = data.withColumnRenamed('StationID', 'to_StationID')\
                                    .withColumnRenamed('Longitude', 'to_Longitude')\
                                    .withColumnRenamed('Latitude', 'to_Latitude')\
                
    stations_pair_dist = data_from.crossJoin(data_to)

    stations_pair_dist = stations_pair_dist.filter(stations_pair_dist.from_StationID != stations_pair_dist.to_StationID)
    
    stations_pair_dist = stations_pair_dist.withColumn('distance_km', calculateDistance(stations_pair_dist.from_Latitude, stations_pair_dist.from_Longitude,
                                                                                stations_pair_dist.to_Latitude, stations_pair_dist.to_Longitude))
    return stations_pair_dist

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
stations_pair_dist = pairs_distance(metadata_zurich)
stations_pair_dist.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------+-------------+------------+------------+-----------+------------------+
|from_StationID|from_Longitude|from_Latitude|to_StationID|to_Longitude|to_Latitude|       distance_km|
+--------------+--------------+-------------+------------+------------+-----------+------------------+
|       0000065|      8.595545|     47.40921|     0000066|    8.595545|   47.40921|               0.0|
|       0000065|      8.595545|     47.40921|     0000176|    8.521961|   47.35168| 8.462666257272906|
|       0000065|      8.595545|     47.40921|     8502186|    8.398942|  47.393406|14.900964487282666|
|       0000065|      8.595545|     47.40921|     8502187|    8.377032|   47.36474|17.177613118555456|
|       0000065|      8.595545|     47.40921|     8502188|    8.354599|  47.355907|19.084514154615547|
+--------------+--------------+-------------+------------+------------+-----------+------------------+
only showing top 5 rows

**Assumption 2** : We allow short (max 500m "As the Crows Flies") walking distances for transfers between two stations

In [15]:
stations_pair_walking = stations_pair_dist.where(stations_pair_dist.distance_km <= 0.5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Assumption 3:** We assume a walking speed of 50m/1min on a straight line, regardless of obstacles, human-built or natural, such as building, highways, rivers, or lakes. 

In [16]:
stations_pair_walking = stations_pair_walking.withColumn('time_walking_min', stations_pair_walking.distance_km / 0.050)
stations_pair_walking.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------+-------------+------------+------------+-----------+-------------------+-----------------+
|from_StationID|from_Longitude|from_Latitude|to_StationID|to_Longitude|to_Latitude|        distance_km| time_walking_min|
+--------------+--------------+-------------+------------+------------+-----------+-------------------+-----------------+
|       0000065|      8.595545|     47.40921|     0000066|    8.595545|   47.40921|                0.0|              0.0|
|       0000065|      8.595545|     47.40921|     8503129|    8.591911|  47.412716| 0.4761461043939778|9.522922087879556|
|       0000065|      8.595545|     47.40921|     8587651|    8.595545|   47.40921|                0.0|              0.0|
|       0000065|      8.595545|     47.40921|     8587655|    8.592534|   47.41272|0.45123862945729615|9.024772589145922|
|       0000065|      8.595545|     47.40921|     8590884|    8.597269|    47.4117| 0.3058698680514894|6.117397361029788|
+--------------+--------

We now have all possible pairs of stations possible for a transfer and the time needed to do it. 

### Stop time

We take as model a typical week from the 5th of May 2019 to 12th of May 2019. 

In [17]:
import os
import pandas as pd

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df_stop_time = spark.read.csv('/data/sbb/timetables/csv/stop_times/2019/05/14/stop_times.txt', header=True)
df_stop_time = df_stop_time.na.drop() 
df_stop_time = df_stop_time.select('trip_id','arrival_time','departure_time','stop_id','stop_sequence')
df_stop_time.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+--------------+-----------+-------------+
|             trip_id|arrival_time|departure_time|    stop_id|stop_sequence|
+--------------------+------------+--------------+-----------+-------------+
|1.TA.1-1-B-j19-1.1.R|    04:20:00|      04:20:00|8500010:0:3|            1|
|1.TA.1-1-B-j19-1.1.R|    04:24:00|      04:24:00|8500020:0:3|            2|
|1.TA.1-1-B-j19-1.1.R|    04:28:00|      04:28:00|8500021:0:5|            3|
|1.TA.1-1-B-j19-1.1.R|    04:30:00|      04:30:00|8517131:0:2|            4|
|1.TA.1-1-B-j19-1.1.R|    04:32:00|      04:32:00|8500300:0:5|            5|
+--------------------+------------+--------------+-----------+-------------+
only showing top 5 rows

Correct the stop_id format to only get the station_id.

In [19]:
df_stop_time = df_stop_time.withColumn("stop_id", functions.split(df_stop_time.stop_id, ':')[0])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We will only keep the stops within the stations in the area of Zurich. 

In [20]:
df_stop_time_zurich = df_stop_time.join(metadata_zurich, df_stop_time.stop_id == metadata_zurich.StationID, how='inner')
df_stop_time_zurich.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+--------------+-------+-------------+---------+---------+---------+
|             trip_id|arrival_time|departure_time|stop_id|stop_sequence|StationID|Longitude| Latitude|
+--------------------+------------+--------------+-------+-------------+---------+---------+---------+
|5.TA.1-1-A-j19-1.3.H|    02:42:00|      02:42:00|8503305|            2|  8503305| 8.686662| 47.42581|
|5.TA.1-1-A-j19-1.3.H|    02:46:00|      02:46:00|8503306|            3|  8503306| 8.619255|47.420197|
|5.TA.1-1-A-j19-1.3.H|    02:50:00|      02:50:00|8503147|            4|  8503147| 8.596132|47.397213|
|5.TA.1-1-A-j19-1.3.H|    02:55:00|      02:55:00|8503003|            5|  8503003| 8.548466| 47.36661|
|5.TA.1-1-A-j19-1.3.H|    02:58:00|      03:00:00|8503000|            6|  8503000| 8.540192|47.378178|
+--------------------+------------+--------------+-------+-------------+---------+---------+---------+
only showing top 5 rows

We remove trips with hours superior to 24. 

In [21]:
df_stop_time_zurich = df_stop_time_zurich.withColumn("arrival_hour", functions.split(df_stop_time_zurich.arrival_time, ':')[0])\
                                        .filter(functions.col('arrival_hour') < 24)\
                                        .withColumn("departure_hour", functions.split(df_stop_time_zurich.departure_time, ':')[0])\
                                        .filter(functions.col('departure_hour') < 24)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We select only the traject between 8H and 17H.

In [22]:
@functions.udf
def date_formatting(date):
    return '2019-05-13'+' '+date

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
df_stop_time_zurich = df_stop_time_zurich.withColumn('arrival_time_format', date_formatting(df_stop_time_zurich.arrival_time))
df_stop_time_zurich = df_stop_time_zurich.withColumn('arrival_time_format',
                                     functions.to_timestamp(df_stop_time_zurich.arrival_time_format, 'yyyy-MM-dd HH:mm:ss'))

df_stop_time_zurich = df_stop_time_zurich.withColumn('departure_time_format', date_formatting(df_stop_time_zurich.departure_time))
df_stop_time_zurich = df_stop_time_zurich.withColumn('departure_time_format',
                                     functions.to_timestamp(df_stop_time_zurich.departure_time_format, 'yyyy-MM-dd HH:mm:ss'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
import datetime

start_day_time = datetime.datetime.strptime('2019-05-13'+' '+'08:00:00', "%Y-%m-%d %H:%M:%S")
end_day_time = datetime.datetime.strptime('2019-05-13'+' '+'17:00:00', "%Y-%m-%d %H:%M:%S")

df_stop_time_zurich = df_stop_time_zurich.filter(df_stop_time_zurich.departure_time_format >= start_day_time)\
                                         .filter(df_stop_time_zurich.arrival_time_format <= end_day_time)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
#Number of stations in Zurich
df_stop_time_zurich.select('stop_id').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1570

Save a list of stations.

In [27]:
#Number of stations in Zurich
list_stations_zurich  = [station[0] for station in df_stop_time_zurich.select('stop_id').distinct().collect()] 

#Save the list
with open('list_stations_zurich.pkl', 'wb') as handle:
    pickle.dump(list_stations_zurich, handle, protocol=pickle.HIGHEST_PROTOCOL)
del list_stations_zurich

# send to hdfs
save_hdfs('list_stations_zurich.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f list_stations_zurich.pkl /user/kooli/
Success

### Delays

We take the SBB actual data and translate it from German for an easier understanding.

In [None]:
df_sbb = sbb.withColumnRenamed("BETRIEBSTAG", "trip_date")\
            .withColumnRenamed("FAHRT_BEZEICHNER", "trip_id")\
            .withColumnRenamed("BETREIBER_ID", "operator_id")\
            .withColumnRenamed("BETREIBER_ABK", "operator_short")\
            .withColumnRenamed("BETREIBER_NAME", "operator_name")\
            .withColumnRenamed("PRODUKT_ID", "transport_type")\
            .withColumnRenamed("LINIEN_ID", "train_id")\
            .withColumnRenamed("LINIEN_TEXT", "train_type")\
            .withColumnRenamed("UMLAUF_ID", "circulation_id")\
            .withColumnRenamed("VERKEHRSMITTEL_TEXT", "train_text")\
            .withColumnRenamed("ZUSATZFAHRT_TF", "additional_trip")\
            .withColumnRenamed("FAELLT_AUS_TF", "trip_failed")\
            .withColumnRenamed("BPUIC", "stop_id")\
            .withColumnRenamed("HALTESTELLEN_NAME", "stop_name")\
            .withColumnRenamed("ANKUNFTSZEIT", "arr_time_schedule")\
            .withColumnRenamed("AN_PROGNOSE", "arr_time_actual")\
            .withColumnRenamed("AN_PROGNOSE_STATUS", "arr_time_status")\
            .withColumnRenamed("ABFAHRTSZEIT", "dep_time_schedule")\
            .withColumnRenamed("AB_PROGNOSE", "dep_time_actual")\
            .withColumnRenamed("AB_PROGNOSE_STATUS", "dep_time_status")\
            .withColumnRenamed("DURCHFAHRT_TF", "stops_here")

We only keep stations that are within a 15km radius around Zurich.

In [None]:
list_station_zurich = [int(row.StationID) for row in metadata_zurich.select("StationID").collect()]
df_sbb_zurich = df_sbb.where(functions.col("stop_id").isin(list_station_zurich))

We remove rows corresponding to trips that only started, ended or went inside the radius, but the rest of the trip was outside of this radius.

In [None]:
counts = df_sbb_zurich.groupBy("trip_id","trip_date").count()\
                    .withColumnRenamed("trip_id", "trip_id_count")\
                    .withColumnRenamed("trip_date", "trip_date_count")

df_sbb_zurich = df_sbb_zurich.join(counts, (df_sbb_zurich.trip_id == counts.trip_id_count) &\
                                   (df_sbb_zurich.trip_date == counts.trip_date_count))\
                            .filter(functions.col("count") >1)\
                            .drop('trip_id_count')\
                            .drop('trip_date_count')

We should first clean the SBB data before computing the delays.

We only want trips that are part of the regular schedule (addional_trip == False), those which didn't fail (trip_failed == False), and those who actually stop at the indicated station (stops_here == False).

For the arrival and departure time status, there are 4 possible values: REAL (real), PROGNOSE (forecast), GESCHAETZT (estimated) and UNBEKANNT (unknown). We remove all the rows corresponding to unknown status.

In [None]:
df_sbb_zurich = df_sbb_zurich.filter(functions.col("additional_trip") == False)\
                            .filter(functions.col("trip_failed") == False)\
                            .filter(functions.col("stops_here") == False)\
                            .filter(functions.col("dep_time_status") != 'UNBEKANNT')\
                            .filter(functions.col("arr_time_status") != 'UNBEKANNT')\
                            .drop("additional_trip")\
                            .drop("trip_failed")\
                            .drop("stops_here")

df_sbb_zurich.printSchema()

We can now compute the delays between the scheduled and actual times for departure and arrival.

In [None]:
timeFormatSchedule = "dd.MM.yyyy HH:mm"
timeFormatActual = "dd.MM.yyyy HH:mm:ss"

departure_delay = (functions.unix_timestamp('dep_time_schedule', format=timeFormatSchedule)\
                   - functions.unix_timestamp('dep_time_actual', format=timeFormatActual))
arrival_delay = (functions.unix_timestamp('arr_time_schedule', format=timeFormatSchedule)\
                 - functions.unix_timestamp('arr_time_actual', format=timeFormatActual))

We can add some more filtering on the trip times and dates. We first only keep trips that happened during the weekdays (Monday to Friday). Then we're only interested in "reasonable hours" of the day. We thus decided to remove trips with arrival times before 8 a.m and after 5 p.m. Finally, we only take into account trips that happened in 2019. This will help reducing the amount of data and thus the computation time.

We also make sure to remove all the Null entries for arrival and departure times.

In [None]:
df_delay = df_sbb_zurich.withColumn("departure_delay", functions.when(departure_delay>0,departure_delay).otherwise(0))\
                        .withColumn("arrival_delay", functions.when(arrival_delay>0,arrival_delay).otherwise(0))\
                        .withColumn("dep_time_schedule",functions.to_timestamp(df_sbb_zurich.dep_time_schedule, 'dd.MM.yyyy HH:mm'))\
                        .withColumn("arr_time_schedule",functions.to_timestamp(df_sbb_zurich.arr_time_schedule, 'dd.MM.yyyy HH:mm'))\
                        .withColumn("dep_time_actual",functions.to_timestamp(df_sbb_zurich.dep_time_actual, 'dd.MM.yyyy HH:mm:ss'))\
                        .withColumn("arr_time_actual",functions.to_timestamp(df_sbb_zurich.arr_time_actual, 'dd.MM.yyyy HH:mm:ss'))\
                        .filter(functions.year('arr_time_schedule')==2019)\
                        .filter(functions.date_format('arr_time_schedule', 'u')<=5)\
                        .filter(functions.hour(functions.col("arr_time_schedule")).cast("int") >= 8)\
                        .filter(functions.hour(functions.col("arr_time_schedule")).cast("int") <= 17)\
                        .where(functions.col("dep_time_schedule").isNotNull())\
                        .where(functions.col("arr_time_schedule").isNotNull())\
                        .where(functions.col("dep_time_actual").isNotNull())\
                        .where(functions.col("arr_time_actual").isNotNull())

df_delay.printSchema()

Now that we have the delays for each trip, we can start building the hole SBB schedule. It will contain the departure and arrival stations, with the corresponding departure and arrival schedule times, and the delay of the actual arrival time (with the respect to the scheduled one). It will also contain the trip ID corresponding to the trip between the stations.

In order to build this schedule, we need to reconstruct the journey of each trip. We thus group the data we have by the trip information we have (id and date), then order it by ascending scheduled departure time. We will thus have for each trip id and date the stations of the journey in order. We use this order to build a metric corresponding to the row number.

In [42]:
from pyspark.sql.window import Window

window_trip_id = Window.partitionBy("trip_id","trip_date").orderBy("dep_time_schedule")
df_order_trips = df_delay.withColumn("order", functions.row_number().over(window_trip_id))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now that all stations of all trips are in order and have their corresponding order, we can create two dataframes based on the previously built one (one for the departure station and one for the arrival station). Joining both of them will give us the whole schedule, with the actual arrival delays.

In [43]:
df_trips_prev = df_order_trips.select('trip_id', 'trip_date', 'stop_id', 'order', 'dep_time_schedule', 'arr_time_schedule')\
                                    .withColumnRenamed("stop_id", "stop_id_prev")\
                                    .withColumnRenamed("arr_time_schedule", "arrival_prev")\
                                    .withColumnRenamed("dep_time_schedule", "departure_prev")\
                                    .select("trip_id",'trip_date',"stop_id_prev","arrival_prev",'departure_prev','order')\
                                    .withColumn('next_order_prev', (df_order_trips.order.cast('int') + 1 ).cast('string'))

df_trips_next = df_order_trips.select('trip_id', 'trip_date', 'stop_id', 'order', 'dep_time_schedule', 'arr_time_schedule', 'arrival_delay')\
                                    .withColumnRenamed("trip_id", "trip_id_next")\
                                    .withColumnRenamed("trip_date", "trip_date_next")\
                                    .withColumnRenamed("stop_id", "stop_id_next")\
                                    .withColumnRenamed("arr_time_schedule", "arrival_next")\
                                    .withColumnRenamed("dep_time_schedule", "departure_next")\
                                    .withColumnRenamed("order", "order_next")

df_trips_schedule = df_trips_prev.join(df_trips_next,
                                       (df_trips_prev.trip_id == df_trips_next.trip_id_next) &\
                                       (df_trips_prev.trip_date == df_trips_next.trip_date_next) &\
                                       (df_trips_prev.next_order_prev == df_trips_next.order_next),
                                       how='inner')\
                                .select('trip_id', 'trip_date', 'stop_id_prev', 'stop_id_next',
                                        'arrival_prev', 'arrival_next', 'arrival_delay')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We now need to group the dataframe in order to have an idea on the average delay between two given stations.

We have to format the arrival and departure times to remove the date since we want only want to group on the hour of the trip. This means that we consider that all trips between two stations happening at the same hour of the day (for example between 10:00 and 10:59) as equivalent, also that different days for a same exact trip are equivalent.

In [44]:
df_trips_schedule = df_trips_schedule.withColumn("arrival_next_time",
                                                 functions.split(df_trips_schedule.arrival_next, ' ')[1])\
                                    .withColumn("arrival_prev_time",
                                                functions.split(df_trips_schedule.arrival_prev, ' ')[1])
df_trips_schedule = df_trips_schedule.withColumn("arrival_next_hour",
                                                functions.hour(df_trips_schedule.arrival_next_time))\
                                    .withColumn("arrival_prev_hour",
                                                functions.hour(df_trips_schedule.arrival_prev_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
df_delay_dist = df_trips_schedule.groupBy(['stop_id_prev','stop_id_next',
                                           'arrival_next_hour','arrival_prev_hour']).agg(functions.mean('arrival_delay').alias('med_val'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Finally, we convert the dataframe to Pandas and save it in order to have an easier access later on.

In [46]:
pd_delay_dist = df_delay_dist.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
pd_delay_dist.to_pickle('pd_delay_dist1yearmeanperh.pkl')
save_hdfs('pd_delay_dist1yearmeanperh.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Contruction of the Graph

We will contsruct a graph where the nodes are the events that happen on a typical week. 

2 possible types of connections: the natural one that is the mean of transport connects the two stations and the one where the traveler makes a transfer by changing the mean of transport or  walking a maximum distance of 500m.

To construct this big graph we will need to gather some information.

We will construct 2 adjency matrices for the 2 possibilites with 1 at entry (i,j) if there exists a connection between station i and j and 0 otherwise.

In [27]:
df_station_id = df_stop_time_zurich.select('StationID').distinct().toPandas()
df_station_id.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  StationID
0   8589111
1   8591190
2   8503078
3   8591284
4   8503088

In [27]:
len(df_station_id)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1570

In [28]:
df_index = df_station_id.reset_index().set_index('StationID')
df_index.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

           index
StationID       
8589111        0
8591190        1
8503078        2
8591284        3
8503088        4

In [30]:
# save to pickle
df_index.to_pickle('df_index.pkl')
save_hdfs('df_index.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f df_index.pkl /user/kooli/
Success

In [29]:
df_station_id.loc[0,'StationID']   #put index gives stationID

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

u'8589111'

In [30]:
df_index.loc['8591284','index']   #put stationID gives index 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3

This allows us to go from an index to a station_id.

We can now compute the matrices. 

In [31]:
df_stop_time_zurich_prev = df_stop_time_zurich.select('trip_id','stop_id','stop_sequence')\
                  .withColumn('next_stop_sequence_prev', (df_stop_time_zurich.stop_sequence.cast('int') + 1 ).cast('string'))

df_stop_time_zurich_next = df_stop_time_zurich.withColumnRenamed("trip_id", "trip_id_next")\
                                              .withColumnRenamed("stop_id", "stop_id_next")\
                                              .withColumnRenamed("stop_sequence", "stop_sequence_next")\
                                              .select("trip_id_next","stop_id_next","stop_sequence_next")

df_pairs = df_stop_time_zurich_prev.join(df_stop_time_zurich_next, 
                                         (df_stop_time_zurich_prev.trip_id == df_stop_time_zurich_next.trip_id_next) &\
                                         (df_stop_time_zurich_prev.next_stop_sequence_prev == df_stop_time_zurich_next.stop_sequence_next),
                                          how='inner')\
                                    .select('stop_id','stop_id_next').distinct()
#all possible direct edges
df_pairs.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------+
|stop_id|stop_id_next|
+-------+------------+
|8590787|     8590799|
|8573712|     8503693|
|8591118|     8591160|
|8587420|     8576152|
|8573205|     8588553|
|8590471|     8591278|
|8583761|     8582662|
|8590752|     8590610|
|8591378|     8590684|
|8503053|     8503054|
|8591281|     8591046|
|8590773|     8590771|
|8583731|     8582752|
|8594249|     8590752|
|8502955|     8572646|
|8503010|     8503202|
|8590632|     8588745|
|8591061|     8591270|
|8591439|     8591106|
|8591053|     8591307|
+-------+------------+
only showing top 20 rows

In [32]:
df_pairs = df_pairs.toPandas()
df_pairs.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   stop_id stop_id_next
0  8590787      8590799
1  8573712      8503693
2  8591118      8591160
3  8587420      8576152
4  8573205      8588553

In [33]:
import numpy as np 
trans_matrix = np.zeros((len(df_station_id),len(df_station_id)))

for index, row in df_pairs.iterrows():
    index1 = df_index.loc[row['stop_id'],'index']
    index2 = df_index.loc[row['stop_id_next'],'index']
    trans_matrix[index1,index2] = 1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we need to construct the walking adjency matrix.

In [34]:
#this double join give us information about the two walking stations
stations_stop_pair_walking = df_stop_time_zurich.select('StationID').distinct().join(stations_pair_walking, 
                                                        df_stop_time_zurich.StationID == stations_pair_walking.from_StationID,
                                                                                    how = 'inner').drop('StationID')

stations_stop_pair_walking = df_stop_time_zurich.select('StationID').distinct().join(stations_stop_pair_walking, 
                                                        df_stop_time_zurich.StationID == stations_pair_walking.to_StationID,
                                                                                    how = 'inner').drop('StationID')
stations_stop_pair_walking.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------+-------------+------------+------------+-----------+--------------------+-------------------+
|from_StationID|from_Longitude|from_Latitude|to_StationID|to_Longitude|to_Latitude|         distance_km|   time_walking_min|
+--------------+--------------+-------------+------------+------------+-----------+--------------------+-------------------+
|       8503077|      8.596796|    47.347317|     8503078|    8.593024|  47.345474|  0.3503261531304608|  7.006523062609215|
|       8591903|      8.596038|     47.34835|     8503078|    8.593024|  47.345474|  0.3922255506454547|  7.844511012909094|
|       8591023|      8.598149|     47.34682|     8503078|    8.593024|  47.345474| 0.41414706143344815|  8.282941228668962|
|       8590879|        8.5933|     47.34539|     8503078|    8.593024|  47.345474|0.022765925906543028|0.45531851813086055|
|       8576189|      8.588489|      47.3457|     8503078|    8.593024|  47.345474| 0.34264594945678645|  6.852918989135729|


In [35]:
walking_matrix = np.zeros((len(df_station_id),len(df_station_id)))

#dict to find the walking time easily 
#a minimum of 2 minutes for a transfer
dict_walk_time = { (r['from_StationID'], r['to_StationID']) : (r['time_walking_min'] + 2 )for r in stations_stop_pair_walking.collect()}

#collect the pairs to fill the matrix 
df_walking_pair_station = stations_stop_pair_walking.select('from_StationID','to_StationID').distinct().toPandas()
df_walking_pair_station.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  from_StationID to_StationID
0        8590879      8503078
1        8503077      8503078
2        8591903      8503078
3        8591023      8503078
4        8576189      8503078

We save the dictionnary of the walking time between stations.

In [38]:
#Save the dictionnary 
with open('dict_walk_time.pkl', 'wb') as handle:
    pickle.dump(dict_walk_time, handle, protocol=pickle.HIGHEST_PROTOCOL)
del dict_walk_time

# send to hdfs
save_hdfs('dict_walk_time.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f dict_walk_time.pkl /user/kooli/
Success

We construct a walking matrix and dictionnary.

In [36]:
for index, row in df_walking_pair_station.iterrows():
    index1 = df_index.loc[row['from_StationID'], 'index']
    index2 = df_index.loc[row['to_StationID'], 'index']
    walking_matrix[index1,index2] = 1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
walking_neighbors={}

for i in range(len(walking_matrix)):
    key = df_station_id.loc[i,'StationID']
    values = set()
    for j in range(len(walking_matrix)):
        if (walking_matrix[i,j]):
            values.add(df_station_id.loc[j,'StationID'])
    walking_neighbors[key] = values

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
#Save the dictionnary 
with open('dict_walking_neighbors.pkl', 'wb') as handle:
    pickle.dump(walking_neighbors, handle, protocol=pickle.HIGHEST_PROTOCOL)
del walking_neighbors

# send to hdfs
save_hdfs('dict_walking_neighbors.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f dict_walking_neighbors.pkl /user/kooli/
Success

We can now merge the 2 adjency matrices 

In [38]:
adj_matrix = np.logical_or(trans_matrix, walking_matrix).astype(int)
adj_matrix

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 1, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]])

#### Compute new Graph

We now have all the information to compute the edges and the vertices of the graph. We can build a schedule using the stop_sequence of each station and the associated trip_id to know which station comes next.

In [39]:
df_stop_time_prev = df_stop_time_zurich.select('trip_id', 'stop_id', 'stop_sequence', 'arrival_time', 'departure_time')\
                                    .withColumnRenamed("stop_id", "stop_id_prev")\
                                    .withColumnRenamed("arrival_time", "arrival_prev")\
                                    .withColumnRenamed("departure_time", "departure_prev")\
                                    .select("trip_id","stop_id_prev","arrival_prev",'departure_prev','stop_sequence')\
                                    .withColumn('next_stop_sequence_prev', (df_stop_time_zurich.stop_sequence.cast('int') + 1 ).cast('string'))

df_stop_time_next = df_stop_time_zurich.select('trip_id', 'stop_id', 'stop_sequence', 'arrival_time', 'departure_time')\
                                    .withColumnRenamed("trip_id", "trip_id_next")\
                                    .withColumnRenamed("stop_id", "stop_id_next")\
                                    .withColumnRenamed("arrival_time", "arrival_next")\
                                    .withColumnRenamed("departure_time", "departure_next")\
                                    .withColumnRenamed("stop_sequence", "stop_sequence_next")\
                                    .select("trip_id_next","stop_id_next","arrival_next",'departure_next','stop_sequence_next')

df_schedule = df_stop_time_prev.join(df_stop_time_next,
                                     (df_stop_time_prev.trip_id == df_stop_time_next.trip_id_next) &\
                                     (df_stop_time_prev.next_stop_sequence_prev == df_stop_time_next.stop_sequence_next),
                                     how='inner')\
                                .select('trip_id', 'stop_id_prev', 'stop_id_next', 'arrival_prev', 'departure_prev', 'arrival_next', 'departure_next')
df_schedule.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+------------+------------+--------------+------------+--------------+
|             trip_id|stop_id_prev|stop_id_next|arrival_prev|departure_prev|arrival_next|departure_next|
+--------------------+------------+------------+------------+--------------+------------+--------------+
|1.TA.26-10-j19-1.1.H|     8590620|     8590626|    15:17:00|      15:17:00|    15:18:00|      15:18:00|
|1.TA.26-161-j19-1...|     8591241|     8591080|    12:33:00|      12:33:00|    12:33:00|      12:33:00|
|1.TA.26-38-j19-1.1.R|     8591045|     8591043|    12:14:00|      12:14:00|    12:14:00|      12:14:00|
|1.TA.26-760-j19-1...|     8596113|     8591065|    16:23:00|      16:23:00|    16:25:00|      16:25:00|
|10.TA.26-14-A-j19...|     8591193|     8591276|    09:03:00|      09:03:00|    09:04:00|      09:05:00|
+--------------------+------------+------------+------------+--------------+------------+--------------+
only showing top 5 rows

We have all the possible connections of this week which constitues some of the graph direct edges. 

In [44]:
#number of direct edges
df_schedule.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

953542

Columns with dates transformed in time stamp to ease the computations.

In [40]:
df_schedule = df_schedule.withColumn('arrival_prev_time', date_formatting(df_schedule.arrival_prev))
df_schedule = df_schedule.withColumn('arrival_prev_time',
                                     functions.to_timestamp(df_schedule.arrival_prev_time, 'yyyy-MM-dd HH:mm:ss'))

df_schedule = df_schedule.withColumn('departure_prev_time', date_formatting(df_schedule.departure_prev))
df_schedule = df_schedule.withColumn('departure_prev_time',
                                     functions.to_timestamp(df_schedule.departure_prev_time, 'yyyy-MM-dd HH:mm:ss'))

df_schedule = df_schedule.withColumn('arrival_next_time', date_formatting(df_schedule.arrival_next))
df_schedule = df_schedule.withColumn('arrival_next_time',
                                     functions.to_timestamp(df_schedule.arrival_next_time, 'yyyy-MM-dd HH:mm:ss'))

df_schedule = df_schedule.withColumn('departure_next_time', date_formatting(df_schedule.departure_next))
df_schedule = df_schedule.withColumn('departure_next_time',
                                     functions.to_timestamp(df_schedule.departure_next_time, 'yyyy-MM-dd HH:mm:ss'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
@functions.udf
def create_id(stop_id,time):
    return stop_id+'_'+time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
@functions.udf
def compute_time(departure_time, arr_time):
    return (departure_time - arr_time).total_seconds()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We start by computing all the simple edges with the weights being the time needed to go from src (source) to dst (destination). 

In [43]:
df_edges = df_schedule.withColumn('src',
                       create_id(functions.col('stop_id_prev'), functions.col('arrival_prev')))\
            .withColumn('dst',
                       create_id(functions.col('stop_id_next'), functions.col('arrival_next')))\
            .withColumn('weight',
                        compute_time (functions.col('arrival_next_time'), functions.col('arrival_prev_time')) ) \
            .select('src','dst','weight')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
df_edges = df_edges.distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
df_edges.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----------------+------+
|             src|             dst|weight|
+----------------+----------------+------+
|8590620_15:17:00|8590626_15:18:00|  60.0|
|8591241_12:33:00|8591080_12:33:00|   0.0|
|8591045_12:14:00|8591043_12:14:00|   0.0|
|8596113_16:23:00|8591065_16:25:00| 120.0|
|8591193_09:03:00|8591276_09:04:00|  60.0|
|8573164_11:49:00|8573165_11:50:00|  60.0|
|8590544_15:33:00|8590542_15:34:00|  60.0|
|8502776_10:46:00|8573163_10:47:00|  60.0|
|8591196_16:12:00|8591230_16:14:00| 120.0|
|8503378_14:27:00|8588279_14:29:00| 120.0|
|8591832_12:40:00|8591147_12:41:00|  60.0|
|8590610_12:28:00|8594249_12:30:00| 120.0|
|8591258_08:02:00|8591311_08:03:00|  60.0|
|8590707_16:58:00|8590709_16:59:00|  60.0|
|8591174_14:39:00|8588078_14:41:00| 120.0|
|8591329_11:43:00|8591366_11:45:00| 120.0|
|8591410_14:07:00|8591268_14:08:00|  60.0|
|8589110_08:19:00|8589109_08:20:00|  60.0|
|8581015_14:06:00|8573178_14:09:00| 180.0|
|8580912_14:24:00|8503610_14:27:00| 180.0|
+----------

In [50]:
pre_dict_edges_simple = df_edges.groupby('dst').agg(functions.collect_list('src'))
dict_edges_simple = { r['dst'] : r['collect_list(src)']  for r in pre_dict_edges_simple.collect()}

#Save the dictionnary 
with open('dict_edges_simple.pkl', 'wb') as handle:
    pickle.dump(dict_edges_simple, handle, protocol=pickle.HIGHEST_PROTOCOL)
del dict_edges_simple

# send to hdfs
save_hdfs('dict_edges_simple.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f dict_edges_simple.pkl /user/kooli/
Success

In [71]:
df_edges.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

339261

In [45]:
df_vertices = df_edges.select('src').withColumnRenamed('src', 'id')\
                                    .union(df_edges.select('dst').withColumnRenamed('src', 'id'))\
                                    .distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
df_vertices.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+
|              id|
+----------------+
|8572598_16:47:00|
|8576248_16:25:00|
|8590716_08:45:00|
|8591100_11:30:00|
|8590318_16:33:00|
|8503579_10:32:00|
|8591051_08:37:00|
|8591379_16:39:00|
|8590587_11:02:00|
|8590554_16:26:00|
|8591219_15:51:00|
|8591101_08:20:00|
|8502572_09:44:00|
|8590552_15:03:00|
|8591273_12:28:00|
|8580301_14:36:00|
|8591079_08:12:00|
|8591067_16:46:00|
|8594261_14:36:00|
|8503378_11:30:00|
+----------------+
only showing top 20 rows

In [55]:
df_vertices.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

229314

In [45]:
# save
#df_vertices.write.format("orc").save("/user/{}/df_vertices_bis.orc".format("kooli"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
pd_vertices = df_vertices.toPandas()
pd_vertices.to_pickle('pd_vertices.pkl')
save_hdfs('pd_vertices.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f pd_vertices.pkl /user/kooli/
Success

We now that we have all the nodes and the direct edges, we need to add the possibilities to make a transfer, by stoping at a stop and taking another mean of transport from the same stop or another stop nearby. 

We list for each station all the scheduled stops.

In [54]:
df_stop_list_hours = df_stop_time_zurich.groupby('stop_id').agg(functions.collect_list('arrival_time'))
df_stop_list_hours.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------------+
|stop_id|collect_list(arrival_time)|
+-------+--------------------------+
|8503078|      [15:50:00, 16:05:...|
|8503088|      [08:15:00, 08:25:...|
|8573729|      [16:39:00, 16:09:...|
|8589111|      [13:45:00, 13:45:...|
|8590819|      [15:45:00, 15:15:...|
|8591190|      [15:13:00, 09:43:...|
|8503376|      [08:03:00, 09:50:...|
|8506895|      [16:12:00, 16:12:...|
|8587967|      [16:09:00, 16:17:...|
|8591284|      [15:45:00, 15:15:...|
|8590478|      [13:38:00, 13:18:...|
|8590804|      [09:56:00, 09:41:...|
|8591315|      [14:25:00, 14:19:...|
|8591362|      [09:37:00, 16:13:...|
|8588312|      [16:48:00, 16:48:...|
|8590541|      [09:38:00, 09:08:...|
|8591149|      [08:58:00, 08:50:...|
|8590273|      [16:51:00, 16:36:...|
|8590477|      [13:29:00, 12:59:...|
|8591053|      [14:16:00, 08:56:...|
+-------+--------------------------+
only showing top 20 rows

In [55]:
#Connect all nodes within the same stop_id and having at least 2 minutes of interval 
#Returns for each stop_id a list of edges to add 
@functions.udf('array<array<string>>')
def add_edges_correspondance(stop_id, list_hours):
    edges = []
    for hour_before, hour_after in zip(list_hours[:-1], list_hours[1:]):
        date1 = '2019-05-13'+' '+hour_before
        date2 = '2019-05-13'+' '+hour_after
        
        time_before = datetime.datetime.strptime(date1, "%Y-%m-%d %H:%M:%S")
        time_after =  datetime.datetime.strptime(date2, "%Y-%m-%d %H:%M:%S")
        transfer_time = (time_after - time_before).total_seconds()
        
        if( transfer_time >= 120 ):  #only make a transfer if he has 2 minutes ?
            e = [stop_id+'_'+hour_before, stop_id+'_'+hour_after, transfer_time]
            edges.append(e)  
    return edges

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
df_stop_list_hours = df_stop_list_hours.withColumn('edges',
                              add_edges_correspondance(functions.col('stop_id'), 
                                                      functions.col('collect_list(arrival_time)')) )
df_stop_list_hours.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------------+--------------------+
|stop_id|collect_list(arrival_time)|               edges|
+-------+--------------------------+--------------------+
|8503078|      [15:50:00, 16:05:...|[[8503078_15:50:0...|
|8503088|      [08:15:00, 08:25:...|[[8503088_08:15:0...|
|8573729|      [16:39:00, 16:09:...|[[8573729_16:09:0...|
|8589111|      [13:45:00, 13:45:...|[[8589111_08:15:0...|
|8590819|      [15:45:00, 15:15:...|[[8590819_10:45:0...|
|8591190|      [15:13:00, 09:43:...|[[8591190_09:33:0...|
|8503376|      [08:03:00, 09:50:...|[[8503376_08:03:0...|
|8506895|      [16:12:00, 16:12:...|[[8506895_08:12:0...|
|8587967|      [16:09:00, 16:17:...|[[8587967_16:09:0...|
|8591284|      [15:45:00, 15:15:...|[[8591284_08:45:0...|
|8590478|      [13:38:00, 13:18:...|[[8590478_12:18:0...|
|8590804|      [09:56:00, 09:41:...|[[8590804_08:11:0...|
|8591315|      [14:25:00, 14:19:...|[[8591315_12:55:0...|
|8591362|      [09:37:00, 16:13:...|[[8591362_09:37:0...|
|8588312|     

For each stop we have a list of possible transfers that can be made by stopping at a station and take a different mean of transport.

In [57]:
dict_stop_list_hours = { r['stop_id'] : r['collect_list(arrival_time)']  for r in df_stop_list_hours.collect()}

#Save the dictionnary 
with open('dict_stop_list_hours.pkl', 'wb') as handle:
    pickle.dump(dict_stop_list_hours, handle, protocol=pickle.HIGHEST_PROTOCOL)
del dict_stop_list_hours

# send to hdfs
save_hdfs('dict_stop_list_hours.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f dict_stop_list_hours.pkl /user/kooli/
Success

In [58]:
df_edges_array = df_stop_list_hours.withColumn("col", functions.explode("edges")).select('col')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
df_edges_array = df_edges_array.withColumn('src', df_edges_array.col.getItem(0))
df_edges_array = df_edges_array.withColumn('dst', df_edges_array.col.getItem(1)) 
df_edges_array = df_edges_array.withColumn('weight', df_edges_array.col.getItem(2))
df_edges_correspondance =  df_edges_array.drop('col')
df_edges_correspondance.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----------------+------+
|             src|             dst|weight|
+----------------+----------------+------+
|8503088_08:31:00|8503088_08:51:00|1200.0|
|8503088_08:51:00|8503088_09:31:00|2400.0|
|8503088_09:31:00|8503088_09:51:00|1200.0|
|8503088_09:51:00|8503088_10:31:00|2400.0|
|8503088_10:31:00|8503088_10:51:00|1200.0|
|8503088_10:51:00|8503088_11:31:00|2400.0|
|8503088_11:31:00|8503088_11:51:00|1200.0|
|8503088_11:51:00|8503088_12:31:00|2400.0|
|8503088_12:31:00|8503088_12:51:00|1200.0|
|8503088_12:51:00|8503088_13:31:00|2400.0|
|8503088_13:31:00|8503088_13:51:00|1200.0|
|8503088_13:51:00|8503088_14:31:00|2400.0|
|8503088_14:31:00|8503088_14:51:00|1200.0|
|8503088_14:51:00|8503088_15:31:00|2400.0|
|8503088_15:31:00|8503088_15:51:00|1200.0|
|8503088_15:51:00|8503088_16:31:00|2400.0|
|8503088_16:31:00|8503088_16:51:00|1200.0|
|8503088_08:38:00|8503088_08:58:00|1200.0|
|8503088_08:58:00|8503088_09:38:00|2400.0|
|8503088_09:38:00|8503088_09:58:00|1200.0|
+----------

In [60]:
pre_dict_edges_corr = df_edges_correspondance.groupby('dst').agg(functions.collect_list('src'))
dict_edges_corr = { r['dst'] : r['collect_list(src)']  for r in pre_dict_edges_corr.collect()}

#Save the dictionnary 
with open('dict_edges_corr.pkl', 'wb') as handle:
    pickle.dump(dict_edges_corr, handle, protocol=pickle.HIGHEST_PROTOCOL)
del dict_edges_corr

# send to hdfs
save_hdfs('dict_edges_corr.pkl', '/user/{}/'.format('kooli'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f dict_edges_corr.pkl /user/kooli/
Success

In [76]:
df_edges_correspondance.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

243224

In [77]:
#save the correspondance edges to avoid recomputing them 
#df_edges_correspondance.write.csv("/user/chahed/edges_corr_filtered_final_corrected.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We now focus on the transfer that can be made by walking to a nearby station.

In [61]:
@functions.udf('array<array<string>>')
def add_edges_walking(node_id):
    stop_id, time = node_id.split('_')
    index_node = df_index.loc[stop_id,'index'] 
    edges = []
    
    for i in range(len(df_index)):  #iterate over the other nodes 
        if(walking_matrix[index_node,i]): #if it's possible to go by walking from the present station
            
            station_id_other = df_station_id.loc[i,'StationID']
            
            walking_time = datetime.timedelta(minutes = dict_walk_time[(stop_id, station_id_other)] )
            time_formatted = datetime.datetime.strptime('2019-05-13'+' '+time, "%Y-%m-%d %H:%M:%S")
            arrival_time = time_formatted + walking_time
            
            hours_other = dict_stop_list_hours[station_id_other]
            possible_changes = filter( (lambda x : x > arrival_time) , \
                                    map(lambda x: datetime.datetime.strptime('2019-05-13'+' '+x, "%Y-%m-%d %H:%M:%S"), \
                                        hours_other) )
            
            if(len(possible_changes) != 0):
                first_hour_change = min(possible_changes)
                                    
                #destination node
                first_hour_change_str = first_hour_change.strftime('%H:%M:%S')
                node_id_other = station_id_other+'_'+first_hour_change_str
            
                time_transfer = (first_hour_change - time_formatted).total_seconds()
            
                edges.append([node_id, node_id_other, time_transfer])
            
    return edges

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
df_edges_correspondance_walking = df_vertices.withColumn('edges',add_edges_walking(functions.col('id')) )
df_edges_correspondance_walking.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
df_edges_correspondance_walking_exploded = df_edges_correspondance_walking.withColumn("edges", 
                                                                                      functions.explode("edges")).select('edges')
df_edges_correspondance_walking_exploded.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
df_edges_correspondance_walking_exploded = df_edges_correspondance_walking_exploded.withColumn('src', df_edges_correspondance_walking_exploded.edges.getItem(0))
df_edges_correspondance_walking_exploded = df_edges_correspondance_walking_exploded.withColumn('dst', df_edges_correspondance_walking_exploded.edges.getItem(1)) 
df_edges_correspondance_walking_exploded = df_edges_correspondance_walking_exploded.withColumn('weight', df_edges_correspondance_walking_exploded.edges.getItem(2))
df_edges_walk =  df_edges_correspondance_walking_exploded.drop('edges') 
df_edges_walk.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----------------+------+
|             src|             dst|weight|
+----------------+----------------+------+
|8588078_15:29:00|8503088_15:47:00|1080.0|
|8588078_15:29:00|8591327_15:37:00| 480.0|
|8588078_15:29:00|8591123_15:39:00| 600.0|
|8588078_15:29:00|8587349_15:35:00| 360.0|
|8588078_15:29:00|8591067_15:38:00| 540.0|
|8588078_15:29:00|8503446_16:00:00|1860.0|
|8588078_15:29:00|8530822_15:41:00| 720.0|
|8588078_15:29:00|8591174_15:38:00| 540.0|
|8588078_15:29:00|8591379_15:39:00| 600.0|
|8588078_15:29:00|8503000_15:38:00| 540.0|
|8588078_15:29:00|8587348_15:38:00| 540.0|
|8588078_15:29:00|8503500_15:35:00| 360.0|
|8588078_15:29:00|8503499_15:33:00| 240.0|
|8591119_16:50:00|8530813_17:00:00| 600.0|
|8591119_16:50:00|8503083_16:57:00| 420.0|
|8591119_16:50:00|8591364_17:00:00| 600.0|
|8591119_16:50:00|8591199_17:00:00| 600.0|
|8591329_12:44:00|8503087_13:03:00|1140.0|
|8591329_12:44:00|8591366_12:53:00| 540.0|
|8591329_12:44:00|8591365_12:50:00| 360.0|
+----------

In [63]:
#save the correspondance edges to avoid recomputing them 
#df_edges_walk.write.csv("/user/chahed/edges_walk_filtered_final_corrected_bis.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We now have the all the pieces of our graph and this step needs only to be done once. Every computation is saved and we can ftom now on load it and directly compute the routing alogrithm.

# Load and apply bfs 

In [4]:
import pyspark.sql.functions as functions
import datetime
import pandas as pd 
import math

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
@functions.udf
def date_formatting(date):
    return '2019-05-13'+' '+date

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
dict_edges_bus = read_hdfs('/user/{}/dict_edges_simple.pkl'.format('kooli'))
dict_edges_waiting = read_hdfs('/user/{}/dict_edges_corr.pkl'.format('kooli'))
vertices = read_hdfs('/user/{}/pd_vertices.pkl'.format('kooli')) 
df_index = read_hdfs('/user/{}/df_index.pkl'.format('kooli'))
walking_neighbors = read_hdfs('/user/{}/dict_walking_neighbors.pkl'.format('kooli') )
dict_walk_time = read_hdfs('/user/{}/dict_walk_time.pkl'.format('kooli'))
dict_stop_list_hours =  read_hdfs('/user/{}/dict_stop_list_hours.pkl'.format('kooli'))
df_means = read_hdfs('/user/{}/pd_delay_dist1yearmeanperh.pkl'.format('kooli'))


#format the vertices data frame
vertices['station'] =  vertices.id.map(lambda x :  x.split('_')[0])
vertices['time'] = vertices.id.map(lambda x :  x.split('_')[1])
vertices['time_formatted'] = vertices.time.map(lambda x : datetime.datetime.strptime('2019-05-13'+' '+x, "%Y-%m-%d %H:%M:%S"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -cat /user/kooli/dict_edges_simple.pkl
Success
Running system command: hdfs dfs -cat /user/kooli/dict_edges_corr.pkl
Success
Running system command: hdfs dfs -cat /user/kooli/pd_vertices.pkl
Success
Running system command: hdfs dfs -cat /user/kooli/df_index.pkl
Success
Running system command: hdfs dfs -cat /user/kooli/dict_walking_neighbors.pkl
Success
Running system command: hdfs dfs -cat /user/kooli/dict_walk_time.pkl
Success
Running system command: hdfs dfs -cat /user/kooli/dict_stop_list_hours.pkl
Success
Running system command: hdfs dfs -cat /user/kooli/pd_delay_dist1yearmeanperh.pkl
Success

The algorithm is based on Breadth First Search with a few changes. We made the assumption that a trip shouldn't last more than 2 hours and that in each correspondence we can't walk more than 10 minutes. Since the graph nodes are event based (station + time) we set the destination nodes as the 5 last events occuring at the destination station strictly before the arrival time. We also made the assumption that we don't want to arrive exactly at the arrival time since this would cause the risk of being late to be very high. In order to provide a faster calculation (and unfortunately a suboptimal solution) we set as parameters a maximal depth for the tree traversal, we also set as parameter a minimum number of paths, once we surpass that threshold, the time window is updated from 2 hours to (**arrival time - earliest departure of the collected paths**). Another pruning method was to have a dictionary of **visited stations** and their time of visit. If we already visited a station at a cetain time **T** we only add a node from the same station if the time is superior to <em>**T**</em> since this means that there is a shorter path from that station to the destination. To ensure an output of different paths, we make sure that there are no two paths that have the same nodes except the source nodes, since this means that if path **A** starts at <em>**T**<sub>departure</sub></em> then path **B** is just waiting at the source station then taking path **A** at <em>**T**<sub>departure</sub></em>. So we only retain path **A**

In [7]:
def bfs_homies(source,final_stop_id, max_arrival_time, dict_edges_bus,dict_edges_waiting,vertices,df_index,walking_neighbors,
               dict_walk_time,dict_stop_list_hours,max_iter=12,minimum_paths=5):
    
    #find the destination nodes
    arrival_time_formatted = datetime.datetime.strptime('2019-05-13'+' '+max_arrival_time, "%Y-%m-%d %H:%M:%S")
    possible_dest_nodes=vertices.loc[(vertices.time_formatted<arrival_time_formatted) & \
                                                      (vertices.station==final_stop_id),"id"]
    
    destinations=set(possible_dest_nodes.sort_values(ascending = False).values[:5]) 
    # first iteration will be to visit the destination nodes
    to_visit=destinations.copy()
    
    # Heuristic rule: a trip duration cannot exceed 2 hours
    max_time_formatted = datetime.datetime.strptime('2019-05-13'+' '+max_arrival_time, "%Y-%m-%d %H:%M:%S")
    earliest_departure_time=max_time_formatted- datetime.timedelta(minutes = 120)
    
    # finding all the nodes correspending to the source station and possible departure times 
    possible_nodes_to_visit = vertices.loc[(vertices.time_formatted>earliest_departure_time) \
                                            & (vertices.time_formatted<max_time_formatted)]
    
    possible_sources = possible_nodes_to_visit.loc[possible_nodes_to_visit.station==source,"id"]
                                           
    possible_sources = set(possible_sources.values)
    
    
    # Initialization
    true_paths=[]
    transports=[]
    trunc_paths={}
    sources_with_path=[]
    dict_paths={}
    visited_stations={}
    
    # add destination node
    visited_stations[final_stop_id]=max_arrival_time
    
    break_condition=False
    k=0   #iteration counter
    while((not break_condition)&(k<max_iter)):
        k+=1

        # deep copy of the nodes to visit
        to_visit_tmp=to_visit.copy()
        to_visit=set()
       
        for node in to_visit_tmp:
            
            to_visit_next_from_node=set()
            
            stop_id, time = node.split('_')
            visited_stations[stop_id]=time
            node_transport="waiting"
            if (stop_id !=final_stop_id):
                node_transport=dict_paths[node][1]
            
            
            ################################ WALKING ######################
            
            #avoid double walking
            if (node_transport!="walking"):

                time_formatted = datetime.datetime.strptime('2019-05-13'+' '+time, "%Y-%m-%d %H:%M:%S")
                index_node = df_index.loc[stop_id,'index'] 
                walking_nodes=walking_neighbors[stop_id]
                for station_id_other in walking_nodes:  #iterate over the other nodes 

                    walking_time = datetime.timedelta(minutes = dict_walk_time[(stop_id, station_id_other)] )
                    departure_time = time_formatted - walking_time

                    hours_other = dict_stop_list_hours[station_id_other]
                    possible_changes = list(filter( (lambda x : (x < departure_time) &(earliest_departure_time<x)) , \
                                            map(lambda x: datetime.datetime.strptime('2019-05-13'+' '+x,"%Y-%m-%d %H:%M:%S"),\
                                                hours_other) ))

                    if(len(possible_changes) != 0):
                                                
                        first_hour_change = max(possible_changes)               
                        #destination node
                        first_hour_change_str = first_hour_change.strftime('%H:%M:%S')
                        node_id_other = station_id_other+'_'+first_hour_change_str

                        time_transfer = (first_hour_change - time_formatted).total_seconds()

                        # add the node
                        if (not (node_id_other in to_visit_tmp)):
                            # check if the we reached the station before
                            if station_id_other in visited_stations.keys():
                                # if we reached it at a better time update it
                                if first_hour_change_str>visited_stations[station_id_other]:
                                    visited_stations[station_id_other]=first_hour_change_str
                                    to_visit_next_from_node.add(node_id_other)
                            else:
                                to_visit_next_from_node.add(node_id_other)

                        # if the path is arleady added, we don't update since we favoritize bus transport    
                        if not (node_id_other in dict_paths.keys()):

                            dict_paths[node_id_other]=[node,"walking",stop_id] # node=stop_id+time

            #################### Bus ################
            if node in dict_edges_bus.keys():

                bus_nodes=dict_edges_bus[node]

                for bus_node in bus_nodes:
                    bus_node_id,bus_node_time=bus_node.split('_')
                    
                    if bus_node_id in visited_stations.keys():
                        # if we reached it at a better time update it
                        if bus_node_time>visited_stations[bus_node_id]:
                            visited_stations[bus_node_id]=bus_node_time
                            to_visit_next_from_node.add(bus_node)
                            dict_paths[bus_node]=[node,"bus",stop_id]
                    else:
                        to_visit_next_from_node.add(bus_node)
                        dict_paths[bus_node]=[node,"bus",stop_id]        
                    # here we always update since this can only be better
                    

            #################### Correspendance ##############
            if node in dict_edges_waiting.keys():
                waiting_nodes=dict_edges_waiting[node]
                to_visit_next_from_node.update(waiting_nodes)
                
                # here we always update since this can only be better
                for waiting_node in waiting_nodes:
                    waiting_node_id,waiting_node_time=waiting_node.split('_')
                    
                    if waiting_node_id in visited_stations.keys():
                        # if we reached it at a better time update it
                        if waiting_node_time>visited_stations[waiting_node_id]:
                            visited_stations[waiting_node_id]=waiting_node_time
                            
                    dict_paths[waiting_node]=[node,"waiting",stop_id]
            
            # check if new nodes are valid
            valid_nodes = filter((lambda x :(earliest_departure_time<datetime.datetime.strptime('2019-05-13'+' '+x.split('_')[1],
                                                                                               "%Y-%m-%d %H:%M:%S"))) , \
                                            to_visit_next_from_node)
                      
            to_visit_next_from_node=set()                
            for new_node in valid_nodes:
                
                if (new_node in possible_sources):
                    
                    # reconstruct the path
                    double_walk=False # for not having two consecutive walking edges
                    new_node_trans=dict_paths[new_node][1]
                    last_transport=new_node_trans
                    correct_path=[new_node]
                    trans=[last_transport]
                    trunc_path=[]
                    trunc_trans=[]
                    backprop=node
                    current_station=dict_paths[new_node][2]
                    path_stations=set()
                    path_stations.add(current_station)
                    loop=False # for not going to the same station twice
                    nb=0 #number of hops, it shouldn't exceed 200
                    while ((current_station!=final_stop_id)&(nb<200)&(not double_walk)&(not loop)):
                        nb+=1
                        node_meta=dict_paths[backprop]
                        current_station=node_meta[2]
                        
                        if (current_station in path_stations):
                            loop=True
                        else:
                            if ((node_meta[1]==last_transport=="walking")):
                                double_walk=True
                            else:
                                path_stations.add(current_station)
                                correct_path.append(backprop)
                                trunc_path.append(backprop)
                                
                                backprop=node_meta[0]
                                
                                last_transport=node_meta[1]
                                trans.append(last_transport)
                                trunc_trans.append(last_transport)
                                
                    if(nb==200):
                        print("Maximum hops reached at: ",correct_path)
                    if((not double_walk)&(not loop)):
                        trunc_key=tuple(trunc_path)
                        if (trunc_key in trunc_paths.keys()):
                            other_source=trunc_paths[trunc_key]
                            new_time=new_node.split("_")[1]
                            other_time=other_source.split("_")[1]
                            if new_time>other_time:
                                
                                # remove the old path
                                path_to_remove=trunc_path[:]
                                path_to_remove.insert(0,other_source)
                                path_to_remove.append(backprop)
                                
                                # remove the old transport
                                other_transport=dict_paths[other_source][1]
                                trans_to_remove=trunc_trans[:]
                                trans_to_remove.insert(0,other_transport)
                                trans_to_remove.append(last_transport)
                                if path_to_remove in true_paths:
                                    print("Old path starts at {}, better start at {} ".format(other_time,new_time))
                                    true_paths.remove(path_to_remove)
                                    transports.remove(trans_to_remove)
                                else:
                                    print("can't find path:", path_to_remove)
                                # update dict
                                trunc_paths[trunc_key]=new_node
                                # add updated path
                                correct_path.append(backprop)
                                trans.append(last_transport)
                                true_paths.append(correct_path)
                                transports.append(trans)
                                sources_with_path.remove(other_source)
                                sources_with_path.append(new_node)
                                
                        else:        
                            trunc_paths[trunc_key]=new_node
                            correct_path.append(backprop)
                            trans.append(last_transport)
                            true_paths.append(correct_path)
                            transports.append(trans)
                            sources_with_path.append(new_node)
                            print("found a new path at iteration ",k)
                            
                else:
                    to_visit_next_from_node.add(new_node)
                  
  
            # don't add nodes that we are currently exploring
            to_visit_next_from_node=to_visit_next_from_node-to_visit_tmp
            
            # cumulating nodes
            to_visit.update(to_visit_next_from_node)
        if (len(to_visit)>0):
            selected_nodes=pd.DataFrame(list(map(lambda x: x.split("_"),(to_visit)))). \
                                groupby(0).max().reset_index().apply(lambda x: x[0]+"_"+x[1],axis=1).values
            
            print("at iteration {} we are visiting {} nodes that corresponds to {} unique stations".
                  format(k,len(to_visit),len(selected_nodes)))
        else:
            # break if the all the graph is visited
            break_condition=True
            
        #heuristic rule : limit the minimum number of paths
        if (len(sources_with_path) >= min(minimum_paths,len(possible_sources))) :
            earliest_time=sorted(sources_with_path)[0].split("_")[1]
            earliest_formatted = datetime.datetime.strptime('2019-05-13'+' '+earliest_time, "%Y-%m-%d %H:%M:%S")
            if earliest_formatted>earliest_departure_time:
                print("update time")
                earliest_departure_time=earliest_formatted
              
    print("completed after {} iterations. Constructed tree with {} nodes. Found {} paths".
          format(k,len(dict_paths),len(true_paths)))
    
    if(len(sources_with_path) == 0):
        print("No path found, try to increase BFS's depth.")
    else: 
        earliest_time = sorted(sources_with_path)[0].split("_")[1]
        earliest_formatted = datetime.datetime.strptime('2019-05-13'+' '+earliest_time, "%Y-%m-%d %H:%M:%S")
        if ((earliest_formatted==earliest_departure_time)|(len(to_visit)>0)):
            print("If you can be more patient we can get you more paths by increasing the minimum path parameter")
        else:
            print("That's all what we can find")
    return true_paths ,transports

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

To take into account the risk factor, we first computed the hourly mean delay for all the bus stops. Assuming that this corresponds to the expected delay, we can modelize the delay distribution as an exponential with parameter $\lambda = \frac{1}{mean}$ . Then for each path we see when a correspondance happens <em>(i.e when the traveler gets off the bus to take the next one)</em> and get the mean of the delay for the last stop, we then see how much time does the path allow as delay to still catch the next bus, which is modelised as **T<sub>i</sub>=transfer time**. Now we can get the probability of not missing a correspondance at a correspondance $i$ as $P_i = 1 - e^{- \lambda_{i} * T_i}$ . The final probability is therefore the product of $P_i's$ , we therefore compare it to the threshold to determine if the path is safe.

In [9]:
def get_safe_paths(true_paths,transports,max_time_arrival,threshold,df_means):
    max_time_arrival_formatted = datetime.datetime.strptime('2019-05-13'+' '+max_time_arrival, "%Y-%m-%d %H:%M:%S")

    path_safe = []
    safe_probas=[]
    for path,transport in zip(true_paths, transports) : 

        proba_success_transfer = 1

        if(len(path) == 1):
            pritn('Error need at least 2 stations !')
        elif(len(path) == 2):

            now = path[0]
            after = path[1]
            transport_now_after = transports[0]

            if(transport == 'bus'):
                station_now, time_now = now.split('_')
                station_after, time_after = after.split('_')

                hour_now = int(time_now[0:2])
                hour_after = int(time_after[0:2])

                mean = df_meas.loc[(df_means['stop_id_prev'] == station_now) & (df_means['arrival_prev_hour'] == hour_now) 
                                          & (df_means['stop_id_next'] == station_after) & (df_means['arrival_next_hour'] == hour_after),'mean_val'].values

                if(mean.isEmpty()):
                    lambda_ = 1e12  #equivlaent to infinity
                else : 
                    lambda_ = 1/max(mean[0], 1e-12)

                proba_success_transfer = proba_success_transfer * (1 - math.exp(-transfer_time * lambda_))

        else :

            for i in range(len(path)):  

                if (i <= len(path) - 3) : 

                    prev = path[i]
                    now = path[i+1]
                    after = path[i+2]
                    transport_now_after = transport[i+1]
                    destination = (i==(len(path)-3))

                    station_prev, time_prev = prev.split('_')
                    station_now, time_now = now.split('_')
                    station_after, time_after = after.split('_')

                    hour_prev = int(time_prev[0:2])
                    hour_now = int(time_now[0:2])

                    mean = df_means.loc[(df_means['stop_id_prev'] == station_prev) & (df_means['arrival_prev_hour'] == hour_prev) 
                                          & (df_means['stop_id_next'] == station_now) & (df_means['arrival_next_hour'] == hour_now),'mean_val'].values

                    if(mean.size == 0):
                        lambda_ = 1e12   #equivlaent to infinity
                    else : 
                        lambda_ = 1/max(mean[0], 1e-12)


                    time_prev_formatted = datetime.datetime.strptime('2019-05-13'+' '+time_prev, "%Y-%m-%d %H:%M:%S")
                    time_now_formatted = datetime.datetime.strptime('2019-05-13'+' '+time_now, "%Y-%m-%d %H:%M:%S")
                    time_after_formatted = datetime.datetime.strptime('2019-05-13'+' '+time_after, "%Y-%m-%d %H:%M:%S")

                    if(transport_now_after == 'waiting'):
                        transfer_time = (time_after_formatted - time_now_formatted)

                        if not destination:
                            transfer_time=transfer_time - datetime.timedelta(minutes = 2)

                        transfer_time=transfer_time.total_seconds()
                        proba_success_transfer = proba_success_transfer * (1 - math.exp(-transfer_time * lambda_))

                    elif(transport_now_after == 'walking'):
                        transfer_time = ( (time_after_formatted - time_now_formatted) 
                                         - datetime.timedelta(minutes = float(dict_walk_time[(station_now,station_after)])) )

                        if destination:
                            transfer_time = transfer_time + datetime.timedelta(minutes = 2)
                        transfer_time = transfer_time.total_seconds()
                        proba_success_transfer = proba_success_transfer * (1 - math.exp(-transfer_time * lambda_))

                    elif (destination  &  (transport_now_after == 'bus')):

                        transfer_time = max_time_arrival_formatted - time_after_formatted
                        transfer_time = transfer_time.total_seconds()


        if (threshold < proba_success_transfer): 
            safe_probas.append(proba_success_transfer)
            path_safe.append([path,transport[:-1]])
    return path_safe , safe_probas

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
def get_final_output(path_safe,safe_probas):
    initial_node=[]
    for path in path_safe:
        initial_node.append(path[0][0])

    ordered_nodes=sorted(initial_node)[::-1]
    ordered_safe_paths=[]
    for i in range(len(ordered_nodes)):
        ordered_safe_paths.append(path_safe[initial_node.index(ordered_nodes[i])])


    final_output=[]
    for path ,transport in ordered_safe_paths:
        final_path=[]
        i=0
        taking_bus=False
        while (i<len(transport)):
            if not(taking_bus):
                final_path.append(path[i].split("_"))
                final_path.append(transport[i])
                if transport[i]=="bus":
                    taking_bus=True
            else:
                if transport[i]!="bus":
                    taking_bus=False
                    final_path.append(path[i].split("_"))
                    final_path.append(transport[i])
            i+=1

        final_path.append(path[-1].split("_"))
        final_output.append(final_path)


    output_messages=[]
    for index ,path in enumerate(final_output):

        message='- Path number {} ({:0.2f}% certainty):\n  Be at the starting station before {}.\n'.format(
            index+1,100*safe_probas[index],path[0][1])
        for i in range(1,len(path),2):
            if path[i]=="bus":
                message+="  Take the bus from {} at {} to arrive to {} at {}\n".format(
                    path[i-1][0],path[i-1][1],path[i+1][0],path[i+1][1])
            elif path[i]=="walking":
                message+="  Walk from {} to catch the bus at {} at {}\n".format(
                path[i-1][0],path[i+1][0],path[i+1][1])
            elif path[i]=="waiting":
                message+="Wait at station {} to take the bus at {}\n".format(
                path[i-1][0],path[i+1][1])
        message+="You will arrive at your destination at {}".format(path[len(path)-1][1]) 
        output_messages.append(message)
    return final_output , output_messages

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
#the inputs 
source = '8503000' 
destination_id = '8591049'
max_time_arrival = '12:30:00'
threshold = 0.9
max_iter = 15
minimum_paths = 15

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We run BFS to find the possible paths from the 2 stations.

In [11]:
true_paths, transports = bfs_homies(source,destination_id, max_time_arrival, dict_edges_bus,
                                    dict_edges_waiting,vertices,df_index,walking_neighbors,
                                   dict_walk_time,dict_stop_list_hours,max_iter,minimum_paths)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

at iteration 1 we are visiting 8 nodes that corresponds to 4 unique stations
at iteration 2 we are visiting 12 nodes that corresponds to 6 unique stations
at iteration 3 we are visiting 26 nodes that corresponds to 11 unique stations
at iteration 4 we are visiting 38 nodes that corresponds to 19 unique stations
at iteration 5 we are visiting 49 nodes that corresponds to 22 unique stations
at iteration 6 we are visiting 102 nodes that corresponds to 29 unique stations
('found a new path at iteration ', 7)
Old path starts at 11:45:00, better start at 11:49:00 
at iteration 7 we are visiting 160 nodes that corresponds to 43 unique stations
('found a new path at iteration ', 8)
('found a new path at iteration ', 8)
('found a new path at iteration ', 8)
('found a new path at iteration ', 8)
at iteration 8 we are visiting 243 nodes that corresponds to 70 unique stations
('found a new path at iteration ', 9)
at iteration 9 we are visiting 344 nodes that corresponds to 96 unique stations
('fou

In [12]:
path_safe , safe_probas = get_safe_paths(true_paths,transports,max_time_arrival,threshold,df_mean)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
final_output , output_messages = get_final_output(path_safe,safe_probas)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
for msg in output_messages:
    print(msg)
    print("")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Path number 1 (100.00% certainty):
  Be at the starting station before 12:10:00.
  Take the bus from 8503000 at 12:10:00 to arrive to 8503006 at 12:16:00
  Walk from 8503006 to catch the bus at 8580449 at 12:21:00
  Take the bus from 8580449 at 12:21:00 to arrive to 8591049 at 12:29:00
You will arrive at your destination at 12:29:00

- Path number 2 (99.99% certainty):
  Be at the starting station before 12:09:00.
  Take the bus from 8503000 at 12:09:00 to arrive to 8503006 at 12:15:00
  Walk from 8503006 to catch the bus at 8591063 at 12:22:00
  Take the bus from 8591063 at 12:22:00 to arrive to 8591049 at 12:29:00
You will arrive at your destination at 12:29:00

- Path number 3 (100.00% certainty):
  Be at the starting station before 12:05:00.
  Take the bus from 8503000 at 12:05:00 to arrive to 8503006 at 12:11:00
  Walk from 8503006 to catch the bus at 8591382 at 12:18:00
  Take the bus from 8591382 at 12:18:00 to arrive to 8591049 at 12:26:00
You will arrive at your destination 