VRIB algorithm

In [None]:
'''
The leg is structured as follows:
(trip_id, departure_node, departure_time, arrival_node, arrival_time, route_id, service_id)

trip_id = leg.iloc[0]
departure_node = leg[1]
departure_time = leg[2]
arrival_node = leg[3]
arrival_time = leg[4]
route_id = leg[5]
service_id = leg[6]
'''

Import Functions

In [None]:
from datetime import datetime, timedelta
import pandas as pd
import math
import scipy.stats as stats
from data_preparation import prepare_data,import_data
pd.set_option('display.max_colwidth', None)

Data Import

In [None]:
agency_df, stops_df, routes_df, trips_df, stop_times_df, calendar_df,calendar_dates_df = import_data()
legs_df = prepare_data(stops_df,trips_df,stop_times_df)

Reliability Functions

In [None]:
# Function to calculate the probability of a successful transfer between two subsequent legs
def calculate_transfer_probability(prev_leg: tuple, next_leg: tuple) -> float:
    # Check if the previous leg and the next leg have the same route ID
    if prev_leg[5] == next_leg[5]:
        # If they do, the transfer is guaranteed (probability = 1)
        return 1
    else:

        prev_arrival_time = datetime.strptime(prev_leg[4], "%H:%M:%S")
        next_departure_time = datetime.strptime(next_leg[2], "%H:%M:%S")
        
        # Calculate the transfer time in minutes
        transfer_time = (next_departure_time - prev_arrival_time).total_seconds() / 60
        if transfer_time < 2:
            return 0
       
        else:
            # Calculate the probability using the CDF of a Gamma distribution 
            return min(stats.gamma.cdf(transfer_time, a=2, scale=2),0.95)


# Function to calculate cumulative probabilities for a given itinerary
def calculate_cumulative_probability(itinerary:list) -> list[float]:
    # Initialize a list to store cumulative probabilities, starting with 1 (100% probability for the first leg)
    cumulative_probabilities = [1] 
    
    # Iterate through each pair of consecutive legs in the itinerary
    for i in range(len(itinerary) - 1):
        # Get the current leg (prev_leg) and the next leg (next_leg)
        prev_leg = itinerary[i]
        next_leg = itinerary[i + 1]

        # Calculate the probability of a successful transfer between the two legs
        transfer_prob = calculate_transfer_probability(prev_leg, next_leg)

        # Append the transfer probability to the cumulative probabilities list
        cumulative_probabilities.append(transfer_prob)

    return cumulative_probabilities


# Function to calculate the probability of arriving within the time budget given successful transfers
# Redundant due to network filtering
def calculate_arrival_probability(itinerary:list, start_time:str, time_budget:timedelta) -> int:
    # Check if all transfers in the itinerary can potentially be successful using cumulative probability
    if math.prod(calculate_cumulative_probability(itinerary)) > 0:  
        # If all transfers are successful, proceed to calculate the arrival time of the final leg of the itinerary
        destination_leg = itinerary[-1] 
        destination_arrival_time = destination_leg[4]  
        
        # Convert start time and destination arrival time to datetime objects for comparison
        start_time = datetime.strptime(start_time, "%H:%M:%S")
        destination_arrival_time = datetime.strptime(destination_arrival_time, "%H:%M:%S")

        # Calculate total travel time
        total_travel_time = destination_arrival_time - start_time

        # Check if the total travel time is within the specified time budget
        if total_travel_time <= time_budget:
            return 1  # Return 1 if arrival is within the time budget
        else:
            return 0  # Return 0 if arrival is beyond the time budget
    else:
        # Return 0 if any transfer in the itinerary is not possible
        return 0
    
# Function to calculate the reliability of a primary itinerary
def primary_itinerary_reliability(itinerary:list, start_time:str, time_budget:timedelta) -> float:
    '''
    Compute the overall reliability as the product of 
     - Arrival probability
     - Pruduct of cumulative probabilities of making a transfer
     '''
    
    # Calculate the cumulative probabilities for all transfers in the itinerary
    cumulative_probabilities = calculate_cumulative_probability(itinerary)
    
    # Compute the product of cumulative probabilities to determine the overall transfer success probability
    product_of_probabilities = math.prod(cumulative_probabilities)
    
    # Calculate the probability of arriving within the time budget
    arrival_probability = calculate_arrival_probability(itinerary, start_time, time_budget)
    
    reliability = arrival_probability * product_of_probabilities

    return reliability


