# To Begin With...

### Name your spark application as `GASPAR_final` or `GROUP_NAME_final`.

<div class='alert alert-info'><b>Any application without a proper name would be promptly killed.</b></div>

In [28]:
%%configure
{"conf": {
    "spark.app.name": "datavirus_final"
}}

A session has already been started. If you intend to recreate the session with new configurations, please include the -f argument.


### Start Spark

In [29]:
# Initialization
spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f0e848a2850>

In [3]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5969,application_1589299642358_0458,pyspark,idle,Link,Link,
5973,application_1589299642358_0460,pyspark,idle,Link,Link,
5990,application_1589299642358_0478,pyspark,idle,Link,Link,
5991,application_1589299642358_0479,pyspark,idle,Link,Link,
5994,application_1589299642358_0482,pyspark,busy,Link,Link,
5995,application_1589299642358_0483,pyspark,idle,Link,Link,
5997,application_1589299642358_0485,pyspark,idle,Link,Link,
5998,application_1589299642358_0486,pyspark,idle,Link,Link,
6000,application_1589299642358_0488,pyspark,idle,Link,Link,
6002,application_1589299642358_0490,pyspark,idle,Link,Link,


### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format

In [4]:
sbb = spark.read.orc('/data/sbb/orc/istdaten')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
sbb.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- betriebstag: string (nullable = true)
 |-- fahrt_bezeichner: string (nullable = true)
 |-- betreiber_id: string (nullable = true)
 |-- betreiber_abk: string (nullable = true)
 |-- betreiber_name: string (nullable = true)
 |-- produkt_id: string (nullable = true)
 |-- linien_id: string (nullable = true)
 |-- linien_text: string (nullable = true)
 |-- umlauf_id: string (nullable = true)
 |-- verkehrsmittel_text: string (nullable = true)
 |-- zusatzfahrt_tf: string (nullable = true)
 |-- faellt_aus_tf: string (nullable = true)
 |-- bpuic: string (nullable = true)
 |-- haltestellen_name: string (nullable = true)
 |-- ankunftszeit: string (nullable = true)
 |-- an_prognose: string (nullable = true)
 |-- an_prognose_status: string (nullable = true)
 |-- abfahrtszeit: string (nullable = true)
 |-- ab_prognose: string (nullable = true)
 |-- ab_prognose_status: string (nullable = true)
 |-- durchfahrt_tf: string (nullable = true)

