# Delays estimation 

In this notebook we work with the table sbb_orc_istdaten_new which is the dataset istdaten with only the stops in a radius of 18km around Zurich HB. We estimate the probability of all the possibles combinations we asummed (type of transport, daytime and day of the week). Then we store all these probabilities in a dataframe which will be reuse to compute the confidence of a trip. 

In [None]:
import os
import warnings
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

warnings.simplefilter(action='ignore', category=UserWarning)

In [None]:
from pyhive import hive

# Set python variables from environment variables
username = os.environ['USERNAME']
hive_host = os.environ['HIVE_SERVER2'].split(':')[0]
hive_port = os.environ['HIVE_SERVER2'].split(':')[1]

# create connection
conn = hive.connect(
    host=hive_host,
    port=hive_port)

# create cursor
cur = conn.cursor()

print(f"your username is {username}")
print(f"you are connected to {hive_host}:{hive_port}")

In [None]:
query = f'CREATE DATABASE IF NOT EXISTS {username}'
cur.execute(query)

In [None]:
query = f"USE {username}"
cur.execute(query)

In [None]:
%load_ext sparkmagic.magics

In [None]:
from IPython import get_ipython
username = os.environ['RENKU_USERNAME']
server = "http://iccluster044.iccluster.epfl.ch:8998"

# set the application name as "<your_gaspar_id>-homework3"
get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell="""{{ "name": "{0}-final-project2", "executorMemory": "4G", "executorCores": 4, "numExecutors": 10, "driverMemory": "4G" }}""".format(username)
)

In [None]:
get_ipython().run_line_magic(
    "spark", f"""add -s {username}-final-project2 -l python -u {server} -k"""
)

In [None]:
%%spark
#load a istdaten with the only the stops around 18km around Zurich
df_valid = spark.read.orc(f"/user/epple/hive/sbb_orc_istdaten_new")
df_valid.count()

In [None]:
%%spark
df = df_valid.sample(0.01)  #we work on a sample of 1% of istdaten randomly chosen
df.count()

In [None]:
%%spark
import pyspark.sql.functions as F
from pyspark.sql import DataFrameStatFunctions
import pandas as pd
from pyspark.sql.functions import col, lower, to_timestamp, date_format, avg, hour

In [None]:
%%spark
#compute the delays actual_arrival - expected_arrival
df_delays = df.select('*', (F.unix_timestamp(F.to_timestamp(df.an_prognose, format ='dd.MM.yyyy HH:mm:ss')) - F.unix_timestamp(F.to_timestamp(df.ankunftszeit,format ='dd.MM.yyyy HH:mm'))).alias('delay'))


In [None]:
%%spark
#select only the useful columns stop_id, type of transport, stop name, expected_arrival, actual_arrival, delay 
ls = ['fahrt_bezeichner','produkt_id', 'haltestellen_name', 'ankunftszeit', 'an_prognose', 'delay']
df_delays_filter = df_delays.select(ls)
df_delays_filter.show(10) 

In [None]:
%%spark
#change the negative delays to 0
df_delays_filter = df_delays_filter.withColumn("positive_delay", F.when(df_delays.delay > 0, df_delays.delay).otherwise(0))
df_delays_filter.show(10)

In [None]:
%%spark
#change the type of transport to lowercase because there are 2 types bus (BUS, bus)
df_delays_filter = df_delays_filter.withColumn('produkt_id', lower(col('produkt_id')));
df_delays_filter.show(5)

In [None]:
%%spark
#remove the rows without data on delays
df_delays_filter = df_delays_filter.filter(col("delay").isNotNull())
df_delays_filter.show(5)

In [None]:
%%spark
#add a column with the day of the week
df_days_w = df_delays_filter.withColumn("an_prognose",F.to_timestamp(col("an_prognose"), format ='dd.MM.yyyy HH:mm:ss')).withColumn("week_day_abb", date_format(col("an_prognose"), "E"))
df_days_w.show(5)



In [None]:
%%spark
#create a list with all the days of the week
days =df_days_w.select('week_day_abb').distinct().rdd.flatMap(lambda x: x).collect()
print(days)

In [None]:
%%spark
#create a list with all the type of transport
#we do not consider the funicular
ttype = df_delays_filter.select('produkt_id').distinct().rdd.flatMap(lambda x: x).collect()
ttype = ttype[:3]
print(ttype)