# Function to calculate the reliability of a backup itinerary
# backup = (sequence of legs of backup starting from the next after transfer, duration)
def backup_itinerary_reliability(itinerary:list, backup: tuple, start_time:str, time_budget:timedelta) -> float:
    '''
     Calculate the backup reliability as the product of:
     - Arrival probability for the backup itinerary
     - Product of cumulative probabilities for the backup itinerary
     - Probability of a missing a transfer (1 - initial_transfer_prob)
     - Reliability of the primary itinerary up to the transfer point
    '''
    
    # Extract the sequence of legs for the backup itinerary
    backup_itinerary = backup[0]

    # Calculate the arrival and cumulative probabilities of the backup route
    arrival_probability = calculate_arrival_probability(backup_itinerary, start_time, time_budget)
    cumulative_probabilities = calculate_cumulative_probability(backup_itinerary)
    product_of_probabilities= math.prod(cumulative_probabilities)
    
    # Default initial transfer probability in case no missed transfer is identified
    initial_transfer_prob = 1 
    
    # Identify the leg in the primary itinerary where the backup starts
    for idx, leg in enumerate(itinerary[:-1]): 
        # Search for the leg of the primary itinerary that arrives on the node, where backup starts
        if leg[3] == backup_itinerary[0][1]:
            prev_leg = itinerary[idx] # The transfer leg 
            missed_leg = itinerary[idx + 1]  # The possibly missed leg after the transfer
            # Calculate the transfer probability for the transfer in the primary itinerary
            initial_transfer_prob = calculate_transfer_probability(prev_leg,missed_leg)
            # Calculate the reliability of the primary itinerary up to the point of the transfer
            primary_itinerary_rel_before_transfer = primary_itinerary_reliability(itinerary[:idx+1],start_time,time_budget)
            break
    
    
    
    backup_reliability = arrival_probability * product_of_probabilities * (1 - initial_transfer_prob)* primary_itinerary_rel_before_transfer
    
    return backup_reliability


# Function to calculate the reliability of a complete itinerary, including primary and backup itineraries
def itinerary_reliability(itinerary: list, Backups: list[tuple], start_time: str, time_budget: timedelta) -> float:
    '''
     Calculate the total reliability as 
     - Primary itinerary reliability
     - Backup itineraries reliability
    '''
    # Calculate the reliability of the primary itinerary
    primary_reliability = primary_itinerary_reliability(itinerary, start_time, time_budget)

    # If the primary itinerary has non-zero reliability
    if primary_reliability > 0:
        added_reliability = 0  # Initialize added reliability from backups to 0

        # Iterate through all backup itineraries
        for backup in Backups:
            # Calculate the reliability of the current backup itinerary
            backup_reliability = backup_itinerary_reliability(itinerary, backup, start_time, time_budget)
            # Accumulate the reliability from all backups
            added_reliability += backup_reliability

        complete_reliability = primary_reliability + added_reliability

        # Return the calculated complete reliability
        return complete_reliability
    else:
        # If the primary itinerary reliability is 0, return 0
        return 0.0


Network Filtering

In [None]:
# Function to get the available service IDs for a given start date
def get_available_service_ids(start_date:str) -> list: 
    start_date_datetime = datetime.strptime(start_date, "%Y-%m-%d")
    start_date_str = start_date_datetime.strftime("%Y%m%d")

    # Determine the weekday name for the given date
    weekday = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"][start_date_datetime.weekday()]

    # Initialize a list to store available service IDs
    available_service_ids = []

    # Check for regular service in the calendar dataframe
    for _, service in calendar_df.iterrows():
        service_id = service["service_id"]
        
        # Check if the given date is within the service's active date range
        if int(service["start_date"]) <= int(start_date_str) <= int(service["end_date"]):
            # Check if the service operates on the determined weekday
            if service[weekday] == 1:
                # Add the service ID to the list if not already present
                if service_id not in available_service_ids:
                    available_service_ids.append(service_id)

    # Check for exceptions in the calendar_dates dataframe
    # Filter the calendar_dates dataframe to only include rows matching the given date
    exceptions = calendar_dates_df[calendar_dates_df["date"] == int(start_date_str)]
    
    # Process each exception to adjust the list of available services
    for _, exception in exceptions.iterrows():
        service_id = exception["service_id"]
        
        if exception["exception_type"] == 2:  # Service is added as an exception
            # Add the service ID to the list if not already present
            if service_id not in available_service_ids:
                available_service_ids.append(service_id)
                
        elif exception["exception_type"] == 1:  # Service is removed as an exception
            # Remove the service ID from the list if it exists
            if service_id in available_service_ids:
                available_service_ids.remove(service_id)

    return available_service_ids


