# Start a Spark instance

In [2]:
%%local
import os
import json
username = os.environ['JUPYTERHUB_USER']
namespace = os.environ['CI_NAMESPACE']
project = os.environ['CI_PROJECT']

configuration = dict(
    name = f"{username}-{namespace}-{project}",
    executorMemory = "4G",
    executorCores = 1,
    numExecutors = 1,
    conf = {
        # "spark.pyspark.python": "/opt/anaconda3/bin/python3", # Use python3
        "spark.jars.repositories": "https://repos.spark-packages.org",
        "spark.jars.packages": "graphframes:graphframes:0.7.0-spark2.3-s_2.11"
    }
                
)

from IPython import get_ipython
ipython = get_ipython()
ipython.run_cell_magic('configure', line="-f",  cell=json.dumps(configuration))

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
7135,application_1618324153128_6839,pyspark,idle,Link,Link,,
7137,application_1618324153128_6842,pyspark,idle,Link,Link,,
7139,application_1618324153128_6844,pyspark,idle,Link,Link,,
7140,application_1618324153128_6845,pyspark,idle,Link,Link,,
7141,application_1618324153128_6846,pyspark,idle,Link,Link,,


In [3]:
sc.addPyFile('graphframes_graphframes-0.7.0-spark2.3-s_2.11.jar')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
7142,application_1618324153128_6847,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load the data 

In [5]:
import pyspark.sql.functions as f
import math

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
stop_times = spark.read.csv("/data/sbb/csv/timetable/stop_times/2019/05/07/stop_times.csv", header=True)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
stops = spark.read.orc("/data/sbb/orc/geostops/") 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
calendar = spark.read.csv("/data/sbb/csv/timetable/calendar/2019/05/07/calendar.csv",header = "True") 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
trips = spark.read.csv("/data/sbb/csv/timetable/trips/2019/05/07/trips.csv",header = "True") 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
routes =  spark.read.csv("/data/sbb/csv/timetable/routes/2019/05/07/routes.csv",header = "True") 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Data pre-processing 

## filter the stops within 15km of Zurich

In [11]:
from math import sin, cos, sqrt, atan2, radians,asin
import numpy as np

# Given two location parameters and calculate the distance between them. 
def calc_dist(lat_1, lat_2, lon_1, lon_2):
    R = 6378 
    dlat = radians(lat_1 - lat_2)
    dlon = radians(lon_1 - lon_2)
    a = sin(dlat/2) * sin(dlat/2) + cos(radians(lat_2)) * cos(radians(lat_1)) * sin(dlon/2) * sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = R * c
    return d

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Add the Zurich HB location to the stop dataset

In [12]:
# Zürich HB (8503000), (lat, lon) = (47.378177, 8.540192).
lat_zur = 47.378177
lon_zur = 8.540192

temp_stops = stops.withColumn('lat_zur', f.lit(lat_zur))
temp_stops = temp_stops.withColumn('lon_zur', f.lit(lon_zur))

temp_stops = temp_stops.withColumn('stop_lat', temp_stops.stop_lat.cast("float")).withColumn('stop_lon', temp_stops.stop_lon.cast("float")).withColumn('lat_zur', temp_stops.lat_zur.cast("float")).withColumn('lon_zur', temp_stops.lon_zur.cast("float"))

temp_stops.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------+---------+--------+-------------+--------------+---------+--------+
|    stop_id|     stop_name| stop_lat|stop_lon|location_type|parent_station|  lat_zur| lon_zur|
+-----------+--------------+---------+--------+-------------+--------------+---------+--------+
|    8711790|Bouilly Mairie|48.191723|3.996982|         null|              |47.378178|8.540192|
|    8573112|        Gwüest|46.652576|8.516478|         null|              |47.378178|8.540192|
|8014471:0:3|      Albbruck| 47.59236|8.131775|         null|      8014471P|47.378178|8.540192|
+-----------+--------------+---------+--------+-------------+--------------+---------+--------+
only showing top 3 rows

Calculate all the stops distance to Zurich HB

In [14]:
from pyspark.sql.types import FloatType,IntegerType 
# User defined function
udf_func = f.udf(calc_dist, FloatType())

# Calculate the distance from each stop to Zurich HB
stops_filter = temp_stops.withColumn('dist_to_zurich_km', 
                                   udf_func(temp_stops.lat_zur, temp_stops.stop_lat, temp_stops.lon_zur, temp_stops.stop_lon)).drop(*['lat_zur', 'lon_zur'])

# Filter for stops within 15 km radius from Zurich HB
stops_filter = stops_filter.filter(stops_filter.dist_to_zurich_km <= 15.0)

