In [1]:
import datetime
from queue import *
import pandas as pd
import datetime as dt
import pickle
import time
#from geopy.distance import distance as geo_dist
import scipy.stats
from bisect import bisect_right
import getpass
import pyspark
import numpy as np
from pyspark.sql import SparkSession
import math
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from datetime import datetime, timedelta
from scipy import stats
from heapq import *
spark = SparkSession \
    .builder \
    .master("yarn") \
    .appName('sbb2-{0}'.format(getpass.getuser())) \
    .config('spark.executor.memory', '4g') \
    .config('spark.executor.instances', '5') \
    .config('spark.port.maxRetries', '100') \
    .getOrCreate()
spark

## Algorithm 

The search algorithm we decided to use is a modified version of the A* algorithm. If eta is the estimated arrival time of a path, then eta(n) = g(n) + E[d(n)] + h(n) with g(n) the predicted arrival time of the last connection, E[d(n)] the expected delay of the last trip and h(n) the heuristic estimating the time remanning to get from the nth stop to the arrival station. 

The priority queue is performed on eta. As the eta is always an overly optimistic estimation of the arrival time, no trip can ever perform better that the current lowest eta. Meaning that when the station with the lowest eta is the destination station we are assured that any further computed trip would take more time, and have therefore find the time optimal trip. 

At each step, the stop with the lowest eta is extracted from the priority queue and all states that can be reached from this point are computed. The states that are better that what we already found and have a higher probability to succeed that the threshold set by the user are added to the priority queue. The program stops when the destination station is the station extracted from the priority queue. 

**Defining a state**
A state is defined by a station, an eta, the probability to be at the state and the arrival time of the trip that took us to that state. The eta is a very optimistic estimation of the arrival time. The actual arrival time will always we either equal or higher that the estimated arrival time. 

**Creating a state**
When the nth state is "poped" from the priority list, a list of neighbouring stations and trip to get to them from the nth state station is computed. 
For each element of this list a new state is created :
    - The new eta = (predicted arrival time to station n+1) + (expected delay from the trip from n to n+1) +  (heuristic at n+1) 
    - The probability of being at the state = Pboarding * P being at the nth state 
    - The P of boarding = probability that delay if the train taken to nth station arrives in time to take the train to the n+2 station. 
If the trip to the next station is by walking we set the eta as the current time + walking time + heuristic of the n+1 state and the P of boarding is set to 1

**Heursitic** 

For the Heuristic, we create a graph that contains as edge weights between nodes (stations) the fastest direct transport between them (walking if in range, train if there is a train-line connecting the 2 stations, no edge otherwise). We then precalculated the distance-matrix for this graph.

**Probability of delay of a train**

The probability distribution of the delays of each trains is approximated by assuming that each sort of train (linien_text) have the same probability distribution function (pdf) at each stations. The pdf is estimated to be lognorm. Details of estimation of train delays are in the Delay Analysis notebook. To estimate the arrival time of a train we use the predicted arrival time + the expected delay.

**Probability of boarding**

If a user takes *train 1* to station a and *train 2* to station b.

We want the probability to board *train 2* at station a, meaning the probability for *train 1* to arrive at station a before *train 2* leave the station. 

Here only arrival time of trains are random variable, whereas departure are set as deterministic values, to avoid to offer a plan counting on the late departure of a train. We also estimated that changing train takes around 3 minutes. This could be fine tuned by for exemple testing if the arrival platform is the same as the next departure platform. 

The probability of boarding *train 2* is therefore calculated as the probability that the *train 1* is delayed of less than the time between the predicted arrival of *train 1* and the departure of *train 2*.   

**Eliminating bad path**

To reduce the number of visited path we keep track of the stations we already visited. A station that already as been visited by the algorithm can be considered again either if the subsequent estimated time  of arrival is better that the previous one or if the probability to successfully be at this stop is higher that the previous one. If either or both of the condition are met, the trip is added the priority queue. Otherwise we can ignore it. 






## Data Loading

## Distance Filter
To get information about the localization of the bus/train stop we used the SBB Didok documentation in which name are compatible with the name in the trip data.


First, we filter on only the stations in a 10km radius around Zürich. This allows for computation in "reasonable" time on the cluster.

In [2]:
def distance_squared(n1,e1, n2, e2):
    '''Calculates the euclidean distance between two points'''
    eucl_dist2 = ((n1-n2)*(n1-n2)+ (e1-e2)*(e1-e2))
    return eucl_dist2

In [3]:
coords_zurich = (683144.0, 248040.0) # X, Y  (E,N), 

In [4]:
# Get the data of train station localisation
df_stops = spark.read.csv('stops.txt', sep=';', header=True).select('Dst-Bezeichnung-offiziell','KOORDE','KOORDN')\
.withColumnRenamed('Dst-Bezeichnung-offiziell','station_name')

