# Data Preprocessing


This notebook contains data cleaning and preparation procedure.

### 1) stops.txt

We read stops file. We select all the unique stations (without platform information). Next we filter on radius and keep only stations that are withing 15km from Zurich main station and we save the final list of stops <b>filtered_stops</b> for future usage as '/user/lortkipa/filtered_stops_Premoved.pkl'. We are left with approximately 1600 stations.


### 2) calendar.txt, trips.txt 

Goal: Leave only services operating Mon-Fri. We read calendar.txt and select services <b>allowed_service_ids</b> that correspond to operation from Mo-Fri. Next we select trips <b>allowed_trips_ids</b> that belong to service in allowed_service_ids. We will use this information for final filtering described below.


### 3) stop_times.txt

Goal of this part is to read stop_times file and only keep entries of stops/station that are within 15km of Zurich using <b>filtered_stops</b>, have trips that are allowed according to <b>allowed_trips_ids</b> described above. Furthermore, we only keep entries concerning normal time period, from 06:00 to 20:00. <b>filtered_stop_time</b> contains final filtered content of stop_times document.

### 4) current_next

Here we join <b>filtered_stop_time</b> on itself. We want that each row of the new table contains information of a stations and its next station according to their route. For that reaseon we join <b>filtered_stop_time</b> on itself, where trip_ids are the same and stop_sequence of the first one equal to the stop_sequence-1 for the next one. This way we will have a row corresponding to a station and its next station according to the trip_id they are on. Joined table <b>current_next</b> is saved at 'hdfs:/user/lortkipa/current_next_6_22_Pcor.parquet' and will be used for transport graph generation. For more details of how individual rows look like see data_preprocessing.ipynb.

Table of Content for the notebook:

0. Helper Functions
1. Data reading/pre-processing
    - stops.txt
    - calendar.txt, trips.txt
    - stop_times.txt
    - current_next dataframe