stops_filter.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+---------+--------+-------------+--------------+-----------------+
|stop_id|           stop_name| stop_lat|stop_lon|location_type|parent_station|dist_to_zurich_km|
+-------+--------------------+---------+--------+-------------+--------------+-----------------+
|8557033|Oberhasli, Industrie|47.459267|8.490014|         null|              |         9.785882|
|8573711|   Zürich, Sädlenweg|47.367756| 8.48748|         null|              |          4.13962|
|8591828|    Ebmatingen, Dorf| 47.35139|8.641003|         null|              |        8.1649275|
+-------+--------------------+---------+--------+-------------+--------------+-----------------+
only showing top 3 rows

Save the daset to hdfs so that we can handle it locally. Please attentation, we have already written this file. If you want to write again,please change the user name.

In [155]:
#stops_filter.write.parquet('/user/{0}/stops_node.parquet'.format("kfu")) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Filter the Route operating on weekdays

In [17]:
valid_service_id = calendar.filter((calendar.monday == 1) & (calendar.tuesday == 1)& (calendar.wednesday == 1) & (calendar.thursday == 1) & (calendar.friday == 1))

trip_filter = trips.join(valid_service_id, 'service_id', 'inner').select(['service_id', 'route_id', 'trip_id',"trip_short_name", "direction_id"])
trip_filter = trip_filter.join(routes, 'route_id', 'inner').select(['service_id', 'route_id', 'trip_id', "direction_id",'route_desc','route_type','route_short_name'])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Use stop_times.csv to filter the stops operated from Monday to Friday

In [18]:
stop_times_filter = stop_times.join(trip_filter, 'trip_id', 'inner').drop(*['pickup_type', 'drop_off_type'])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Filter the stops within 15km of Zurich stations

In [19]:
stop_final = stop_times_filter.join(stops_filter, 'stop_id', 'inner').drop(*['stop_lat', 'stop_lon','location_type', 'dist_to_zurich_km','route_type'])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Filter trips running on the working hours of a day (9am to 5pm)

In [20]:
# Creating 2 new columns where the hour is extracted from the arrival and departure timestamps
timetable = stop_final.withColumn('arrive', stop_final.arrival_time.substr(1, 2).cast(IntegerType()))
timetable = timetable.withColumn('departure', stop_final.departure_time.substr(1, 2).cast(IntegerType()))

# Filter such that we only keep times between 9-17
timetable = timetable.where((timetable.arrive >= 9) 
                                        & (timetable.arrive <= 17) 
                                        & (timetable.departure <= 17) 
                                        & (timetable.departure >= 9))

# Drop unecessary columns
timetable = timetable.drop(*['arrive', 'departure','parent_station'])

timetable_temp = timetable.withColumn('arrival_timestamp', f.unix_timestamp(timetable.arrival_time, 'HH:mm:ss'))
timetable_temp = timetable_temp.withColumn('departure_timestamp', f.unix_timestamp(timetable.departure_time, 'HH:mm:ss'))
timetable_temp.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Save the daset to hdfs so that we can handle it locally. Please attentation, we have already written this file. If you want to write again,please change the user name.

In [165]:
#timetable_temp.write.parquet('/user/{0}/timetable.parquet'.format("kfu")) # the file will be owned by ebouille

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Construct the dataframe of edges

In [21]:
# Group the data
timetable_for_edges = timetable.groupBy(['trip_id']).agg(f.collect_list('stop_id').alias('stop_ids'),                                                    
                                                    f.collect_list('arrival_time').alias('arrival_time'),
                                                    f.collect_list('departure_time').alias('departure_time')            
                                                    
                                                          )


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
row_list = []
for iteration in timetable_for_edges.collect():
    trip_id = iteration['trip_id']
    stop_ids = iteration['stop_ids']
    arr_time = iteration['arrival_time']
    dept_time = iteration['departure_time']
    for i in range(len(stop_ids)-1):
        for j in range(i+1,len(stop_ids)):
            dict1 = dict(src= stop_ids[i], dst =stop_ids[j],trip_id = trip_id ,dept_from_src = dept_time[i], arr_to_dst = arr_time[j])
            row_list.append(dict1) 
df_edge = spark.createDataFrame(row_list)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



add the timestamp and then calculate the edge time 

In [23]:
df_edge_new = df_edge.withColumn('dept', f.unix_timestamp(df_edge.dept_from_src, 'HH:mm:ss'))
df_edge_new = df_edge_new.withColumn('arr', f.unix_timestamp(df_edge_new.arr_to_dst, 'HH:mm:ss'))
df_edge_new = df_edge_new.withColumn("edge_time", (df_edge_new.arr- df_edge_new.dept)/60)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