In [5]:
df_stops = df_stops.withColumn('dist2', distance_squared(coords_zurich[1], coords_zurich[0], df_stops.KOORDN, df_stops.KOORDE))

In [6]:
df_stops = df_stops.filter(df_stops.dist2<=10000**2)

**Filtering the Didok**

The Dikdoc documentation includes bridges and other info. As we are only interested in the stops we filter by keeping only the name that appear in the general trip data of a specific day.

In [7]:
swiss_data = spark.read.csv('/datasets/sbb/2017/10/2017-10-17istdaten.csv.bz2', header=True, sep=";")
cross_table = df_stops.join(swiss_data, swiss_data.HALTESTELLEN_NAME == df_stops.station_name, 'inner').persist()
df_stops = cross_table.select('station_name','KOORDE','KOORDN').distinct()

In [8]:
df_stops = cross_table.select('station_name','KOORDE','KOORDN').distinct()

In [9]:
stops_filter = df_stops.select('station_name')

### Train Delay

We get the train delay distribution as explained in the delay analysis. We chose to keep the dataframe in memory all the time. This is possible here since it is not too long, but a more scalable methods would be to load the dataframe each time by chunks as explained in the delay analysis.

In [10]:
DATA_PATH = './data/'
delays_df = pd.read_csv(DATA_PATH+'params_by_stop_line.csv')

## Processing Data
We now want to read data from the sbb files and extract usefull information

#### Find station accessible by walking 

Finds stations in proximity of station of interest. We used 5 minute as an upperbound on the time of walking. Distance is computed as simply the euclidian distance. 

In [11]:
def euclidiandist(x_a,x_b,y_a,y_b):
    # We assumed that average speed of the user is of 5km/h
    avg_speed = 1.388889
    eucl_dist = math.sqrt((x_a-x_b)*(x_a-x_b)+ (y_a-y_b)*(y_a-y_b))
    return eucl_dist/avg_speed

In [12]:
# first, load all the necessary data
import pickle
with open('./data/station_from_index.pkl', 'rb') as handle:
    station_from_index = pickle.load(handle)
with open('./data/index_from_station.pkl', 'rb') as handle:
    index_from_station = pickle.load(handle)
walking_distances = np.load('./data/walking_distances.npy')
walking_distances = np.sqrt(walking_distances)

# definition of the maximum walking time
MAX_WALKING_TIME = 10*60

In [13]:
def get_walking_stations(current_station, current_time, way, isadvance):
    if way == 'forward':
        # get the distances for all stations
        walking_times = walking_distances[index_from_station[current_station],:]
        # get stations in walking distance
        close_stations = np.argwhere(walking_times<MAX_WALKING_TIME)
        # remove station itself
        close_stations = [i for i in close_stations.flatten().tolist()  \
                         if i!=index_from_station[current_station]]
        # get the names of the stations
        station_names = [station_from_index[i] for i in close_stations]
        # get the estimated arrival times for all stations
        arrival_times = (walking_times[close_stations].flatten() + current_time).tolist()
        # get the other information necessary
        line_type = [None]*len(station_names)
        departure_time = [current_time]*len(station_names)


        # return the tuples we want
        walking_stops_tuples = list(zip(line_type, departure_time, \
                                        station_names, arrival_times))
    else:
        if isadvance :
            current_time = current_time - 180
        # get the distances for all stations
        walking_times = walking_distances[index_from_station[current_station],:]
        # get stations in walking distance
        close_stations = np.argwhere(walking_times<MAX_WALKING_TIME)
        # remove station itself
        close_stations = [i for i in close_stations.flatten().tolist()  \
                         if i!=index_from_station[current_station]]
        # get the names of the stations
        station_names = [station_from_index[i] for i in close_stations]
        # get the estimated arrival times for all stations
        departure_time = (current_time - walking_times[close_stations].flatten()).tolist()
        # get the other information necessary
        line_type = [None]*len(station_names)
        arrival_times = [current_time]*len(station_names)
        # return the tuples we want
        walking_stops_tuples = list(zip(line_type, departure_time, \
                                        station_names, arrival_times))
        
    return walking_stops_tuples

In [14]:
#def get_walking_stations(current_station, current_time):
#    coord_stat = df_stops.filter(df_stops['station_name'] == current_station).select('KOORDE','KOORDN').rdd.map(lambda x:(float(x[0]),float(x[1]))).collect()[0]
#    interm = df_stops.rdd.map(lambda x: (x[0],euclidiandist(float(x[1]),coord_stat[0],float(x[2]),coord_stat[1])))
#    #We want station that are 5 minutes away
#    res = interm.filter(lambda x : x[1] <= 300)
#    
#    # return the tuples we want
#    walking_stops_tuples = res.filter(lambda x : x[1] <= 900).filter(lambda x: x[0] != current_station).map(lambda x: (None,current_time,x[0],current_time + x[1])).collect()
#    return walking_stops_tuples

