# Data Pipeline

This notebook contains the initial wrangling and preprocessing of the data provided. We read the data straight into spark, where we do the computations, and write it to HDFS. That way the useful data is easily accessed in our main notebook. 

## Initalize environment

In [None]:
%%local 

import os
import warnings
warnings.simplefilter(action='ignore', category=UserWarning)

username = os.environ['RENKU_USERNAME']
hiveaddr = os.environ['HIVE_SERVER2']
(hivehost,hiveport) = hiveaddr.split(':')
print("Operating as: {0}".format(username))

In [None]:
%load_ext sparkmagic.magics

In [None]:
%%local 
from IPython import get_ipython
# Set up spark session
server = 'http://iccluster029.iccluster.epfl.ch:8998'

packages = """{"packages": "graphframes:graphframes:0.6.0-spark2.3-s_2.11"}"""

# Set application name as "<your_gaspar_id>-final_project"
get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell=f"""{{ "name": "{username}-final_project", "executorMemory": "4G", "executorCores": 4, "numExecutors": 10, "driverMemory": "4G", "conf": {packages}}}"""
)
# Send username to spark channel
get_ipython().run_line_magic(
    "spark", "add -s {0}-final_project -l python -u {1} -k".format(username, server)
)

In [None]:
print('We are using Spark %s' % spark.version)

In [None]:
%%spark

import pyspark.sql.functions as F
from pyspark.sql.window import Window

## Stops and walking connections

We start by loading all the stops in Switzerland, followed by extracting those in the Zurich area. These are then used to derive the walking connections, based on the rules given in the project destription. Therefore, we do not rely on the transfers contained in `transfers.txt`


**Drawbacks**:
- Walking time calculation does only consider straight line distance between two points. Using `transfers.txt` would likely give more accurate measurements

In [None]:
%%spark 
# Load stops data
df = spark.read.orc('/data/sbb/orc/allstops')
df.printSchema()
df.count()

In [None]:
%%spark

# Extract coordinates of Zurich HB as CENTER
ZURICH_HB = df[df['stop_id'] == '8503000'].first()
CENTER = (F.lit(ZURICH_HB.stop_lat), F.lit(ZURICH_HB.stop_lon))

# Haversine distance function
def dist(lat_x, lon_x, lat_y=CENTER[0], lon_y=CENTER[1]):
    return F.acos(
        F.sin(F.radians(lat_x)) * F.sin(F.radians(lat_y)) +
        F.cos(F.radians(lat_x)) * F.cos(F.radians(lat_y)) *
        F.cos(F.radians(lon_x) - F.radians(lon_y))
    ) * F.lit(6371.0)

In [None]:
%%spark

# Extract relevant stops in Zurich area

stops = (
    df.withColumn('distance',
                  dist('stop_lat', 'stop_lon'))
      .filter('distance <= 15')
      .drop('location_type', 'parent_station')
)
stops.show()
stops.count()

In [None]:
# Retrieving walking connections

source = stops.select(F.col('stop_id').alias('source_id'), 
                      F.col('stop_lat').alias('src_lat'), 
                      F.col('stop_lon').alias('src_lon')
                     )
destination = stops.select(F.col('stop_id').alias('destination_id'), 
                     F.col('stop_lat').alias('dest_lat'), 
                     F.col('stop_lon').alias('dest_lon')
                    )

# Cross join stops with itself, remove stops that are joined with itself,
# calculate distance, remove stops that are too far away.
# Finally calculate the walking time, round up to closest minute and
# convert to seconds (to get it on same format as transport connections)

walking_connections = (
    source.crossJoin(destination)
          .filter('source_id <> destination_id')
          .withColumn('distance', 
                      dist('src_lat', 'src_lon', 'dest_lat', 'dest_lon'))
          .filter('distance <= 0.5')
          .withColumn('travel_time',
                      F.ceil(F.col('distance') / F.lit(0.05)) * F.lit(60))
          .drop('src_lat', 'src_lon', 'dest_lat', 'dest_lon', 'distance')
)
walking_connections.show(5)

