## Setup Spark environment

In [None]:
%%local
import os
import json

username = os.environ['JUPYTERHUB_USER']

configuration = dict(
    name = "%s-final-schedule" % username,
    executorMemory = "4G",
    executorCores = 4,
    numExecutors = 10,
    driverMemory = "4G",
    conf = {
        # "spark.pyspark.python": "/opt/anaconda3/bin/python3", # Use python3
        "spark.jars.repositories": "https://repos.spark-packages.org",
        "spark.jars.packages": "graphframes:graphframes:0.7.0-spark2.3-s_2.11"
    }
)

# set the application name as "<your_gaspar_id>-homework3"
get_ipython().run_cell_magic('configure', line="-f", cell=json.dumps(configuration))

In [None]:
%%send_to_spark -i username -t str -n username

In [None]:
print('We are using Spark %s' % spark.version)

In [None]:
sc.addPyFile('graphframes_graphframes-0.7.0-spark2.3-s_2.11.jar')

In [None]:
from graphframes import *

## Load data into spark dataframes

In [None]:
stops = spark.read.orc('/data/sbb/orc/geostops')
stop_times = spark.read.csv("/data/sbb/csv/timetable/stop_times/2019/05/07/stop_times.csv", header=True).drop('pickup_type', 'drop_off_type')
routes = spark.read.csv('/data/sbb/csv/timetable/routes/2019/05/07/routes.csv', header=True )
calendar = spark.read.csv('/data/sbb/csv/timetable/calendar/2019/05/07/calendar.csv', header=True).drop('start_date','end_date')
trips = spark.read.csv('/data/sbb/csv/timetable/trips/2019/05/07/trips.csv', header=True)

## Filter out stops out of the 15km radius from Zürich HB

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import acos, asin, cos, sin, lit, toRadians, sqrt
from pyspark.sql import Window


def haversine(theta):
    return (lit(1) - cos(theta)) / lit(2)

def haversine_dist(latitude_x, longitude_x, latitude_y, longitude_y):
    latitude_x, longitude_x, latitude_y, longitude_y = toRadians(latitude_x), toRadians(longitude_x),\
                                                       toRadians(latitude_y), toRadians(longitude_y)
    h = haversine(latitude_x - latitude_y) + cos(latitude_x) * cos(latitude_y) * haversine(longitude_x - longitude_y)
    earth_radius = 6371.0
    return acos(lit(1) - lit(2) * h) * earth_radius

In [None]:
# Leave only stops in 15 km radius
zurich_HB_lat, zurich_HB_lon = 47.378177, 8.540192
stops = stops.withColumn('distance_zurich_HB', haversine_dist(lit(zurich_HB_lat), lit(zurich_HB_lon), stops.stop_lat, stops.stop_lon))
stops = stops.filter(stops.distance_zurich_HB <= 15)

## Remove duplicate trips on each day

In [None]:
all_trip_times = trips.join(stop_times, 'trip_id').join(calendar, 'service_id').cache()

In [None]:
all_trip_times.show()

Add column start time of service for a trip. Then each trip will have a starting time. Sort by length of `ord_stops` (largest on top), drop duplicates on route_id, direction, start_time. This will keep most generic trip on that route starting at a given time.

In [None]:
@F.udf
def sort_array(array):
    return sorted(array)

def get_daily_trips(df, day):
    day_trips = df.filter(day + ' = "1"')

    groupby_cols = ['trip_id', 'route_id', 'direction_id', 'trip_short_name', 'service_id']
    res = day_trips.withColumn("ord_stop", F.concat(F.col('stop_sequence'), F.lit(';'), F.col('stop_id')))\
                   .groupby(groupby_cols)\
                   .agg({x : 'collect_list' for x in day_trips.columns + ['ord_stop'] if x not in groupby_cols})\
                   .withColumn('departure_times', sort_array(F.col('collect_list(departure_time)')))\
                   .withColumn('arrival_times', sort_array(F.col('collect_list(arrival_time)')))\
                   .withColumn('ord_stops', sort_array(F.col('collect_list(ord_stop)')))
                
    cc = ['route_id', 'departure_times', 'arrival_times', 'ord_stops']
    return res.drop_duplicates(cc).select('trip_id').withColumn('weekday', F.lit(day))

In [None]:
from pyspark.sql.types import StructType,StructField, StringType

daily_trips_schema = StructType([
  StructField('trip_id', StringType(), True),
  StructField('weekday', StringType(), True)
  ])
daily_trips = spark.createDataFrame([], daily_trips_schema)

weekdays = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday']

for weekday in weekdays:
    daily_trips = daily_trips.union(get_daily_trips(all_trip_times, weekday))

In [None]:
daily_trips.count()

In [None]:
df.count()

## Compute edge lists

### Distances between stops and walking edges

In [None]:
stops_start = stops.withColumnRenamed('stop_id', 'start_vertex')\
                   .withColumnRenamed('stop_lat', 'stop_lat_start')\
                   .withColumnRenamed('stop_lon', 'stop_lon_start')\
              