In [15]:
def get_all_possible_stations(df, distance_df, stop_name, desired_time, mode,\
                                   allowed_timewindow=1800, isadvance = False):
    """
    Get all stations accessible from a given point by either a SBB trip or by walking
    
    input: 
    
        df :         df with structure as sbb_df
        distance_df: df of a kind didok_df
        stop_name:   string, name of the station we are heading to or from, when mode is forward for a fixed time of arrival, \
                     or anything else for fixed time of departure

        desired_time: time in seconde (unix time)

        allowed_timewindow: maximum time of waiting at a station in seconds
    
    output:
        a tuple list of all accesible in the format: (type of train, departure time at current station,
                                                       name of the next station, time of arrival at next station)
        NB : if the trip is by walking the type of train is None

    """
    df = df\
        .withColumn('ArrivalTimeScheduled', f.unix_timestamp(f.col("ArrivalTimeScheduled"), "dd.MM.yyyy HH:mm"))\
        .withColumn('ArrivalTimeActual', f.unix_timestamp(f.col("ArrivalTimeActual"), "dd.MM.yyyy HH:mm"))\
        .withColumn('DepartureTimeScheduled', f.unix_timestamp(f.col("DepartureTimeScheduled"), "dd.MM.yyyy HH:mm"))\
        .withColumn('DepartureTimeActual', f.unix_timestamp(f.col("DepartureTimeActual"), "dd.MM.yyyy HH:mm"))
    if(mode == 'forward') :
        in_station = df.filter(df['SkipStation'] == 'false').filter(df['DepartureTimeScheduled'] > desired_time).filter(df['DepartureTimeScheduled'] < desired_time + allowed_timewindow).filter(df['station_name'] == stop_name).select('TripId','DepartureTimeScheduled')
        next_station = df.filter(df['SkipStation'] == 'false').select('StopName','TripId','ArrivalTimeScheduled','TransportType').na.fill("Unknown")
        both = next_station.filter(next_station['StopName'] != stop_name).filter(next_station['ArrivalTimeScheduled'] < desired_time + 28800).join(in_station,on = 'TripId')
        res = both.filter(both['ArrivalTimeScheduled'] > both['DepartureTimeScheduled'] ).select('TransportType','DepartureTimeScheduled','StopName','ArrivalTimeScheduled')
        tuples_train = res.rdd.map(lambda row:(row[0], row[1], row[2], row[3])).collect()
    else :
        # We divide the allowed time window by two in the case that the arrival time is fixed, assuming that tolerence for waiting in arrival point is lower that at departure 
        in_station = df.filter(df['SkipStation'] == 'false').filter(df['ArrivalTimeScheduled'] < desired_time).filter(df['ArrivalTimeScheduled'] > desired_time - allowed_timewindow/2).filter(df['station_name'] == stop_name).select('TripId','ArrivalTimeScheduled')

        next_station = df.filter(df['SkipStation'] == 'false').select('StopName','TripId','DepartureTimeScheduled','TransportType').na.fill("Unknown")

        both = next_station.filter(next_station['StopName'] != stop_name).filter(next_station['DepartureTimeScheduled'] > desired_time - 28800).join(in_station,on = 'TripId')
        res = both.filter(both['ArrivalTimeScheduled'] > both['DepartureTimeScheduled'] ).select('TransportType','DepartureTimeScheduled','StopName','ArrivalTimeScheduled')
        tuples_train = res.rdd.map(lambda row:(row[0], row[1], row[2], row[3])).collect()
    tuples_with_walking = get_walking_stations(stop_name, desired_time, mode, isadvance)
    all_tuples = tuples_train + tuples_with_walking
    
    
    return all_tuples

## Dealing with probability

Here we deal with the fact that the time of arrival is not determinitic but a random variable. If we need to define it by a single value we decided to use the expectency, which is used to calculate the estimated time of arrival. The delay probability is used to find the probability for a train to arrive early enough that the user can safly catch the next train. 

In [16]:
def get_expected_delay(station_name, line_id):
    row = delays_df.loc[(delays_df.LINIEN_TEXT == line_id) & (delays_df.HALTESTELLEN_NAME==station_name)]
    if row.shape[0]>0:
        s = row['shape'].values[0]
        loc = row['mean'].values[0]
        scale = row['std'].values[0]
        mean = stats.lognorm.mean(s, loc=loc, scale=scale)
        return mean
    else:
        return 0 # average expected delay over dataset is 0, congrats SBB ;)

