In [1]:
%%configure
{"conf": {
    "spark.app.name": "dslab-group_final"
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7165,application_1589299642358_1661,pyspark,busy,Link,Link,


#### Imports:

In [4]:
from geopy.distance import distance as geo_distance
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Load data:

In [5]:
# Loading data, these are snapshots of the all available data
# Calendar and trips are useful to filter the other dataframe according to the day

stop_times = spark.read.format('orc').load('/data/sbb/timetables/orc/stop_times/000000_0')
stops = spark.read.format('orc').load('/data/sbb/timetables/orc/stops/000000_0')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Criteria 1: Stop times during rush-hour 

Only consider journeys at reasonable hours of the day, thus we take only stop times that are in the window of rush-hour (e.g. from 8 a.m. to 8 p.m.). 

In [6]:
# Filter stop_times to be only in 08:00-19:59:
stop_times = stop_times.where((col('departure_time') >= '08:00:00') 
                              & (col('departure_time') <= '19:59:59'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Criteria 2: Stations around Zürich HB

Only consider stations in a 15km radius of Zürich's train station (Zürich HB). 

First we get the geolocation of Zürich Hauptbahnhof to be able to calculate the distance of the other stations to the Hauptbahnhof. 

In [7]:
zurich_pos = stops.where(col('stop_name') == 'Zürich HB').select('stop_lat', 'stop_lon').collect()
zurich_pos = (zurich_pos[0][0], zurich_pos[0][1])
print('Location of Zürich Hauptbahnhof (lat, lon) :'+str(zurich_pos))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Location of Zürich Hauptbahnhof (lat, lon) :(47.3781762039461, 8.54019357578468)

In [8]:
def zurich_distance(x, y):
    """zurich_distance: returns the distance of a station to Zurich HB
    @input: (lat,lon) of a station
    @output: distance in km to Zurich HB
    """
    return geo_distance(zurich_pos, (x,y)).km

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Then we create a dataframe `stops_zurich` of the stations where we add a column for the distance to Zurich HB. In that dataframe, we keep only those that are in a radius of 15km to the HB. The same filter is applied to the `stop_times` df mentioned above. 

In [9]:
# filter stops:
stops_distance = stops.rdd.map(lambda x: (x['stop_id'], zurich_distance(x['stop_lat'], x['stop_lon'])))
stops_distance = spark.createDataFrame(stops_distance.map(lambda r: Row(stop_id=r[0], 
                                                                        zurich_distance=r[1])))

stops_distance = stops_distance.filter(col('zurich_distance') <= 15)

# add distance to HB to stops info and keep only in radius of 15km
stops_zurich = stops_distance.join(stops, on='stop_id')

# keep only stop times in radius of 15km of Zurich
stop_times_zurich = stop_times.join(stops_distance.select('stop_id'), on='stop_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Cache it to save time:
stop_times_zurich.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[stop_id: string, trip_id: string, arrival_time: string, departure_time: string, stop_sequence: smallint, pickup_type: tinyint, drop_off_type: tinyint]

### Have a look at the data we have so far: 

#### Stop times in Zurich: 
Arrival and departure times at stops in the 15km radius of Zurich HB. 

In [11]:
stop_times_zurich.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+------------+--------------+-------------+-----------+-------------+
|stop_id|             trip_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|
+-------+--------------------+------------+--------------+-------------+-----------+-------------+
|8502508|9.TA.1-303-j19-1.2.R|    19:55:00|      19:55:00|            6|          0|            0|
|8502508|12.TA.1-303-j19-1...|    09:55:00|      09:55:00|            6|          0|            0|
|8502508|13.TA.1-303-j19-1...|    08:25:00|      08:25:00|            6|          0|            0|
+-------+--------------------+------------+--------------+-------------+-----------+-------------+
only showing top 3 rows

#### Trips in Zurich:

Information about journeys in the 15km radius of Zurich HB. 

#### Stops in Zurich:
Information about stops in the 15km radius of Zurich HB.

In [12]:
stops_zurich.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------------+--------------------+----------------+----------------+-------------+--------------+
|    stop_id|   zurich_distance|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|
+-----------+------------------+--------------------+----------------+----------------+-------------+--------------+
|    8500926|11.510766966884365|Oetwil a.d.L., Sc...|47.4236270123012| 8.4031825286317|             |              |
|    8502186|10.798985488832079|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|             |      8502186P|
|8502186:0:1|10.800041577194426|Dietikon Stoffelbach|47.3934666445388|8.39894248049007|             |      8502186P|
+-----------+------------------+--------------------+----------------+----------------+-------------+--------------+
only showing top 3 rows

## Create network data

From the pre-processed data, we would like to create a directed network where each node is a station and each edge between two nodes corresponds to a possible trip. 

A node will have the following attributes:
- stop_name: name of the station (e.g. Zurich HB)
- latitude
- longitude

An directed edge will have the following attributes:
- stop_id: the id of the stop the (directed) edge points from
- next_stop: the id of the stop the edge points to
- duration: the duration of the trip from stop_id to next_stop
- departure time: the time from which the service departs from stop_id

### Nodes:

Then we create a **multigraph** (e.g. more than one edge allowed between two nodes) and add the stations as nodes.

In [13]:
%%local
import os
username = os.environ['JUPYTERHUB_USER']

In [14]:
%%send_to_spark -i username -t str -n username

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'username' as 'username' to Spark kernel

In [15]:
stops_zurich.write.format("orc").save("/user/{}/nodes.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
u'path hdfs://iccluster044.iccluster.epfl.ch:8020/user/gottraux/nodes.orc already exists.;'
Traceback (most recent call last):
  File "/hdata/sdg/hadoop/yarn/local/usercache/ebouille/appcache/application_1589299642358_2208/container_e06_1589299642358_2208_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 705, in save
    self._jwrite.save(path)
  File "/hdata/sdg/hadoop/yarn/local/usercache/ebouille/appcache/application_1589299642358_2208/container_e06_1589299642358_2208_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/hdata/sdg/hadoop/yarn/local/usercache/ebouille/appcache/application_1589299642358_2208/container_e06_1589299642358_2208_01_000001/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'path hdfs://iccluster044.iccluster.epfl.ch:8020/user/gottraux/nodes.orc already exis

### Edges:

Right now, we have only nodes that are stations in our graph and we would like to add edges between them showing possible trips at certain times. For this we need to do a few operations. 

First, in our `stop_times_zurich` table, we have the time of arrival and departure but we would like to have an idea of the time elapsed in minutes since 12 p.m. This way times will be easily subtractable and we can get an idea of trip duration in minutes. So we convert those two columns: 

In [16]:
@udf
def convertToMinute(s):
    h, m, _ = s.split(':')
    h,m = int(h), int(m)
    
    return h*60+m

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Convert time information to minutes elapsed since 0am
stop_times_zurich = stop_times_zurich.withColumn('arrival_time', 
                                                 convertToMinute(col('arrival_time')))
stop_times_zurich = stop_times_zurich.withColumn('departure_time', 
                                                 convertToMinute(col('departure_time')))
stop_times_zurich.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+------------+--------------+-------------+-----------+-------------+
|stop_id|             trip_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|
+-------+--------------------+------------+--------------+-------------+-----------+-------------+
|8502508|9.TA.1-303-j19-1.2.R|        1195|          1195|            6|          0|            0|
|8502508|12.TA.1-303-j19-1...|         595|           595|            6|          0|            0|
|8502508|13.TA.1-303-j19-1...|         505|           505|            6|          0|            0|
+-------+--------------------+------------+--------------+-------------+-----------+-------------+
only showing top 3 rows

Then we want a dataframe that has the trip duration to the next stop from the current one on the trip. For that, we first create a table with the next stop and arrival time for each stop sequence in a trip. 

In [18]:
stop_times_zurich_2 = (stop_times_zurich.withColumn('stop_sequence_prev', col('stop_sequence')-1)
                       .select('trip_id',
                               col('stop_id').alias('next_stop'),
                               col('stop_sequence_prev').alias('stop_sequence'),
                               col('arrival_time').alias('next_arrival_time')))

stop_times_zurich_2.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+-------------+-----------------+
|             trip_id|next_stop|stop_sequence|next_arrival_time|
+--------------------+---------+-------------+-----------------+
|9.TA.1-303-j19-1.2.R|  8502508|            5|             1195|
|12.TA.1-303-j19-1...|  8502508|            5|              595|
+--------------------+---------+-------------+-----------------+
only showing top 2 rows

Then we join this to the `stop_times_zurich` table to have trip duration (in minutes) and next stop information. 

In [19]:
# Add trip duration and next stop: 
stop_times_zurich = stop_times_zurich.join(stop_times_zurich_2, 
                                           on=['trip_id', 'stop_sequence']).orderBy('trip_id', 'stop_sequence')
stop_times_zurich = stop_times_zurich.withColumn('trip_duration', 
                                                 col('next_arrival_time')-col('departure_time'))
stop_times_zurich = stop_times_zurich.select('trip_id', 
                                             'stop_id', 'arrival_time', 'departure_time', 
                                             'next_stop', 'trip_duration').cache()
stop_times_zurich.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------------+--------------+---------+-------------+
|             trip_id|stop_id|arrival_time|departure_time|next_stop|trip_duration|
+--------------------+-------+------------+--------------+---------+-------------+
|1.TA.1-231-j19-1.1.H|8582462|         578|           578|  8572600|          1.0|
|1.TA.1-231-j19-1.1.H|8572600|         579|           579|  8572601|          0.0|
+--------------------+-------+------------+--------------+---------+-------------+
only showing top 2 rows

##### Save stop_times informations to hdfs:

In [20]:
stop_times_zurich.write.format("orc").save("/user/{}/edges.orc".format(username))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
u'path hdfs://iccluster044.iccluster.epfl.ch:8020/user/gottraux/edges.orc already exists.;'
Traceback (most recent call last):
  File "/hdata/sdg/hadoop/yarn/local/usercache/ebouille/appcache/application_1589299642358_2208/container_e06_1589299642358_2208_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 705, in save
    self._jwrite.save(path)
  File "/hdata/sdg/hadoop/yarn/local/usercache/ebouille/appcache/application_1589299642358_2208/container_e06_1589299642358_2208_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/hdata/sdg/hadoop/yarn/local/usercache/ebouille/appcache/application_1589299642358_2208/container_e06_1589299642358_2208_01_000001/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'path hdfs://iccluster044.iccluster.epfl.ch:8020/user/gottraux/edges.orc already exis