## Initialize the `SparkSession`

In [1]:
import getpass
import pyspark
from pyspark.sql import SparkSession

In [2]:
conf = pyspark.conf.SparkConf()
conf.setMaster('yarn')
conf.setAppName('final_project-{0}'.format(getpass.getuser()))
conf.set('spark.executor.memory', '12g')
conf.set('spark.executor.instances', '12')
conf.set('spark.executor.cores', '4')
conf.set('spark.port.maxRetries', '200')
#conf.set('files', 'file:///home/akozak/final_project/zurich_hb_stops.csv')
sc = pyspark.SparkContext.getOrCreate(conf)
conf = sc.getConf()
sc

In [3]:
spark = SparkSession(sc)

In [4]:
from datetime import datetime

import pyspark.sql.functions as functions

import pandas as pd

### load whole dataset

In [16]:
whole_df = spark.read.csv("/datasets/project/istdaten/*/*/*.csv", header=True, sep=";")

- rename some useful columns

In [17]:
oldColumns = whole_df.schema.names
print(oldColumns)
newColumns = ["date", 'trip_id', 
              'BETREIBER_ID', 'BETREIBER_ABK',
              'BETREIBER_NAME', "transport_type", 
             "train_line", "train_service", 
              'UMLAUF_ID', 'VERKEHRSMITTEL_TEXT',
             "additional_trip", "failed_trip",
             'BPUIC', "station_name", "arrival_time",
             "actual_arrival", 'AN_PROGNOSE_STATUS',
             "departure_time", "actual_departure",
             'AB_PROGNOSE_STATUS', "DURCHFAHRT_TF"]

['BETRIEBSTAG', 'FAHRT_BEZEICHNER', 'BETREIBER_ID', 'BETREIBER_ABK', 'BETREIBER_NAME', 'PRODUKT_ID', 'LINIEN_ID', 'LINIEN_TEXT', 'UMLAUF_ID', 'VERKEHRSMITTEL_TEXT', 'ZUSATZFAHRT_TF', 'FAELLT_AUS_TF', 'BPUIC', 'HALTESTELLEN_NAME', 'ANKUNFTSZEIT', 'AN_PROGNOSE', 'AN_PROGNOSE_STATUS', 'ABFAHRTSZEIT', 'AB_PROGNOSE', 'AB_PROGNOSE_STATUS', 'DURCHFAHRT_TF']


In [18]:
whole_df = whole_df.toDF(*newColumns)
whole_df.printSchema()
# whole_df.show()