In [17]:
import warnings
def get_delay_probability(station_name, line_id, delay):
    row = delays_df.loc[(delays_df.LINIEN_TEXT == line_id) & (delays_df.HALTESTELLEN_NAME==station_name)]
    if row.shape[0]>0:
        # cdf parameters present, calcualte cdf value
        s = row['shape'].values[0]
        loc = row['mean'].values[0]
        scale = row['std'].values[0]
        return stats.lognorm.cdf(delay, s, loc=loc, scale=scale)
    else:
        # not present, resort to default
        binvals = np.load(DATA_PATH+'default_delay.npy')
        if delay>60:
            return 1 - binvals[61]
        p = 0
        for i in range(int(delay+1)):
            p += binvals[i]
        return p

In [18]:
import pickle
with open('./data/stations_dict', 'rb') as handle:
    stations_dict = pickle.load(handle)
min_travel_times = np.load('./data/min_travel_times.npy')

def get_heuristic(city_a,city_b):
    '''
    Calculates the heuristic of time in minutes between two city taking a train
    '''
    # try to find connection in our adjacency matrix calculated in Heuristic.ipynb
    if city_a in stations_dict and city_b in stations_dict:
        i = stations_dict[city_a]
        j = stations_dict[city_b]
        estimated_time = min_travel_times[i,j]
        if estimated_time!=1440:
            # estimated_time=1440: no possible connection was found when building the Heuristic adjacency matrix
            return float(estimated_time)
    
    avg_speed = 1600
    try :
        df_a = df_stops.select('KOORDE','KOORDN').where(df_stops['station_name'] ==city_a).collect()
        x_a = float(df_a[0]['KOORDE'])
        y_a = float(df_a[0]['KOORDN'])
    except IndexError :
        print('Name ',city_a,'is invalide')
        return 0
    try :
        df_b = df_stops.select('KOORDE','KOORDN').where(df_stops['station_name'] ==city_b).collect()
        x_b = float(df_b[0]['KOORDE'])
        y_b = float(df_b[0]['KOORDN'])
    except IndexError :
        print('Name ',city_b,'is invalide')
        return 0
    eucl_dist = math.sqrt((x_a-x_b)*(x_a-x_b)+ (y_a-y_b)*(y_a-y_b))
    return eucl_dist/avg_speed

## Defining useful classes

In [19]:
class Step :
    def __init__(self,departure_station,arrival_station, depature_time,arrival_time, train):
        self.departure_station = departure_station
        self.arrival_station = arrival_station
        self.depature_time = depature_time
        self.arrival_time = arrival_time
        self.train = train
    def _get(self) :
        #print('From : ',self.departure_station,',to : ', self.arrival_station, ',departing at : ', self.depature_time,
        #     'with train', self.train, '\n')
        return {'departure' : self.departure_station, 'destination' : self.arrival_station, \
                'time of departure' : datetime.fromtimestamp(abs(self.depature_time)).strftime('%Y-%m-%d %H:%M:%S'),\
                'time of arrival' : datetime.fromtimestamp(abs(self.arrival_time)).strftime('%Y-%m-%d %H:%M:%S'),\
                'type' : self.train}

# Station keep each station and it's associated heuristic 
class Station :
    def __init__(self,name, heuristic):
        self.name = name
        self.heuristic = heuristic
    def _print(self) :
        print('Station ',self.name,'with heursitic ', self.heuristic)

# State describe a station visited at a certain time from a certain path, with a certain probability 
class State :
    def __init__(self,station, eta, ps, previous_steps,last_train,pred_arrival_time):
        # Name of the station at that state
        self.station = station
        
        # Estimated time of arrival at that state. (in backward mode this is estimated time of departure)
        self.eta = eta
        
        # Probability to be at that state (given by probability to be at last state * probability to catch connection
        #to previous state)
        self.ps = ps
        
        # List of the step we took to get to that state
        self.previous_steps = previous_steps
        
        # The linien_text of the last taken train
        self.last_train = last_train
        
        # The predicted arrival time to that station (in backward mode this is predicted depature time of last transition)
        self.pred_arrival_time = pred_arrival_time
    def get_steps(self) :
        steps_list = []
        if (len(self.previous_steps) != 0):
            for step in self.previous_steps :
                steps_list.append(step._get())
        else : print('Leave from home')
        return steps_list
    def __lt__ (self,other) :
        return self.eta < other.eta
    def __hash__ (self) :
        return hash(self.eta)  
    def _print(self) :
        print('At',self.station.name,' the estimated time of arrival is ',self.eta\
              ,' with a probability of being here of ',self.ps, '\n')
        
class End(Exception):
    """We found the right station"""
    pass
class Empty_queue(Exception):
    """Parameter where too restrictive, no path found and priority queue is empty"""
    pass

## Implementating the shortest path search 