In [5]:
sbb.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(betriebstag=u'03.09.2018', fahrt_bezeichner=u'80:06____:17010:000', betreiber_id=u'80:06____', betreiber_abk=u'DB', betreiber_name=u'DB Regio AG', produkt_id=u'Zug', linien_id=u'17010', linien_text=u'RE', umlauf_id=u'', verkehrsmittel_text=u'RE', zusatzfahrt_tf=u'false', faellt_aus_tf=u'false', bpuic=u'8500090', haltestellen_name=u'Basel Bad Bf', ankunftszeit=u'', an_prognose=u'', an_prognose_status=u'PROGNOSE', abfahrtszeit=u'03.09.2018 05:45', ab_prognose=u'', ab_prognose_status=u'UNBEKANNT', durchfahrt_tf=u'false'), Row(betriebstag=u'03.09.2018', fahrt_bezeichner=u'80:06____:17012:000', betreiber_id=u'80:06____', betreiber_abk=u'DB', betreiber_name=u'DB Regio AG', produkt_id=u'Zug', linien_id=u'17012', linien_text=u'RE', umlauf_id=u'', verkehrsmittel_text=u'RE', zusatzfahrt_tf=u'false', faellt_aus_tf=u'false', bpuic=u'8500090', haltestellen_name=u'Basel Bad Bf', ankunftszeit=u'', an_prognose=u'', an_prognose_status=u'PROGNOSE', abfahrtszeit=u'03.09.2018 06:34', ab_prognose=u'',

### Read the station list data [BFKOORD_GEO](https://opentransportdata.swiss/en/cookbook/hafas-rohdaten-format-hrdf/#Abgrenzung)

In [5]:
metadata = spark.read.csv('/data/sbb/stations/bfkoordgeo.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
metadata.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- StationID: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Remark: string (nullable = true)

In [8]:
metadata.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------+---------+------+----------------+
|StationID|Longitude| Latitude|Height|          Remark|
+---------+---------+---------+------+----------------+
|  0000002|26.074412|44.446770|     0|       Bucuresti|
|  0000003| 1.811446|50.901549|     0|          Calais|
|  0000004| 1.075329|51.284212|     0|      Canterbury|
|  0000005|-3.543547|50.729172|     0|          Exeter|
|  0000007| 9.733756|46.922368|   744|Fideris, Bahnhof|
+---------+---------+---------+------+----------------+
only showing top 5 rows

In [6]:
import pyspark.sql.functions as F
import math

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df_stations = metadata.withColumn("dlon", F.radians(F.col("Longitude")) - math.radians(8.540192)) \
             .withColumn("dlat", F.radians(F.col("Latitude")) - math.radians(47.378177)) \
             .withColumn("Distance_from_Zurich", F.asin(F.sqrt( F.sin(F.col("dlat") / 2) ** 2 + math.cos(math.radians(47.378177))
                                               *F.cos(F.radians(F.col("Latitude"))) * F.sin(F.col("dlon") / 2) ** 2)) * 2 * 3963 * 5280) \
             .drop("dlon", "dlat") \
             .filter(F.col("Distance_from_Zurich")<15000)

zurich_stations = list(df_stations.select('Remark').toPandas()['Remark'])
print(len(zurich_stations))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

382

In [8]:
%%spark -o df_stations -n -1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
%%local
df_stations.to_csv("../data/Zurich_Stations_TOTAL.csv", index=False)

In [10]:
test = df_stations.select('StationID','Longitude','Latitude')
joinedDF = test.crossJoin(test).toDF('id1','lon1','lat1','id2','lon2','lat2')
joinedDF.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+---------+-------+--------+---------+
|    id1|    lon1|     lat1|    id2|    lon2|     lat2|
+-------+--------+---------+-------+--------+---------+
|0000176|8.521961|47.351679|0000176|8.521961|47.351679|
|0000176|8.521961|47.351679|8502572|8.513918|47.370293|
|0000176|8.521961|47.351679|8503000|8.540192|47.378177|
|0000176|8.521961|47.351679|8503001|8.488940|47.391481|
|0000176|8.521961|47.351679|8503003|8.548466|47.366611|
+-------+--------+---------+-------+--------+---------+
only showing top 5 rows

In [11]:
from pyspark.sql.types import IntegerType
distance = (
    joinedDF
        .withColumn("dlon", F.radians(F.col("lon1")) - F.radians(F.col("lon2")))
        .withColumn("dlat", F.radians(F.col("lat1")) - F.radians(F.col("lat2"))) 
        .withColumn("Distance", F.asin(F.sqrt( F.sin(F.col("dlat") / 2) ** 2 + F.cos(F.radians("lat2"))
                                           *F.cos(F.radians(F.col("lat1"))) * F.sin(F.col("dlon") / 2) ** 2)) * 2 * 3963 * 5280) \
        .drop("dlon", "dlat") 
        .filter(F.col("Distance")<500)
        .withColumn("Walking_time",60*F.round(2+F.col("Distance")/50).cast(IntegerType()))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
%%spark -o distance -n -1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
%%local
distance.to_csv("../data/Zurich_WalkingConnections_TOTAL.csv", index=False)

In [91]:
distance.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+---------+-------+--------+---------+------------------+------------+
|    id1|    lon1|     lat1|    id2|    lon2|     lat2|          Distance|Walking_time|
+-------+--------+---------+-------+--------+---------+------------------+------------+
|0000176|8.521961|47.351679|0000176|8.521961|47.351679|               0.0|           2|
|8502572|8.513918|47.370293|8502572|8.513918|47.370293|               0.0|           2|
|8503000|8.540192|47.378177|8503000|8.540192|47.378177|               0.0|           2|
|8503000|8.540192|47.378177|8503088|8.539170|47.377431|371.62272536447483|           9|
|8503000|8.540192|47.378177|8503446|8.541715|47.378846|448.94025446812276|          11|
|8503000|8.540192|47.378177|8587348|8.539338|47.377241| 401.8110520444312|          10|
|8503000|8.540192|47.378177|8587349|8.541742|47.377560| 444.6416307076613|          11|
|8503001|8.488940|47.391481|8503001|8.488940|47.391481|               0.0|           2|
|8503001|8.488940|47.391481|8591

**BETRIEBSTAG**: date of the trip

**FAHRT_BEZEICHNER**: identifies the trip

**BETREIBER_ABK, BETREIBER_NAME**: operator (name will contain the full name, e.g. Schweizerische Bundesbahnen for SBB)

**PRODUKT_ID**: type of transport, e.g. train, bus

**LINIEN_ID**: for trains, this is the train number

**LINIEN_TEXT,VERKEHRSMITTEL_TEXT**: for trains, the service type (IC, IR, RE, etc.)

**ZUSATZFAHRT_TF**: boolean, true if this is an additional trip (not part of the regular schedule)

**FAELLT_AUS_TF**: boolean, true if this trip failed (cancelled or not completed)

**HALTESTELLEN_NAME**: name of the stop

**ANKUNFTSZEIT**: arrival time at the stop according to schedule

**AN_PROGNOSE**: actual arrival time (when AN_PROGNOSE_STATUS is GESCHAETZT)

**AN_PROGNOSE_STATUS**: look only at lines when this is GESCHAETZT. This indicates that AN_PROGNOSE is the measured time of arrival.

**ABFAHRTSZEIT**: departure time at the stop according to schedule

**AB_PROGNOSE**: actual departure time (when AN_PROGNOSE_STATUS is GESCHAETZT)

**AB_PROGNOSE_STATUS**: look only at lines when this is GESCHAETZT. This indicates that AB_PROGNOSE is the measured time of arrival.

**DURCHFAHRT_TF**: boolean, true if the transport does not stop there

In [16]:
spark.catalog.clearCache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Cache a simpler dataframe with delays for each trip and without duplicated rows
delays_df = (
    sbb.where(F.col("HALTESTELLEN_NAME").isin(zurich_stations))
    .withColumn('arrival_time', F.when(F.col('ankunftszeit') == '', None).otherwise(F.col('ankunftszeit')))
    .withColumn('arrival', F.unix_timestamp(F.col('ankunftszeit'), "dd.MM.yyyy HH:mm").cast('long'))
    .withColumn('real_arrival', F.unix_timestamp(F.col('an_prognose'), "dd.MM.yyyy HH:mm:ss").cast("long"))
    .withColumn('arrival_delay',F.col('real_arrival')-F.col('arrival'))
    .withColumn('departure_time', F.when(F.col('abfahrtszeit') == '', None).otherwise(F.col('abfahrtszeit'))) 
    .withColumn('departure', F.unix_timestamp(F.col('abfahrtszeit'), "dd.MM.yyyy HH:mm").cast('long'))
    .withColumn('real_departure', F.unix_timestamp(F.col('ab_prognose'), "dd.MM.yyyy HH:mm:ss").cast("long"))
    .withColumn('departure_delay',F.col('real_departure')-F.col('departure'))
    .select('arrival_time','departure_time','arrival_delay','departure_delay','BETRIEBSTAG','FAHRT_BEZEICHNER','LINIEN_ID','PRODUKT_ID','BPUIC','HALTESTELLEN_NAME')
    .toDF('Arrival_Time','Departure_Time','Arrival_Delay','Departure_Delay','Day','Trip_ID','Line_ID', 'Type','Station_ID', 'Station_Name')
    .dropDuplicates()
    .cache()
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Remove rows correspoding to trips appearing only once on the dataframe
# These removable trips actually come from / go to stations outside Zurich
ids = delays_df.groupBy('Trip_ID','Day').count()
ids = ids.where(ids['count']>1).select('Trip_ID').distinct()
df = delays_df.join(ids, "Trip_ID").orderBy('Trip_ID','Arrival_Time','Departure_Time')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Solve null values: Remove columns with more than two null-values
# and Copy "Arrival_Time" value if "Departure_Time" is null and viceversa
df = df.dropna(thresh=1,subset=('Arrival_Time','Departure_Time')) \
        .withColumn("Departure",F.coalesce(df.Departure_Time,df.Arrival_Time))\
        .withColumn("Arrival", F.coalesce(df.Arrival_Time,df.Departure_Time))\
        .drop("Departure_Time","Arrival_Time")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
#Compute the average delay times for every connection (trip and station)
mean_delays = df.groupBy('Trip_ID','Station_Name').agg(F.mean('Arrival_Delay'),F.mean('Departure_Delay'))
df = df.join(mean_delays,on =['Trip_ID','Station_Name']).drop('Arrival_Delay','Departure_Delay')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
#Filter data to trips happening only a single working day between 5h and 21h 
df_day =df.where(F.col('Day')=='15.05.2019') 
df_min_hour = df_day.where(F.hour(F.unix_timestamp(F.col('Arrival'), "dd.MM.yyyy HH:mm").cast('timestamp'))>=5) 
df_max_hour = df_min_hour.where(F.hour(F.unix_timestamp(F.col('Departure'), "dd.MM.yyyy HH:mm").cast('timestamp'))<=20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Rank stops by departure_time for every trip and day 
from pyspark.sql import Window
trip_window = Window.partitionBy('Trip_ID','Day').orderBy(F.asc('Departure'))
trip_rank = F.rank().over(trip_window).alias('stop')
begin = df_max_hour.select('*', trip_rank).alias('begin').orderBy('Trip_ID','Arrival','Departure').fillna(0).cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# Create dataframe for every connection 
end = begin.drop('Departure','avg(Departure_Delay)').withColumn('stop', begin.stop -1).alias('end')
data = begin.drop('Arrival','avg(Arrival_Delay)').join(end, on=['stop','Day','Trip_ID','Type','Line_ID'])\
            .drop('stop').orderBy('Trip_ID','Arrival','Departure')\
            .toDF('Day','Trip_ID','Type','Line_ID','Start_Station','Start_ID','Start_Time', 
                  'Start_Delay', 'Stop_Station','Stop_ID','Stop_Time','Stop_Delay')\
            

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
#data.write.format("csv").save("../data/connections_with_delays.csv")
#new = spark.read.csv("../data/connections_with_delays.csv",header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
%%spark -o data -n -1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o1489.toJavaRDD.
: org.apache.spark.SparkException: Job 8 cancelled 
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1542)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1789)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.s

In [18]:
%%local
data.to_csv("../data/connections_delays.csv", index=False)

In [51]:
data.where(data['Day']=='02.12.2019').where(data['Type']=='Bus').drop('Trip_ID').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+----+--------+--------------------+----------------+-------+--------------------+----------------+
|       Day|   Line_ID|Type|Start_ID|       Start_Station|      Start_Time|Stop_ID|        Stop_Station|       Stop_Time|
+----------+----------+----+--------+--------------------+----------------+-------+--------------------+----------------+
|02.12.2019|85:773:787| Bus| 8591347|Zürich, Schürgist...|02.12.2019 20:48|8591047|    Zürich, Aubrücke|02.12.2019 20:49|
|02.12.2019|85:773:787| Bus| 8591047|    Zürich, Aubrücke|02.12.2019 20:49|8591225|Zürich, Genossens...|02.12.2019 20:52|
|02.12.2019|85:773:787| Bus| 8591225|Zürich, Genossens...|02.12.2019 20:52|8591318|    Zürich, Riedbach|02.12.2019 20:53|
|02.12.2019|85:773:787| Bus| 8591318|    Zürich, Riedbach|02.12.2019 20:53|8591172|   Zürich, Hagenholz|02.12.2019 20:54|
|02.12.2019|85:773:787| Bus| 8591172|   Zürich, Hagenholz|02.12.2019 20:54|8591256|Zürich, Leutschen...|02.12.2019 20:55|
|02.12.2019|85:773:787| 

In [52]:
data.where(data['Day']=='02.12.2019').where(data['Type']=='Tram').drop('Trip_ID').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+----+--------+--------------------+----------------+-------+--------------------+----------------+
|       Day|    Line_ID|Type|Start_ID|       Start_Station|      Start_Time|Stop_ID|        Stop_Station|       Stop_Time|
+----------+-----------+----+--------+--------------------+----------------+-------+--------------------+----------------+
|02.12.2019|85:3849:015|Tram| 8591101|Zürich, Bucheggplatz|02.12.2019 20:07|8591246|    Zürich, Laubiweg|02.12.2019 20:08|
|02.12.2019|85:3849:015|Tram| 8591246|    Zürich, Laubiweg|02.12.2019 20:09|8591335|Zürich, Schaffhau...|02.12.2019 20:10|
|02.12.2019|85:3849:015|Tram| 8591335|Zürich, Schaffhau...|02.12.2019 20:10|8591324|Zürich, Röslistrasse|02.12.2019 20:11|
|02.12.2019|85:3849:015|Tram| 8591324|Zürich, Röslistrasse|02.12.2019 20:11|8591298|Zürich, Ottikerst...|02.12.2019 20:12|
|02.12.2019|85:3849:015|Tram| 8591298|Zürich, Ottikerst...|02.12.2019 20:12|8591373|Zürich, Sonneggst...|02.12.2019 20:13|
|02.12.2019|85:3