In [1]:
%%configure
{"conf": {
    "spark.app.name": "my-awesome-group_final"
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6287,application_1589299642358_0776,pyspark,idle,Link,Link,
6288,application_1589299642358_0777,pyspark,idle,Link,Link,
6291,application_1589299642358_0780,pyspark,idle,Link,Link,
6292,application_1589299642358_0781,pyspark,busy,Link,Link,
6293,application_1589299642358_0782,pyspark,busy,Link,Link,
6295,application_1589299642358_0784,pyspark,busy,Link,Link,


In [2]:
# Initialization
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6296,application_1589299642358_0785,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f196b2d8750>

# 0. Helper Functions / Global Parameters

In [3]:
import pandas as pd 
from math import sin, cos, sqrt, atan2, radians
from geopy import distance as dist
from pyspark.sql.functions import col

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# global parameters
start_day = u'06:00:00'
end_day   = u'22:00:00' 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
%%local
import pandas as pd
from hdfs3 import HDFileSystem

# configuring connection with hdfs system
hdfs = HDFileSystem(host='hdfs://iccluster044.iccluster.epfl.ch',port=8020, user='ebouille')

In [6]:
# cell to communicate with hdfs
import subprocess, pickle

def run_cmd(args_list):
    """Run linux commands."""
    print('Running system command: {0}'.format(' '.join(args_list)))    
    proc = subprocess.Popen(args_list,                            
                            stdout=subprocess.PIPE,                            
                            stderr=subprocess.PIPE)    
    s_output, s_err = proc.communicate()    
    s_return =  proc.returncode
    return s_return, s_output, s_err


def save_hdfs(localPath, hdfsPath):
    
    (ret, out, err)= run_cmd(['hdfs','dfs','-put','-f', localPath, hdfsPath])
    if err:
        print(err)
    else:
        print('Success')
        
def read_hdfs(hdfsPath):
    
    (ret, out, err)= run_cmd(['hdfs','dfs','-cat', hdfsPath])
    if err:
        print(err)
    else:
        print('Success')
    return pickle.loads(out)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
import pyspark.sql.functions as functions

# keep the parent ID only
@functions.udf
def keep_parent_ID(text):
    """Keep only parent ID and remove trailing P if necessary"""
    parent_id = text.split(':')[0]
    
    # remove trailing P if present
    if parent_id[-1] == 'P':
        parent_id = parent_id[:-1]
        
    return parent_id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 1. Read needed datasets and filter them accordingly

### 1.1 Read Stops File

We read stops file. We select all the unique stations (without platform information). Next we filter on radius and keep only stations that are withing 15km from Zurich main station and we save the final list of stops <b>filtered_stops</b> for future usage as '/user/lortkipa/filtered_stops_Premoved.pkl'. We are left with approximately 1600 stations.

In [8]:
# reading stops file
stops = spark.read.csv('hdfs:/data/sbb/timetables/csv/stops/2019/05/14/stops.txt', header=True)
stops.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- stop_id: string (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_lat: string (nullable = true)
 |-- stop_lon: string (nullable = true)
 |-- location_type: string (nullable = true)
 |-- parent_station: string (nullable = true)

In [9]:
stops.where(col('parent_station').isNotNull()).select('stop_id').take(30)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(stop_id=u'8004238'), Row(stop_id=u'8004238:0:1'), Row(stop_id=u'8004238:0:2'), Row(stop_id=u'8014008'), Row(stop_id=u'8014008:0:10'), Row(stop_id=u'8014008:0:11'), Row(stop_id=u'8014008:0:5'), Row(stop_id=u'8014008:0:7b'), Row(stop_id=u'8014008:0:9'), Row(stop_id=u'8014008:0:9b'), Row(stop_id=u'8014015'), Row(stop_id=u'8014015:0:1'), Row(stop_id=u'8014015:0:2'), Row(stop_id=u'8014020:0:2'), Row(stop_id=u'8014021'), Row(stop_id=u'8014021:0:2'), Row(stop_id=u'8014021:0:3'), Row(stop_id=u'8014021:0:4'), Row(stop_id=u'8014021:0:5'), Row(stop_id=u'8014021:0:9'), Row(stop_id=u'8014031:0:2'), Row(stop_id=u'8014033:0:1'), Row(stop_id=u'8014035:0:1'), Row(stop_id=u'8014035:0:2'), Row(stop_id=u'8014036:0:2'), Row(stop_id=u'8014037:0:1'), Row(stop_id=u'8014037:0:2'), Row(stop_id=u'8014038:0:1'), Row(stop_id=u'8014039:0:2'), Row(stop_id=u'8014040:0:2b')]

In [10]:
print(stops.count())
# remove platform information and drop duplicates
stops = stops.withColumn("stop_id", keep_parent_ID(stops["stop_id"])).dropDuplicates(["stop_id"])
print(stops.count())
# convert to pandas
df_stops = stops.toPandas().dropna(0, subset=['stop_id']).reset_index().drop('index', axis=1)
df_stops.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30631
25784
   stop_id               stop_name  ... location_type parent_station
0  1322058        Migiandone Bivio  ...          None           None
1  8014584   Konstanz-Petershausen  ...          None       8014584P
2  8029549                   Aalen  ...          None       8029549P
3  8500383        Kappel SO, Kreuz  ...          None           None
4  8500737  Mont-sur-Rolle, église  ...          None           None

[5 rows x 6 columns]

In [11]:
# keep only stations within 15kms of Zurich Main Station (47.378177, 8.540192)
zurich_coordinate = df_stops[df_stops['stop_id']=='8503000']
z_lat = zurich_coordinate['stop_lat'].values[0]
z_lon = zurich_coordinate['stop_lon'].values[0]

# Filters stations outside of 15k.
filtered_stops = df_stops[df_stops.apply(lambda x: dist.distance(
                   (float(z_lat), 
                   float(z_lon)),
                   (float(x['stop_lat']), 
                   float(x['stop_lon']))).km <= 15, axis=1)]

print('Remaining stops: %d'%len(filtered_stops))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Remaining stops: 1583

In [62]:
# save to pickle
filtered_stops.to_pickle('filtered_stops_Premoved.pkl')
del filtered_stops
# send to hdfs
save_hdfs('filtered_stops_Premoved.pkl','/user/lortkipa/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -put -f filtered_stops_Premoved.pkl /user/lortkipa/
Success

In [63]:
filtered_stops = read_hdfs('/user/lortkipa/filtered_stops_Premoved.pkl')
filtered_stops.head(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running system command: hdfs dfs -cat /user/lortkipa/filtered_stops_Premoved.pkl
Success
    stop_id               stop_name  ... location_type parent_station
10  8502508  Spreitenbach, Raiacker  ...          None           None
14  8503078                Waldburg  ...          None           None

[2 rows x 6 columns]

### 1.2 Read calendar and trips files

Goal: Leave only services operating Mon-Fri. We read calendar.txt and select services <b>allowed_service_ids</b> that correspond to operation from Mo-Fri. Next we select trips <b>allowed_trips_ids</b> that belong to service in allowed_service_ids. We will use this information for final filtering described below.

In [12]:
calendar = spark.read.csv('/data/sbb/timetables/csv/calendar/2019/05/14/calendar.txt', header=True)
calendar.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- service_id: string (nullable = true)
 |-- monday: string (nullable = true)
 |-- tuesday: string (nullable = true)
 |-- wednesday: string (nullable = true)
 |-- thursday: string (nullable = true)
 |-- friday: string (nullable = true)
 |-- saturday: string (nullable = true)
 |-- sunday: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)

In [13]:
trips = spark.read.csv('/data/sbb/timetables/csv/trips/2019/05/14/trips.txt', header=True)
trips.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- route_id: string (nullable = true)
 |-- service_id: string (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- trip_headsign: string (nullable = true)
 |-- trip_short_name: string (nullable = true)
 |-- direction_id: string (nullable = true)

In [14]:
print(calendar.count())
calendar.take(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22615
[Row(service_id=u'TA+b0nx9', monday=u'1', tuesday=u'1', wednesday=u'1', thursday=u'1', friday=u'1', saturday=u'0', sunday=u'0', start_date=u'20181209', end_date=u'20191214')]

In [15]:
# Retrieve service_ids that we want to keep
allowed_service_ids = calendar.where((calendar['monday'] == '1') & (calendar['tuesday'] == '1') & (
    calendar['wednesday'] == '1') & (calendar['thursday'] == '1') & (calendar['friday'] == '1')).select('service_id')
print(allowed_service_ids.count())
allowed_service_ids = allowed_service_ids.rdd.flatMap(lambda x: x).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8648

In [16]:
# select trip_ids that have allowed service_id
print(trips.select('trip_id').distinct().count())
allowed_trips_ids = trips.where(trips['service_id'].isin(allowed_service_ids)).select('trip_id').distinct()
print(allowed_trips_ids.count())
#allowed_trips_ids = allowed_trips_ids.rdd.flatMap(lambda x: x).collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1017413
528368

In [17]:
allowed_trips_ids.take(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(trip_id=u'237.TA.1-14-j19-1.4.H')]

### 1.3 Read Stop Times File

Goal of this part is to read stop_times file and only keep entries of stops/station that are within 15km of Zurich using <b>filtered_stops</b>, have trips that are allowed according to <b>allowed_trips_ids</b> described above. Furthermore, we only keep entries concerning normal time period, from 06:00 to 20:00. <b> filtered_stop_time </b> contains final filtered content of stop_times document.

In [18]:
stop_times = spark.read.csv('/data/sbb/timetables/csv/stop_times/2019/05/14/stop_times.txt', header=True)
print(stop_times.take(1))
stop_times.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(trip_id=u'1.TA.1-1-B-j19-1.1.R', arrival_time=u'04:20:00', departure_time=u'04:20:00', stop_id=u'8500010:0:3', stop_sequence=u'1', pickup_type=u'0', drop_off_type=u'0')]
root
 |-- trip_id: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- stop_sequence: string (nullable = true)
 |-- pickup_type: string (nullable = true)
 |-- drop_off_type: string (nullable = true)

In [19]:
stop_times.groupBy(['trip_id']).count().sort('count', ascending=False).take(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(trip_id=u'2.TA.9-NC-j19-1.2.H', count=69), Row(trip_id=u'4.TA.9-NC-j19-1.2.H', count=69), Row(trip_id=u'3.TA.9-NC-j19-1.2.H', count=69)]

In [20]:
from pyspark.sql.types import IntegerType
# convert stop sequence to integer
stop_times = stop_times.withColumn("stop_sequence", stop_times["stop_sequence"].cast(IntegerType()))
# remove platform etc. information and keep only main (parent) station id
stop_times = stop_times.withColumn("stop_id", keep_parent_ID(stop_times["stop_id"]))
stop_times.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trip_id: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- stop_sequence: integer (nullable = true)
 |-- pickup_type: string (nullable = true)
 |-- drop_off_type: string (nullable = true)

In [21]:
print(stop_times.count())
stop_times.take(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

11128930
[Row(trip_id=u'1.TA.1-1-B-j19-1.1.R', arrival_time=u'04:20:00', departure_time=u'04:20:00', stop_id=u'8500010', stop_sequence=1, pickup_type=u'0', drop_off_type=u'0'), Row(trip_id=u'1.TA.1-1-B-j19-1.1.R', arrival_time=u'04:24:00', departure_time=u'04:24:00', stop_id=u'8500020', stop_sequence=2, pickup_type=u'0', drop_off_type=u'0'), Row(trip_id=u'1.TA.1-1-B-j19-1.1.R', arrival_time=u'04:28:00', departure_time=u'04:28:00', stop_id=u'8500021', stop_sequence=3, pickup_type=u'0', drop_off_type=u'0'), Row(trip_id=u'1.TA.1-1-B-j19-1.1.R', arrival_time=u'04:30:00', departure_time=u'04:30:00', stop_id=u'8517131', stop_sequence=4, pickup_type=u'0', drop_off_type=u'0'), Row(trip_id=u'1.TA.1-1-B-j19-1.1.R', arrival_time=u'04:32:00', departure_time=u'04:32:00', stop_id=u'8500300', stop_sequence=5, pickup_type=u'0', drop_off_type=u'0')]

In [22]:
# Selecting stop times of only the stops that are in our filtered stop list (15 km radius from Zurich HB)
filtered_stop_time = stop_times.where(col('stop_id').isin(list(filtered_stops['stop_id'].values)))
print(filtered_stop_time.count())

# selecting stop time that operate only through accepted times
filtered_stop_time = filtered_stop_time.where((col('arrival_time') > start_day) & (col('departure_time') < end_day))
print(filtered_stop_time.take(3))
print(filtered_stop_time.count())

# leave trips corresponding to service id-s that run Mo-Fri
#filtered_stop_time = filtered_stop_time.where(col('trip_id').isin(allowed_trips_ids))
filtered_stop_time = filtered_stop_time.join(
    allowed_trips_ids, ['trip_id']).cache()
print(filtered_stop_time.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2319312
[Row(trip_id=u'2.TA.1-1-E-j19-1.1.H', arrival_time=u'06:01:00', departure_time=u'06:01:00', stop_id=u'8578679', stop_sequence=24, pickup_type=u'0', drop_off_type=u'0'), Row(trip_id=u'3.TA.1-1-E-j19-1.2.R', arrival_time=u'20:08:00', departure_time=u'20:08:00', stop_id=u'8578679', stop_sequence=1, pickup_type=u'0', drop_off_type=u'0'), Row(trip_id=u'3.TA.1-1-E-j19-1.2.R', arrival_time=u'20:08:00', departure_time=u'20:08:00', stop_id=u'8590314', stop_sequence=2, pickup_type=u'0', drop_off_type=u'0')]
1870681
361520

In [23]:
filtered_stop_time.take(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(trip_id=u'78.TA.1-1-E-j19-1.12.R', arrival_time=u'17:34:00', departure_time=u'17:34:00', stop_id=u'8578679', stop_sequence=1, pickup_type=u'0', drop_off_type=u'0')]

In [24]:
print('Stops Remaining:', filtered_stop_time.select('stop_id').distinct().count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('Stops Remaining:', 1406)

# 1.4 Create joined current_next Dataframe

Here we join <b>filtered_stop_time</b> on itself. We want that each row of the new table contains information of a stations and its next station according to their route. For that reaseon we join <b>filtered_stop_time</b> on itself, where trip_ids are the same and stop_sequence of the first one equal to the stop_sequence-1 for the next one. This way we will have a row corresponding to a station and its next station according to the trip_id they are on. Joined table <b>current_next</b> is saved at 'hdfs:/user/lortkipa/current_next_6_22_Pcor.parquet' and will be used for transport graph generation. For more details of how individual rows look like see data_preprocessing.ipynb.

In [25]:
# self join stop times file so that each row contains infromation of a station and its next stations according
# to their route

from pyspark.sql.functions import lit

t1 = filtered_stop_time.alias('t1')

t2 = (filtered_stop_time
      .withColumn('trip_id_2', col('trip_id'))
      .withColumn('arrival_time_2', col('arrival_time'))
      .withColumn('departure_time_2', col('departure_time'))
      .withColumn('stop_id_2', col('stop_id'))
      .withColumn('stop_sequence_2', col('stop_sequence'))
      .withColumn('stop_sequence_adjusted', col('stop_sequence') -1)
      .select('trip_id_2','arrival_time_2','departure_time_2','stop_id_2', 'stop_sequence_2', 'stop_sequence_adjusted'))

# join current station with its next stations defined by their trip_it and stop_sequence
current_next = t1.join(t2, (t1.trip_id == t2.trip_id_2) & (t1.stop_sequence == t2.stop_sequence_adjusted)).cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
# Save the Generated File for Future use
#current_next.write.parquet('hdfs:/user/lortkipa/current_next_8_18.parquet')
current_next.write.parquet('hdfs:/user/lortkipa/current_next_6_22_Pcor.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
## Read Saved File
current_next = spark.read.parquet('hdfs:/user/lortkipa/current_next_6_22_Pcor.parquet')
print(current_next.take(1))
current_next.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(trip_id=u'1356.TA.26-31-j19-1.12.R', arrival_time=u'16:21:00', departure_time=u'16:21:00', stop_id=u'8591186', stop_sequence=1, pickup_type=u'0', drop_off_type=u'0', trip_id_2=u'1356.TA.26-31-j19-1.12.R', arrival_time_2=u'16:22:00', departure_time_2=u'16:22:00', stop_id_2=u'8591334', stop_sequence_2=2, stop_sequence_adjusted=1)]
root
 |-- trip_id: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- stop_sequence: integer (nullable = true)
 |-- pickup_type: string (nullable = true)
 |-- drop_off_type: string (nullable = true)
 |-- trip_id_2: string (nullable = true)
 |-- arrival_time_2: string (nullable = true)
 |-- departure_time_2: string (nullable = true)
 |-- stop_id_2: string (nullable = true)
 |-- stop_sequence_2: integer (nullable = true)
 |-- stop_sequence_adjusted: integer (nullable = true)

As you can see on the example above first part of the row corresponding to stop 1 and the second part to stop 2 of the '1356.TA.26-31-j19-1.12.R' trip.