# Preprocessing

preprocess timetable data

## Starting Spark

In [1]:
%load_ext sparkmagic.magics

In [2]:
import os
from IPython import get_ipython
username = os.environ['RENKU_USERNAME']
server = "http://iccluster029.iccluster.epfl.ch:8998"

# set the application name as "<your_gaspar_id>-final"
get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell="""{{ "name": "{0}-final", "executorMemory": "4G", "executorCores": 4, "numExecutors": 10, "driverMemory": "4G"}}""".format(username)
)

In [3]:
get_ipython().run_line_magic(
    "spark", "add -s {0}-final -l python -u {1} -k".format(username, server)
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5590,application_1652960972356_0041,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [30]:
%%send_to_spark -i username -t str -n username

UsageError: Cell magic `%%send_to_spark` not found.


In [4]:
%%spark
print('We are using Spark %s' % spark.version)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We are using Spark 2.3.2.3.1.4.0-315

## Filter Stations (`allstops`)

We only consider departure and arrival stops in a 15km radius of Zürich's train station, `Zürich HB (8503000)`, (lat, lon) = `(47.378177, 8.540192)`.

In [5]:
%%spark
allstops = spark.read.orc('/data/sbb/orc/allstops')
allstops.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- stop_id: string (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_lat: double (nullable = true)
 |-- stop_lon: double (nullable = true)
 |-- location_type: string (nullable = true)
 |-- parent_station: string (nullable = true)

In [6]:
%%spark
print(allstops.count())
allstops.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

46689
+-------+--------------------+----------------+----------------+-------------+--------------+
|stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|
+-------+--------------------+----------------+----------------+-------------+--------------+
|1100008|Zell (Wiesental),...|47.7100842702352|7.85964788274668|             |              |
|1100009|Zell (Wiesental),...|47.7131911044794|7.86290876722849|             |              |
|1100010|           Atzenbach|47.7146175266411| 7.8723500608659|             |              |
|1100011|     Mambach, Brücke|47.7282088873189| 7.8774704579861|             |              |
|1100012|  Mambach, Mühlschau|47.7340818684375| 7.8813871126254|             |              |
+-------+--------------------+----------------+----------------+-------------+--------------+
only showing top 5 rows

In [7]:
%%spark
import pyspark.sql.functions as F
from math import sin, cos, sqrt, atan2, radians

# input lat/lon in degree
# output distance in KM
# equation from https://www.movable-type.co.uk/scripts/latlong.html
def lat_lon_to_distance(lat1, lon1, lat2, lon2):
    # earth radius in KM
    R = 6371.0

    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)

    dlat = lat2 - lat1
    dlon = lon2 - lon1
    
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))

    return R * c

# test
assert(int(lat_lon_to_distance(20, 30, 40, 50)) == 2927)

def distance_from_Zurich_HB(lat, lon):
    return lat_lon_to_distance(lat, lon, 47.378177, 8.540192)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
