# Set up spark

In [1]:
%load_ext sparkmagic.magics

In [2]:
import os
from IPython import get_ipython

# set the application name as "<your_gaspar_id>-homework3"
username = os.environ['RENKU_USERNAME']
server = "http://iccluster029.iccluster.epfl.ch:8998"

get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell="""{{ "name": "{0}-final", "executorMemory": "4G", "executorCores": 4, "numExecutors": 10, "driverMemory": "4G"}}""".format(username)
)

In [3]:
get_ipython().run_line_magic(
    "spark", "add -s {0}-final -l python -u {1} -k".format(username, server)
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
9313,application_1652960972356_5123,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [4]:
%%spark
import pyspark.sql.functions as functions
import time
from datetime import datetime
from dateutil.parser import parse
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

delayTimeMax = 90
N = 30

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load data
## stops and sbb

In [8]:
%%spark
stops = spark.read.option("header",True).csv('/user/sixu/work/stops_main.csv')
stops = stops.drop("_c0")
stops = stops.withColumnRenamed("main_id", "stop_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Confidence

In [9]:
%%spark

# Calculate cases of trip on time
def is_arrive_on_time(arr_sch, arr_act):
    if arr_sch != '' and arr_act != '':
        #compare the difference of them
        arr_date1 = parse(arr_sch)
        arr_date2 = parse(arr_act)
        res_arr = (arr_date1 - arr_date2).total_seconds()
        if res_arr < -delayTimeMax :
            return 0
    return 1
is_arrive_on_time_udf = functions.udf(is_arrive_on_time,IntegerType())

# Retrive the data based on corrsponding trip_id
def getDataOnDTrip(tripId):
    # filter trip
    sbb_trip = sbb.filter(sbb.trip_id == tripId)
    
    # join and get stop_id
    sbb_trip = sbb_trip.join(stops, sbb_trip.stop_name2==stops.stop_name, "inner")
    sbb_trip = sbb_trip.drop("stop_name2")
    sbb_trip = sbb_trip.withColumnRenamed("main_id", "stop_id")
    
    return sbb_trip

# Retrive all data
def getAllData():
    # read data
    sbb = spark.read.orc('/data/sbb/orc/istdaten')
    # filter by date and not additional trip
    sbb = sbb.filter(sbb.zusatzfahrt_tf=='false')
    # keep stops in 15km radius
    sbb = sbb.join(stops, sbb.haltestellen_name==stops.stop_name, "inner")
    
    # select necessary columns
    sbb = sbb.select("haltestellen_name", "fahrt_bezeichner","faellt_aus_tf","ankunftszeit","an_prognose","abfahrtszeit","ab_prognose","durchfahrt_tf")
    # rename
    newColumns = ["stop_name2","trip_id","failed","arrival_schedule","arrival_actual","departure_schedule","departure_actual","not_stop"]
    sbb = sbb.toDF(*newColumns)
    
    # join and get stop_id
    sbb = sbb.join(stops, sbb.stop_name2==stops.stop_name, "inner")
    sbb = sbb.drop("stop_name2")
    sbb = sbb.withColumnRenamed("main_id", "stop_id")
    return sbb

# Retrive the data based on corrsponding date
def getDataOnDate(date):
    # read data
    sbb = spark.read.orc('/data/sbb/orc/istdaten')
    # filter by date and not additional trip
    sbb = sbb.filter(sbb.betriebstag==date).filter(sbb.zusatzfahrt_tf=='false')
    # keep stops in 15km radius
    sbb = sbb.join(stops, sbb.haltestellen_name==stops.stop_name, "inner")
    
    # select necessary columns
    sbb = sbb.select("haltestellen_name", "fahrt_bezeichner","faellt_aus_tf","ankunftszeit","an_prognose","abfahrtszeit","ab_prognose","durchfahrt_tf")
    # rename
    newColumns = ["stop_name2","trip_id","failed","arrival_schedule","arrival_actual","departure_schedule","departure_actual","not_stop"]
    sbb = sbb.toDF(*newColumns)
    
    # join and get stop_id
    sbb = sbb.join(stops, sbb.stop_name2==stops.stop_name, "inner")
    sbb = sbb.drop("stop_name2")
    sbb = sbb.withColumnRenamed("main_id", "stop_id")
    return sbb

# def getConfidenceByTripId(tripID):
#     sbb_trip =  getDataOnDTrip(tripID)
#     if(sbb_trip.count() == 0): return None
#     sbb_trip = sbb_trip.withColumn("is_arrive_on_time", is_arrive_on_time_udf(sbb_trip.arrival_schedule, sbb_trip.arrival_actual))
#     Trips_confidence = sbb_trip.groupBy("trip_id").agg({"is_arrive_on_time": "avg"}).first()
#     return Trips_confidence['avg(is_arrive_on_time)']

# Get the confidence by trip_id
def getConfidenceByTripId(df, tripID):
    return df.filter(df.trip_id == tripID).first().Confidence
getConfidenceByTripId_udf = functions.udf(getConfidenceByTripId,FloatType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create confidence data based on date ##

In [None]:
%%spark
sbb_all = getAllData()
trip_id_list = getDataOnDate('13.05.2019').select("trip_id").distinct().withColumnRenamed("trip_id", "trip_id2")
sbb_date = sbb_all.join(trip_id_list, trip_id_list.trip_id2 == sbb_all.trip_id, "inner").drop("trip_id2")

sbb_date = sbb_date.withColumn("is_arrive_on_time", is_arrive_on_time_udf(sbb_date.arrival_schedule, sbb_date.arrival_actual))
Trips_confidence = sbb_date.groupBy("trip_id").agg({"is_arrive_on_time": "avg"})\
                            .withColumnRenamed("avg(is_arrive_on_time)", "Confidence")

# Write the results to the work directory
Trips_confidence.write.option("header",True).csv("/user/ymao/work/confidence_13_05_2019.csv")

In [36]:
%%spark
sbb_all = getAllData()
trip_id_list = getDataOnDate('14.05.2019').select("trip_id").distinct().withColumnRenamed("trip_id", "trip_id2")
sbb_date = sbb_all.join(trip_id_list, trip_id_list.trip_id2 == sbb_all.trip_id, "inner").drop("trip_id2")

sbb_date = sbb_date.withColumn("is_arrive_on_time", is_arrive_on_time_udf(sbb_date.arrival_schedule, sbb_date.arrival_actual))
Trips_confidence = sbb_date.groupBy("trip_id").agg({"is_arrive_on_time": "avg"})\
                            .withColumnRenamed("avg(is_arrive_on_time)", "Confidence")

# Write the results to the work directory
Trips_confidence.write.option("header",True).csv("/user/ymao/work/confidence_14_05_2019.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
%%spark
sbb_all = getAllData()
trip_id_list = getDataOnDate('15.05.2019').select("trip_id").distinct().withColumnRenamed("trip_id", "trip_id2")
sbb_date = sbb_all.join(trip_id_list, trip_id_list.trip_id2 == sbb_all.trip_id, "inner").drop("trip_id2")

sbb_date = sbb_date.withColumn("is_arrive_on_time", is_arrive_on_time_udf(sbb_date.arrival_schedule, sbb_date.arrival_actual))
Trips_confidence = sbb_date.groupBy("trip_id").agg({"is_arrive_on_time": "avg"})\
                            .withColumnRenamed("avg(is_arrive_on_time)", "Confidence")

# Write the results to the work directory
Trips_confidence.write.option("header",True).csv("/user/ymao/work/confidence_15_05_2019.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
%%spark
sbb_all = getAllData()
trip_id_list = getDataOnDate('16.05.2019').select("trip_id").distinct().withColumnRenamed("trip_id", "trip_id2")
sbb_date = sbb_all.join(trip_id_list, trip_id_list.trip_id2 == sbb_all.trip_id, "inner").drop("trip_id2")

sbb_date = sbb_date.withColumn("is_arrive_on_time", is_arrive_on_time_udf(sbb_date.arrival_schedule, sbb_date.arrival_actual))
Trips_confidence = sbb_date.groupBy("trip_id").agg({"is_arrive_on_time": "avg"})\
                            .withColumnRenamed("avg(is_arrive_on_time)", "Confidence")

# Write the results to the work directory
Trips_confidence.write.option("header",True).csv("/user/ymao/work/confidence_16_05_2019.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
%%spark
sbb_all = getAllData()
trip_id_list = getDataOnDate('17.05.2019').select("trip_id").distinct().withColumnRenamed("trip_id", "trip_id2")
sbb_date = sbb_all.join(trip_id_list, trip_id_list.trip_id2 == sbb_all.trip_id, "inner").drop("trip_id2")

sbb_date = sbb_date.withColumn("is_arrive_on_time", is_arrive_on_time_udf(sbb_date.arrival_schedule, sbb_date.arrival_actual))
Trips_confidence = sbb_date.groupBy("trip_id").agg({"is_arrive_on_time": "avg"})\
                            .withColumnRenamed("avg(is_arrive_on_time)", "Confidence")

# Write the results to the work directory
Trips_confidence.write.option("header",True).csv("/user/ymao/work/confidence_17_05_2019.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
%%spark
con_13 = spark.read.option("header",True).csv('/user/ymao/work/confidence_13_05_2019.csv')
con_14 = spark.read.option("header",True).csv('/user/ymao/work/confidence_14_05_2019.csv')
con_15 = spark.read.option("header",True).csv('/user/ymao/work/confidence_15_05_2019.csv')
con_16 = spark.read.option("header",True).csv('/user/ymao/work/confidence_16_05_2019.csv')
con_17 = spark.read.option("header",True).csv('/user/ymao/work/confidence_17_05_2019.csv')

An error was encountered:
Invalid status code '404' from http://iccluster029.iccluster.epfl.ch:8998/sessions/8803 with error payload: "Session '8803' not found."


### Filter data with threshold ###

In [8]:
%%spark
threshold = 0.7
stop_513 = spark.read.option("header",True).csv('/user/sixu/work/actual_data_513.csv')
con_13 = spark.read.option("header",True).csv('/user/ymao/work/confidence_13_05_2019.csv')
con_13 = con_13.filter(con_13.Confidence > threshold).withColumnRenamed('trip_id', 'trip_id_over_threshold')
stop_513_filttered = stop_513.join(con_13, stop_513.trip_id == con_13.trip_id_over_threshold, "inner")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trip_id: string (nullable = true)
 |-- failed: string (nullable = true)
 |-- arrival_schedule: string (nullable = true)
 |-- arrival_actual: string (nullable = true)
 |-- departure_schedule: string (nullable = true)
 |-- departure_actual: string (nullable = true)
 |-- not_stop: string (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_lat: string (nullable = true)
 |-- stop_lon: string (nullable = true)
 |-- stop_id: string (nullable = true)

317526

In [10]:
%%spark
stop_513_filttered.write.option("header",True).csv("/user/ymao/work/actual_data_513_filtered_0.7.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…