root
 |-- date: string (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- BETREIBER_ID: string (nullable = true)
 |-- BETREIBER_ABK: string (nullable = true)
 |-- BETREIBER_NAME: string (nullable = true)
 |-- transport_type: string (nullable = true)
 |-- train_line: string (nullable = true)
 |-- train_service: string (nullable = true)
 |-- UMLAUF_ID: string (nullable = true)
 |-- VERKEHRSMITTEL_TEXT: string (nullable = true)
 |-- additional_trip: string (nullable = true)
 |-- failed_trip: string (nullable = true)
 |-- BPUIC: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- actual_arrival: string (nullable = true)
 |-- AN_PROGNOSE_STATUS: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- actual_departure: string (nullable = true)
 |-- AB_PROGNOSE_STATUS: string (nullable = true)
 |-- DURCHFAHRT_TF: string (nullable = true)



- dropping useless columns:

In [19]:
whole_df = whole_df.drop('BETREIBER_ID','BETREIBER_ABK', 'BETREIBER_NAME', 'UMLAUF_ID', 'BPUIC')

In [20]:
# To load this csv having more than one executors it has to be put on HDFS
radius_stations_df = spark.read.csv('/user/akozak/zurich_hb_stops.csv', header=True, sep=",")

In [21]:
radius_stations_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- elevation: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- dist_to_zurich_HB: string (nullable = true)



In [22]:
filtered_df = whole_df.join(radius_stations_df, on="station_name", how='inner')

### THIRD TABLE: COMPUTING THE DELTAS DELAYS

- First assumptions: we filter entries were transport does not stop, entries that are additional trips and failed trips

In [23]:
third_table = filtered_df.filter((filtered_df.DURCHFAHRT_TF == False) \
                                 & (filtered_df.additional_trip == False)\
                                 & (filtered_df.failed_trip == False))#.cache()

In [24]:
useful_trips = third_table.groupBy('trip_id').count().filter("count > 1")

In [25]:
useful_trips = useful_trips.drop('count')

- merging with our third table:

In [26]:
third_table = third_table.join(useful_trips, on="trip_id", how='inner')#.cache()

- look how distinct trip_id we have:

In [27]:
third_table.select('trip_id').distinct().count()

153572

- look trip_ids in the day we choose as regular day:

In [30]:
trips_17_10_17 = third_table.filter(third_table.date == "17.10.2017").select('trip_id').distinct()

- some checks with other days:

In [31]:
trips_18_10_17 = third_table.filter(third_table.date == "18.10.2017").select('trip_id').distinct()

In [32]:
trips_14_10_17 = third_table.filter(third_table.date == "14.10.2017").select('trip_id').distinct()

In [33]:
trips_15_10_17 = third_table.filter(third_table.date == "15.10.2017").select('trip_id').distinct()

In [38]:
trips_05_03_18 = third_table.filter(third_table.date == "05.03.2018").select('trip_id').distinct()

In [42]:
trips_20_11_17 = third_table.filter(third_table.date == "20.11.2017").select('trip_id').distinct()

In [34]:
trips_14_10_17.count()

13892

In [35]:
trips_15_10_17.count()

11092

In [36]:
join_trips = trips_17_10_17.join(trips_18_10_17, on="trip_id", how='inner')

In [37]:
join_trips.count()

16657

In [40]:
trips_05_03_18.count()

19073

In [39]:
join_trips2 = trips_17_10_17.join(trips_05_03_18, on="trip_id", how='inner')
join_trips2.count()

8279

In [None]:
withColumn('arrival_delay', when((third_table.actual_arrival.isNull()) \
                                  & (third_table.arrival_time.isNull()), None
                                 )
                                 .when((third_table.actual_arrival.isNull()) \
                                       & (third_table.arrival_time.isNotNull()), 0)\
                                 .otherwise(functions.round(unix_timestamp("actual_arrival",'dd.MM.yyyy HH:mm') - \
                                            unix_timestamp("arrival_time",'dd.MM.yyyy HH:mm')) / 60))\

In [51]:
trips_17_10_17 = trips_17_10_17.orderBy('trip_id')
trips_17_10_17.show(20)

+--------------+
|       trip_id|
+--------------+
|  85:11:10:002|
|  85:11:11:004|
|85:11:1251:001|
|85:11:1255:001|
|85:11:1256:002|
|85:11:1258:001|
|85:11:1260:001|
|  85:11:12:002|
|  85:11:13:006|
|85:11:1408:002|
|85:11:1410:001|
|85:11:1411:001|
|85:11:1429:001|
|85:11:1431:001|
|  85:11:14:003|
|85:11:1507:002|
|85:11:1508:002|
|85:11:1509:002|
|85:11:1510:002|
|85:11:1511:002|
+--------------+
only showing top 20 rows



In [53]:
trips_05_03_18 = trips_05_03_18.orderBy('trip_id')
trips_05_03_18.show(20)

+--------------+
|       trip_id|
+--------------+
|  85:11:10:002|
|  85:11:11:001|
|85:11:1251:001|
|85:11:1252:001|
|85:11:1255:001|
|85:11:1256:003|
|85:11:1260:001|
|  85:11:12:001|
|  85:11:13:001|
|85:11:1408:001|
|85:11:1410:002|
|85:11:1411:001|
|85:11:1429:001|
|85:11:1431:001|
|85:11:1456:001|
|85:11:1458:001|
|85:11:1463:001|
|85:11:1489:001|
|  85:11:14:001|
|85:11:1507:002|
+--------------+
only showing top 20 rows



In [None]:
new_df = trips_17_10_17.withColumn('different_trip_id', when(trips_17_10_17.trip_id.isin(trips_05_03_18.trip_id), None )\
                                   .otherwise()

In [41]:
join_trips3 = trips_17_10_17.join(trips_15_10_17, on="trip_id", how='inner')
join_trips3.count()

1674

In [43]:
join_trips4 = trips_17_10_17.join(trips_20_11_17, on="trip_id", how='inner')
join_trips4.count()

14846

In [44]:
trips_20_12_17 = third_table.filter(third_table.date == "20.12.2017").select('trip_id').distinct()

In [46]:
trips_20_12_17.count()

16209

In [45]:
join_trips5 = trips_17_10_17.join(trips_20_12_17, on="trip_id", how='inner')
join_trips5.count()

8556

In [47]:
trips_29_01_18 = third_table.filter(third_table.date == "29.01.2018").select('trip_id').distinct()

In [48]:
trips_29_01_18.count()

16221

In [49]:
join_trips5 = trips_17_10_17.join(trips_29_01_18, on="trip_id", how='inner')
join_trips5.count()

8009

- if we just use only the GESCHAETZT entries, we will use only 4.8 % of our available data. Also, if we drop all the other entries there is the risk to lose information about some connection in our network. Right now we will just compute the difference of the columns. A further discussion with the other group members and TAs should be done.
- Is critical how to handle the null values, do we put delay = 0 if null? Or do we drop those entries? (by dropping them, we stil have the same problem highlighted in the first point of this cell. Right now I put a zero for all the null values.
- We also create a 'hour' column because it could be useful for the statitistics tests

In [50]:
from pyspark.sql.functions import when,unix_timestamp,hour,to_timestamp,col

In [40]:
third_table_final = third_table\
.withColumn('arrival_delay', when((third_table.actual_arrival.isNull()) \
                                  & (third_table.arrival_time.isNull()), None
                                 )
                                 .when((third_table.actual_arrival.isNull()) \
                                       & (third_table.arrival_time.isNotNull()), 0)\
                                 .otherwise(functions.round(unix_timestamp("actual_arrival",'dd.MM.yyyy HH:mm') - \
                                            unix_timestamp("arrival_time",'dd.MM.yyyy HH:mm')) / 60))\

.withColumn('departure_delay', when((third_table.actual_departure.isNull())\
                                    & (third_table.departure_time.isNull()), None
                                 )
                                 .when((third_table.actual_departure.isNull()) \
                                       & (third_table.departure_time.isNotNull()), 0)\
                                    .otherwise(functions.round(unix_timestamp("actual_departure",'dd.MM.yyyy HH:mm')\
                                               - unix_timestamp("departure_time",'dd.MM.yyyy HH:mm')) /60))\
.withColumn('hour',  when(third_table.arrival_time.isNull(), hour(to_timestamp(third_table.departure_time,
                                                                              'dd.MM.yyyy HH:mm'))) \
                         .otherwise(hour(to_timestamp(third_table.arrival_time, 'dd.MM.yyyy HH:mm'))))

### STATISTICS TETS

- In this section we will produce statistics tests in order to see if our bins delays can fit well a known distribution (we were thinking about a logNormal distr).
- First we will produce the test on the whole dataset, that means on the third table.
- then we will run a test on the delays on each station (group by station name)
- after this we will do the same for each transport line (group by the line_id, right now the columns is erroneusly called 'train_line')
- then, the same thing for each trip (group by trip_id)
- finally, same tests for hour of the day (we need to choose how to split the day, right now the column 'hour' has 24 distinct values of course).

In [41]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import lognorm
from pyspark.mllib.stat import Statistics
from scipy.stats import lognorm
from pyspark.sql.types import *

In [42]:
third_table_final.columns

['trip_id',
 'station_name',
 'date',
 'transport_type',
 'train_line',
 'train_service',
 'VERKEHRSMITTEL_TEXT',
 'additional_trip',
 'failed_trip',
 'arrival_time',
 'actual_arrival',
 'AN_PROGNOSE_STATUS',
 'departure_time',
 'actual_departure',
 'AB_PROGNOSE_STATUS',
 'DURCHFAHRT_TF',
 'id',
 'latitude',
 'longitude',
 'elevation',
 'dist_to_zurich_HB',
 'arrival_delay',
 'departure_delay',
 'hour']

In [43]:
third_table_final.cache().count()

51046552

In [44]:
third_table_final_dep=third_table_final.where(col("departure_delay").isNotNull())
third_table_final_arr=third_table_final.where(col("arrival_delay").isNotNull())

In [45]:
df_delay_dep = third_table_final_dep.groupBy('trip_id','station_name').agg(functions.collect_list("departure_delay").alias('list_dep_delay'))
df_delay_arr = third_table_final_arr.groupBy('trip_id','station_name').agg(functions.collect_list("arrival_delay").alias('list_arr_delay'))

In [46]:
df_delay_arr.count()

2271318

In [65]:
df_delay_dep_param=df_delay_dep.rdd.map(lambda x:[x['trip_id'], x['station_name'],[float(b) for b in lognorm.fit(x['list_dep_delay'])]])
df_delay_arr_param=df_delay_arr.rdd.map(lambda x:[x['trip_id'], x['station_name'],[float(b) for b in lognorm.fit(x['list_arr_delay'])]])


df_delay_dep_fit=df_delay_dep_param.map(lambda x:(x[0],x[1],x[2])).toDF(['trip_id','station_name','fit_param_dep'])
df_delay_arr_fit=df_delay_arr_param.map(lambda x:(x[0],x[1],x[2])).toDF(['trip_id','station_name','fit_param_arr'])

In [66]:
df_delay_arr_fit.show(2)

+---------------+-------------------+--------------------+
|        trip_id|       station_name|       fit_param_arr|
+---------------+-------------------+--------------------+
|85:11:13752:001|     Birmensdorf ZH|[0.64744702021595...|
|85:11:13752:001|Bonstetten-Wettswil|[0.64971286678057...|
+---------------+-------------------+--------------------+
only showing top 2 rows



In [67]:
df_delay_dep_fit.show(2)

+---------------+-------------------+--------------------+
|        trip_id|       station_name|       fit_param_dep|
+---------------+-------------------+--------------------+
|85:11:13752:001|     Birmensdorf ZH|[0.64242304975669...|
|85:11:13752:001|Bonstetten-Wettswil|[0.46106848938145...|
+---------------+-------------------+--------------------+
only showing top 2 rows



We split the parameters in 3 columns :

In [68]:
df_delay_dep_param_split=df_delay_dep_fit.select('trip_id','station_name',df_delay_dep_fit["fit_param_dep"].getItem(0).alias("shape_dep"),df_delay_dep_fit["fit_param_dep"].getItem(1).alias("mean_dep"),df_delay_dep_fit["fit_param_dep"].getItem(2).alias("std_dep"))
df_delay_arr_param_split=df_delay_arr_fit.select('trip_id','station_name',df_delay_arr_fit["fit_param_arr"].getItem(0).alias("shape_arr"),df_delay_arr_fit["fit_param_arr"].getItem(1).alias("mean_arr"),df_delay_arr_fit["fit_param_arr"].getItem(2).alias("std_arr"))

In [69]:
df_delay_dep_param_split_pandas = df_delay_dep_param_split.toPandas()

In [62]:
df_delay_dep_param_split_pandas.head()

Unnamed: 0,trip_id,station_name,shape_dep,mean_dep,std_dep
0,85:11:13752:001,Birmensdorf ZH,0.647447,-1.649415,2.571467
1,85:11:13752:001,Bonstetten-Wettswil,0.649713,-0.743932,2.739554
2,85:11:13752:001,Urdorf,0.691869,-0.505328,2.098195
3,85:11:13752:001,Urdorf Weihermatt,0.663387,0.48323,2.317448
4,85:11:13752:001,Zürich Altstetten,0.755722,0.541231,1.650423


In [70]:
df_delay_dep_param_split_pandas.to_csv("delay_dep_param.csv", index=False)

In [56]:
df_delay_arr_param_split_pandas = df_delay_arr_param_split.toPandas()

In [57]:
df_delay_arr_param_split_pandas.to_csv('delay_arr_param.csv', index=False)

In [58]:
df_delay_arr_param_split_pandas.head()

Unnamed: 0,trip_id,station_name,shape_arr,mean_arr,std_arr
0,85:11:13752:001,Birmensdorf ZH,0.647447,-1.649415,2.571467
1,85:11:13752:001,Bonstetten-Wettswil,0.649713,-0.743932,2.739554
2,85:11:13752:001,Urdorf,0.691869,-0.505328,2.098195
3,85:11:13752:001,Urdorf Weihermatt,0.663387,0.48323,2.317448
4,85:11:13752:001,Zürich Altstetten,0.755722,0.541231,1.650423


In [71]:
delay_params = pd.merge(df_delay_dep_param_split_pandas, df_delay_arr_param_split_pandas, on=['trip_id', 'station_name'])

In [73]:
delay_params.to_csv("delay_params.csv", index=False)

In [104]:
delay_params.head(5)

Unnamed: 0,trip_id,station_name,shape_dep,mean_dep,std_dep,shape_arr,mean_arr,std_arr
0,85:11:13752:001,Birmensdorf ZH,0.642423,-0.715683,2.617516,0.647447,-1.649415,2.571467
1,85:11:13752:001,Bonstetten-Wettswil,0.461068,-1.201161,4.136449,0.649713,-0.743932,2.739554
2,85:11:13752:001,Urdorf,0.51191,-0.678884,3.233112,0.691869,-0.505328,2.098195
3,85:11:13752:001,Urdorf Weihermatt,0.485538,0.206669,3.601493,0.663387,0.48323,2.317448
4,85:11:13752:001,Zürich Altstetten,0.586274,0.380187,2.598591,0.755722,0.541231,1.650423


In [84]:
delays_dict = dict()

In [89]:
dd=delay_params.groupby(['trip_id', 'station_name'],as_index=False)['shape_dep', 'mean_dep', 'std_dep',
       'shape_arr', 'mean_arr', 'std_arr'].apply(lambda x : x.values.tolist()[0]).to_frame()

def recur_dictify(frame):
    if len(frame.columns) == 1:
        if frame.values.size == 1: return frame.values[0][0]
        return frame.values.squeeze()
    grouped = frame.groupby(frame.columns[0])
    d = {k: recur_dictify(g.iloc[:,1:]) for k,g in grouped}
    return d


delays_dict = recur_dictify(dd.reset_index())

In [92]:
delays_dict['85:11:13752:001']

{'Birmensdorf ZH': [0.6424230497566921,
  -0.7156830945161855,
  2.6175157868578487,
  0.6474470202159504,
  -1.6494151038426064,
  2.5714673710314537],
 'Bonstetten-Wettswil': [0.4610684893814535,
  -1.2011608867353814,
  4.136448664880946,
  0.6497128667805727,
  -0.7439320429616096,
  2.7395542606923],
 'Urdorf': [0.5119096382336292,
  -0.6788836778776737,
  3.233111640184454,
  0.6918689182829518,
  -0.5053278033593314,
  2.0981950381446564],
 'Urdorf Weihermatt': [0.48553771228888437,
  0.20666937426606913,
  3.6014926573494055,
  0.6633866660825061,
  0.48322999401788624,
  2.3174484760115375],
 'Zürich Altstetten': [0.5862735581954911,
  0.380187219532638,
  2.5985910526787928,
  0.7557217285665694,
  0.541231305636209,
  1.650422933206059],
 'Zürich HB': [1.7133024431228892,
  -1.2209669461901277e-05,
  0.00022974235779182949,
  22.64985321953943,
  -1.0000000000000027,
  0.004666414825163004],
 'Zürich Hardbrücke': [0.7783684665489181,
  -0.13950422133332674,
  1.8708303802568

In [91]:
import pickle

pickle.dump(delays_dict, open( "delays_dict.p", "wb"))

In [103]:
sc.stop()