%%spark
filtered_stops = allstops \
    .withColumn("distance", F.udf(distance_from_Zurich_HB)(F.col('stop_lat'), F.col('stop_lon'))) \
    .filter(F.col('distance')<=15.5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
%%spark
print(filtered_stops.count())
# Check Zürich HB
filtered_stops.filter(F.col('stop_id')==8503000).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2193
+-------+---------+----------------+----------------+-------------+--------------+--------------------+
|stop_id|stop_name|        stop_lat|        stop_lon|location_type|parent_station|            distance|
+-------+---------+----------------+----------------+-------------+--------------+--------------------+
|8503000|Zürich HB|47.3781762039461|8.54021154209037|             | Parent8503000|0.001474108001550874|
+-------+---------+----------------+----------------+-------------+--------------+--------------------+

In [10]:
%%spark
filtered_stops = filtered_stops.select("stop_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Filter Timetable 

Filter timetable data according to 15km radius of Zürich's train station.

We only consider journeys at reasonable hours of the day, and on a typical business day, and assuming the schedule of May 13-17, 2019.


### Filter `trip_id` in `stop_times`

In [11]:
%%spark
stop_times = spark.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .csv('/data/sbb/csv/stop_times/2019/05/08/stop_times.txt')

stop_times.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trip_id: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- stop_sequence: integer (nullable = true)
 |-- pickup_type: integer (nullable = true)
 |-- drop_off_type: integer (nullable = true)

In [12]:
%%spark
stop_times.show(5)
print(stop_times.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------+--------------+-------+-------------+-----------+-------------+
|            trip_id|arrival_time|departure_time|stop_id|stop_sequence|pickup_type|drop_off_type|
+-------------------+------------+--------------+-------+-------------+-----------+-------------+
|1.TA.1-84-j19-1.1.H|    06:13:00|      06:13:00|8572249|            1|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:14:00|      06:14:00|8577508|            2|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:15:00|      06:15:00|8581070|            3|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:16:00|      06:16:00|8578360|            4|          0|            0|
|1.TA.1-84-j19-1.1.H|    06:17:00|      06:17:00|8583448|            5|          0|            0|
+-------------------+------------+--------------+-------+-------------+-----------+-------------+
only showing top 5 rows

10862563

In [23]:
%%spark
filtered_stop_times = stop_times \
    .select("trip_id", "arrival_time", "departure_time", "stop_id") \
    .join(filtered_stops, on="stop_id", how="inner")
filtered_trip_ids = filtered_stop_times.select("trip_id").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
%%spark
print(filtered_stop_times.count())
print(filtered_trip_ids.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2166674
160040

### Filter `trips`

In [15]:
%%spark
trips = spark.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .csv('/data/sbb/csv/trips/2019/05/08/trips.txt') \

trips.printSchema()
trips.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- route_id: string (nullable = true)
 |-- service_id: string (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- trip_headsign: string (nullable = true)
 |-- trip_short_name: integer (nullable = true)
 |-- direction_id: integer (nullable = true)

+-----------+----------+--------------------+-------------------+---------------+------------+
|   route_id|service_id|             trip_id|      trip_headsign|trip_short_name|direction_id|
+-----------+----------+--------------------+-------------------+---------------+------------+
| 1-85-j19-1|  TA+b0001| 2.TA.1-85-j19-1.1.H|Schöftland, Bahnhof|          85003|           0|
|1-1-C-j19-1|  TA+b0001|5.TA.1-1-C-j19-1.3.R| Zofingen, Altachen|            108|           1|
|1-1-C-j19-1|  TA+b0001|7.TA.1-1-C-j19-1.3.R| Zofingen, Altachen|            112|           1|
|1-1-C-j19-1|  TA+b0001|9.TA.1-1-C-j19-1.3.R| Zofingen, Altachen|            116|           1|
|1-1-C-j19-1|  TA+b0001|11.TA.1-1-C-j19-1...| Zofingen, Altachen|     

In [16]:
%%spark
filtered_trips = trips \
    .join(filtered_trip_ids, on="trip_id", how="inner")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
%%spark
print(filtered_trips.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

160040

### Join trips with calendar

In [18]:
%%spark
calendar = spark.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .csv('/data/sbb/csv/calendar/2019/05/08/calendar.txt')

calendar.printSchema()
calendar.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- service_id: string (nullable = true)
 |-- monday: integer (nullable = true)
 |-- tuesday: integer (nullable = true)
 |-- wednesday: integer (nullable = true)
 |-- thursday: integer (nullable = true)
 |-- friday: integer (nullable = true)
 |-- saturday: integer (nullable = true)
 |-- sunday: integer (nullable = true)
 |-- start_date: integer (nullable = true)
 |-- end_date: integer (nullable = true)

+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|service_id|monday|tuesday|wednesday|thursday|friday|saturday|sunday|start_date|end_date|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|  TA+b0006|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0ch2|     0|      0|        0|       0|     0|       1|     1|  20181209|20191214|
|  TA+b0014|     0|      0|        0|       0|     0|       0|     1|  20181209|20191214|
|  TA+b000w|     0|      0|        0|       0| 

In [19]:
%%spark 

filtered_trips_new = filtered_trips.select("route_id", "service_id", "trip_id", "direction_id") \
    .join(calendar.select("service_id", "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"), 
          on="service_id", how="inner"
         )

filtered_trips_new.show(5)
print(filtered_trips_new.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------+--------------------+------------+------+-------+---------+--------+------+--------+------+
|service_id|      route_id|             trip_id|direction_id|monday|tuesday|wednesday|thursday|friday|saturday|sunday|
+----------+--------------+--------------------+------------+------+-------+---------+--------+------+--------+------+
|  TA+b0b46|   26-18-j19-1|1.TA.26-18-j19-1.1.H|           0|     1|      1|        1|       1|     1|       0|     0|
|  TA+b0a2k|63-138-Y-j19-1|1.TA.63-138-Y-j19...|           0|     0|      0|        1|       0|     0|       0|     0|
|  TA+b001t|   26-77-j19-1|10.TA.26-77-j19-1...|           0|     0|      0|        0|       0|     0|       0|     1|
|  TA+b07dj|  42-1-Y-j19-1|10.TA.42-1-Y-j19-...|           0|     1|      1|        1|       0|     0|       0|     0|
|  TA+b090k| 80-55-Y-j19-1|10.TA.80-55-Y-j19...|           0|     0|      0|        0|       0|     0|       1|     0|
+----------+--------------+--------------------+

## Write filtered data into HDFS

In [48]:
%%spark

stop_times_df = filtered_stop_times.toDF(*filtered_stop_times.columns)
stop_times_df.write.parquet("/user/tshen/final-assn/parquet/stop_times", mode="overwrite")

trips_df = filtered_trips_new.toDF(*filtered_trips_new.columns)
trips_df.write.parquet("/user/tshen/final-assn/parquet/trips", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
%%spark

# check the memory usage is reasonable
stop_times_df.toPandas().info() # 66MB
trips_df.toPandas().info() # 8.5MB

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2166674 entries, 0 to 2166673
Data columns (total 4 columns):
trip_id           object
arrival_time      object
departure_time    object
stop_id           object
dtypes: object(4)
memory usage: 66.1+ MB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 160040 entries, 0 to 160039
Data columns (total 11 columns):
route_id        160040 non-null object
trip_id         160040 non-null object
direction_id    160040 non-null object
service_id      160040 non-null int32
monday          160040 non-null int32
tuesday         160040 non-null int32
wednesday       160040 non-null int32
thursday        160040 non-null int32
friday          160040 non-null int32
saturday        160040 non-null int32
sunday          160040 non-null int32
dtypes: int32(8), object(3)
memory usage: 8.5+ MB