In [20]:
def createstate(state,next_,destination,Pthresh,way):
    """Create a state resulting of a trip between station a and b :
    Input : 
        - current state s at station a
        - tuple containing the linien_text of the train t that goes from a to b, predicted
        departure time at station a, predicted arrival time at station b and name of station b
        - Final destination we are looking for and minimum probability we need. 
    Output :
        - New State resulting from trip from a to b """
    linien_text = next_[0]
    station_name = next_[2]
    pred_departure = next_[1] 
    pred_arrival = next_[3]
    # first we estimate the heuristic for this new station
    heuristic = get_heuristic(station_name,destination)
    station = Station(station_name,heuristic)
    
 
    if(linien_text != None) :
        # If we go to the next station by train
        
        # get expected probability 
        Expected_delay = get_expected_delay(linien_text,station_name)
        
        if (way == 'forward') :
            # Calculate the new ETA
            ETA = Expected_delay*60 + heuristic*60 + pred_arrival
             # Calculates the probability that the last train arrives early enough to catch the next one (3min changing time)
            delay = pred_departure- state.pred_arrival_time - 3*60
            Pboard = get_delay_probability(state.station.name,linien_text,delay/60)
            #Calculate the probability of the state which is the probability of the last state * probability of having made
            # it to the next trip on time
            Ps = state.ps * Pboard

            # Update the step list 
            new_step = Step(state.station.name,station_name,pred_departure,pred_arrival,linien_text) 
            Previous = state.previous_steps + [new_step]
            return State(station,ETA,Ps,Previous,linien_text,pred_arrival)
        else :
            # Since the train departure time are assumed to be deterministic their is no delay of departure also eta is really estimated departure time
            # in this way of calculation and must be negatuve as the priority list is selecting on smaller element
            ETD = heuristic*60 - pred_departure
             # Calculates the probability that the last train arrives early enough to catch the next one (3min changing time)
            # NB here state.pred_arrival_time is actually the depature time of the next connection
            delay = state.pred_arrival_time - pred_arrival
            Pboard = get_delay_probability(state.station.name,linien_text,delay/60)
            #Calculate the probability of the state which is the probability of the last state * probability of having made
            # it to the next trip on time
            Ps = state.ps * Pboard

            # Update the step list 
            new_step = Step(station_name,state.station.name,pred_departure,pred_arrival,linien_text) 
            Previous = [new_step] + state.previous_steps 
            return State(station,ETD,Ps,Previous,linien_text,pred_departure)
        
        
       
    else : 
        # if we walked from the last station  
        walking_time = pred_arrival - pred_departure 
        if (way == 'forward') :
            ETA = state.eta - state.station.heuristic*60 + walking_time + heuristic*60
            # if we walk we can leave when we want
            Ps = state.ps
            # Update the step list 
            new_step = Step(state.station.name,station_name,pred_departure,pred_arrival,'Walk')
            Previous = state.previous_steps + [new_step]
            return State(station,ETA,Ps,Previous,linien_text,pred_arrival)
        else :
            ETD = state.eta - state.station.heuristic*60 +  heuristic*60 + walking_time
            # we assume that we must arrive 3 minute in advance or more to catch the train
            Ps = state.ps
            # Update the step list 
            new_step = Step(station_name,state.station.name,pred_departure,pred_arrival,'Walk')
            Previous = [new_step] + state.previous_steps
            return State(station,ETD,Ps,Previous,linien_text,pred_departure)
    
        
    
    

In [21]:
def firststep(From, To, t, Pthresh,way):
    t = float(time.mktime(datetime.strptime(t, "%d.%m.%Y %H:%M").timetuple()))
    start_heuristic = get_heuristic(From,To)
    pqueue = PriorityQueue()
    pqueue.put((float('inf'),"empty_queue"))
    if (way == 'forward') :
        current_station = Station(From,start_heuristic)
        start_state = State(current_station,start_heuristic*60+t,1.0,[],"starting",t)
        pqueue.put((start_state.eta,start_state))
    else :
        current_station = Station(To,start_heuristic)
        start_state = State(current_station,start_heuristic*60-t,1.0,[],"starting",t)
        pqueue.put((start_state.eta,start_state))
    visited = visited_nodes()
    visited.update(start_state,way)
    return visited,pqueue