# Function to filter the network of legs based on time frame and available services
def filter_network(start_time:str, start_date:str, time_budget:timedelta) -> list:
    # Retrieve the list of available service IDs for the given date
    available_services = get_available_service_ids(start_date)

    # Initialize an empty list to store the filtered legs
    filtered_network = []

    # Calculate the end time based on the given time budget
    start_time = datetime.strptime(start_time, "%H:%M:%S")
    end_time = start_time + time_budget

    # Iterate through all rows in the `legs_df` dataframe
    for row in legs_df:
        # Extract the departure and arrival times for the current leg
        leg_departure_time = datetime.strptime(row[2], "%H:%M:%S")
        leg_arrival_time = datetime.strptime(row[4], "%H:%M:%S")

        # Check if the leg's departure and arrival times are within the time window
        if (
            leg_departure_time >= start_time  # Departure is after or at the start time
            and leg_departure_time <= end_time  # Departure is before or at the end time
            and leg_arrival_time <= end_time  # Arrival is before or at the end time
        ):
            # Extract the service ID for the current leg
            service_id = row[6]

            #  Check if the service ID is in the list of available services
            if service_id in available_services:
                # Add the leg to the filtered network
                filtered_network.append(row)

    return filtered_network

Helping functions

In [None]:
# Function to search for adjacent legs based on arrival node and time
def search_adjecent_legs(arrival_node:str, arrival_time:str, filtered_legs:list) -> list:
    # Convert the arrival_time to datetime format for comparison
    arrival_time = datetime.strptime(arrival_time, "%H:%M:%S")
    
    # Initialize an empty list to store adjacent legs
    adjecent_legs = []

    # Iterate through each filtered leg
    for i in range(0, len(filtered_legs)):
        leg = filtered_legs[i]

        # Convert the departure time of the current leg to datetime format
        leg_departure_time = datetime.strptime(leg[2], "%H:%M:%S")

        # Check if the leg starts from the given node and departs after the arrival time
        if leg[1] == arrival_node and leg_departure_time >= arrival_time:
            # Add the leg to the list of adjacent legs
            adjecent_legs.append(leg)

    return adjecent_legs

# Function to check if the last two legs in the itinerary involve a transfer
def is_transfer(itinerary:list) -> bool:
    # Get the second last leg in the itinerary (previous leg)
    prev_leg = itinerary[-2]
    
    # Get the last leg in the itinerary (next leg)
    next_leg = itinerary[-1]
    # Check if both legs have the same tripId (no transfer if they are the same)
    if prev_leg[5] == next_leg[5]:
        return False
    else:
        return True
    

# Function to calculate the total travel time for the itinerary
def travel_time(itinerary:list, start_time:str):
    # Get the final leg of the itinerary (destination leg)
    destination_leg = itinerary[-1]
    
    # Extract the arrival time of the destination leg
    destination_arrival_time = destination_leg[4]
        
    # Convert the start time and destination arrival time to datetime objects
    start_time = datetime.strptime(start_time, "%H:%M:%S")
    destination_arrival_time = datetime.strptime(destination_arrival_time, "%H:%M:%S")

    # Calculate the total travel time by subtracting start time from destination arrival time
    total_travel_time = destination_arrival_time - start_time
    
    return total_travel_time


# Function to find the index of the itinerary with the shortest duration
def find_min_index(LIST: list) -> int:
    
    # Initialize the minimum value with the duration of the first itinerary
    min_index = 0
    min_value = LIST[0][1]  # Setting the min valie to the duration of the first itinerary duration
    
    # Iterate over the remaining itineraries to find the one with the shortest duration
    for i in range(1, len(LIST)):
        current_value = LIST[i][1]  # Get the duration of the current itinerary
        
        # If the current duration is less than the minimum value, update the minimum value and index
        if current_value < min_value:
            min_value = current_value
            min_index = i

    return min_index


# Function to check and update the most reliable itinerary path (MRIB) if a better one is found
def check_and_update_mrib(shortest_path:list, MRIB_reliability:float, MRIB:list, start_time:str, time_budget:timedelta):
    
    # Extract the backup paths from the shortest path
    Backups = shortest_path[4][:]

    # Calculate the reliability of the itinerary 
    rel = itinerary_reliability(shortest_path[0], Backups, start_time, time_budget)

    # Check if the current path has a higher reliability than the existing MRIB
    if rel > MRIB_reliability:
        # Update MRIB and its reliability if the current path is more reliable
        MRIB_reliability = rel
        MRIB = shortest_path

    return MRIB_reliability, MRIB

