# Infrastructure processing

In this notebook, we prepare the data using which we will build our network. Using the timetable data available, we prepare the edges and vertices required to build that graph.  

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from itertools import combinations
from math import radians, cos, sin, asin, sqrt
from pyspark.sql.types import ArrayType, StringType

## Start pyspark 

In [None]:
%%local
import os
import json

username = os.environ['JUPYTERHUB_USER']
namespace = os.environ['CI_NAMESPACE']
project = os.environ['CI_PROJECT']

configuration = dict(
    name = f"{username}-{namespace}-{project}",
    executorMemory = "4G",
    executorCores = 4,
    numExecutors = 10,
    conf = {
        "spark.jars.repositories": "https://repos.spark-packages.org",
    })

ipython = get_ipython()
ipython.run_cell_magic('configure', line="-f", cell=json.dumps(configuration))

In [None]:
%%send_to_spark -i username -t str -n username

In [None]:
# Define the checkpoint folder
checkpoint = 'hdfs:///user/{}/checkpoint/'.format(username)
print('checkpoint created at hdfs:///user/{}/checkpoint/'.format(username))
sc.setCheckpointDir(checkpoint)

## Dealing with train services 

We keep services that run on every business day, i.e. Monday through Friday

In [None]:
# Read calendar data
calendar = spark.read.csv("/data/sbb/csv/timetable/calendar/2019/05/07/calendar.csv",  header=True, inferSchema=True)

In [None]:
# Filter calendar to keep services that run on all 5 business days
business_day_calendar = calendar.filter((col("monday") == 1) & \
                                        (col("tuesday") == 1) & \
                                        (col("wednesday") == 1) & \
                                        (col("thursday") == 1) & \
                                        (col("friday") == 1)).select("service_id")

In [None]:
calendar.show(5)

## Dealing with trips

We keep only the trips whose `service_id` is in the list computed above, i.e. trips that run on typical business day

In [None]:
# Read the trips data
trips = spark.read.csv("/data/sbb/csv/timetable/trips/2019/05/07/trips.csv",  header=True, inferSchema=True)

In [None]:
# Keep the trips that run during the services (i.e. business days)
business_day_trips = business_day_calendar.join(trips, on = "service_id", how = "left_outer").drop("service_id", "direction_id")
business_day_trips.show(5)

## Dealing with routes

We include the route information to the dataframe, i.e. the `agency_id`, the `route_short_name` and the `route_desc`.

In [None]:
# Read the routes data
routes = spark.read.csv("/data/sbb/csv/timetable/routes/2019/05/07/routes.csv",  header=True, inferSchema=True)

In [None]:
routes.show(5)

In [None]:
# Augment the trips data with information about the route (agency, route short name and description)
business_day_routes = business_day_trips.join(routes, on = "route_id", how = "left_outer").drop("route_long_name", "route_type")
business_day_routes.show(5)

## Dealing with stop times

We add the stop times in the dataframe, but we only keep trips (or part of trips) that run after 6 a.m. and arrive before 9 p.m.

In [None]:
# Read stop times data
stop_times = spark.read.csv("/data/sbb/csv/timetable/stop_times/2019/05/07/stop_times.csv",  header=True, inferSchema=True)

In [None]:
# Augment this data with information about trips and stops
business_day_stop_times = business_day_routes.join(stop_times, on = "trip_id", how = "left_outer").drop("pickup_type", "drop_off_type")
business_day_stop_times.show(5)

In [None]:
# Keep stop times occuring during "reasonable" business day hours
business_day_stop_times = business_day_stop_times.filter( (col("departure_time") >= "06:00:00") & (col("arrival_time") < "21:00:00"))

## Dealing with stops

We now load information about the transportation stops. We will filter these stops to retain those inside a 15 km radius around Zurich's main station.

In [None]:
# Read the stops data
stops = spark.read.orc("/data/sbb/orc/geostops/")

In [None]:
# Determining the exact location of Zurich HB and broadcasting it
zurich_HB_location = stops.filter(stops.stop_id == '8503000').select('stop_lat', 'stop_lon').first()
bc_zhb_location = spark.sparkContext.broadcast(zurich_HB_location)

In [None]:
@F.udf
def haversine(lat1, lon1, lat2 = bc_zhb_location.value.stop_lat, lon2 = bc_zhb_location.value.stop_lon):
    '''
        Returns the distance between two points in Km
    '''
    R = 6372.8
    dLat = radians(lat2 - lat1)
    dLon = radians(lon2 - lon1)
    lat1 = radians(lat1)
    lat2 = radians(lat2)

    a = sin(dLat/2)**2 + cos(lat1)*cos(lat2)*sin(dLon/2)**2
    c = 2*asin(sqrt(a))

    return R * c

### Filtering stops within a 15 km radius from Zurich HB

In [None]:
# The geostops dataset filtered to include only the stops that are whithin a 15km radius of Zürich Hauptbahnhof
stops_15_from_zhb = stops.filter(haversine(stops.stop_lat, stops.stop_lon) <= 15.0)

# An broadcast object containing the stop_ids of all the stops in this area
bc_stops_15_from_zhb = spark.sparkContext.broadcast([row.stop_id for row in stops_15_from_zhb.collect()])