In [22]:
def new_step(priortity_list,Pthresh,destination,visited,way,reduced_df) :
    """For the first state in the priority_list find all possible next state and insert them into the priortity list 
    if their probability is highy<Ser than Pthresh"""
    current_state = priortity_list.get()[1]
    if (type(current_state) == str) :
        raise Empty_queue("error")
    #print(current_state.station.name,nice_time(current_state.eta),current_state.ps,current_state.last_train,len(current_state.previous_steps))
    if (current_state.station.name == destination):
        raise End(8,current_state)
    # Get all stations to which we can go from current state
    # Let's assume that getpossibletrip returns a list of tuple like 
    # (linien_text,next_station_name,predicted_arrival_time_current_station,predicted_departure_time, predicted_arrival_time) 
    # for each possible new trip. Predicted_departure_time is the predicted departure time at current state
    # predicted_arrival_time the predicted arrival time in the reachable station
    # If we are walking train_id is None, departure is the arrival time of last trip
    # and arrival is departure time + estimation for walking time. 
    if (way == 'forward'):
        possible_next_stops = get_all_possible_stations(reduced_df,df_stops,current_state.station.name,current_state.pred_arrival_time,way)
    else :
        if(current_state.last_train != "starting"):
            # if we have a connections to catch and we are walking we should arrive 3 minutes in advance 
            possible_next_stops = get_all_possible_stations(reduced_df,df_stops,current_state.station.name,current_state.pred_arrival_time,way,isadvance = True)
        else :
            # if we are walking to destination we can arrive at desired time  
            possible_next_stops = get_all_possible_stations(reduced_df,df_stops,current_state.station.name,current_state.pred_arrival_time,way)
        
    # For each next station, lets create the next state and add it to the Priority Queue
    for next_ in possible_next_stops :
        #print(next_[2],'leaving at ',nice_time(next_[1]), 'arriving at',nice_time(next_[3]),'using',next_[0])
        if (current_state.last_train == None and next_[0] == None) :
            # Here we want to avoid to do twice a trip by walking
            continue
        n_state = createstate(current_state,next_,destination,Pthresh,way)
        if n_state.ps > Pthresh and visited.update(n_state,way) :
            priortity_list.put((n_state.eta,n_state))
    return priortity_list, visited


## Visited nodes
We want to model the set of visited nodes in a way that there are duplicates in the visited nodes only if the node has a higher probability of arriving there in a slower time. In all other cases only the fastest achieved node is saved in the set.

In [23]:
 def create_tuple_from_node(node):
        return (node.station.name, node.eta, node.ps)

In [24]:
#visited_nodes as a set of tuples, containing: (station, visit-time, certainity)
class visited_nodes:
    def __init__(self, initial_set=set([])):
        self.nodes = initial_set
    def update(self, new_node,way):
        old_nodes = self.previous_visits(new_node)
        new_node_tup = create_tuple_from_node(new_node)
        
        if len(old_nodes) == 0: #case when the node has not been explored yet.
            self.add_to_visited(new_node_tup)
            return True
         
        else: #case when the node has been explored and multiple optimal values are in the set
            added = False
            for n in old_nodes: #iterate through nodes and compare to new node
                if (way == 'forward') :
                    if self.compare_nodes_2_forward(n, new_node_tup): #node is added if better/equal than any node and replaces worse nodes
                        added = True
                else :
                    if self.compare_nodes_2_backward(n, new_node_tup): #node is added if better/equal than any node and replaces worse nodes
                        added = True
            self.compare_in_set(new_node,way)
            return added
            
    '''previous_visits takes a node and compares it to the already existing set of nodes that have been visited.
        If a node has been visited before, the function hands out the previous visits in a list'''
    def previous_visits(self, new_node):
        return list(filter(lambda x: x[0] == new_node.station.name, self.nodes))
    
    '''add_to_visited takes a new node and adds it to the set of visited nodes'''
    def add_to_visited(self, new_node_tup):
        self.nodes.add(new_node_tup)
        return None
    
    '''replace_old is used when a better node (higher probability and faster) appears. It takes a new node and adds
        it to the set of visited nodes, while deleting the old one'''
    def replace_old(self, new_node, old_node):
        self.add_to_visited(new_node)
        if old_node in self.nodes:
            self.nodes.remove(old_node)
        return None

    
    '''takes two nodes and compares them according to the arrival time and probability. If a node is in some way
        better than the existing it is stored. The function returns a boolean which indicates whether the value 
        has been stored (important to know for updating priority queue)'''
    def compare_nodes_2_forward(self, node1, node2):
    # Compare node by estimated time of arrival in which a lower eta is a fastest way
        if node1[1] > node2[1] and node1[2] < node2[2]: #the node2 is faster and has a higher proba
            self.replace_old(node2, node1) #we replace the slow and improbable node
            return True

        elif node1[1] > node2[1] and node1[2] > node2[2]: #the node2 is only faser, but less certain
            self.add_to_visited(node2) # add to optimal set
            return True

        elif node1[1] < node2[1] and node1[2] < node2[2]: #the node2 is slower but more certain
            self.add_to_visited(node2) #add it to optimal set
            return True

        elif node1[1] > node2[1] and node1[2] == node2[2]:
            self.replace_old(node2, node1)
            return True

        elif node1[1] == node2[1] and node1[2] < node2[2]:
            self.replace_old(node2, node1)
            return True

        else:
            return False #the node2 is neither more certain nor faster, we drop it.
    def compare_nodes_2_backward(self, node1, node2):
        # Compare node by estimated time of departure in which a bigger etd is a fastest way
        if node1[1] > node2[1] and node1[2] < node2[2]: #the node2 is faster and has a higher proba
            self.replace_old(node2, node1) #we replace the slow and improbable node
            return True

        elif node1[1] < node2[1] and node1[2] > node2[2]: #the node2 is only faser, but less certain
            self.add_to_visited(node2) # add to optimal set
            return True

        elif node1[1] > node2[1] and node1[2] < node2[2]: #the node2 is slower but more certain
            self.add_to_visited(node2) #add it to optimal set
            return True

        elif node1[1] < node2[1] and node1[2] == node2[2]:
            self.replace_old(node2, node1)
            return True

        elif node1[1] == node2[1] and node1[2] < node2[2]:
            self.replace_old(node2, node1)
            return True

        else:
            return False #the node2 is neither more certain nor faster, we drop it.
            
    def compare_in_set(self, new_node,way):
        old_nodes = self.previous_visits(new_node)
        for n in old_nodes:
            for m in old_nodes:
                if (way == 'forward'):
                    self.compare_nodes_2_forward(n, m)
                else :
                    self.compare_nodes_2_backward(n, m)
                    