Network Search Optimization

In [None]:
# Function for updating the visited stops dictionary with a new leg 
def update_visited_stops(visited_stops:dict, leg:tuple):
    
    # Extract destination stop from the leg 
    destination_stop = leg[1]
    
    # Convert the arrival time 
    arrival_time = datetime.strptime(leg[4], "%H:%M:%S")
    
    # Extract the trip_id 
    trip_id = leg[0]
    
    # Update the visited_stops dictionary: use destination_stop as the key, store a tuple with the arrival_time and trip_id as the value.
    visited_stops[destination_stop] = (arrival_time, trip_id)



#Function to check if a leg can be visited. Checks if the leg is the fastest to reache visited node, and if leg is direct connection to the last leg.
def can_visit_leg(leg:tuple, visited_stops:dict, shortest_path_trip_id:str, destination_node:str):
    
    # Get destination stop from the leg
    arrival_stop = leg[3]
    # Extract the trip_id (assumed to be at index 0)
    # Here trip_id was used instead of route_id due to many dublicating trips 
    trip_id = leg[0]
    
    # Convert the arrival time 
    arrival_time = datetime.strptime(leg[4], "%H:%M:%S")
    
    # If the destination stop is not the same as the destination node, check conditions for visiting
    if destination_node != arrival_stop:
        
        # If the stop has already been visited
        if arrival_stop in visited_stops:
            visited_time, visited_trip_id = visited_stops[arrival_stop]
            
            # Check If the trip_id is the same as the shortest path trip_id.
            if trip_id == shortest_path_trip_id:
                if arrival_time <= visited_time:  # If the arrival time is earlier or equal, update and visit
                    visited_stops[arrival_stop] = (arrival_time, trip_id)
                return True
            else:
                if arrival_time > visited_time:  # If arrival time is later, cannot visit
                    return False
                elif arrival_time == visited_time and trip_id == visited_trip_id:  # Same arrival time and trip id, can visit
                    return True
                elif arrival_time == visited_time:  # Same arrival time, but different trip ID, cannot visit
                    return False
                else:  # If the arrival time is earlier, update and visit
                    visited_stops[arrival_stop] = (arrival_time, trip_id)
                    return True

        else:
            # If the stop hasn't been visited, add it to the visited stops 
            visited_stops[arrival_stop] = (arrival_time, trip_id)
            return True
    
    # If the arrival node is the same as the destination stop, no checks are needed, so return True
    return True


# Filters legs to ensure departure from the origin occurs within the first 20% of the time budget.
def origin_node_filtering(origin_node: int, start_time: str, time_budget: timedelta, legs: list):
    """Function to filter the legs based on the departure time constraint (within the first 20% of the time budget)."""
    
    # Calculate the 20% time limit from the time budget
    max_departure_time = datetime.strptime(start_time, "%H:%M:%S") + time_budget * 0.20

    # Initialize an empty list to store the filtered legs
    filtered_legs = []
    
    # Loop through each leg and check if the departure time is within the acceptable range
    for leg in legs:
        departure_time = datetime.strptime(leg[2], "%H:%M:%S")  # Assuming departure time is stored in this location
        
        # If the departure time is within the first 20% of the time budget, add to filtered_legs
        if departure_time <= max_departure_time:
            filtered_legs.append(leg)

    return filtered_legs


Output transformation