In [None]:
# Write stops and walking connections to HDFS
stops.write.orc('/group/gutane/stops', mode='overwrite')
walking_connections.write.orc('/group/gutane/walking_connections', mode='overwrite')

## Trip connections

We only view trips that occur between 6AM and 12PM each day, as we consider these reasonable times. As we want a product that can give a trip any arbitrary weekday, we keep only trips that are scheduled every day. In addition, we only keep trips where all stops are within the Zurich area to simplify the search process. Due to this, there will be some stops in the area which are not reachable from the connections in the resulting dataset. This is handled in the `final_project.ipynb`

We transform the format of the data from keeping on entry per stop on a trip, to keep on entry per connection, that is a row contains both the current stop and the next stop on a trip.

**Drawbacks**:
- 6AM and 12PM doesn't capture all trips
- Only looking at trips where all stops are in Zurich area reduces expressiveness of model and makes some legal stops unreachable
- Ideally we would keep track of trips per day, but to keep things simple in our initial product, we only view trips that occur every weekday

In [None]:
%%spark
# Retrieving the last week of stop_times as this is the most relevant
stop_times = spark.read.csv('/data/sbb/csv/stop_times/2020/12/09/stop_times.txt', header=True)
stop_times.printSchema()

In [None]:
# Filter out 'unreasonable' times of day
# We choose to look at trips between 6AM and 12PM
# The filter works because string representation of dates preserves the ordering

stop_times = (
    stop_times.filter('arrival_time >= "06:00:00" and departure_time >= "06:00:00"')
              .filter('arrival_time <= "23:59:59" and departure_time <= "23:59:59"')
              .withColumn('arrival_time', F.to_timestamp('arrival_time'))
              .withColumn('departure_time', F.to_timestamp('departure_time'))
              .withColumn('stop_sequence', F.col('stop_sequence').cast('int'))
)
stop_times.show()

In [None]:
# For efficiency, we collect all legal stop id's in a set, which yield 
# O(1) lookup
legal_stops = set([row.stop_id for row in stops.select('stop_id').collect()])

# Define window over each trip_id, ordered by stop_sequence
window = (
    Window.partitionBy('trip_id')
          .orderBy(F.asc('stop_sequence'))
)

# Although a trip is legal as long as there are at least two legal stops in the trip, 
# we choose for simplicity reasons to only include trips where all stops are legal
# Altough this reduces the the expressiveness of our model, we remove a tricky edge case

# User defined spark sql function which takes a an iterable 
# of stop id's and returns true if all stops are in legal_stops
@F.udf
def is_legal(stop_ids): 
    masks = [int(stop in legal_stops) for stop in stop_ids]
    return sum(masks) == len(masks)

# Collect the list of all stops in each trip
legal_mask = (
    is_legal(F.collect_list('stop_id').over(window))
)

# Add is_legal mask to each trip
stop_times = (
    stop_times.select('trip_id', 
                      'arrival_time',
                      'departure_time', 
                      'stop_id', 
                      'stop_sequence', 
                      legal_mask.alias('is_legal'))
)

# Filter on and drop is_legal column
stop_times = stop_times.filter('is_legal == true').drop('is_legal')

In [None]:
stop_times.show(5)
stop_times.count()

In [None]:
# Extract metadata
routes = spark.read.csv('/data/sbb/csv/routes/2020/12/09/routes.txt', header=True)
trips = spark.read.csv('/data/sbb/csv/trips/2020/12/09/trips.txt', header=True)
calendar = spark.read.csv('/data/sbb/csv/calendar/2020/12/09/calendar.txt', header=True)

In [None]:
# The service must be valid in the given period. 
# Since the last end date in the dataset is 12:12:2020, we assume that every 
# entry with this end_date is still valid

# Because our service should give the correct information for an arbitrary 
# normal business day (Monday to Friday), we choose to simplify our model
# by only looking at trips that are valid each day. 

legal_services = (
    calendar.filter('start_date <= "20201209"')
            .filter('monday == "1" and tuesday == "1" and \
                     wednesday == "1" and thursday == "1" and friday == "1"')
            .select('service_id')
)
legal_services.show(5)