## Main function

In [25]:

def main(From,To,At,P,way):
    steps = None
    # initate the queue 
    visited,queue = firststep(From,To,At,P,way)
    
    # load the Data for the day of the trip as well as one day before/after to account for midnight trip.
    t = datetime.strptime(At, "%d.%m.%Y %H:%M").timetuple()
    if (way == 'forward') :
        path1 = '/datasets/sbb/{}/{:02d}/{}-{:02d}-{:02d}istdaten.csv.bz2'.format(t[0],t[1],t[0],t[1],t[2])
        path2 = '/datasets/sbb/{}/{:02d}/{}-{:02d}-{:02d}istdaten.csv.bz2'.format(t[0],t[1],t[0],t[1],t[2] + 1)
    else :
        path1 = '/datasets/sbb/{}/{:02d}/{}-{:02d}-{:02d}istdaten.csv.bz2'.format(t[0],t[1],t[0],t[1],t[2] - 1)
        path2 = '/datasets/sbb/{}/{:02d}/{}-{:02d}-{:02d}istdaten.csv.bz2'.format(t[0],t[1],t[0],t[1],t[2])
    swiss_data = spark.read.csv([path1,path2], header=True, sep=";").cache()
    istdaten = swiss_data.join(stops_filter, swiss_data.HALTESTELLEN_NAME == stops_filter.station_name, 'inner')
    
    traduction = 'TripDate string, TripId string, OperatorId string, OperatorAbbrv string, OperatorName string, ProductId string, LineId string, LineType string, UmlaufId string, TransportType string, AdditionalTrip boolean, FailedTrip boolean, BPUIC string, StopName string, ArrivalTimeScheduled string, ArrivalTimeActual string, ArrivalTimeActualStatus string,     DepartureTimeScheduled string, DepartureTimeActual string, DepartureTimeActualStatus string, SkipStation boolean'
    traduction = list(map(lambda x: x.split()[0],traduction.split(',')))

    for german, english in zip(istdaten.columns, traduction):
        istdaten = istdaten.withColumnRenamed(german, english)
    
    # Running the query until the final station is found 
    while True:
        try:
            if (way =='forward'):
                queue,visited = new_step(queue,P,To,visited,way,istdaten)
            else :
                queue,visited = new_step(queue,P,From,visited,way,istdaten)
        except End as end: 
            # We found the soltution
            final_state = end.args[1]
            steps = pd.DataFrame(final_state.get_steps())
            break
        except Empty_queue:
            print('No way was found with this probability threshold, try again with a lower threshold or stay home')
            break
    return steps

## A few run to try 

In [26]:
def nice_time(t):
    """Useful for seeing time for debugging"""
    return datetime.fromtimestamp(abs(t)).strftime("%d.%m.%Y %H:%M")

In [28]:
start = time.time()

steps = main('Zürich Tiefenbrunnen','Zürich, Central','17.10.2017 17:45',0.9,'forward')

end = time.time()
print('took {}mins'.format((end - start)/60))
steps

took 5.866852414608002mins


Unnamed: 0,departure,destination,time of arrival,time of departure,type
0,Zürich Tiefenbrunnen,Zürich HB,2017-10-17 17:59:00,2017-10-17 17:53:00,S
1,Zürich HB,"Zürich, Central",2017-10-17 18:05:18,2017-10-17 17:59:59,Walk


In [42]:
start = time.time()

steps = main('Zürich Tiefenbrunnen','Zürich, Höschgasse','17.10.2017 17:45',0.7,'forward')

end = time.time()
print('took {}mins'.format((end - start)/60))
steps

took 1.059355584780375mins