In [None]:
def transform_route_info(MRIB,MRIB_reliability,Backups):
    primary_itinerary = MRIB  # Primary itinerary (first item in best_result_fast)
    reliability = MRIB_reliability  # Reliability score
    duration = MRIB[1]  # Duration of the trip
    arrival_time = primary_itinerary[-1][4]  # Arrival time

    
    # Initialize the grouped_routes list
    grouped_routes = []
    last_route = None  # To track the previous route for grouping

    # Step 1: Process the primary itinerary
    for i in range(len(primary_itinerary)):
        current_stop = primary_itinerary[i][1]
        route_id = primary_itinerary[i][5]
        departure_time = primary_itinerary[i][2]
        arrival_time = primary_itinerary[i][4]
        next_stop = primary_itinerary[i][3]

        # Grouping the routes based on the route ID
        if route_id == last_route:
            grouped_routes[-1]["stops"].append((next_stop, arrival_time))
        else:
            grouped_routes.append({
                "route_id": route_id,
                "start_stop": current_stop,
                "departure_time": departure_time,
                "stops": [(next_stop, arrival_time)]
            })
        last_route = route_id

    # Step 2: Print the grouped routes for the primary itinerary
    for segment in grouped_routes:
        start = segment["start_stop"]
        dep_time = segment["departure_time"]
        route = segment["route_id"]
        stops = " → ".join([f"{stop} (Arrival: {arr})" for stop, arr in segment["stops"]])
        print(f"  🚆 {start} (Departure: {dep_time}) → {stops} with Line {route}")

    # Step 3: Print additional details
    print(f"\n🎯 End station: {primary_itinerary[-1][3]} (Ankunft: {arrival_time})")
    print(f"🔹 Total route reliability: {reliability:.2f}\n")
    
    # Step 4: Process and print the backup routes
    if Backups:
        print("🔄 Backups:")
        for backup in Backups:
            backup_path = backup[0][0]
            backup_reliability = backup[1]
            grouped_backup_routes = []
            last_backup_route = None
            transfer_point = backup_path[0][1]

            print(f"\nBackup from {transfer_point} :")
            for i in range(len(backup_path)):
                current_stop = backup_path[i][1]
                route_id = backup_path[i][5]
                departure_time = backup_path[i][2]
                arrival_time = backup_path[i][4]
                next_stop = backup_path[i][3]

                # Group backup routes by route ID
                if route_id == last_backup_route:
                    grouped_backup_routes[-1]["stops"].append((next_stop, arrival_time))
                else:
                    grouped_backup_routes.append({
                        "route_id": route_id,
                        "start_stop": current_stop,
                        "departure_time": departure_time,
                        "stops": [(next_stop, arrival_time)]
                    })
                last_backup_route = route_id

            # Print backup route segments
            for segment in grouped_backup_routes:
                start = segment["start_stop"]
                dep_time = segment["departure_time"]
                route = segment["route_id"]
                stops = " → ".join([f"{stop} (Arrival: {arr})" for stop, arr in segment["stops"]])
                print(f"  🚆 {start} (Departure: {dep_time}) → {stops} with Line {route}")

            print(f"🔹 Total reliability of backup routes: {backup_reliability:.2f}\n")
            


Primary Itinerary Search