In [None]:
# Filter relevant trips by extracting the unique trip_id's
# from stop_times, and join with the newly loaded trips
trip_ids = stop_times.select('trip_id').distinct()
trips = trips.join(trip_ids, on='trip_id', how='inner')

# One entry per legal trip_id
trips.count() == stop_times.select('trip_id').distinct().count()

In [None]:
# Append route information to each trip
trips_with_route_info = trips.join(routes, on='route_id', how='inner')
trips_metadata = trips_with_route_info.select('service_id', 'trip_id', 'route_short_name', 'route_desc')

# And then only keep the trip id's corresponding to legal service groups
trips_metadata = trips_metadata.join(legal_services, on='service_id', how='inner')
trips_metadata.count()

In [None]:
trips_metadata.show(5)

In [None]:
# Join with stop_times to decrease number of connection
print('Number of connections before filtering out invalid services: {}'.format(stop_times.count()))
stop_times = stop_times.join(trips_metadata.select('trip_id'), on='trip_id', how='inner')
print('Number of connections after filtering out invalid services: {}'.format(stop_times.count()))

In [None]:
%%spark

# To extract stops that are connected by a trip, we define a window function partitioned over each trip_id, 
# and ordered by the stop_sequence (ascending). That way we can access subsequent stops with F.lag

from pyspark.sql.window import Window

window = (
    Window.partitionBy('trip_id')
          .orderBy(F.asc('stop_sequence'))
)

# Column of each outgoing connection (same as stop_id)
source = (
    F.lag('stop_id', 0).over(window)
)

# Column of destination for outgoing connection
destination = (
    F.lag('stop_id', -1).over(window)
)

dest_arrival_time = (
    F.lag('arrival_time', -1).over(window)
)
dest_departure_time = (
    F.lag('departure_time', -1).over(window)
)

# Travel time from previous stop to next (for ML model)
travel_time = (
    (F.lag('arrival_time', 0).over(window).cast('long') - 
    F.lag('departure_time', 1).over(window).cast('long'))
)

In [None]:
# Collect all edges, plus meta data, in one dataframe
# Important: 
    # travel_time refers to the time it took from the stop prior to source stop
    # this is a predictive feature for delay model, as the time spent 
    # on the journey is more relevant for the delay at the stop than the time that will be spent
connections = (
    stop_times.select(
        'trip_id', 
        source.alias('src_id'), 
        F.col('arrival_time').alias('src_arrival_time'), 
        F.col('departure_time').alias('src_departure_time'),
        destination.alias('dest_id'),
        dest_arrival_time.alias('dest_arrival_time'),
        dest_departure_time.alias('dest_departure_time'),
        'stop_sequence',
        travel_time.alias('travel_time_from_prev'),
    )
)

# Total travel time for the trip after reaching source (for ML model)
connections = (
    connections.withColumn('cummulated_travel_time',
                           F.sum('travel_time_from_prev')
                            .over(window.rowsBetween(Window.unboundedPreceding, 0))
                          )
)

# Add metadata so it doesn't have to be saved on separate file
# We sacrifice normalization for convenience
connections = connections.join(trips_metadata, on='trip_id', how='left_outer') \
                         .drop('service_id')

connections.count()

Which is a reasonable size for a pandas dataframe

In [None]:
# Finally write stop connections to file
connections.write.orc('/group/gutane/connections', mode='overwrite')

## Realtime data from Istdaten
We keep only the rows where `AN_PROGNOSE_STATUS` and `AB_PROGNOSE_STATUS` are either \"REAL\", \"GESCHAETZT\" or \"PROGNOSE\", as these are the fields we´re interested in. When exploring the dataset, we found a lot of duplicate values. We identified (`trip_date`, `trip_id` and `scheduled_arrival_time`) to be the columns in which we wanted distinct values over, as there often where several values for `actual_arrival_time` per compbination of `trip_date` and `trip_id`. This caused trouble when calculating travel times between two stops. The transformation of the rows is done similarly to what we did with `stop_times.txt`

