### Get Distributions of Delay Times per trip and station

The goal of this chapter is to create a distribution of arrival delays for each station / trip_id pair, to be used later on to compute transfer probabilities. These are then used in McRaptor implementation, to choose the best trip according to their time but also their __probability of success__.

<div class='alert alert-info'><b>Any application without a proper name would be promptly killed.</b></div>

In [1]:
%%configure
{"conf": {
    "spark.app.name": "lgptguys_final"
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8983,application_1589299642358_3520,pyspark,idle,Link,Link,
9081,application_1589299642358_3641,pyspark,idle,Link,Link,
9098,application_1589299642358_3660,pyspark,idle,Link,Link,
9112,application_1589299642358_3675,pyspark,busy,Link,Link,
9121,application_1589299642358_3684,pyspark,idle,Link,Link,
9130,application_1589299642358_3694,pyspark,idle,Link,Link,
9145,application_1589299642358_3710,pyspark,idle,Link,Link,
9152,application_1589299642358_3716,pyspark,idle,Link,Link,
9153,application_1589299642358_3717,pyspark,idle,Link,Link,
9156,application_1589299642358_3721,pyspark,idle,Link,Link,


### Start Spark

In [2]:
# Initialization
%%spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9183,application_1589299642358_3750,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
unknown magic command '%spark'
UnknownMagic: unknown magic command '%spark'



In [3]:
%%send_to_spark -i username -t str -n username

An error was encountered:
Variable named username not found.


### Import useful libraries 

In [4]:
from geopy.distance import great_circle
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read TimeTable data for routes / trips 

In [5]:
# Load data with stop_id of interest
stop_times = spark.read.csv('data/lgpt_guys/stop_times_final_cyril.csv', header = True)
stops_15km = stop_times.select(col('stop_id_general')).dropDuplicates()

# print unique number of stop_id and show
print stops_15km.count()
stops_15km.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1407
+---------------+
|stop_id_general|
+---------------+
|        8503078|
|        8503088|
|        8589111|
|        8503376|
|        8591190|
+---------------+
only showing top 5 rows

### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format

In [74]:
sbb = spark.read.orc('/data/sbb/orc/istdaten')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Subset SBB data

This notebook was ran twice to get two different set of distributions : 
- On one hand, delay distribution calculated only with delays having prognose status `geschaetz` or `real`. These are the distributions we use in priority whenever we have enough data in it for a given transfer.
- On the other hand, delay distribution calculated with all delays from sbb, including status `prognose`, which means the delay is estimated. This is expected to be less precise, but whenever we have not enough data in `geschaetz` or `real`, this is still better than estimating ourself the delay.

First step is to subset sbb dataset for `real/geschaetz` or `all` prognose_status and to write results in intermediate tables. 

__Delays with geschaetz/real status__

We take only stop_id in 15 km range from Zurich HB using `stop_id_general` field from _stops_15km_ file. Then we filter only `an_prognose_status` and `ab_prognose_status` set to `geschaetz` or `real`.

In [25]:
# Used to subset sbb table based on stop_id from stops_15km
stop_id  = stops_15km.select('stop_id_general').collect()
stop_idx = [item.stop_id_general for item in stop_id]

# Make the subset dataframe
sbb_filt = sbb.filter( ( sbb['bpuic'].isin(stop_idx) ) &\
                       ((sbb.an_prognose_status == 'REAL') | \
                        (sbb.an_prognose_status == 'GESCHAETZ') | \
                        (sbb.ab_prognose_status == 'REAL') | \
                        (sbb.ab_prognose_status == 'GESCHAETZ') ) ) \
              .select('fahrt_bezeichner', 'ankunftszeit', 'abfahrtszeit', \
                      'an_prognose', 'ab_prognose', \
                      col('bpuic').alias('stop_id'))

print sbb_filt.count()
sbb_filt.show(3,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10848628
+----------------+----------------+----------------+-------------------+-------------------+-------+
|fahrt_bezeichner|ankunftszeit    |abfahrtszeit    |an_prognose        |ab_prognose        |stop_id|
+----------------+----------------+----------------+-------------------+-------------------+-------+
|85:11:10:002    |03.09.2018 21:51|                |03.09.2018 21:53:40|                   |8503000|
|85:11:11:001    |                |03.09.2018 06:09|                   |03.09.2018 06:10:22|8503000|
|85:11:12:001    |03.09.2018 10:51|                |03.09.2018 10:51:28|                   |8503000|
+----------------+----------------+----------------+-------------------+-------------------+-------+
only showing top 3 rows

Write subset table in HDFS for better performance during later usage

In [26]:
# save
sbb_filt.write.format("orc").save("data/lgpt_guys/sbb_filt_forDelays_GeschaetzAndReal.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

__Delay including prognose status__

We take only stop_id in 15 km range from Zurich HB using `stop_id_general` field from _stops_15km_ file.

In [76]:
# Used to subset sbb table based on stop_id from stops_15km
stop_id  = stops_15km.select('stop_id_general').collect()
stop_idx = [item.stop_id_general for item in stop_id]

# Make the subset dataframe
sbb_filt_all = sbb.filter( sbb['bpuic'].isin(stop_idx) )\
                  .select('fahrt_bezeichner', 'ankunftszeit', 'abfahrtszeit', \
                          'an_prognose', 'ab_prognose', \
                          col('bpuic').alias('stop_id'))

print sbb_filt_all.count()
sbb_filt_all.show(3,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

209398081
+----------------+----------------+----------------+-------------------+-------------------+-------+
|fahrt_bezeichner|ankunftszeit    |abfahrtszeit    |an_prognose        |ab_prognose        |stop_id|
+----------------+----------------+----------------+-------------------+-------------------+-------+
|85:11:10:002    |03.09.2018 21:51|                |03.09.2018 21:53:40|                   |8503000|
|85:11:11:001    |                |03.09.2018 06:09|                   |03.09.2018 06:10:22|8503000|
|85:11:12:001    |03.09.2018 10:51|                |03.09.2018 10:51:28|                   |8503000|
+----------------+----------------+----------------+-------------------+-------------------+-------+
only showing top 3 rows

In [77]:
# save
sbb_filt_all.write.format("orc").save("data/lgpt_guys/sbb_filt_forDelays_AllDelays_2.orc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Summary of tables writen in `data/lgpt_guys/` :
- sbb_filt_forDelays_GeschaetzAndReal.orc : Use geschaetz and real, < 15km, final stops from cyril data
- sbb_filt_forDelays_AllDelays_2.orc : stops < 15km, final stops from cyril data
- sbb_filt_forDelays_AllDelays.orc : MISTAKE done , do NOT use this one !!


### Work from translation tables 

We will use data generated in `match_datasets.ipynb`, that matches trip_id between _timetable_ and _sbb_ dataset. We begin by looking at all trip_id that are found in both dataset with at least 5 stations in common.

Our goal is to find a match in sbb dataset for all _timetable_ trips (and not the other way around). So we will focus on getting this assymetrical correspondance table. 

In order to do that, we need to do multiple join, as we want to join 3 tables : _sbb_ data which contains information about delays, `joined_trip_atL5_3` table which contains translation between trip_id in two datasets, and `stop_time` which contains all the unique stop_id x trip_id used for later steps.
- First, we join _sbb_ data `sbb_filt_forDelays_GeschaetzAndReal_2` with translation table `joined_trip_atL5_3` to get sbb data with information about _timetable_ trip_id. 
- We can then use this _timetable_ trip_id to join this first table with `stop_time` table, using a _left_outer_ join, so that we get an idea of how many matches are found overall.

First we load SBB data. Following cells were ran twice : once for `geschaetz` / `real` delays only, and once for `all` delays. 
- `geschaetz` / `real` : load and use `/user/{}/sbb_filt_forDelays_GeschaetzAndReal_2.orc` table
- `all` : load and use `/user/{}/sbb_filt_forDelays_AllDelays.orc` table

In [6]:
# Choose one table to work with 
#table_delays = 'sbb_filt_forDelays_AllDelays_2'
table_delays = 'sbb_filt_forDelays_GeschaetzAndReal'

# Load sbb data for a given table
sbb_filt = spark.read.orc("data/lgpt_guys/{}.orc".format(table_delays))

# Print line count and show
print(sbb_filt.count())
sbb_filt.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10848628
+----------------+----------------+----------------+-------------------+-------------------+-------+
|fahrt_bezeichner|    ankunftszeit|    abfahrtszeit|        an_prognose|        ab_prognose|stop_id|
+----------------+----------------+----------------+-------------------+-------------------+-------+
|    85:11:10:002|12.10.2018 21:51|                |12.10.2018 21:51:50|                   |8503000|
| 85:11:10293:004|                |13.10.2018 00:25|                   |13.10.2018 00:26:08|8503000|
| 85:11:10293:004|13.10.2018 00:34|13.10.2018 00:35|13.10.2018 00:35:27|13.10.2018 00:36:44|8503016|
+----------------+----------------+----------------+-------------------+-------------------+-------+
only showing top 3 rows

Then we load the translation table we made in `match_datasets` notebook. This give a table with matching trip_id between _timetable_ and _sbb_ data.

In [7]:
# Load data
translation_tab = spark.read.csv('data/lgpt_guys/match_datasets_translation.csv', header = True)

# Print line counts and show
print translation_tab.count()
translation_tab.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

921476
+--------------------------+----------------------+-----+
|trip_id                   |fahrt_bezeichner      |count|
+--------------------------+----------------------+-----+
|89.TA.26-721-j19-1.3.H    |85:773:778860-04720-1 |7    |
|217.TA.1-17-A-j19-1.17.H  |85:31:987:000         |12   |
|1890.TA.26-11-A-j19-1.27.R|85:3849:137108-21011-1|25   |
|1612.TA.26-10-j19-1.11.R  |85:3849:617087-24010-1|19   |
|113.TA.26-131-j19-1.6.R   |85:807:473534-31131-1 |3    |
+--------------------------+----------------------+-----+
only showing top 5 rows

We first join sbb data `sbb_filt` with the translation table `translation_tab` to get trip_id in _timetable_ format on _sbb_ table.

In [8]:
joined_sbb = sbb_filt.join(translation_tab, on = ['fahrt_bezeichner'], how = 'left_outer')

print joined_sbb.count()
joined_sbb.show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14816224
+----------------+----------------+------------+-------------------+-----------+-------+-------+-----+
|fahrt_bezeichner|ankunftszeit    |abfahrtszeit|an_prognose        |ab_prognose|stop_id|trip_id|count|
+----------------+----------------+------------+-------------------+-----------+-------+-------+-----+
|85:11:10173:004 |30.06.2019 16:04|            |30.06.2019 16:17:34|           |8503000|null   |null |
|85:11:10173:004 |12.05.2019 16:04|            |12.05.2019 16:03:55|           |8503000|null   |null |
|85:11:10213:004 |26.05.2019 16:30|            |26.05.2019 16:31:03|           |8503000|null   |null |
|85:11:10213:004 |18.05.2019 16:30|            |18.05.2019 16:30:56|           |8503000|null   |null |
|85:11:10213:004 |25.05.2019 16:30|            |25.05.2019 16:34:28|           |8503000|null   |null |
+----------------+----------------+------------+-------------------+-----------+-------+-------+-----+
only showing top 5 rows

The reference table we use is `stop_times`. We load it and use it as a reference in a join against sbb table which now contains trip_id in _timetable_ format.

In [9]:
# load ref table stop_times
stop_times = spark.read.csv('data/lgpt_guys/stop_times_final_cyril.csv', header = True)

# rename trip_id column
stop_times = stop_times.select(stop_times.stop_id_general.alias('stop_id'), 'trip_id')

# print line count and show 
print stop_times.count()
stop_times.show(3, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

260459
+-------+----------------------+
|stop_id|trip_id               |
+-------+----------------------+
|8591371|742.TA.26-46-j19-1.8.R|
|8591358|742.TA.26-46-j19-1.8.R|
|8591158|742.TA.26-46-j19-1.8.R|
+-------+----------------------+
only showing top 3 rows

We then make the join between our reference table `stop_time` and previous join containing sbb data (used for delay computation). We can join them on `stop_id` and `trip_id` column, which in both case corresponds to _timetable_ trip_id.

In [10]:
# Do the 
stop_times_join = stop_times.join(joined_sbb, on=['trip_id', 'stop_id'], 
                                  how='left_outer')\
                            .select('trip_id', 'stop_id', 'count',
                                    'fahrt_bezeichner', 'ankunftszeit', 'abfahrtszeit',
                                    'an_prognose', 'ab_prognose')

print stop_times_join.count()
stop_times_join.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

13084796
+----------------------+-------+-----+----------------+------------+------------+-----------+-----------+
|trip_id               |stop_id|count|fahrt_bezeichner|ankunftszeit|abfahrtszeit|an_prognose|ab_prognose|
+----------------------+-------+-----+----------------+------------+------------+-----------+-----------+
|1.TA.26-163-j19-1.1.R |8590688|null |null            |null        |null        |null       |null       |
|1.TA.26-89-j19-1.1.R  |8591209|null |null            |null        |null        |null       |null       |
|10.TA.1-305-j19-1.1.R |8587018|null |null            |null        |null        |null       |null       |
|10.TA.26-69-j19-1.2.H |8591122|null |null            |null        |null        |null       |null       |
|10.TA.26-845-j19-1.2.H|8580879|null |null            |null        |null        |null       |null       |
+----------------------+-------+-----+----------------+------------+------------+-----------+-----------+
only showing top 5 rows

We then compute arrival delays using the following approach : 
- arrival_true ( = `an_prognose`) - arrival_expected ( = `ankunftszeit`). Train being late have a positive delay and trains being ahead of schedule a negative one.

In [11]:
stop_times_diff = stop_times_join.select( col("an_prognose").alias("arrival_true"),\
                              col("ankunftszeit").alias("arrival_expected"),\
                              'trip_id', 'stop_id')\
              .withColumn('arrival_true',to_timestamp(col('arrival_true'),\
                                                          format='dd.MM.yyyy HH:mm:ss'))\
              .withColumn('arrival_expected',to_timestamp(col('arrival_expected'),\
                                                           format='dd.MM.yyyy HH:mm'))\
              .withColumn('DiffInSeconds',col('arrival_true').cast(LongType()) - col('arrival_expected').cast(LongType()))\
              .withColumn('DiffInMinutes',(col('DiffInSeconds')/60).cast('integer'))\
              .select("stop_id", "trip_id", "arrival_true", "DiffInSeconds", "DiffInMinutes",\
                        date_format('arrival_expected', 'E').alias('weekday'))

# Remove Saturday and Sunday weekdays from table - show
stop_times_diff = stop_times_diff.filter( (stop_times_diff.weekday != "Sun") & (stop_times_diff.weekday != "Sat") )
stop_times_diff.show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------------+-------------------+-------------+-------------+-------+
|stop_id|trip_id               |arrival_true       |DiffInSeconds|DiffInMinutes|weekday|
+-------+----------------------+-------------------+-------------+-------------+-------+
|8503003|176.TA.26-3-j19-1.12.H|2019-08-01 01:51:47|47           |0            |Thu    |
|8503000|176.TA.26-3-j19-1.12.H|2019-08-01 01:54:56|-4           |0            |Thu    |
|8503020|176.TA.26-3-j19-1.12.H|2019-08-01 01:59:18|18           |0            |Thu    |
|8503003|176.TA.26-3-j19-1.12.H|2019-06-10 01:52:23|83           |1            |Mon    |
|8503000|176.TA.26-3-j19-1.12.H|2019-06-10 01:55:17|17           |0            |Mon    |
|8503020|176.TA.26-3-j19-1.12.H|2019-06-10 01:59:16|16           |0            |Mon    |
|8503003|176.TA.26-3-j19-1.12.H|2019-05-30 01:52:03|63           |1            |Thu    |
|8503000|176.TA.26-3-j19-1.12.H|2019-05-30 01:54:58|-2           |0            |Thu    |
|8503020|176.TA.26-3-

We get the difference between expected arrival time `ankunftszeit` and the actual arrival time `an_prognose` to compute the delay. This delay in seconds `DiffInSeconds` is then converted to a delay in minutes `DiffInMinutes` and converted to integer type. 

This `DiffInMinutes` is used in next step to do a pivot on the table, in order to get one column per unique value of `DiffInMinutes`. Before being able to do that, we bound the values contained in `DiffInMinutes` in the range [-1, +30] :
- minimum 'delay' is -1 : it contains all arrivals ahead of schedule.
- maximum delay is +30, it contains all delays $\geq +30$

In [12]:
# we bound distribution to this 
lower_bound = -1
upped_bound = +30

# Bound minute delays between lower_bound and upper_bound 
stop_times_bounded = stop_times_diff.withColumn('DiffInMinutes_bounded1',\
                                        greatest(col('DiffInMinutes'), lit(lower_bound) ))\
                                    .withColumn('DiffInMinutes_bounded2',\
                                        least(col('DiffInMinutes_bounded1'), lit(upped_bound) ))

# Pivot table using delay rounded to minute, fill null with 0
stop_times_distribution = stop_times_bounded.groupBy('stop_id', 'trip_id')\
                                            .pivot("DiffInMinutes_bounded2").count()\
                                            .na.fill(0)

stop_times_distribution.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|stop_id|trip_id                |-1 |0  |1  |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |24 |25 |26 |27 |28 |29 |30 |
+-------+-----------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|8503020|45.TA.26-7-A-j19-1.12.H|0  |596|148|53 |17 |4  |5  |4  |2  |1  |1  |0  |1  |1  |0  |1  |1  |0  |0  |1  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |
|8503128|179.TA.26-14-j19-1.37.R|0  |523|215|57 |27 |5  |5  |0  |0  |1  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |1  |0  |0  |0  |0  |0  |0  |0  |
|8594307|44.TA.1-11-B-j19-1.2.H |0  |50 |126|42 |9  |4  |1  |1  |1  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |
|8502221|54.TA.26-5-A-j19-1.

The last move is to compute a unique key per line, corresponding to `trip_id` x `stop_id`. 

In [13]:
stop_times_distrib_wKey = stop_times_distribution.orderBy('trip_id', 'stop_id')\
                                       .withColumn('key2', concat(col('trip_id'), lit('__'), col('stop_id')))\
                                       .drop('trip_id').drop('stop_id')\
                                       .select(col('key2').alias('key'), "*")\
                                       .drop('key2')

stop_times_distrib_wKey.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|key                             |-1 |0  |1  |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |24 |25 |26 |27 |28 |29 |30 |
+--------------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|1.TA.26-20-j19-1.1.R__8503000   |0  |25 |78 |18 |5  |2  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |
|1.TA.26-20-j19-1.1.R__8503003   |0  |83 |31 |9  |4  |1  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |
|1.TA.26-20-j19-1.1.R__8503101   |3  |89 |22 |6  |5  |1  |0  |1  |1  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |0  |
|1.TA.26-20-j19-1.1.R_

To get an idea of the number of lines we have related to the total number of unique `trip_id` x `stop_id`, we can compare it to the ref stop_time table, where each line correspond to a unique `trip_id` x `stop_id`

In [14]:
print "reference table stop_times number of lines : {}".format(stop_times.count())
print "distribution table number of unique keys   : {}".format(stop_times_distrib_wKey.select("key").distinct().count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

reference table stop_times number of lines : 260459
distribution table number of unique keys   : 16760

Write tables on hdfs. Differente path depending on which table we are working with (`geschaetz`/`real` or `all`). 

In [15]:
which_distribution = 'geschaetzAndReal'
#which_distribution = 'allDelays' 

stop_times_distrib_wKey.write.csv('data/lgpt_guys/distribution_{}_6.csv'.format(which_distribution), \
                                  header = True, mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Also write it on /user/ folder to be able to load it in local

In [16]:
username = 'acoudray'
stop_times_distrib_wKey.write.csv('/user/{0}/distribution_{1}_6.csv'.format(username, which_distribution), \
                                  header = True, mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

_Note : Last tables written_
- data/lgpt_guys/distribution_allDelays_6.csv : contains distribution delays from all SBB data including arrival time with `prognose` status. Made with FULL sbb dataset. Translation table with 2 stop - time in the overlap only 
- data/lgpt_guys/distribution_allDelays_5.csv : contains distribution delays from all SBB data including arrival time with `prognose` status. Made with FULL sbb dataset.
- data/lgpt_guys/distribution_geschaetzAndReal_5.csv : contains distribution delays for arrival time with status `geschaetz` or `real` only. Made with FULL sbb dataset.
- data/lgpt_guys/distribution_allDelays_4.csv : contains distribution delays from all SBB data including arrival time with `prognose` status. Made with 13-17 May sbb dataset.
- data/lgpt_guys/distribution_geschaetzAndReal_4.csv : contains distribution delays for arrival time with status `geschaetz` or `real` only. Made with 13-17 May sbb dataset.

### Use local python to make dictionnary on local

In [17]:
%local

from hdfs3 import HDFileSystem
import pandas as pd
import numpy as np 
import pickle 
import gzip
from itertools import islice

hdfs = HDFileSystem(host='hdfs://iccluster044.iccluster.epfl.ch', port=8020, user='ebouille')

username = 'acoudray'
which_distribution = 'geschaetzAndReal'
#which_distribution = 'allDelays'

# Load distribution file from HDFS and concatenate individual csv
distrib_files = hdfs.glob('/user/{0}/distribution_{1}_6.csv/*.csv'.format(username, which_distribution))
distrib = pd.DataFrame()
for file in distrib_files:
    with hdfs.open(file) as f:
        distrib = distrib.append(pd.read_csv(f))
distrib = distrib.set_index('key')

# zip index and values to get {key : np.array()} shape 
d = dict(zip(distrib.index, np.array(distrib.values)))

# Write it to local 
if which_distribution == 'allDelays':
    with gzip.open("../data/d_all.pkl.gz".format(which_distribution), "wb") as output_file:
        pickle.dump(d, output_file)
elif which_distribution == 'geschaetzAndReal':
    with gzip.open("../data/d_real.pkl.gz".format(which_distribution), "wb") as output_file:
        pickle.dump(d, output_file)
    
# Functon to take a slice from a dictionnary - head equivalent
def take(n, iterable):
    "Return first n items of the iterable as a list"
    return list(islice(iterable, n))

# display a slice of it
take(10, d.items())

[('1.TA.26-20-j19-1.1.R__8503000',
  array([ 0, 25, 78, 18,  5,  2,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])),
 ('1.TA.26-20-j19-1.1.R__8503003',
  array([ 0, 83, 31,  9,  4,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])),
 ('1.TA.26-20-j19-1.1.R__8503101',
  array([ 3, 89, 22,  6,  5,  1,  0,  1,  1,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])),
 ('1.TA.26-20-j19-1.1.R__8503104',
  array([ 0, 83, 21, 13,  4,  3,  2,  0,  1,  0,  1,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])),
 ('1.TA.80-173-Y-j19-1.1.H__8502276',
  array([  0, 315,  61,  17,   2,   1,   0,   0,   0,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0,   0,   0,   1,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0])),
 ('1.TA.80-173-Y-j19-1.1.H__8502277'