In [None]:
def find_primary_path(origin_node: str, destination_node: str, start_time: str, time_budget: timedelta, filtered_legs:list):
    """
    This function finds the primary path from the origin node to the destination node within the specified time budget.

    Input:
    origin_node (str): The ID of the origin node where the path starts.
    destination_node (str): The ID of the destination node where the path ends.
    start_time (str): The starting time of the journey in "HH:MM:SS" format.
    time_budget (timedelta): The total time available for the journey.
    filtered_legs (list): A list of filtered legs that are available for the search.

    Output:
    LISTofCompletedTRIPS (list): A list of completed trips that represent the primary paths from the origin node to the destination node.
    """

    '''Initial setup'''
    LISTofTRIPS = []  # List to store all possible trips
    LISTofCompletedTRIPS = []  # List to store completed trips
    visited_stops = {}  # Dictionary to keep track of visited stops
    new_time_budget = time_budget  # Store updated time budget
    n = 0  # Counter for iterations

    # Initialize passed stops with the origin node
    passed_stops = [origin_node]  

    '''Search initial adjacent legs'''
    adjecent_legs = search_adjecent_legs(origin_node, start_time, filtered_legs) 
    # Filter legs based on departure time constraint
    filtered_adj_legs = origin_node_filtering(origin_node, start_time, time_budget, adjecent_legs)  

    # Additional filtering if the origin node is a Wien Hauptbahnhof
    # Double filtering is necessary, as the adjacent legs are not in chronological order
    if origin_node == "Wien Hauptbahnhof":
        filtered_adj_legs = [
            leg for leg in filtered_adj_legs
            if can_visit_leg(leg, visited_stops, "", destination_node)  
        ]
        
        filtered_adj_legs = [
            leg for leg in adjecent_legs
            if can_visit_leg(leg, visited_stops, "", destination_node)  
        ]
        
    # Process filtered adjacent legs and add possible trips to the list
    for leg in filtered_adj_legs:
        itinerary = [leg]  # Start the itinerary with the current leg
        duration = travel_time(itinerary, start_time)  # Calculate the travel time for this itinerary
        reliability = primary_itinerary_reliability(itinerary, start_time, time_budget)  
        if reliability > 0 and timedelta(seconds=0) < duration <= time_budget:
            LISTofTRIPS.append([itinerary, duration])  # Add valid trip to the list

    '''Main Loop'''
    while LISTofTRIPS:
        n += 1  # Increment iteration count

        visited_stops_n = visited_stops.copy()  # Copy the visited stops as they are unique to each itinerary
        passed_stops_n = passed_stops[:]  # Copy the passed stops list

        # Find the shortest itinerary from the list of trips
        min_index = find_min_index(LISTofTRIPS)  
        shortest_path = LISTofTRIPS.pop(min_index)  

        tail = shortest_path[0][-1]  # Get the last leg of the trip

        # Check if the destination has been reached
        if tail[3] == destination_node:
            if len(LISTofCompletedTRIPS) == 0:
                 # Update the time budget if first destination reached
                new_time_budget = min(1.2 * shortest_path[1], time_budget)  

            # Add the completed trip to the list
            LISTofCompletedTRIPS.append(shortest_path)  
            if len(LISTofCompletedTRIPS) <= 15:
                continue  # Continue the search if fewer than 15 trips are completed
            else:
                break  # Exit if more than 15 trips are completed

        # Update visited stops and passed stops for each leg of the shortest path
        for leg in shortest_path[0]:
            update_visited_stops(visited_stops_n, leg)  # Update the visited stops
            destination_stop = leg[3]  # Get the destination stop of the current leg
            passed_stops_n.append(destination_stop)  # Add the destination stop to the passed stops list

        # Search for adjacent legs from the tail's destination
        next_legs = search_adjecent_legs(tail[3], tail[4], filtered_legs)  # Find all adjacent legs to the tail leg
        next_legs = [leg for leg in next_legs if leg[3] not in passed_stops_n]  # Filter out legs that have already been passed
        # Filter out legs that is outside allowed transfer time range, except if direct connection
        next_legs = [
            leg for leg in next_legs
            if leg[5] == tail[5] or
            (datetime.strptime(leg[2], "%H:%M:%S") >= (datetime.strptime(tail[4], "%H:%M:%S") + timedelta(minutes=2)) 
            and datetime.strptime(leg[2], "%H:%M:%S") <= (datetime.strptime(tail[4], "%H:%M:%S") + timedelta(minutes=30)))
        ]

        # Further filter the next legs based on whether they can be visited
        next_legs = [
            leg for leg in next_legs
            if can_visit_leg(leg, visited_stops_n, tail[0], destination_node)
        ]
        next_legs = [
            leg for leg in next_legs
            if can_visit_leg(leg, visited_stops_n, tail[0], destination_node)
        ]

        for leg in next_legs:
            itinerary = shortest_path[0] + [leg]  # Add the current leg to the itinerary
            duration = travel_time(itinerary, start_time)  # Calculate the new travel time
            reliability = primary_itinerary_reliability(itinerary, start_time, new_time_budget)  # Calculate the new reliability
            if reliability > 0 and timedelta(seconds=0) < duration <= new_time_budget:
                LISTofTRIPS.append([itinerary, duration])  # Add valid trip to the list

    return LISTofCompletedTRIPS  


Backup Itinerary Search