Unnamed: 0,departure,destination,time of arrival,time of departure,type
0,Zürich Tiefenbrunnen,"Zürich, Wildbachstrasse",2017-10-17 17:49:35,2017-10-17 17:45:00,Walk
1,"Zürich, Wildbachstrasse","Zürich, Höschgasse",2017-10-17 17:58:00,2017-10-17 17:56:00,Unknown


In [29]:
start = time.time()

steps = main('Zürich Tiefenbrunnen','Zürich, Höschgasse','17.10.2017 18:05',0.7,'backward')

end = time.time()
print('took {}mins'.format((end - start)/60))
steps

took 2.2776575803756716mins


Unnamed: 0,departure,destination,time of arrival,time of departure,type
0,Zürich Tiefenbrunnen,"Zürich, Wildbachstrasse",2017-10-17 17:53:00,2017-10-17 17:48:24,Walk
1,"Zürich, Wildbachstrasse","Zürich, Höschgasse",2017-10-17 17:58:00,2017-10-17 17:56:00,Unknown


## Visualization

*NB :* Since computation is quite expensive and long a exemple of vizualization and result if offered in the Vizualisation notebook

In [31]:
import pickle
with open('./data/stations', 'rb') as stations_file:
     stations_list = pickle.load(stations_file)

In [32]:
def cb_desired_arrival(destination, departure, dt, p):
    # here, call the scheduler and return the obtained dataframe
    return main(departure, destination, dt, p, 'backward')
def cb_desired_departure(destination, departure, dt, p):
    # here, call the scheduler and return the obtained dataframe
    return main(departure, destination, dt, p, 'forward')

In [33]:
from notebook import notebookapp
servers = list(notebookapp.list_running_servers())
notebook_url = servers[0]['url']

In [34]:
import pandas as pd
from bokeh.plotting import curdoc
from bokeh.layouts import widgetbox
from bokeh.layouts import column, row, layout
from bokeh.models.layouts import Column
from bokeh.models import ColumnDataSource
from bokeh.models.widgets import DatePicker, Button, Select, RadioButtonGroup, \
                            Slider, AutocompleteInput, DataTable, DateFormatter, TableColumn, Paragraph
from datetime import date
from datetime import timedelta as td
from datetime import datetime
from bokeh.io import output_notebook
from bokeh.plotting import show
from bokeh.application import Application
from bokeh.application.handlers import FunctionHandler

output_notebook()

def modify_doc(doc):

    #crnt_date=dt.now()
    # datetime stuff
    dt_pckr=DatePicker(title='Date',min_date=date(2017,1,1),max_date=date.today(), value=date(2017, 10, 17))
    hours = Select(title="Hour:", value='12', options=[str(i) for i in range(24)])
    mins = Select(title="Min:", value='00', options=['{:02}'.format(i) for i in range(60)])
    # location stuff
    start = AutocompleteInput(title="Start:", completions=stations_list)
    destination = AutocompleteInput(title="Destination:", completions=stations_list)
    direction = RadioButtonGroup(labels=["Arrival At", "Departure At"], active=0)
    # probability stuff
    certainty_slider = Slider(start=0, end=1, value=0.7, step=.01, title="Certainty of making connections")
    # outputting
    # table
    frame = pd.DataFrame({'departure':[], 'destination':[], 'time of departure':[], 'time of arrival':[], 'type':[]})
    Columns = [TableColumn(field=Ci, title=Ci) for Ci in frame.columns] # bokeh columns
    init_source = ColumnDataSource(frame)
    data_table = DataTable(columns=Columns, source=init_source) # bokeh table
    # status text
    status_text = Paragraph(text = 'Welcome to our scheduler! Select your trip and hit Go!', width = 400)
    
    
    #button
    button = Button(label="Go", button_type="success")
    def button_click():
        if destination.value in stations_list and start.value in stations_list:
            date_string = dt_pckr.value.strftime('%d.%m.%Y')
            datetime_string = date_string + ' {}:{}'.format(hours.value, mins.value)
            if direction.active ==0:
                r = cb_desired_arrival(destination.value, start.value, datetime_string, certainty_slider.value)
            else:
                r = cb_desired_departure(destination.value, start.value, datetime_string, certainty_slider.value)
            data_table.source.data = ColumnDataSource(r).data
            status_text.text = 'Here is your connection. You can select another one.'
        elif destination.value in stations_list:
            status_text.text = 'You picked a wrong departure station'
        else:
            status_text.text = 'You picked a wrong destination station'


    #dt_pckr_strt.on_change('value',callback)
    button.on_click(button_click)

    doc.add_root(layout([[status_text],
                         [direction],
                         [dt_pckr, row(hours, mins, width=200)],
                         [start, destination],
                         [certainty_slider, button],
                         [data_table]]))

app = Application(FunctionHandler(modify_doc))
show(app, notebook_url=notebook_url[:-1])