# Building the stochastic timetables

### Creating Spark Session

In [1]:
import os
%load_ext sparkmagic.magics
from datetime import datetime
username = os.environ['RENKU_USERNAME']
server = server = "http://iccluster029.iccluster.epfl.ch:8998"
from IPython import get_ipython
get_ipython().run_cell_magic('spark', line="config", 
                             cell="""{{ "name":"{0}-final_project", "executorMemory":"4G", "executorCores":4, "numExecutors":10 }}""".format(username))



In [2]:
get_ipython().run_line_magic(
    "spark", "add -s {0}-final_project -l python -u {1} -k".format(username, server)
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6594,application_1652960972356_2135,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [3]:
%%spark
## SPARK IMPORTS
from functools import reduce
from pyspark.sql.functions import col, lit, unix_timestamp, from_unixtime, collect_list
from pyspark.sql.functions import countDistinct, concat
from pyspark.sql.functions import udf, explode, split
from pyspark.sql.types import ArrayType, StringType

REMOTE_PATH = "/group/abiskop1/project_data/"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [98]:
%%spark
real_time = spark.read.orc("/data/sbb/part_orc/istdaten")

arrivals = spark.read.csv(REMOTE_PATH + "arrivalsRouteStops.csv", header='true', inferSchema='true')
arrivals = arrivals.withColumn("route_id", udf(lambda end_id : end_id.split("$")[0])(col("end_route_stop_id")))

print("The Schema is :")
real_time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The Schema is :
DataFrame[betriebstag: string, fahrt_bezeichner: string, betreiber_id: string, betreiber_abk: string, betreiber_name: string, produkt_id: string, linien_id: string, linien_text: string, umlauf_id: string, verkehrsmittel_text: string, zusatzfahrt_tf: string, faellt_aus_tf: string, bpuic: string, haltestellen_name: string, ankunftszeit: string, an_prognose: string, an_prognose_status: string, abfahrtszeit: string, ab_prognose: string, ab_prognose_status: string, durchfahrt_tf: string, year: int, month: int]

As I don't speak german :

In [99]:
%%spark
mapping =    [['BETRIEBSTAG', 'date'],
    ['FAHRT_BEZEICHNER', "trip_id"],
    ['BETREIBER_ABK', 'operator'],
    ["BETREIBER_NAME", "operator_name"],
    ["PRODUCT_ID", "type_transport"],
    ["LINIEN_ID"," for trains, this is the train number"],
    ["LINIEN_TEXT","type_service_1"], 
    ["VERKEHRSMITTEL_TEXT","type_service_2"],
    ["ZUSATZFAHRT_TF","additional_trip"],
    ["FAELLT_AUS_TF","trip_failed"],
    ["HALTESTELLEN_NAME","STOP_NAME"],
    ["ANKUNFTSZEIT","arrival_time_schedule"],
    ["AN_PROGNOSE","arrival_time_actual"],
    ["AN_PROGNOSE_STATUS","measure_method_arrival"],
    ["ABFAHRTSZEIT","departure_time_schedule"],
    ["AB_PROGNOSE","departure_time_actual"],
    ["AB_PROGNOSE_STATUS","measure_method_arrival"],
    ["DURCHFAHRT_TF","does_stop_here"]]


for de_name, en_name in mapping:
    real_time = real_time.withColumnRenamed(de_name, en_name)
    
print("Final Schema :")
real_time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Final Schema :
DataFrame[date: string, trip_id: string, betreiber_id: string, operator: string, operator_name: string, produkt_id: string,  for trains, this is the train number: string, type_service_1: string, umlauf_id: string, type_service_2: string, additional_trip: string, trip_failed: string, bpuic: string, STOP_NAME: string, arrival_time_schedule: string, arrival_time_actual: string, measure_method_arrival: string, departure_time_schedule: string, departure_time_actual: string, measure_method_arrival: string, does_stop_here: string, year: int, month: int]

### Restricting the station to the selected ones where transports arrive

In [100]:
%%spark

stations = arrivals.select("STOP_NAME").dropDuplicates()
print("Before selection real_data size : ", real_time.count())
real_time = real_time.join(stations, "STOP_NAME")
print("After selection real_data size : ", real_time.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('Before selection real_data size : ', 2104333948)
('After selection real_data size : ', 438767230)

## EDA of the delay distribution

In [101]:
%%spark
real_time = real_time.withColumn('arrival_time_schedule', unix_timestamp('arrival_time_schedule', "dd.MM.yyyy HH:mm"))
real_time = real_time.withColumn('arrival_time_actual', unix_timestamp('arrival_time_actual', "dd.MM.yyyy HH:mm"))

real_time = real_time.withColumn("arrival_delay", col("arrival_time_actual") - col("arrival_time_schedule"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [102]:
%%spark
analysis = real_time.filter("year == 2021").filter("month == 1")

delays_distrib = analysis.select(["STOP_NAME", "produkt_id", "arrival_delay"]).groupBy(["STOP_NAME", "produkt_id"]).agg(collect_list(col("arrival_delay")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [109]:
%%spark 
delays_distrib

Interrupted by user


In [107]:
%%spark
delays_distrib.coalesce(1).write.format("com.databricks.spark.csv")\
   .option("header", "true").save(REMOTE_PATH + "delays_distrib_jan_2021.csv")

An error was encountered:
Invalid status code '500' from http://iccluster029.iccluster.epfl.ch:8998/sessions/6594/statements with error payload: "java.lang.IllegalStateException: RPC channel is closed."


## Getting the `route_stop_id` of each `trip_id`

In [56]:
%%spark
print("Before selection real_data size : ", real_time.count())
dummy = spark.read.orc("/data/sbb/part_orc/trips")\
             .select(["route_id", "trip_id"])\
             .join(real_time, "trip_id")\
             .withColumn("end_route_stop_id", concat(col("route_id"), lit("$"), col("STOP_NAME"), lit("$A")))
print("Before selection real_data size : ", dummy.count())
print("trips.txt schema :")
trips

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('Before selection real_data size : ', 438767230)
('Before selection real_data size : ', 0)
trips.txt schema :
DataFrame[route_id: string, service_id: string, trip_id: string, trip_headsign: string, trip_short_name: string, direction_id: tinyint, year: int, month: int, day: int]

In [59]:
%%spark
real_time.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+--------------+------------+--------+--------------------+----------+-------------------------------------+--------------+---------+--------------+---------------+-----------+-------+---------------------+-------------------+----------------------+-----------------------+---------------------+----------------------+--------------+----+-----+
|STOP_NAME|      date|       trip_id|betreiber_id|operator|       operator_name|produkt_id| for trains, this is the train number|type_service_1|umlauf_id|type_service_2|additional_trip|trip_failed|  bpuic|arrival_time_schedule|arrival_time_actual|measure_method_arrival|departure_time_schedule|departure_time_actual|measure_method_arrival|does_stop_here|year|month|
+---------+----------+--------------+------------+--------+--------------------+----------+-------------------------------------+--------------+---------+--------------+---------------+-----------+-------+---------------------+-------------------+---------------------