stops_end = stops.withColumnRenamed('stop_id', 'end_vertex')\
                 .withColumnRenamed('stop_lat', 'stop_lat_end')\
                 .withColumnRenamed('stop_lon', 'stop_lon_end')\
              

all_distances = stops_start.crossJoin(stops_end).withColumn('distance', haversine_dist(F.col('stop_lat_start'), F.col('stop_lon_start'),
                                                                                       F.col('stop_lat_end'), F.col('stop_lon_end')))

walking_speed = 0.05
walking_edges = all_distances.filter((F.col('distance') <= 0.5) & (F.col('start_vertex') != F.col('end_vertex')))\
               .withColumn('duration', F.col('distance') / walking_speed)\
               .withColumn('start_time', F.lit(-1))\
               .withColumn('trip_id', F.lit('-1'))\
               .withColumn('route_id', F.lit('walking'))\
               .select('start_vertex', 'end_vertex', 'start_time', 'duration', 'trip_id', 'route_id')

### Public transport edge list

In [None]:
@F.udf
def hour(timestamp):
    return timestamp[:2]

# keep only reasonable hours
min_day_hour, max_day_hour = 6, 22
all_info = all_info.filter(hour(F.col('arrival_time')).cast('int').between(min_day_hour, max_day_hour))

In [None]:
@F.udf
def minutes(timestamp):
    return int(timestamp[:2]) * 60 + int(timestamp[3:5])

# needs columns :{trip_id, stop_sequence, arrival_time, departure_time}
def get_edges(trip_info):
    
    window = Window.partitionBy('trip_id').orderBy(F.col('stop_sequence').cast('int'))

    edges = trip_info.withColumn('arrival_time_minutes', minutes(F.col('arrival_time')).cast('int'))
    edges = edges.withColumn('departure_time_minutes', minutes(F.col('departure_time')).cast('int'))
    
    edges = edges.withColumn("prev_departure_minutes", F.lag(F.col('departure_time_minutes')).over(window))
    edges = edges.withColumn("duration", F.col('arrival_time_minutes') - F.col('prev_departure_minutes'))
    
    edges = edges.withColumn("start_vertex", F.lag(F.col('stop_id')).over(window))
    edges = edges.withColumnRenamed("stop_id", "end_vertex")
    edges = edges.withColumnRenamed('prev_departure_minutes', 'start_time')
    
    edges = edges.filter("prev_departure_minutes is not null") # removes start of trip
    
    return edges.select('start_vertex', 'end_vertex', 'start_time', 'duration', 'trip_id', 'route_id')

In [None]:
transport_edges = get_edges(all_info).join(daily_trips, 'trip_id').cache()

In [None]:
transport_edges.show()

In [None]:
transport_edges.filter('start_vertex = "8503064" and end_vertex = "8503065:0:1" and start_time = "641"').show()

In [None]:
transport_edges.groupBy(transport_edges.columns[1:]).count().filter('count > 1').count()

In [None]:
from pyspark.sql.functions import collect_set
s = transport_edges.groupby(['start_vertex', 'end_vertex', 'start_time', 'route_id']).agg(collect_set('duration'), collect_set('trip_id'))

In [None]:
s.show()

In [None]:
ss = s.withColumn("ss", F.size(F.col('collect_set(duration)')))

### Reachable edges from Zurich HB

In [None]:
graph_edges = transport_edges.select('start_vertex', 'end_vertex')\
                             .union(walking_edges.select('start_vertex', 'end_vertex'))\
                             .distinct()

In [None]:
def filter_unreachable(vertices, edges):
    v = vertices.withColumnRenamed('stop_id', 'id')
    e = edges.withColumnRenamed('start_vertex', 'src').withColumnRenamed('end_vertex', 'dst')
    
    g = GraphFrame(v, e)
    
    cc = g.connectedComponents(algorithm='graphx')
    
    zurich_component = cc.filter("id == '8503000'").select('component').collect()[0][0]
    
    return cc[cc.component == zurich_component].select('id').withColumnRenamed('id', 'stop_id')

In [None]:
reachable_stops = filter_unreachable(stops.select('stop_id').distinct(), graph_edges)

In [None]:
reachable_edges = graph_edges.join(reachable_stops, graph_edges.start_vertex == reachable_stops.stop_id)\
                             .select('start_vertex', 'end_vertex')\
                             .join(reachable_stops, graph_edges.end_vertex == reachable_stops.stop_id)\
                             .select('start_vertex', 'end_vertex')

In [None]:
reachable_edges.show()

### Save Results

In [None]:
reachable_stops.write.parquet('/user/%s/final/parquet/reachable_stops' % username)
reachable_edges.write.parquet('/user/%s/final/parquet/reachable_edges' % username)

In [None]:
%%local
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(user='ebouille') # impersonate ebouille to read the file
hdfs.ls('/user/%s/final/parquet' % username)

In [None]:
calendar.show()