# Configuration

In [1]:
%%configure
{"conf": {
    "spark.app.name":"oh-my-git_final",
     "driverMemory": "2000M",
    "executorMemory": "8G",
    "executorCores": 8,
    "numExecutors": 16
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9130,application_1589299642358_3694,pyspark,idle,Link,Link,
9171,application_1589299642358_3738,pyspark,idle,Link,Link,
9177,application_1589299642358_3744,pyspark,busy,Link,Link,
9188,application_1589299642358_3755,pyspark,idle,Link,Link,
9191,application_1589299642358_3758,pyspark,dead,Link,Link,
9192,application_1589299642358_3759,pyspark,idle,Link,Link,
9194,application_1589299642358_3761,pyspark,idle,Link,Link,
9195,application_1589299642358_3762,pyspark,busy,Link,Link,
9196,application_1589299642358_3763,pyspark,idle,Link,Link,
9199,application_1589299642358_3766,pyspark,idle,Link,Link,


# Initialization

In [2]:
# Initialization

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9233,application_1589299642358_3800,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# All utilities in a single cell

**You can collapse the cell below if you want to hide the libraries and functions.**

In [3]:
# Import relevant libraries
from geopy import distance
import pyspark.sql.types as T
import pyspark.sql.functions  as F
from pyspark.sql.window import Window
import datetime
import networkx as nx
import pandas as pd
from itertools import islice, tee, izip, combinations
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
import math

# Some constants
zurichHB = (47.378177, 8.540192)
cleveland_oh = (49.378177, 9.540192)

# Useful user-defined functions
@F.udf(T.DoubleType())
def getDistToZurich(lat,lon):
    """
    Get distance to the Zurich HB in kilometers.
    """
    zurichHB = (47.378177, 8.540192)
    geo = (lat,lon)
    return distance.distance(zurichHB, geo).km

@F.udf(T.ArrayType(
       T.ArrayType(T.StructType([
    T.StructField("pair1", T.StringType(), False),
    T.StructField("pair2", T.StringType(), False)
]))))
def pairs(stop_ids,stop_sequences,arrival_times,departure_times):
    """
    Print the journey information in pairs.
    """
    stop_id_pairs = list(combinations(stop_ids,2))
    stop_sequence_pairs = list(combinations(stop_sequences,2))
    arrival_time_pairs = list(combinations(arrival_times,2))
    departure_time_pairs = list(combinations(departure_times,2))
    return [stop_id_pairs,stop_sequence_pairs,arrival_time_pairs,departure_time_pairs]

@F.udf(T.BooleanType())
def time_interval_udf(arrival_time_schedule,transfer_arrival_time):
    """
    Get a 2 minute time interval in order to do find relevant vehicles in the SBB data.
    """
    if arrival_time_schedule == "":
        return False
    ah = transfer_arrival_time.split(":")[0]
    am = transfer_arrival_time.split(":")[1]
    a = datetime.datetime(2019, 1, 1, int(ah), int(am))
    
    atsh = arrival_time_schedule.split(":")[0]
    atsm = arrival_time_schedule.split(":")[1]
    ats = datetime.datetime(2019, 1, 1, int(atsh), int(atsm))
    
    #Allow 120 seconds time difference
    if abs(ats-a).total_seconds() <= 120:
        return True
    else:
        return False
    
@F.udf(T.DoubleType())
def getDist(lat1,lon1,lat2,lon2):
    """
    Get distance between two coordinates in terms of meters.
    """
    geo1 = (lat1,lon1)
    geo2 = (lat2,lon2)
    dist = distance.distance(geo1, geo2).m
    return dist
joineds = spark.read.parquet("user/boecuego/backtrace_df.parquet")
#joineds.cache()
#joineds.take(1)
unioned_df_unique = spark.read.parquet("user/boecuego/graph_df.parquet")
pd_df = unioned_df_unique.toPandas()
mygraph = nx.from_pandas_edgelist(pd_df, 'stop_id1', 'stop_id2',["cost","types"])
#sbb_grouped_delay = sbb_inradius_wostops.withColumn("delay",F.floor(timeDiff/60)).groupBy("stop_id","delay").count()
# sbb_grouped_delay.write.parquet("user/boecuego/sbb_stops_time_grouped.parquet")

sbb_grouped_delay = spark.read.parquet("user/boecuego/sbb_stops_time_grouped.parquet")
#!!! TO DO !!!!!
#Filter weekends, public holidays etc
#!!! TO DO !!!!!
def find_prob(stop_id,arrival_time_schedule):
    timeDiff = (F.unix_timestamp('arrival_time_real', format="HH:mm:ss")
            - F.unix_timestamp('arrival_time_schedule', format="HH:mm"))
    delays = sbb_grouped_delay.filter((F.col('stop_id') == stop_id) & time_interval_udf(F.col('arrival_time_schedule'), F.lit(arrival_time_schedule)))
    #Find the total num
    #total_num = delays.groupBy().sum("count").collect()[0][0]
    #print(total_num)
    delays_pd = delays.toPandas()
    total_num = sum(delays_pd["count"].values)
    delays_pd["percentage"] = delays_pd["count"].map(lambda x:(x/float(total_num))*100)
    return delays_pd
#Find the k shortest paths
def k_shortest_paths(G, source, target, k, weight):
    return list(islice(nx.shortest_simple_paths(G, source, target, weight=weight), k))

#This is a helper to get pairs in a list
def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return izip(a, b)
#When converting a date to string this is a helper
def append_zero(x):
    if len(str(x)) == 1:
        return "0"+str(x)
    return str(x)
#Helper to remove adjecent walks
def remove_adjecent_walks(paths):
    no_adjecentwalk =[]
    for pr in paths:
        check = True
        for p1,p2 in pairwise(pairwise(pr)):

            if mygraph[p1[0]][p1[1]]["types"] == ["walk"] and mygraph[p2[0]][p2[1]]["types"] == ["walk"]:
                #print(counter)
                check = False

        if check:
            no_adjecentwalk.append(pr)
    return no_adjecentwalk

#Helper to reverse route
def get_reverse_route(paths):
    reversed_routes  = []
    for route in paths:
        lst = []
        for a in pairwise(route):
            lst.append(a)
        lst_reversed = lst[::-1]
        reversed_routes.append(lst_reversed)
    return reversed_routes
notPossible = datetime.datetime(1999,1,1,12,12,0,0)
def find_possible_routes(reversed_routes,current_timez,cut_off_time):
    
    shorts = []
    longs = []
    printouts = []
    notpossibles = []
    for route in reversed_routes:
        break_loop = False
        current_time = current_timez
        current_trip_id = ""
        
        route_with_timetable = []
        trip = []
        prints  = []
        actual_prints=[]
        for p1,p2 in route:
            #p2 arrival
            #p1 departure
            if mygraph[p1][p2]["types"] == ["walk"]:
                prints.append("Transfer with walking")
                prints.append(("after_walk",current_time))
                cost = mygraph[p1][p2]["cost"]
                if cost > 0:
                    current_time  = current_time - datetime.timedelta(minutes=cost-10)#-2
                else:
                    current_time = current_time
                prints.append(("before_walk",current_time))
                current_trip_id=""
                
                actual_prints.append("Walk from "+p1+" ----to----> "+p2+" for "+str(cost-10)+" minutes.")
                prints.append((p1,p2))
                continue
            else:
                #Get current times hour and minute
                hour = current_time.hour
                minute = current_time.minute

                #Convert it to string
                strtime = append_zero(hour)+":"+append_zero(minute)+":00"

                #Get closest trip to current time with matching stop_ids
                if current_trip_id != "":
                    trip = joineds.filter(F.col("trip_id") == current_trip_id)\
                    .filter("stop_id1 == '"+p1+"'"+" AND "+"stop_id2 == '"+p2+"'").take(1)
               #trip = df_trip_network_reasonable2.filter("prev_stop_id == '"+p1+"'"+" AND "+"stop_id == '"+p2+"'").filter("arrival_time <='"+strtime+"'").sort(F.desc("arrival_time")).take(1)[0]

                #If previous trip_id was different from this trip's id then put 2 minute waiting time
                if trip == [] or current_trip_id == "":

                    #2 min waiting
                    if current_trip_id != "":
                        current_time = current_time - datetime.timedelta(minutes = 2)
                        prints.append("Transfer")


                    #Get hours, minutes
                    hour = current_time.hour
                    minute = current_time.minute
                    strtime = append_zero(hour)+":"+append_zero(minute)+":00"

                    #With new current time get new trip
                    trip = joineds.filter("stop_id1 == '"+p1+"'"+" AND "+"stop_id2 == '"+p2+"'").filter("arrival_time2 <='"+strtime+"'").sort(F.desc("arrival_time2")).take(1)
                    #update current trip_id
                try:
                    trip = trip[0]
                except:
                    #print("Encountered a route that is not possible currently",(p1,p2))
                    current_time = notPossible
                    break
                current_trip_id  = trip.trip_id

                #Update current time
                prtstr = "Take trip: "+trip.trip_id+" from "+p1+" departing at "+trip.departure_time1+" ------------> to "+p2+" arriving at "+trip.arrival_time2
                actual_prints.append(prtstr)
                prints.append((trip.trip_id,trip.arrival_time1,trip.departure_time1,trip.arrival_time2,trip.departure_time2))
                hms = trip.departure_time1.split(":")
                h = int(hms[0])
                m = int(hms[1])
                current_time = datetime.datetime(2019,1,1,h,m,0,0)

            prints.append((p1,p2))
            if break_loop:
                prints = []
                break
        if current_time >= cut_off_time:
            shorts.append(prints)
            shorts.append(current_time)
            shorts.append("---------------------------------------------------------------")
            printouts.append(actual_prints)
        elif current_time == notPossible:
            notpossibles.append(prints)
            notpossibles.append(current_time)
            notpossibles.append("---------------------------------------------------------------")
        else:
            longs.append(prints)
            longs.append("---------------------------------------------------------------")
    return shorts,longs,printouts
##### !!!!!!! TO CHECK !!!!!!!!
# This is currently not checking the probabiliy of desired arival time Also need to check that
#!!!!!!! TO CHECK !!!!!!!!
confidence_level = 0.98
def calculate_probabilities(desired_arrival_time,best_routes, best_routes_print,confidence_level):
    counter = 0
    for r,printroute in zip(best_routes,best_routes_print):
        overall_prob = 1.
        
        printornot = []
        for i in range(len(r)):
            # This case corresponds to the Pr[Arriving the final station in time] case
            # That is why the index is zero and the final means of travelling should not be walking but a public transport
            if i == 0 and r[i]!="Transfer with walking":
                las_arrival_time = r[i][3]
                ah = las_arrival_time.split(":")[0]
                am = las_arrival_time.split(":")[1]
                a = datetime.datetime(2019, 1, 1, int(ah), int(am))
                last_station = r[i+1][1]
                
                allowed_delay = (desired_arrival_time-a).total_seconds()/60
                prob_dist =find_prob(last_station,las_arrival_time[:-3])
                probability_to_catch = sum(prob_dist[prob_dist["delay"]<= allowed_delay].percentage.values)
                overall_prob = (float(overall_prob)*float(probability_to_catch))/100.
                printornot.append("Prob to arrive on time: "+str(probability_to_catch))

            try:
                if r[i] == "Transfer with walking":

                    transfer_departure_time = r[i-2][2]
                    dh = transfer_departure_time.split(":")[0]
                    dm = transfer_departure_time.split(":")[1]
                    d = datetime.datetime(2019, 1, 1, int(dh), int(dm))
                    #think this
                    transfer_station =  r[i+5][1].split(":")[0]

                    walking_min = (r[i+1][1] - r[i+2][1]).total_seconds()/60

                    #think this
                    transfer_arrival_time = r[i+4][3]
            

                    ah = transfer_arrival_time.split(":")[0]
                    am = transfer_arrival_time.split(":")[1]
                    a = datetime.datetime(2019, 1, 1, int(ah), int(am))
                    allowed_delay = (d-a).total_seconds()/60 - walking_min
                    #print(transfer_station,transfer_arrival_time)
                    prob_dist =find_prob(transfer_station,transfer_arrival_time[:-3])

                    probability_to_catch = sum(prob_dist[prob_dist["delay"]<= allowed_delay].percentage.values)
                    
                    printornot.append("Probability to catch trip "+r[i-2][0]+" with "+str(walking_min)+" min walking  : "+str(probability_to_catch))
                    
                    overall_prob = (float(overall_prob)*float(probability_to_catch))/100.
                
                elif r[i] == "Transfer":
                    transfer_departure_time = r[i-2][2]
                    dh = transfer_departure_time.split(":")[0]
                    dm = transfer_departure_time.split(":")[1]
                    d = datetime.datetime(2019, 1, 1, int(dh), int(dm))
                    transfer_station =  r[i-1][1]
                    
                    transfer_arrival_time = r[i+1][3]
                    
                    ah = transfer_arrival_time.split(":")[0]
                    am = transfer_arrival_time.split(":")[1]
                    a = datetime.datetime(2019, 1, 1, int(ah), int(am))
                    allowed_delay = (d-a).total_seconds()/60
                    prob_dist =find_prob(transfer_station,transfer_arrival_time[:-3])
                    
                    probability_to_catch = sum(prob_dist[prob_dist["delay"]<= allowed_delay].percentage.values)
                    printornot.append("Probability to catch trip "+r[i-2][0]+" : "+str(probability_to_catch))
                    overall_prob = (float(overall_prob)*float(probability_to_catch))/100.

            except:
                print("-")
        
        if overall_prob > confidence_level:
            counter +=1
            print("*"*100)
            print("Overal probability: "+str(overall_prob))
            print("Probabilities: ")
            for prt in printornot:
                print "\t"+prt
            print("Route: ")
            for rou in printroute:
                print "\t"+rou
            #print("Probability matches the confidence level, we can select this route.")
        else:
            warning = 1
            #for rou in printroute:
            #    print rou
            #print("WARNING: Confidence level is lower than the required amount! Do not select this route!")
    print str(counter)+" routes found out of "+str(len(best_routes))+" best routes , if you want more please enter lower confidence"
        
def schedule_planner(src, tgt, desired_arrival_time, confidence_level):
    start = datetime.datetime.now()
    
    confidence_level=float(confidence_level)
    
    src_main = src.split(":")[0]
    tgt_main = tgt.split(":")[0]
    counter = 0
    paths = []
    for node in mygraph.nodes:
        if src_main in node:
            counter+=1

    if counter > 1:
        k = 5
    else:
        k=50
    for node in mygraph.nodes:
        if src_main in node:
            p = k_shortest_paths(mygraph, node, tgt,k,weight = "cost")
            for x in p:
                paths.append(x)
    no_adjecentwalk = remove_adjecent_walks(paths)
    reversed_routes  = get_reverse_route(no_adjecentwalk)
    
    hm = desired_arrival_time.split(':')
    h = hm[0]
    m = hm[1]
    #This is arrival_time
    current_time = datetime.datetime(2019,1,1,int(h),int(m),0,0)
    cut_off_time = current_time - datetime.timedelta(minutes=60)
    
    notPossible = datetime.datetime(1999,1,1,12,12,0,0)
    good_routes,bad_routes,printouts = find_possible_routes(reversed_routes,current_time,cut_off_time)
    
    """
    for element in good_routes:
        if type(element) == type([]):
            for r in element:
                print(r)
        else:
            print element
            
    for element in bad_routes:
        if type(element) == type([]):
            for r in element:
                print(r)
        else:
            print element
    """
            
    maxt = datetime.datetime(2019, 1, 1, 0,0)
    index = -1
    start_times  = []
    for i,t in enumerate(good_routes):
        if type(t) == type(datetime.datetime(2019, 1, 1, 11, 56)):
            start_times.append((t,i))
            if t > maxt:
                maxt = t
                index = i
    start_times_reversed = sorted(start_times, key=lambda tup: tup[0],reverse=True)
    best_routes = []
    best_routes_print = [] 

    for time,index in start_times_reversed[:10]:
        route = good_routes[index-1:index]
        best_routes.append(route[0])
        route_print = printouts[index/3]
        best_routes_print.append(route_print)
    """
    for r in best_routes_print:
        print("******"*10)
        for x in r:
            print x
    for r in best_routes:
        print("******"*10)
        for x in r:
            print x"""
    
    print('*'*100)
    print('PROBABILITIES')
            
    only_routes = good_routes[::3]
    calculate_probabilities(current_time,best_routes,best_routes_print, confidence_level)
    
    end = datetime.datetime.now()
    print('*'*100)
    print("Running time:", (end-start).total_seconds(), "seconds.")
    
print("All utilities are loaded successfully.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All utilities are loaded successfully.

# Application

In [4]:
%%local
#src = "8591184"  #"8503000"
#tgt  =  "8591244" #"8591049"


def troll(src, tgt, desired_arrival_time, confidence_level):
    # To push it to the spark
    confidence_level = str(confidence_level)
    
    get_ipython().push("src")
    get_ipython().push("tgt")
    get_ipython().push("desired_arrival_time")
    get_ipython().push("confidence_level")
    get_ipython().run_cell_magic('send_to_spark','-i src -t str -n src' , ' ')
    get_ipython().run_cell_magic('send_to_spark','-i tgt -t str -n tgt' , ' ')
    get_ipython().run_cell_magic('send_to_spark','-i desired_arrival_time -t str -n desired_arrival_time' , ' ')
    get_ipython().run_cell_magic('send_to_spark','-i confidence_level -t str -n confidence_level' , ' ')

    
    get_ipython().run_cell_magic('spark', '', 'schedule_planner(src, tgt, desired_arrival_time, confidence_level)') 

from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets 
func_handle = interact_manual(troll, src='8591184', tgt='8591244', desired_arrival_time='12:30',
                                confidence_level=widgets.FloatSlider(min=0.0, max=1.0, step=1e-2, value=0.90))
func_handle

interactive(children=(Text(value='8591184', description='src'), Text(value='8591244', description='tgt'), Text…

<function __main__.troll(src, tgt, desired_arrival_time, confidence_level)>

# Validation

In [5]:

matched_trips = spark.read.parquet("user/boecuego/matched_trips2.parquet") 
mmm = matched_trips.filter("LENGTH(trips_stop_ids) > 15 AND sample_stop_ids != '%%' ").groupBy("trips_trip_id").agg(F.collect_set("sbb_trip_id").alias("possible_match"))
stop_matches = map(lambda row: row.asDict(), mmm.collect())
dict_matches = {match['trips_trip_id']: match["possible_match"] for match in stop_matches}
sbb_inradius = spark.read.parquet("user/boecuego/sbb_inradius_13_17_trips_select.parquet")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
t1=datetime.datetime.now()
#Trips are from above result 
trips = ["1460.TA.26-10-j19-1.11.R","2698.TA.26-31-j19-1.17.H","1386.TA.26-8-C-j19-1.8.R"]
departs = ["12:12:00","12:03:00","11:59:00"]
arrives = ["12:25:00","12:09:00","12:00:00"]
allz = zip(pairwise(trips),pairwise(departs),pairwise(arrives))

days = [append_zero(str(x))+".05.2019" for x in range(1,30)]
weekdayslst = ["Monday","Tuesday","Wednesday","Thursday","Friday"]

weekdays = [x if datetime.datetime.strptime(x, '%d.%m.%Y').strftime('%A') in weekdayslst else "" for x in days]
weekdays= filter(None,weekdays)

catched_counter = 0
counter = 0
for day in weekdays:
    print("*"*50)
    sbd = sbb_inradius.filter(F.col("date")==day)
    for trip,depart,arrive in reversed(allz):
        print("- -"*30)
        ilkduraktankalkan = dict_matches[trip[1]]
        ikinciduraktankalkan = dict_matches[trip[0]]
        try:
            ilkininin_ikinciye_varisi = sbd.filter((F.col("trip_id").isin(ilkduraktankalkan)) \
                                                            &(F.substring(F.col("schedule_arrival"),12,100) == arrive[1][:-3])).take(1)[0]
            ikincinin_kalkisi = sbd.filter((F.col("trip_id").isin(ikinciduraktankalkan)) \
                                                            &(F.substring(F.col("schedule_dep"),12,100) == depart[0][:-3])).take(1)[0]

            hm = ilkininin_ikinciye_varisi.arrival_time_real.split(':')
            h = hm[0]
            m = hm[1]
            s = hm[2]
            aa = datetime.datetime(2019,1,1,int(h),int(m),int(s),0)

            hm = ikincinin_kalkisi.departure_time_real.split(':')
            h = hm[0]
            m = hm[1]
            s = hm[2]
            ad = datetime.datetime(2019,1,1,int(h),int(m),int(s),0)

            window = (ad-aa).total_seconds()
            #print(window/60)
            counter +=1
            if window >=0:
                catched_counter+=1


        except:
            print "Except"
t2=datetime.datetime.now()
print("*"*50)
print("*"*50)
print("*"*50)
print("Validation from the real past data:")
print(float(catched_counter)/float(counter))
print("*"*50)
print("Time to run the validation code in seconds:")
print((t2-t1).total_seconds())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**************************************************
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
Except
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
Except
**************************************************
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
**************************************************
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
Except
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
Except
**************************************************
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
****************************************