In [None]:
@F.udf
def trip_in_15_zhb(stop_list):
    '''
        Checks if the list of stops contains at least one 
        stop from the stops that are in the 15-km radius
        around Zurich HB
    '''
    return any([stop in bc_stops_15_from_zhb.value for stop in stop_list])

Now that we've got a list of stops, we retain trips that have at least one stop contained in the 15-km radius around Zurich HB. 

In [None]:
# We group by trip_id and we store in a list all the stops of a trip_id, 
# along with the departure and arrival time of each stop.
# We keep only the trips where at least one stop is at less than 15 km from Zurich HB
trip_stops_within_15 = business_day_stop_times.groupby('trip_id', 'trip_short_name', 'agency_id', "route_short_name", 'route_desc', 'route_id')\
                                                    .agg(F.collect_list("stop_sequence").alias("sequence"), 
                                                         F.collect_list("stop_id").alias("stops"), 
                                                         F.collect_list("arrival_time").alias("arrival_times"),
                                                         F.collect_list("departure_time").alias("departure_times"))\
                                                    .filter(trip_in_15_zhb("stops") == True)

In [None]:
trip_stops_within_15.show(5)

## Building the underlying graph

Now that we have the required filtered information, we can proceed to prepare the edges for our graph. We decided to have an edge between consecutive stops of a line.

### Creating pairs of consecutive stops

In [None]:
@F.udf(ArrayType(ArrayType(StringType())))
def make_stop_pairs(stop_sequence, stop_list, departures_list, arrivals_list):
    '''
        Given a list of stop ids, creates a list of tuples
        of consecutive stops
    '''
    quadruples = []
    stop_list_sorted = [x for _, x in sorted(zip(stop_sequence, stop_list), key=lambda pair: pair[0])]
    departures_list_sorted = [x for _, x in sorted(zip(stop_sequence, departures_list), key=lambda pair: pair[0])]
    arrivals_list_sorted = [x for _, x in sorted(zip(stop_sequence, arrivals_list), key=lambda pair: pair[0])]

    for i in range(1, len(stop_list)):
        src, dst = stop_list_sorted[i-1], stop_list_sorted[i]
        src_departure = departures_list_sorted[i-1]
        dst_arrival = arrivals_list_sorted[i]
        quadruples.append([src, dst, src_departure, dst_arrival])
     
    return quadruples

In [None]:
# Creating a dataframe containing the edges of our network
# there is an edge between 2 stops if there exists at least one trip passing through both stops
edges = trip_stops_within_15.withColumn('edge', 
                                        F.explode(make_stop_pairs(trip_stops_within_15.sequence, trip_stops_within_15.stops, 
                                                                  trip_stops_within_15.departure_times, trip_stops_within_15.arrival_times))
                                       )\
                            .select(col('edge')[0].alias('src'), 
                                    col('edge')[1].alias('dst'), 
                                    col('edge')[2].alias('src_departure'), 
                                    col('edge')[3].alias('dst_arrival'),
                                    col('trip_id'), 
                                    col('trip_short_name'), 
                                    col('agency_id'), 
                                    col('route_short_name'), 
                                    col('route_desc'),
                                   col('route_id'))

We now drop duplicates to keep one edge per unique source, destination, arrival and departure times, and route id (i.e. unique identifier for a line of transport). This is avoid having multiple for the same trip. 

In [None]:
# Drop duplicate edges with regard to the source, destination, arrival and departure times
edges = edges.dropDuplicates(['src', 'dst', 'src_departure', 'dst_arrival', 'route_id'])

In [None]:
edges.show(5)

### Adding transfer edges (i.e. walking edges)

Using the edges we just created, we build the list of vertices of our graph. 

In [None]:
# From the edges, we extract the stop that needs to be considered for transfers
new_stops = edges.select(col('src').alias('stop_id'))\
                 .union(edges.select(col('dst').alias('stop_id'))).distinct()\
                 .join(stops, on = 'stop_id', how = 'left_outer').cache()

# Copy to be able to perform cross join
new_stops_bis = new_stops.select(col("stop_id").alias("stop_id_bis"), col("stop_lat").alias("stop_lat_bis"), col("stop_lon").alias("stop_lon_bis"))

We create pairs of vertices and keep those that are less than 500m apart. These pairs will correspond to what we call "transfer edges", i.e walking edges that correspond to transfers between closeby stops.

In [None]:
# We create walking edges dataframe containing all the stops between which there is less than 500 m 
walking_edges = new_stops.crossJoin(new_stops_bis)\
                                 .withColumn('distance', haversine(col('stop_lat'), col('stop_lon'), 
                                                                   col('stop_lat_bis'), col('stop_lon_bis'))) \
                                 .filter((col('distance') > 0.0) & (col('distance') <= 0.5))\
                                 .select(col('stop_id').alias('src'), col('stop_id_bis').alias('dst'), 'distance')

## Saving the results

Finally, we save the results on hdfs under our group user folder.

In [None]:
folder_name = "hdfs:///user/theAggregators"

In [None]:
new_stops.write.mode("overwrite").option("compression","gzip").option("header", "True").csv("{}/vertices.csv".format(folder_name))
edges.write.mode("overwrite").option("compression","gzip").option("header", "True").csv("{}/edges.csv".format(folder_name))
walking_edges.write.mode("overwrite").option("compression","gzip").option("header", "True").csv("{}/transfers.csv".format(folder_name))