In [None]:
def backup_search(shortest_next_itinerary:list, destination_node:str, start_time:str, time_budget:timedelta, filtered_legs:list):
    """
    This function searches for backup paths to the destination node after a missed transfer in the primary itinerary.
    
    Input:
    shortest_next_itinerary (list): A list of legs representing the primary itinerary, including the transfer leg.
    destination_node (str): The ID of the destination node where the journey ends.
    start_time (str): The starting time of the journey in "HH:MM:SS" format.
    time_budget (timedelta): The total time available for the backup journey.
    filtered_legs (list): A list of filtered available legs for the backup journey.

    Output:
    MRB (tuple): The most reliable backup path and its duration.
    MRB_reliability (float): The reliability of the most reliable backup path.
    """
    
    ''' Initial Setup '''
    MRB_reliability = 0  # Initialize the reliability of the most reliable backup path
    MRB = None  # Placeholder for the most reliable backup path
    LIST_Backups = []  # List to store possible backup paths
    new_time_budget = time_budget  # Store updated time budget 
    visited_stops_b = {}  # Dictionary to track visited stops in the backup path
    passed_stops_b = []  # List to track passed stops in the backup path

    transfer_leg = shortest_next_itinerary[-2]  # Get the transfer leg from the primary itinerary
    primary_itinerary = shortest_next_itinerary[:-1]  # Get the primary itinerary before the transfer
    missed_leg_departure = datetime.strptime(shortest_next_itinerary[-1][2], "%H:%M:%S")  # Get departure time of missed leg
    transfer_point = transfer_leg[3]  # Get the transfer point (destination stop of the transfer leg)
    
    # Update the visited stops and passed stops for the primary itinerary
    for leg in primary_itinerary:
        update_visited_stops(visited_stops_b, leg)
        arrival_stop = leg[3]
        passed_stops_b.append(arrival_stop)

    '''Search initial adjacent legs'''
    adjecent_legs = search_adjecent_legs(transfer_point, primary_itinerary[-1][2], filtered_legs)

    # Filter out adjacent legs that have already been passed
    adjecent_legs = [leg for leg in adjecent_legs if leg[3] not in passed_stops_b]

    # Filter adjacent legs that depart after the missed leg's departure + 2 minutes
    adjecent_legs = [
        leg for leg in adjecent_legs
        if datetime.strptime(leg[2], "%H:%M:%S") >= (missed_leg_departure + timedelta(minutes=2))
    ]
    
    # Further filter to check if the leg can be visited 
    adjecent_legs = [
        leg for leg in adjecent_legs
        if can_visit_leg(leg, visited_stops_b, "", destination_node)
    ]

    # Process each filtered adjacent leg and add to the list
    for leg in adjecent_legs:
        backup_legs = [leg]  # Start the backup itinerary with the current leg
        b_duration = travel_time(backup_legs, start_time)  # Calculate the travel time for this backup itinerary
        backup_full = (backup_legs, b_duration)  # Create a tuple with the backup itinerary and its duration
        if timedelta(seconds=0) < b_duration <= time_budget: 
            LIST_Backups.append(backup_full)  # Add valid backup path to the list
    
    '''Main loop'''
    while len(LIST_Backups) > 0:
        min_index_b = 0  
        min_value_b = LIST_Backups[0][1]  # Get the arrival time of the last leg of the first backup

        # Find the backup path with the shortest duration
        for i in range(1, len(LIST_Backups)):
            current_value_b = LIST_Backups[i][1]
            if current_value_b < min_value_b:
                min_value_b = current_value_b
                min_index_b = i
        
        shortest_backup = LIST_Backups.pop(min_index_b) 

        # Update passed stops with the destination stop of each leg in the backup path
        for leg in shortest_backup[0]:
            destination_stop = leg[3]
            passed_stops_b.append(destination_stop)

        b_tail = shortest_backup[0][-1]  # Get the last leg (tail) of the backup path

        # Check if the destination node has been reached in the backup path
        if b_tail[3] == destination_node:
            if MRB is None:  
                # If no backup path has been found yet, update the time budget
                new_time_budget = min(1.2 * shortest_backup[1], time_budget) 

            # Calculate backup reliability
            rel = backup_itinerary_reliability(shortest_next_itinerary, shortest_backup, start_time, time_budget)  
            if round(rel, 4) > MRB_reliability:  # If the reliability is higher than the current best, update MRB
                MRB_reliability = round(rel, 4)
                MRB = shortest_backup  
        else:
            # Search for adjacent legs from the backup path's tail leg
            next_legs_b = search_adjecent_legs(b_tail[3], b_tail[4], filtered_legs)
            next_legs_b = [leg for leg in next_legs_b if leg[3] not in passed_stops_b]  # Filter out already passed stops
            
            # Filter adjacent legs based on time and connection constraints
            next_legs_b = [
                leg for leg in next_legs_b
                if leg[5] == b_tail[5] or datetime.strptime(leg[2], "%H:%M:%S") >= (datetime.strptime(b_tail[4], "%H:%M:%S") + timedelta(minutes=2))
            ]
            
            # Further filter to ensure the leg can be visited
            next_legs_b = [
                leg for leg in next_legs_b
                if can_visit_leg(leg, visited_stops_b, b_tail[0], destination_node)
            ]
            next_legs_b = [
                leg for leg in next_legs_b
                if can_visit_leg(leg, visited_stops_b, b_tail[0], destination_node)
            ]

            for leg in next_legs_b:
                backup_legs = shortest_backup[0][:]  # Copy the current backup itinerary
                backup_legs.append(leg)  # Add the current leg to the backup itinerary
                b_duration = travel_time(backup_legs, start_time)  # Calculate the backup itinerary's duration
                backup_full = (backup_legs, b_duration)  
                b_reliability = backup_itinerary_reliability(shortest_next_itinerary, backup_full, start_time, new_time_budget)  # y
                if timedelta(seconds=0) < b_duration <= new_time_budget and round(b_reliability, 4) > MRB_reliability:
                    LIST_Backups.append(backup_full)  # Add backup to the list if exceeds MRB reliability

    # Return the most reliable backup path and its reliability
    return MRB, MRB_reliability