**Drawbacks**:
- After transforming the dataset to show subsequent stops in each trip, we found ~20k rows where the departure time from the previous stops was dated after the arrival time at the current stop. This resulted in negative travel times between stops. The rows we inspected revealed that these inconsistencies were caused by duplicated entries. We therefore just removed them. However, this assumption might not hold for every row, potentially resulting in inaccurate results

In [None]:
%%spark
# Retrieving the last week of stop_times as this is the most relevant

# After extracting our columns of interest, we found a lot of duplicate values.
# These made it hard to distinguish between different trips under the same trip_id. 
# In addition, there are sometimes several rows for the same stop, differing only in the 
# actual departure and arrival times
# We therefore continue with only the distinct rows over trip_date, trip_id and scheduled_arrival_time

istdaten = spark.read.orc('/data/sbb/orc/istdaten')

istdaten = (
    istdaten.filter('AN_PROGNOSE_STATUS == "REAL" or AN_PROGNOSE_STATUS == "GESCHAETZT" or AN_PROGNOSE_STATUS == "PROGNOSE"')
            .filter('AB_PROGNOSE_STATUS == "REAL" or AB_PROGNOSE_STATUS == "GESCHAETZT" or AB_PROGNOSE_STATUS == "PROGNOSE"')
            .select(F.to_timestamp('BETRIEBSTAG', 'dd.MM.yyyy').alias('trip_date'), 
                    F.col('FAHRT_BEZEICHNER').alias('trip_id'), 
                    F.col('PRODUKT_ID').alias('transport_type'),
                    F.col('BPUIC').alias('stop_id'),
                    F.to_timestamp('ANKUNFTSZEIT', 'dd.MM.yyyy HH:mm').alias('scheduled_arrival_time'),
                    F.to_timestamp('AN_PROGNOSE', 'dd.MM.yyyy HH:mm:ss').alias('actual_arrival_time'),
                    F.to_timestamp('ABFAHRTSZEIT', 'dd.MM.yyyy HH:mm').alias('scheduled_departure_time'),
                    F.to_timestamp('AB_PROGNOSE', 'dd.MM.yyyy HH:mm:ss').alias('actual_departure_time')
                   )
            .dropDuplicates(['trip_date', 'trip_id', 'scheduled_arrival_time']) 
)

In [None]:
# Need to extract each trip, similar to the structure in GTFS data
# Can then use the cummulated length and stop number as features

from pyspark.sql.window import Window

# Define window over each trip_id, ordered by stop_sequence
window = (
    Window.partitionBy('trip_id', 'trip_date')
          .orderBy(F.asc('scheduled_arrival_time'))
)

# Extract travel time
travel_time = (
    (F.lag('scheduled_arrival_time', -1).over(window).cast('long') - 
     F.lag('scheduled_departure_time', 0).over(window).cast('long'))
)

delay_departure = (
    (F.lag('scheduled_departure_time', 1).over(window).cast('long') - 
     F.lag('actual_departure_time', 1).over(window).cast('long'))
)

# Select relevant columns
istdaten = istdaten.select('trip_date', 'trip_id', 'transport_type', 'stop_id', 'scheduled_arrival_time',
                           'actual_arrival_time', 'scheduled_departure_time', 'actual_departure_time', 
                           travel_time.alias('travel_time'), delay_departure.alias('delay_departure')
                          )
# After removing the duplicates, we found roughly 20k rows with negative travel times
# After further inspections, these seemed to be faulty inserts in the middle of valid trips
# We therefore assume this is the case for all, and remove all rows with negative travel times,
# before calculating the cumulative sum

istdaten = istdaten.filter('travel_time >= 0')

istdaten = \
    istdaten.withColumn('cumulated_travel_time', 
                         F.sum('travel_time')
                          .over(window.rowsBetween(Window.unboundedPreceding, 0))
                        )


In [None]:
# Filter out rows where the arrival time is null, as we don't need
# the first stop of each trip in the predictive model
# We don't filter on departure times as this would remove the 
# the last stop in each trip, which we need for our model

istdaten = istdaten.filter('actual_arrival_time is not null') \
                   .filter('scheduled_arrival_time is not null') 

In [None]:
# Finally write to file
istdaten.write.orc('/group/gutane/istdaten_distinct', mode='overwrite')