add the line number into the dataframe

In [25]:
temp = timetable.groupby("trip_id").agg(f.avg(timetable.route_short_name).cast("int").alias("line_number"))
df_edge_new = df_edge_new.join(temp,"trip_id")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df_edge_to_graph = df_edge_new.select("src","dst","line_number","edge_time","dept_from_src","arr_to_dst","trip_id")
df_edge_to_graph.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+-----------+---------+-------------+----------+--------------------+
|    src|        dst|line_number|edge_time|dept_from_src|arr_to_dst|             trip_id|
+-------+-----------+-----------+---------+-------------+----------+--------------------+
|8503064|8503065:0:1|         18|      4.0|     10:41:00|  10:45:00|1.TA.26-18-j19-1.1.H|
|8503064|    8503074|         18|      5.0|     10:41:00|  10:46:00|1.TA.26-18-j19-1.1.H|
|8503064|    8503068|         18|      6.0|     10:41:00|  10:47:00|1.TA.26-18-j19-1.1.H|
+-------+-----------+-----------+---------+-------------+----------+--------------------+
only showing top 3 rows

Save the daset to hdfs so that we can handle it locally. Please attentation, we have already written this file. If you want to write again,please change the user name.

In [171]:
#df_edge_to_graph.write.parquet('/user/{0}/stops_edge.parquet'.format("kfu")) # the file will be owned by ebouille

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


## Find two stops within 500 meters and construct the walking edges


In [27]:
temp_A = stops_filter.select(f.col("stop_id").alias("stop_id_A"),
                    f.col("stop_name").alias("stop_name_A"),
                    f.col("stop_lat").alias("stop_lat_A"),
                    f.col("stop_lon").alias("stop_lon_A")
                    )
temp_B = stops_filter.select(f.col("stop_id").alias("stop_id_B"),
                    f.col("stop_name").alias("stop_name_B"),
                    f.col("stop_lat").alias("stop_lat_B"),
                    f.col("stop_lon").alias("stop_lon_B")
                    )
crossjoin_stops =  temp_A.crossJoin(temp_B)




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
crossjoin_stops = crossjoin_stops.withColumn('distance_km', udf_func(crossjoin_stops.stop_lat_A, crossjoin_stops.stop_lat_B, crossjoin_stops.stop_lon_A, crossjoin_stops.stop_lon_B))
crossjoin_stops = crossjoin_stops.filter((crossjoin_stops.distance_km <= 0.5) & (crossjoin_stops.distance_km > 0))
crossjoin_stops.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+----------+----------+---------+--------------------+----------+----------+-----------+
|stop_id_A|         stop_name_A|stop_lat_A|stop_lon_A|stop_id_B|         stop_name_B|stop_lat_B|stop_lon_B|distance_km|
+---------+--------------------+----------+----------+---------+--------------------+----------+----------+-----------+
|  8573711|   Zürich, Sädlenweg| 47.367756|   8.48748|  8591214|   Zürich, In der Ey|  47.36924|  8.491631| 0.35383454|
|  8573711|   Zürich, Sädlenweg| 47.367756|   8.48748|  8591163|Zürich, Goldackerweg|  47.37189|  8.487929| 0.46155486|
|  8590610|Fällanden, Schütz...| 47.368626|  8.632478|  8590612|      Fällanden, Zil| 47.372246|  8.636214|  0.4916792|
+---------+--------------------+----------+----------+---------+--------------------+----------+----------+-----------+
only showing top 3 rows

In [29]:
def cal_walking_time(distance):
    return distance/0.05
udf_cal_walking_time = f.udf(cal_walking_time,FloatType())
walking_edge = crossjoin_stops.select(f.col("stop_id_A").alias("src"),f.col("stop_id_B").alias("dst"),"distance_km")
walking_edge = walking_edge.withColumn("edge_time",udf_cal_walking_time(walking_edge.distance_km) ).drop(*["distance_km"]) 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Add the corresponding attributes to the walking edges

In [216]:
walking_edge = walking_edge.withColumn('line_number', f.lit("walking")) \
                            .withColumn('dept_from_src', f.lit(0)) \
                            .withColumn('arr_to_dst', walking_edge.edge_time) \
                            .withColumn('trip_id', f.lit("walking"))



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
Please attentation, we have already written this file. If you want to write again,please change the user name.

An error was encountered:
Invalid status code '404' from http://iccluster040.iccluster.epfl.ch:8998/sessions/7142 with error payload: "Session '7142' not found."


In [217]:
#walking_edge.write.parquet('/user/{0}/walking_edge_with_attr.parquet'.format("kfu")) # the file will be owned by ebouille

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…