In [None]:
%%spark
#extract the hours of the trip
df_days_w =df_days_w.withColumn("hours", hour(col("an_prognose")))
df_days_w.show(5)

In [None]:
%%spark
# create a list called combinations with all the possible combinations 
# create a list called bacth with all delays threshold between 0 to 200s in range of 20s
batch = list(range(0,220,20))
comb = []
for i in range(len(batch)-1):
    tmp = str(batch[i]) + "-" + str(batch[i+1])
    comb.append(tmp)  # create a list with 2 batch together 
    
days = ['Mon', 'Tue', 'Wed', 'Thu','Fri', 'Sat', 'Sun']
ttype = ['bus', 'tram', 'zug']
hours_merge = ['6h-9h', '9h-16h','16-21h']
hours = [6,9,16,21]


combinations = []  #will be a list with all the possible combinations between the days, type of transport and moment in the day 
for week in days:
    for type in ttype:
        for h in hours_merge:
            # Create a combination by concatenating the week name and type
            combination = week + " - " + type + " " + h
            # Add the combination to the list
            combinations.append(combination)  


columns = comb
rows = combinations

# create an empty df to store all the info we will collect for the statistics
df_stat_ist = pd.DataFrame(columns=columns, index=rows)   #create an empty dataframe with all the combinations as rows and the batch as columns
df_stat_ist.head()

In [None]:
%%spark
#fill out the dataframe
k = 0
for idx,day in enumerate(days):
    for idx2,t in enumerate(ttype):
        w = 0
        for h in range(len(hours)-1):
            for i in range(len(batch)-1):
                number = df_days_w.filter((df_days_w.positive_delay >= batch[i] ) & (df_days_w.positive_delay < batch[i+1]) & (df_days_w.week_day_abb == day) & (df_days_w.produkt_id == t ) & (df_days_w.hours >= hours[h] ) & (df_days_w.hours < hours[h+1]) ).count()
                df_stat_ist.iloc[k+w,i] = number
            w = w + 1
        k = k + 3
df_stat_ist.head()

In [None]:
%%spark -o df_istaden_stat -n -1
df_istaden_stat = spark.createDataFrame(df_stat_ist.reset_index())  # transfer into a spark df and save it

In [None]:
df_istaden_stat.to_csv("stat_timeday.csv")

In [None]:
#load the df with all the statistics
stat_istdaten = pd.read_csv('stat_timeday.csv')
indexx = stat_istdaten['index']
stat_istdaten.head()

In [None]:
#remove useless columns
stat_istdaten.drop(['Unnamed: 0', 'index'],axis=1, inplace=True)
stat_istdaten.head()

In [None]:
#check if the statistics we computed on istdaten make sense by plotting the distribution on 1 line
frequencies = stat_istdaten.iloc[60].tolist()[0:]
bin_ranges = list(range(0,220,20))
bin_centers = [(bin_ranges[i] + bin_ranges[i+1]) / 2 for i in range(len(bin_ranges)-1)]
plt.bar(bin_centers, frequencies, width=np.diff(bin_ranges), edgecolor='black')
plt.xlabel('Delays [s]')
plt.ylabel('Number of trains delayed')
plt.title('Sunday - Train - Morning (6h-9h)')
plt.savefig('stat.png')


In [None]:
#compute the sum
rows_sums = stat_istdaten.sum(axis=1)
row_sum = rows_sums.tolist()

In [None]:
#compute cumulative sum
df_cum_sum = stat_istdaten.cumsum(axis=1)
df_cum_sum.head()

In [None]:
# compute the probability for each case
istdaten_prob = df_cum_sum.divide(row_sum, axis='rows')
istdaten_prob.head()

In [None]:
prob = istdaten_prob.iloc[60].tolist()
plt.plot(np.linspace(0,200,10), prob)
plt.xlabel('Delays [s]')
plt.ylabel('Probability')
plt.title('Sunday - Train - Morning (6h-9h)')
plt.savefig('exprob.png')

In [None]:
# add all the combinations
all_prob_ist = istdaten_prob.join(indexx)

In [None]:
#change the order of the columns
cols = list(all_prob_ist.columns)
cols = [cols[-1]] + cols[:-1]
prob = all_prob_ist[cols]
prob.tail()

In [None]:
prob.to_csv('prob_istdaten.csv')