Main Algorithm

In [None]:
def find_path(origin_node:str, destination_node:str, start_datetime:str, time_budget:timedelta):
    """
    This function finds the very reliable path from the origin node to the destination node, including primary and backup itineraries.
    
    Input:
    origin_node (str): The starting node of the journey.
    destination_node (str): The destination node of the journey.
    start_datetime (str): The starting datetime of the journey in "YYYY-MM-DD HH:MM:SS" format.
    time_budget (timedelta): The total time available for the journey.
    
    Output:
    MRIB_reliability (float): The reliability of the most reliable itinerary.
    MRIB (list): The most reliable primary itinerary.
    MRIB_Backups (list): A list of backup itineraries corresponding to the primary itinerary.
    """
    
    '''Initial setup'''
    MRIB_reliability = 0  # Track the reliability of the most reliable itinerary
    MRIB = None  # Placeholder for the most reliable primary itinerary
    MRIB_Backups = None  # Placeholder for the backups of the most reliable itinerary
    transfer_dict = {}  # Dictionary to store transfer points and their backup paths

    start_date, start_time = start_datetime.split()  

    #Filter netwotk
    filtered_legs = filter_network(start_time, start_date, time_budget)  
    
    '''Find Primary Routes'''
    completed_primary_trips = find_primary_path(origin_node, destination_node, start_time, time_budget, filtered_legs)  

    '''Backup Loop'''
    # Process each completed primary trip
    for trips in completed_primary_trips:
        primary_itinerary = trips[0]  # Extract the primary itinerary from the trip
        growing_itinerary = []  # Start with an empty list for the growing itinerary
        Backups = []  # List to store backup paths
    
        # Iterate over the legs in the primary itinerary
        for leg in primary_itinerary:
            growing_itinerary.append(leg)  # Add the current leg to the growing itinerary
            
            if len(growing_itinerary) > 1:
                if is_transfer(growing_itinerary):  # Check for transfer
                    # Define the transfer point
                    transfer = (
                        growing_itinerary[-2][3],  # Arrival stop of transfer leg 
                        growing_itinerary[-2][4],  # Arrival time of transfer leg 
                        growing_itinerary[-1][1],  # Departure stop of next leg 
                        growing_itinerary[-1][2]   # Departure time of next leg 
                    )
                    
                    # Initialize variables for the backup search
                    MRB_reliability = 0
                    MRB = None
                    
                    # If the transfer is not in the transfer dictionary, perform backup search
                    if transfer not in transfer_dict.keys():
                        MRB, MRB_reliability = backup_search(growing_itinerary, destination_node, start_time, time_budget, filtered_legs)
                        # Store the result in the dictionary
                        transfer_dict[transfer] = MRB, MRB_reliability  
                    else:
                        # Retrieve the result from the dictionary
                        MRB, MRB_reliability = transfer_dict.get(transfer)  
                    
                    # If a valid backup path (MRB) is found, add it to the backups list
                    if MRB:
                        Backups.append((MRB, MRB_reliability))

        # Calculate the reliability of the current primary itinerary with its backups
        reliability = itinerary_reliability(primary_itinerary, Backups, start_time, time_budget)
        
        # If the current itinerary's reliability is better than the previous MRIB, update MRIB and its backups
        if round(reliability, 4) > MRIB_reliability:
            MRIB_reliability = round(reliability, 4)
            MRIB = primary_itinerary
            MRIB_Backups = Backups
    
 
    return MRIB_reliability, MRIB, MRIB_Backups


In [None]:
#Example of input
origin_node = "Wien Leopoldau"  
destination_node = "Laa/Thaya Bahnhof" 
start_time = "2024-12-12 9:00:00"
time_budget = timedelta(hours=1, minutes = 30)

MRIB_reliability, MRIB, Backups = find_path(origin_node,destination_node,start_time,time_budget)

In [None]:
# Example of results display
transform_route_info(MRIB,MRIB_reliability,Backups)