In [1]:
# Boilerplate for AI Assignment — Knowledge Representation, Reasoning and Planning
# CSE 643

# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import networkx as nx
from pyDatalog import pyDatalog
from collections import defaultdict, deque

## ****IMPORTANT****
## Don't import or use any other libraries other than defined above
## Otherwise your code file will be rejected in the automated testing

# ------------------ Global Variables ------------------
route_to_stops = defaultdict(list)  # Mapping of route IDs to lists of stops
trip_to_route = {}                   # Mapping of trip IDs to route IDs
stop_trip_count = defaultdict(int)    # Count of trips for each stop
fare_rules = {}                      # Mapping of route IDs to fare information
merged_fare_df = None                # To be initialized in create_kb()

# Load static data from GTFS (General Transit Feed Specification) files
df_stops = pd.read_csv('GTFS/stops.txt')
df_routes = pd.read_csv('GTFS/routes.txt')
df_stop_times = pd.read_csv('GTFS/stop_times.txt')
df_fare_attributes = pd.read_csv('GTFS/fare_attributes.txt')
df_trips = pd.read_csv('GTFS/trips.txt')
df_fare_rules = pd.read_csv('GTFS/fare_rules.txt')

# ------------------ Function Definitions ------------------

# Function to create knowledge base from the loaded data
def create_kb():
    """
    Create knowledge base by populating global variables with information from loaded datasets.
    It establishes the relationships between routes, trips, stops, and fare rules.
    
    Returns:
        None
    """
    global route_to_stops, trip_to_route, stop_trip_count, fare_rules, merged_fare_df

    # Create trip_id to route_id mapping"""
    trip_to_route = defaultdict(list)
    for _, row in df_trips.iterrows():
        trip_to_route[row['trip_id']].append(row['route_id'])
        
    # Map route_id to a list of stops in order of their sequence"""
    route_to_stops = defaultdict(list)
    sorted_stop_times = df_stop_times.sort_values(['trip_id', 'stop_sequence'])
    sorted_stop_times.head
    for trip_id, stop_grp in sorted_stop_times.groupby('trip_id'):
        if trip_id in trip_to_route:
            route_id = trip_to_route[trip_id][0]
            stops = stop_grp['stop_id'].to_list()
            route_to_stops[route_id].extend(stops)
            
    # Ensure each route only has unique stops"""
    route_to_stops = {
        route: [x for i, x in enumerate(stops) if x not in stops[:i]]
        for route, stops in route_to_stops.items()
    }
    
    # Count trips per stop"""
    stop_trip_count = dict(df_stop_times['stop_id'].value_counts())

    # Create fare rules for routes
    fare_rules = {}
    for i in range(len(df_fare_rules['route_id'])):
        route_id = df_fare_rules['route_id'][i]
        fare_id = df_fare_rules['fare_id'][i]

        if route_id not in fare_rules:
            fare_rules[route_id] = []

        fare_rules[route_id].append(fare_id)

    # Merge fare rules and attributes into a single DataFrame
    merged_fare_df = pd.merge(
        df_fare_rules,
        df_fare_attributes,
        on='fare_id',
        how='left'
    )

# Function to find the top 5 busiest routes based on the number of trips
def get_busiest_routes():
    """
    Identify the top 5 busiest routes based on trip counts.

    Returns:
        list: A list of tuples, where each tuple contains:
              - route_id (int): The ID of the route.
              - trip_count (int): The number of trips for that route.
    """
    route_trip_counts = defaultdict(int)
    for trip_id, routes in trip_to_route.items():
        # Since we stored routes as a list in trip_to_route, take the first route
        route_id = routes[0]
        route_trip_counts[route_id] += 1
    
    res = sorted(route_trip_counts.items(), key = lambda ele: ele[1], reverse = True)
    
    # Return top 5 routes
    return res[:5]
    pass  # Implementation here

# Function to find the top 5 stops with the most frequent trips
def get_most_frequent_stops():
    """
    Identify the top 5 stops with the highest number of trips.

    Returns:
        list: A list of tuples, where each tuple contains:
              - stop_id (int): The ID of the stop.
              - trip_count (int): The number of trips for that stop.
    """
    res = sorted(stop_trip_count.items(), key = lambda ele: ele[1], reverse = True)
    
    # Return top 5 routes
    return res[:5]
    pass  # Implementation here

# Function to find the top 5 busiest stops based on the number of routes passing through them
def get_top_5_busiest_stops():
    """
    Identify the top 5 stops with the highest number of different routes.

    Returns:
        list: A list of tuples, where each tuple contains:
              - stop_id (int): The ID of the stop.
              - route_count (int): The number of routes passing through that stop.
    """
    stop_routes = defaultdict(set)
    for route_id, stops in route_to_stops.items():
        for stop_id in stops:
            stop_routes[stop_id].add(route_id)
            
    stop_counts = []
    for stop_id, routes in stop_routes.items():
        route_count = len(routes)
        stop_counts.append((stop_id, route_count))

    res = sorted(stop_counts, key = lambda ele: ele[1], reverse = True)
    
    return res[:5]
    pass  # Implementation here

# Function to identify the top 5 pairs of stops with only one direct route between them
def get_stops_with_one_direct_route():
    """
    Identify the top 5 pairs of consecutive stops (start and end) connected by exactly one direct route. 
    The pairs are sorted by the combined frequency of trips passing through both stops.

    Returns:
        list: A list of tuples, where each tuple contains:
              - pair (tuple): A tuple with two stop IDs (stop_1, stop_2).
              - route_id (int): The ID of the route connecting the two stops.
    """
    connections = {}
    
    for route_id, stops in route_to_stops.items():
        for i in range(len(stops) - 1):
            current_stop = stops[i]
            next_stop = stops[i + 1]
            
            # same order for consistency
            stop_pair = (current_stop, next_stop) if current_stop < next_stop else (next_stop, current_stop)

            
            # Add the route to this pair's list of routes
            if stop_pair not in connections:
                connections[stop_pair] = []
            connections[stop_pair].append(route_id)
    
    result = []
    for stop_pair, routes in connections.items():
        if len(routes) == 1:
            result.append((stop_pair, routes[0]))
    
    return result
    pass  # Implementation here

# Function to get merged fare DataFrame
# No need to change this function
def get_merged_fare_df():
    """
    Retrieve the merged fare DataFrame.

    Returns:
        DataFrame: The merged fare DataFrame containing fare rules and attributes.
    """
    global merged_fare_df
    return merged_fare_df

# Visualize the stop-route graph interactively
def visualize_stop_route_graph_interactive(route_to_stops):
    """
    Visualize the stop-route graph using Plotly for interactive exploration.

    Args:
        route_to_stops (dict): A dictionary mapping route IDs to lists of stops.

    Returns:
        None
    """
    # Step 1: Create a network graph
    grph = nx.Graph()
    
    # Step 2: Add edges (connections between stops) from each route
    edge_colors = []
    edge_labels = []
    
    unique_routes = list(route_to_stops.keys())
    color_palette = plt.cm.get_cmap('hsv')(np.linspace(0, 1, len(unique_routes)))
    
    rt_color_map = {}
    for i in range (len(unique_routes)):
        route_id = unique_routes[i]
        r, g, b, _ = color_palette[i]
        
        color_string = f'rgb({int(r*255)},{int(g*255)},{int(b*255)})'
        rt_color_map[route_id] = color_string
    
    for route_id, stops_on_route in route_to_stops.items():
        route_name = df_routes[df_routes['route_id'] == route_id]['route_long_name'].iloc[0]
        for i in range(len(stops_on_route) - 1):
            current_stop = stops_on_route[i]
            next_stop = stops_on_route[i + 1]
            
            grph.add_edge(current_stop, next_stop)
            
            edge_colors.append(rt_color_map[route_id])
            edge_labels.append(f"Route: {route_name}")
        
    # Step 3: Calculate state_positions for the graph
    stop_positions = nx.shell_layout(grph)
    
    # Step 4: Create edge trace
    edge_trace = []
    for i in range (len(grph.edges())):
        edge = list(grph.edges())[i]
        color = edge_colors[i]
        label = edge_labels[i]
        
        x0, y0 = stop_positions[edge[0]]
        x1, y1 = stop_positions[edge[1]]
        
        
        edge_trace.append(
            go.Scatter(
                x=[x0, x1, None],
                y=[y0, y1, None],
                line=dict(width=2, color=color),
                hoverinfo='text',
                text=label,
                mode='lines'
            )
        )
        
    # Step 5: Create node trace
    node_x = []
    node_y = []
    node_text = []
    
    for node in grph.nodes():
        x, y = stop_positions[node]
        node_x.append(x)
        node_y.append(y)
        # Get stop name for hover text
        stop_name = df_stops[df_stops['stop_id'] == node]['stop_name'].iloc[0]
        node_text.append(f"Stop: {stop_name}<br>ID: {node}")
    
    node_trace = go.Scatter(
        x=node_x,
        y=node_y,
        mode='markers+text',
        hoverinfo='text',
        text=node_text,
        marker=dict(
            size=10,
            color='lightblue',
            line=dict(width=2, color='darkblue')
        )
    )
    
    # Step 6: Create the figure
    fig = go.Figure(data=edge_trace + [node_trace],
                   layout=go.Layout(
                       title='Transit Network Map',
                       showlegend=False,
                       hovermode='closest',
                       margin=dict(b=0, l=0, r=0, t=40),
                       xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                       yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                       plot_bgcolor='white'
                   ))
    
    # Add a legend showing route colors
    for route_id in unique_routes:
        route_name = df_routes[df_routes['route_id'] == route_id]['route_long_name'].iloc[0]
        fig.add_trace(go.Scatter(
            x=[None],
            y=[None],
            mode='lines',
            name=route_name,
            line=dict(color=rt_color_map[route_id], width=2),
            showlegend=True
        ))
    
    # Step 7: Show the plot
    fig.show()
    pass  # Implementation here

# Brute-Force Approach for finding direct routes
def direct_route_brute_force(start_stop, end_stop):
    """
    Find all valid routes between two stops using a brute-force method.

    Args:
        start_stop (int): The ID of the starting stop.
        end_stop (int): The ID of the ending stop.

    Returns:
        list: A list of route IDs (int) that connect the two stops directly.
    """
    
    dir_routes = []
    for route_id, stops in route_to_stops.items():
        if start_stop in stops and end_stop in stops:
            # Check if start_stop comes before end_stop in the sequence
            if stops.index(start_stop) < stops.index(end_stop):
                dir_routes.append(route_id)
    return dir_routes
    
    
    
    # dir_routes = []
    # for route_id, stops in route_to_stops.items():
    #     start_indexes = [i for i, stop in enumerate(stops) if stop == start_stop]
    #     end_indexes = [i for i, stop in enumerate(stops) if stop == end_stop]
    #     for start_idx in start_indexes:
    #         for end_idx in end_indexes:
    #             if end_idx > start_idx:
    #                 dir_routes.append(route_id)
    #                 break
    #         if route_id in dir_routes:
    #             break
            
    # return sorted(dir_routes)
    
    pass  # Implementation here

# Initialize Datalog predicates for reasoning
pyDatalog.create_terms('RouteHasStop, DirectRoute, OptimalRoute, X, Y, Z, R, R1, R2')  
def initialize_datalog():
    """
    Initialize Datalog terms and predicates for reasoning about routes and stops.

    Returns:
        None
    """
    pyDatalog.clear()  # Clear previous terms
    print("Terms initialized: DirectRoute, RouteHasStop, OptimalRoute")  # Confirmation print

    # Define Datalog predicates

    create_kb()  # Populate the knowledge base
    add_route_data(route_to_stops)  # Add route data to Datalog
    
    
# Adding route data to Datalog
def add_route_data(route_to_stops):
    """
    Add the route data to Datalog for reasoning.

    Args:
        route_to_stops (dict): A dictionary mapping route IDs to lists of stops.

    Returns:
        None
    """
    for route_id, stops in route_to_stops.items():
        for stop_id in stops:
            +RouteHasStop(route_id, stop_id)

# Function to query direct routes between two stops
def query_direct_routes(start, end):
    """
    Query for direct routes between two stops.

    Args:
        start (int): The ID of the starting stop.
        end (int): The ID of the ending stop.

    Returns:
        list: A sorted list of route IDs (str) connecting the two stops.
    """
    
    # Query for routes that contain both start and end stops
    query_result = pyDatalog.ask(f'RouteHasStop(R, {start}) & RouteHasStop(R, {end})')
    
    # Process the results, assuming each answer contains a single route_id
    if query_result:
        return sorted(set(route_id[0] for route_id in query_result.answers))
    return []
    
    pass  # Implementation here

# Forward chaining for optimal route planning
def forward_chaining(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Perform forward chaining to find optimal routes considering transfers.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        stop_id_to_include (int): The stop ID where a transfer occurs.
        max_transfers (int): The maximum number of transfers allowed.

    Returns:
        list: A list of unique paths (list of tuples) that satisfy the criteria, where each tuple contains:
              - route_id (int): The ID of the route.
              - stop_id (int): The ID of the stop.
    """
    
    
    
    pass  # Implementation here

# Backward chaining for optimal route planning
def backward_chaining(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Perform backward chaining to find optimal routes considering transfers.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        stop_id_to_include (int): The stop ID where a transfer occurs.
        max_transfers (int): The maximum number of transfers allowed.

    Returns:
        list: A list of unique paths (list of tuples) that satisfy the criteria, where each tuple contains:
              - route_id (int): The ID of the route.
              - stop_id (int): The ID of the stop.
    """
    pass  # Implementation here

# PDDL-style planning for route finding
def pddl_planning(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Implement PDDL-style planning to find routes with optional transfers.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        stop_id_to_include (int): The stop ID for a transfer.
        max_transfers (int): The maximum number of transfers allowed.

    Returns:
        list: A list of unique paths (list of tuples) that satisfy the criteria, where each tuple contains:
              - route_id (int): The ID of the route.
              - stop_id (int): The ID of the stop.
    """
    pass  # Implementation here

# Function to filter fare data based on an initial fare limit
def prune_data(merged_fare_df, initial_fare):
    """
    Filter fare data based on an initial fare limit.

    Args:
        merged_fare_df (DataFrame): The merged fare DataFrame.
        initial_fare (float): The maximum fare allowed.

    Returns:
        DataFrame: A filtered DataFrame containing only routes within the fare limit.
    """
    pass  # Implementation here

# Pre-computation of Route Summary
def compute_route_summary(pruned_df):
    """
    Generate a summary of routes based on fare information.

    Args:
        pruned_df (DataFrame): The filtered DataFrame containing fare information.

    Returns:
        dict: A summary of routes with the following structure:
              {
                  route_id (int): {
                      'min_price': float,          # The minimum fare for the route
                      'stops': set                # A set of stop IDs for that route
                  }
              }
    """
    pass  # Implementation here

# BFS for optimized route planning
def bfs_route_planner_optimized(start_stop_id, end_stop_id, initial_fare, route_summary, max_transfers=3):
    """
    Use Breadth-First Search (BFS) to find the optimal route while considering fare constraints.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        initial_fare (float): The available fare for the trip.
        route_summary (dict): A summary of routes with fare and stop information.
        max_transfers (int): The maximum number of transfers allowed (default is 3).

    Returns:
        list: A list representing the optimal route with stops and routes taken, structured as:
              [
                  (route_id (int), stop_id (int)),  # Tuple for each stop taken in the route
                  ...
              ]
    """
    pass  # Implementation here


In [2]:

# Sample public test inputs with expected outputs explicitly defined
test_inputs = {
    "direct_route": [
        ((2573, 1177), [10001, 1117, 1407]),  # Input -> Expected output
        ((2001, 2005), [10001, 1151])
    ],

    "forward_chaining": [
        ((22540, 2573, 4686, 1), [(10153, 4686, 1407)]),
        ((951, 340, 300, 1), [(294, 300, 712), (10453, 300, 712), (1211, 300, 712), (1158, 300, 712), 
                              (37, 300, 712), (1571, 300, 712), (49, 300, 712), (387, 300, 712), 
                              (1206, 300, 712), (1038, 300, 712), (10433, 300, 712), (121, 300, 712)])
    ],
    "backward_chaining": [
        ((2573, 22540, 4686, 1), [(1407, 4686, 10153)]),
        ((340, 951, 300, 1), [(712, 300, 121), (712, 300, 1211), (712, 300, 37), (712, 300, 387),
                              (712, 300, 49), (712, 300, 10453), (712, 300, 1038), (712, 300, 10433),
                              (712, 300, 1571)])
    ],
    "pddl_planning": [
        ((22540, 2573, 4686, 1), [(10153, 4686, 1407)]),
        ((951, 340, 300, 1), [(294, 300, 712), (10453, 300, 712), (1211, 300, 712), (1158, 300, 712), 
                              (37, 300, 712), (1571, 300, 712), (49, 300, 712), (387, 300, 712), 
                              (1206, 300, 712), (1038, 300, 712), (10433, 300, 712), (121, 300, 712)])
    ],
    "bfs_route": [
        ((22540, 2573, 10, 3), [(10153, 4686), (1407, 2573)]),
        ((4012, 4013, 10, 3), [(10004, 4013)])
    ],

    ### NOTE: The below values are just dummy values, the actual values are might differ! 
    "busiest_routes": [
        [(123, 456), (789, 234), (567, 235), (3456, 897), (345, 345)]
    ],
    "most_frequent_stops": [
        [(456, 456), (234, 765), (234, 765), (234, 657765), (3252, 35634)]
    ],
    "busiest_stops": [
        [(432243, 14543), (454235, 2452), (2452, 2454), (78568, 24352), (42352, 24532)]
    ],
    "stops_with_one_direct_route": [
        [((24527, 676), 542), ((243535, 8768), 2456), ((43262, 564), 65437),
         ((256, 56), 245), ((266, 256), 78)]
    ]
}

def check_output(expected, actual):
    """Function to compare expected and actual outputs."""
    return set(expected) == set(actual)

def test_direct_route_brute_force():
    for (start_stop, end_stop), expected_output in test_inputs["direct_route"]:
        actual_output = direct_route_brute_force(start_stop, end_stop)
        print(f"Test direct_route_brute_force ({start_stop}, {end_stop}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_query_direct_routes():
    for (start_stop, end_stop), expected_output in test_inputs["direct_route"]:
        actual_output = query_direct_routes(start_stop, end_stop)
        print(f"Test query_direct_routes ({start_stop}, {end_stop}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_forward_chaining():
    for (start_stop, end_stop, via_stop, max_transfers), expected_output in test_inputs["forward_chaining"]:
        actual_output = forward_chaining(start_stop, end_stop, via_stop, max_transfers)
        print(f"Test forward_chaining ({start_stop}, {end_stop}, {via_stop}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_backward_chaining():
    for (end_stop, start_stop, via_stop, max_transfers), expected_output in test_inputs["backward_chaining"]:
        actual_output = backward_chaining(start_stop, end_stop, via_stop, max_transfers)
        print(f"Test backward_chaining ({start_stop}, {end_stop}, {via_stop}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_pddl_planning():
    for (start_stop, end_stop, via_stop, max_transfers), expected_output in test_inputs["pddl_planning"]:
        actual_output = pddl_planning(start_stop, end_stop, via_stop, max_transfers)
        print(f"Test pddl_planning ({start_stop}, {end_stop}, {via_stop}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_bfs_route_planner():
    for (start_stop, end_stop, initial_fare, max_transfers), expected_output in test_inputs["bfs_route"]:
        pruned_df = prune_data(merged_fare_df, initial_fare)
        route_summary = compute_route_summary(pruned_df)
        actual_output = bfs_route_planner_optimized(start_stop, end_stop, initial_fare, route_summary, max_transfers)
        print(f"Test bfs_route_planner_optimized ({start_stop}, {end_stop}, {initial_fare}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

# New test functions for the additional queries

def test_get_busiest_routes():
    expected_output = test_inputs["busiest_routes"][0]
    actual_output = get_busiest_routes()
    print(f"Test get_busiest_routes: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_get_most_frequent_stops():
    expected_output = test_inputs["most_frequent_stops"][0]
    actual_output = get_most_frequent_stops()
    print(f"Test get_most_frequent_stops: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_get_top_5_busiest_stops():
    expected_output = test_inputs["busiest_stops"][0]
    actual_output = get_top_5_busiest_stops()
    print(f"Test get_top_5_busiest_stops: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_get_stops_with_one_direct_route():
    expected_output = test_inputs["stops_with_one_direct_route"][0]
    actual_output = get_stops_with_one_direct_route()
    print(f"Test get_stops_with_one_direct_route: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

# if __name__ == "__main__":
#     create_kb()  # Ensure the data is loaded before testing
#     merged_fare_df = get_merged_fare_df()  # Use the function to retrieve the DataFrame
#     initialize_datalog()
    
#     # Run all tests
#     test_direct_route_brute_force()
#     test_query_direct_routes()
#     test_forward_chaining()
#     test_backward_chaining()
#     test_pddl_planning()
#     test_bfs_route_planner()
    
#     # Run additional tests for the new queries
#     test_get_busiest_routes()
#     test_get_most_frequent_stops()
#     test_get_top_5_busiest_stops()
#     test_get_stops_with_one_direct_route()

In [3]:
# Import necessary functions from code_changeRollNumber.py

from collections import defaultdict

route_to_stops = defaultdict(list)  # Maps route_id to an ordered list of stop_ids
trip_to_route = {}  # Maps trip_id to route_id
stop_trip_count = defaultdict(int)  # Maps stop_id to count of trips stopping there
fare_rules = {}  # Maps route_id to fare information

# from code_changeRollNumber import (
#     direct_route_brute_force,
#     query_direct_routes,
#     forward_chaining,
#     backward_chaining,
#     pddl_planning,
#     bfs_route_planner_optimized,
#     create_kb,  # Ensure the data is loaded for testing
#     prune_data,
#     initialize_datalog,
#     get_merged_fare_df,
#     compute_route_summary,
#     get_busiest_routes,  # New functions for testing
#     get_most_frequent_stops,
#     get_top_5_busiest_stops,
#     get_stops_with_one_direct_route
# )


In [4]:
create_kb()  # Ensure the data is loaded before testing
merged_fare_df = get_merged_fare_df()  # Use the function to retrieve the DataFrame
initialize_datalog()
    
#     # Run all tests
#     test_direct_route_brute_force()
#     test_query_direct_routes()
#     test_forward_chaining()
#     test_backward_chaining()
#     test_pddl_planning()
#     test_bfs_route_planner()
    
#     # Run additional tests for the new queries
#     test_get_busiest_routes()
#     test_get_most_frequent_stops()
#     test_get_top_5_busiest_stops()
#     test_get_stops_with_one_direct_route()

Terms initialized: DirectRoute, RouteHasStop, OptimalRoute


In [5]:
test_direct_route_brute_force()
test_query_direct_routes()

Test direct_route_brute_force (2573, 1177):  Pass
Test direct_route_brute_force (2001, 2005):  Pass
Test query_direct_routes (2573, 1177):  Pass
Test query_direct_routes (2001, 2005):  Pass


In [28]:
def forward_chaining(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Find optimal routes using Forward Chaining.

    Args:
        start_stop_id (int): Starting stop ID.
        end_stop_id (int): Ending stop ID.
        stop_id_to_include (int): Intermediate stop ID to include.
        max_transfers (int): Maximum number of transfers allowed.

    Returns:
        list: Optimal routes satisfying constraints.
    """
    # Initialize Datalog predicates
    initialize_datalog()

    # Define rules for optimal route planning
    pyDatalog.create_terms('RouteHasStop, DirectRoute, OptimalRoute, X, Y, Z, R, R1, R2')
    @pyDatalog.program()
    def optimal_route_program():
        +OptimalRoute(X, Y, Z) <= RouteHasStop(R, X) & RouteHasStop(R, Y) & RouteHasStop(R, Z)
        +OptimalRoute(X, Y, Z) <= RouteHasStop(R1, X) & RouteHasStop(R2, Y) & RouteHasStop(R2, Z) & DirectRoute(R1, R2)

    optimal_route_program()

    # Query for optimal routes
    query = f'OptimalRoute({start_stop_id}, {stop_id_to_include}, {end_stop_id})'
    results = pyDatalog.ask(query)

    # Process results
    optimal_routes = []
    for result in results.answers:
        route_id = result[0]
        optimal_routes.append(route_id)

    # Filter results based on max_transfers
    filtered_routes = []
    for route_id in optimal_routes:
        transfers = 0
        stops = route_to_stops[route_id]
        start_index = stops.index(start_stop_id)
        end_index = stops.index(end_stop_id)
        via_index = stops.index(stop_id_to_include)
        transfers += abs(start_index - via_index)
        transfers += abs(via_index - end_index)
        if transfers <= max_transfers:
            filtered_routes.append(route_id)

    return filtered_routes


In [29]:
test_forward_chaining()
# test_backward_chaining()

Terms initialized: DirectRoute, RouteHasStop, OptimalRoute


DatalogError: Cannot assert a fact containing Variables
        +OptimalRoute(X, Y, Z) <= RouteHasStop(R, X) & RouteHasStop(R, Y) & RouteHasStop(R, Z)
in line 1 of optimal_route_program

In [None]:
# Boilerplate for AI Assignment — Knowledge Representation, Reasoning and Planning
# CSE 643

# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import networkx as nx
from pyDatalog import pyDatalog
from collections import defaultdict, deque

## ****IMPORTANT****
## Don't import or use any other libraries other than defined above
## Otherwise your code file will be rejected in the automated testing

# ------------------ Global Variables ------------------
route_to_stops = defaultdict(list)  # Mapping of route IDs to lists of stops
trip_to_route = {}                   # Mapping of trip IDs to route IDs
stop_trip_count = defaultdict(int)    # Count of trips for each stop
fare_rules = {}                      # Mapping of route IDs to fare information
merged_fare_df = None                # To be initialized in create_kb()

# Load static data from GTFS (General Transit Feed Specification) files
df_stops = pd.read_csv('GTFS/stops.txt')
df_routes = pd.read_csv('GTFS/routes.txt')
df_stop_times = pd.read_csv('GTFS/stop_times.txt')
df_fare_attributes = pd.read_csv('GTFS/fare_attributes.txt')
df_trips = pd.read_csv('GTFS/trips.txt')
df_fare_rules = pd.read_csv('GTFS/fare_rules.txt')

# ------------------ Function Definitions ------------------

# Function to create knowledge base from the loaded data
def create_kb():
    """
    Create knowledge base by populating global variables with information from loaded datasets.
    It establishes the relationships between routes, trips, stops, and fare rules.
    
    Returns:
        None
    """
    global route_to_stops, trip_to_route, stop_trip_count, fare_rules, merged_fare_df

    # Create trip_id to route_id mapping"""
    trip_to_route = defaultdict(list)
    for _, row in df_trips.iterrows():
        trip_to_route[row['trip_id']].append(row['route_id'])
        
    # Map route_id to a list of stops in order of their sequence"""
    route_to_stops = defaultdict(list)
    sorted_stop_times = df_stop_times.sort_values(['trip_id', 'stop_sequence'])
    sorted_stop_times.head
    for trip_id, stop_grp in sorted_stop_times.groupby('trip_id'):
        if trip_id in trip_to_route:
            route_id = trip_to_route[trip_id][0]
            stops = stop_grp['stop_id'].to_list()
            route_to_stops[route_id].extend(stops)
            
    # Ensure each route only has unique stops"""
    for route_id in route_to_stops:
        # Use dict.fromkeys() to preserve order while removing duplicates
        route_to_stops[route_id] = list(dict.fromkeys(route_to_stops[route_id]))
    
    # Count trips per stop"""
    stop_trip_count = dict(df_stop_times['stop_id'].value_counts())

    # Create fare rules for routes
    fare_rules = {}
    for i in range(len(df_fare_rules['route_id'])):
        route_id = df_fare_rules['route_id'][i]
        fare_id = df_fare_rules['fare_id'][i]

        if route_id not in fare_rules:
            fare_rules[route_id] = []

        fare_rules[route_id].append(fare_id)

    # Merge fare rules and attributes into a single DataFrame
    merged_fare_df = pd.merge(
        df_fare_rules,
        df_fare_attributes,
        on='fare_id',
        how='left'
    )

# Function to find the top 5 busiest routes based on the number of trips
def get_busiest_routes():
    """
    Identify the top 5 busiest routes based on trip counts.

    Returns:
        list: A list of tuples, where each tuple contains:
              - route_id (int): The ID of the route.
              - trip_count (int): The number of trips for that route.
    """
    route_trip_counts = defaultdict(int)
    for trip_id, routes in trip_to_route.items():
        # Since we stored routes as a list in trip_to_route, take the first route
        route_id = routes[0]
        route_trip_counts[route_id] += 1
    
    res = sorted(route_trip_counts.items(), key = lambda ele: ele[1], reverse = True)
    
    # Return top 5 routes
    return res[:5]
    pass  # Implementation here

# Function to find the top 5 stops with the most frequent trips
def get_most_frequent_stops():
    """
    Identify the top 5 stops with the highest number of trips.

    Returns:
        list: A list of tuples, where each tuple contains:
              - stop_id (int): The ID of the stop.
              - trip_count (int): The number of trips for that stop.
    """
    res = sorted(stop_trip_count.items(), key = lambda ele: ele[1], reverse = True)
    
    # Return top 5 routes
    return res[:5]
    pass  # Implementation here

# Function to find the top 5 busiest stops based on the number of routes passing through them
def get_top_5_busiest_stops():
    """
    Identify the top 5 stops with the highest number of different routes.

    Returns:
        list: A list of tuples, where each tuple contains:
              - stop_id (int): The ID of the stop.
              - route_count (int): The number of routes passing through that stop.
    """
    stop_routes = defaultdict(set)
    for route_id, stops in route_to_stops.items():
        for stop_id in stops:
            stop_routes[stop_id].add(route_id)
            
    stop_counts = []
    for stop_id, routes in stop_routes.items():
        route_count = len(routes)
        stop_counts.append((stop_id, route_count))

    res = sorted(stop_counts, key = lambda ele: ele[1], reverse = True)
    
    return res[:5]
    pass  # Implementation here

# Function to identify the top 5 pairs of stops with only one direct route between them
def get_stops_with_one_direct_route():
    """
    Identify the top 5 pairs of consecutive stops (start and end) connected by exactly one direct route. 
    The pairs are sorted by the combined frequency of trips passing through both stops.

    Returns:
        list: A list of tuples, where each tuple contains:
              - pair (tuple): A tuple with two stop IDs (stop_1, stop_2).
              - route_id (int): The ID of the route connecting the two stops.
    """
    connections = {}
    
    for route_id, stops in route_to_stops.items():
        for i in range(len(stops) - 1):
            current_stop = stops[i]
            next_stop = stops[i + 1]
            
            # same order for consistency
            stop_pair = (current_stop, next_stop) if current_stop < next_stop else (next_stop, current_stop)

            
            # Add the route to this pair's list of routes
            if stop_pair not in connections:
                connections[stop_pair] = []
            connections[stop_pair].append(route_id)
    
    result = []
    for stop_pair, routes in connections.items():
        if len(routes) == 1:
            result.append((stop_pair, routes[0]))
    
    return result
    pass  # Implementation here

# Function to get merged fare DataFrame
# No need to change this function
def get_merged_fare_df():
    """
    Retrieve the merged fare DataFrame.

    Returns:
        DataFrame: The merged fare DataFrame containing fare rules and attributes.
    """
    global merged_fare_df
    return merged_fare_df

# Visualize the stop-route graph interactively
def visualize_stop_route_graph_interactive(route_to_stops):
    """
    Visualize the stop-route graph using Plotly for interactive exploration.

    Args:
        route_to_stops (dict): A dictionary mapping route IDs to lists of stops.

    Returns:
        None
    """
    # Step 1: Create a network graph
    grph = nx.Graph()
    
    # Step 2: Add edges (connections between stops) from each route
    edge_colors = []
    edge_labels = []
    
    unique_routes = list(route_to_stops.keys())
    color_palette = plt.cm.get_cmap('hsv')(np.linspace(0, 1, len(unique_routes)))
    
    rt_color_map = {}
    for i in range (len(unique_routes)):
        route_id = unique_routes[i]
        r, g, b, _ = color_palette[i]
        
        color_string = f'rgb({int(r*255)},{int(g*255)},{int(b*255)})'
        rt_color_map[route_id] = color_string
    
    for route_id, stops_on_route in route_to_stops.items():
        route_name = df_routes[df_routes['route_id'] == route_id]['route_long_name'].iloc[0]
        for i in range(len(stops_on_route) - 1):
            current_stop = stops_on_route[i]
            next_stop = stops_on_route[i + 1]
            
            grph.add_edge(current_stop, next_stop)
            
            edge_colors.append(rt_color_map[route_id])
            edge_labels.append(f"Route: {route_name}")
        
    # Step 3: Calculate state_positions for the graph
    stop_positions = nx.shell_layout(grph)
    
    # Step 4: Create edge trace
    edge_trace = []
    for i in range (len(grph.edges())):
        edge = list(grph.edges())[i]
        color = edge_colors[i]
        label = edge_labels[i]
        
        x0, y0 = stop_positions[edge[0]]
        x1, y1 = stop_positions[edge[1]]
        
        
        edge_trace.append(
            go.Scatter(
                x=[x0, x1, None],
                y=[y0, y1, None],
                line=dict(width=2, color=color),
                hoverinfo='text',
                text=label,
                mode='lines'
            )
        )
        
    # Step 5: Create node trace
    node_x = []
    node_y = []
    node_text = []
    
    for node in grph.nodes():
        x, y = stop_positions[node]
        node_x.append(x)
        node_y.append(y)
        # Get stop name for hover text
        stop_name = df_stops[df_stops['stop_id'] == node]['stop_name'].iloc[0]
        node_text.append(f"Stop: {stop_name}<br>ID: {node}")
    
    node_trace = go.Scatter(
        x=node_x,
        y=node_y,
        mode='markers+text',
        hoverinfo='text',
        text=node_text,
        marker=dict(
            size=10,
            color='lightblue',
            line=dict(width=2, color='darkblue')
        )
    )
    
    # Step 6: Create the figure
    fig = go.Figure(data=edge_trace + [node_trace],
                   layout=go.Layout(
                       title='Transit Network Map',
                       showlegend=False,
                       hovermode='closest',
                       margin=dict(b=0, l=0, r=0, t=40),
                       xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                       yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                       plot_bgcolor='white'
                   ))
    
    # Add a legend showing route colors
    for route_id in unique_routes:
        route_name = df_routes[df_routes['route_id'] == route_id]['route_long_name'].iloc[0]
        fig.add_trace(go.Scatter(
            x=[None],
            y=[None],
            mode='lines',
            name=route_name,
            line=dict(color=rt_color_map[route_id], width=2),
            showlegend=True
        ))
    
    # Step 7: Show the plot
    fig.show()
    pass  # Implementation here

# Brute-Force Approach for finding direct routes
def direct_route_brute_force(start_stop, end_stop):
    """
    Find all valid routes between two stops using a brute-force method.

    Args:
        start_stop (int): The ID of the starting stop.
        end_stop (int): The ID of the ending stop.

    Returns:
        list: A list of route IDs (int) that connect the two stops directly.
    """
    
    dir_routes = []
    for route_id, stops in route_to_stops.items():
        if start_stop in stops and end_stop in stops:
            # Check if start_stop comes before end_stop in the sequence
            if stops.index(start_stop) < stops.index(end_stop):
                dir_routes.append(route_id)
    return dir_routes
    
    pass  # Implementation here

# Initialize Datalog predicates for reasoning
pyDatalog.create_terms('RouteHasStop, DirectRoute, OptimalRoute, X, Y, Z, R, R1, R2')  
def initialize_datalog():
    """
    Initialize Datalog terms and predicates for reasoning about routes and stops.

    Returns:
        None
    """
    pyDatalog.clear()  # Clear previous terms
    print("Terms initialized: DirectRoute, RouteHasStop, OptimalRoute")  # Confirmation print

    # Define Datalog predicates

    create_kb()  # Populate the knowledge base
    add_route_data(route_to_stops)  # Add route data to Datalog
    
    
# Adding route data to Datalog
def add_route_data(route_to_stops):
    """
    Add the route data to Datalog for reasoning.

    Args:
        route_to_stops (dict): A dictionary mapping route IDs to lists of stops.

    Returns:
        None
    """
    for route_id, stops in route_to_stops.items():
        for stop_id in stops:
            +RouteHasStop(route_id, stop_id)

# Function to query direct routes between two stops
def query_direct_routes(start, end):
    """
    Query for direct routes between two stops.

    Args:
        start (int): The ID of the starting stop.
        end (int): The ID of the ending stop.

    Returns:
        list: A sorted list of route IDs (str) connecting the two stops.
    """
    
    # Query for routes that contain both start and end stops
    query_result = pyDatalog.ask(f'RouteHasStop(R, {start}) & RouteHasStop(R, {end})')
    
    # Process the results, assuming each answer contains a single route_id
    if query_result:
        return sorted(set(route_id[0] for route_id in query_result.answers))
    return []
    
    pass  # Implementation here



# PDDL-style planning for route finding
def pddl_planning(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Implement PDDL-style planning to find routes with optional transfers.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        stop_id_to_include (int): The stop ID for a transfer.
        max_transfers (int): The maximum number of transfers allowed.

    Returns:
        list: A list of unique paths (list of tuples) that satisfy the criteria, where each tuple contains:
              - route_id (int): The ID of the route.
              - stop_id (int): The ID of the stop.
    """
    pass  # Implementation here

# Function to filter fare data based on an initial fare limit
def prune_data(merged_fare_df, initial_fare):
    """
    Filter fare data based on an initial fare limit.

    Args:
        merged_fare_df (DataFrame): The merged fare DataFrame.
        initial_fare (float): The maximum fare allowed.

    Returns:
        DataFrame: A filtered DataFrame containing only routes within the fare limit.
    """
    pass  # Implementation here

# Pre-computation of Route Summary
def compute_route_summary(pruned_df):
    """
    Generate a summary of routes based on fare information.

    Args:
        pruned_df (DataFrame): The filtered DataFrame containing fare information.

    Returns:
        dict: A summary of routes with the following structure:
              {
                  route_id (int): {
                      'min_price': float,          # The minimum fare for the route
                      'stops': set                # A set of stop IDs for that route
                  }
              }
    """
    pass  # Implementation here

# BFS for optimized route planning
def bfs_route_planner_optimized(start_stop_id, end_stop_id, initial_fare, route_summary, max_transfers=3):
    """
    Use Breadth-First Search (BFS) to find the optimal route while considering fare constraints.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        initial_fare (float): The available fare for the trip.
        route_summary (dict): A summary of routes with fare and stop information.
        max_transfers (int): The maximum number of transfers allowed (default is 3).

    Returns:
        list: A list representing the optimal route with stops and routes taken, structured as:
              [
                  (route_id (int), stop_id (int)),  # Tuple for each stop taken in the route
                  ...
              ]
    """
    pass  # Implementation here


In [None]:
Just do this correctly it will work
this is working properly fiine just need to convert this according to that function rest everything is corect

from pyDatalog import pyDatalog
import time
import psutil
import os
from collections import defaultdict

def pddl_planning(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Optimized PDDL-based route planning implementation
    Returns list of tuples (route1, transfer_stop, route2) representing valid paths
    """
    # Start timing and memory tracking
    start_time = time.time()
    process = psutil.Process(os.getpid())
    initial_memory = process.memory_info().rss / 1024 / 1024
    
    # Pre-process route data for faster lookups
    stop_to_routes = defaultdict(set)
    route_stops = defaultdict(set)
    transfer_routes = set()
    
    # Build lookup dictionaries
    for route_id, stops in route_to_stops.items():
        route_stops[route_id] = set(stops)
        if stop_id_to_include in stops:
            transfer_routes.add(route_id)
        for stop in stops:
            stop_to_routes[stop].add(route_id)
            
    # Early termination checks
    if not stop_to_routes[start_stop_id] or not stop_to_routes[end_stop_id]:
        return []
    
    result_paths = set()  # Using set to avoid duplicates
    
    # Handle routes containing start, transfer, and end stops
    routes_with_transfer = transfer_routes
    start_routes = stop_to_routes[start_stop_id]
    end_routes = stop_to_routes[end_stop_id]
    
    # Find all possible combinations within transfer limit
    if max_transfers >= 0:
        # Direct routes (no transfers)
        direct_routes = start_routes & end_routes & routes_with_transfer
        for route in direct_routes:
            stops = list(route_to_stops[route])
            start_idx = stops.index(start_stop_id)
            via_idx = stops.index(stop_id_to_include)
            end_idx = stops.index(end_stop_id)
            
            # Check if stops are in correct order
            if is_valid_sequence(start_idx, via_idx, end_idx, len(stops)):
                result_paths.add((route, stop_id_to_include, route))
    
    if max_transfers >= 1:
        # Single transfer routes
        for first_route in (start_routes & routes_with_transfer):
            stops1 = list(route_to_stops[first_route])
            start_idx = stops1.index(start_stop_id)
            transfer_idx1 = stops1.index(stop_id_to_include)
            
            # Check if transfer is reachable from start
            if not is_valid_sequence(start_idx, transfer_idx1, None, len(stops1)):
                continue
                
            for second_route in (end_routes & routes_with_transfer):
                if first_route == second_route:
                    continue
                    
                stops2 = list(route_to_stops[second_route])
                transfer_idx2 = stops2.index(stop_id_to_include)
                end_idx = stops2.index(end_stop_id)
                
                # Check if end is reachable from transfer
                if is_valid_sequence(transfer_idx2, end_idx, None, len(stops2), reverse_allowed=True):
                    result_paths.add((first_route, stop_id_to_include, second_route))
    
    # Calculate performance metrics
    execution_time = time.time() - start_time
    final_memory = process.memory_info().rss / 1024 / 1024
    memory_used = final_memory - initial_memory
    
    print(f"\nPerformance Metrics:")
    print(f"Execution Time: {execution_time:.4f} seconds")
    print(f"Memory Usage: {memory_used:.2f} MB")
    print(f"Number of Steps: {len(result_paths)}")
    
    return sorted(list(result_paths))

def is_valid_sequence(idx1, idx2, idx3=None, route_length=None, reverse_allowed=False):
    """
    Check if the sequence of indices represents a valid path through the route
    """
    if reverse_allowed:
        # Allow both forward and backward travel
        if idx3 is None:
            return True
        return (idx1 <= idx2 <= idx3) or (idx3 <= idx2 <= idx1)
    else:
        # Forward travel only
        if idx3 is None:
            return idx1 <= idx2
        return idx1 <= idx2 <= idx3

Implement the previous planning problem using Planning Domain Definition Language (PDDL) using forward chaining. Use the PyDatalog library for the same.
Key aspects:
• Initial State: Define the initial state as the stop where the journey begins (e.g., start stop id).
• Goal State: The goal state is to reach the destination stop (e.g., end stop id). • Action: You have two primary actions in this route planning:
– Board a route: This action allows you to board a specific route at a stop. Example: Action(‘board route’, R, X) means boarding route R at stop X.
– Transfer between routes: This action allows you to switch from one route to another at a stop. Example: Action(‘transfer route’, R1, R2, Z) means
transferring from route R1 to route R2 at stop Z (the transfer stop).
Print the current state information at each step. In your analysis, (a) evaluate time complexity in terms of execution time (measured in seconds) and memory usage (measured in megabytes, MB), (b) examine the intermediate steps in your reasoning process, and (c) compare the overall number of steps involved in both implementations. Do all algorithms (forward chaining, backward chaining, and PDDL) produce the same optimal route, or do some produce suboptimal routes due to the way constraints are applied?

In [1]:
# Boilerplate for AI Assignment — Knowledge Representation, Reasoning and Planning
# CSE 643

# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import networkx as nx
from pyDatalog import pyDatalog
from collections import defaultdict, deque

## ****IMPORTANT****
## Don't import or use any other libraries other than defined above
## Otherwise your code file will be rejected in the automated testing

# ------------------ Global Variables ------------------
route_to_stops = defaultdict(list)  # Mapping of route IDs to lists of stops
trip_to_route = {}                   # Mapping of trip IDs to route IDs
stop_trip_count = defaultdict(int)    # Count of trips for each stop
fare_rules = {}                      # Mapping of route IDs to fare information
merged_fare_df = None                # To be initialized in create_kb()

# Load static data from GTFS (General Transit Feed Specification) files
df_stops = pd.read_csv('GTFS/stops.txt')
df_routes = pd.read_csv('GTFS/routes.txt')
df_stop_times = pd.read_csv('GTFS/stop_times.txt')
df_fare_attributes = pd.read_csv('GTFS/fare_attributes.txt')
df_trips = pd.read_csv('GTFS/trips.txt')
df_fare_rules = pd.read_csv('GTFS/fare_rules.txt')

# ------------------ Function Definitions ------------------

# Function to create knowledge base from the loaded data
def create_kb():
    """
    Create knowledge base by populating global variables with information from loaded datasets.
    It establishes the relationships between routes, trips, stops, and fare rules.
    
    Returns:
        None
    """
    global route_to_stops, trip_to_route, stop_trip_count, fare_rules, merged_fare_df

    # Create trip_id to route_id mapping"""
    trip_to_route = defaultdict(list)
    for _, row in df_trips.iterrows():
        trip_to_route[row['trip_id']].append(row['route_id'])
        
    # Map route_id to a list of stops in order of their sequence"""
    route_to_stops = defaultdict(list)
    sorted_stop_times = df_stop_times.sort_values(['trip_id', 'stop_sequence'])
    sorted_stop_times.head
    for trip_id, stop_grp in sorted_stop_times.groupby('trip_id'):
        if trip_id in trip_to_route:
            route_id = trip_to_route[trip_id][0]
            stops = stop_grp['stop_id'].to_list()
            route_to_stops[route_id].extend(stops)
            
    # Ensure each route only has unique stops"""
    for route_id in route_to_stops:
        # Use dict.fromkeys() to preserve order while removing duplicates
        route_to_stops[route_id] = list(dict.fromkeys(route_to_stops[route_id]))
    
    # Count trips per stop"""
    stop_trip_count = dict(df_stop_times['stop_id'].value_counts())

    # Create fare rules for routes
    fare_rules = {}
    for i in range(len(df_fare_rules['route_id'])):
        route_id = df_fare_rules['route_id'][i]
        fare_id = df_fare_rules['fare_id'][i]

        if route_id not in fare_rules:
            fare_rules[route_id] = []

        fare_rules[route_id].append(fare_id)

    # Merge fare rules and attributes into a single DataFrame
    merged_fare_df = pd.merge(
        df_fare_rules,
        df_fare_attributes,
        on='fare_id',
        how='left'
    )

# Function to find the top 5 busiest routes based on the number of trips
def get_busiest_routes():
    """
    Identify the top 5 busiest routes based on trip counts.

    Returns:
        list: A list of tuples, where each tuple contains:
              - route_id (int): The ID of the route.
              - trip_count (int): The number of trips for that route.
    """
    route_trip_counts = defaultdict(int)
    for trip_id, routes in trip_to_route.items():
        # Since we stored routes as a list in trip_to_route, take the first route
        route_id = routes[0]
        route_trip_counts[route_id] += 1
    
    res = sorted(route_trip_counts.items(), key = lambda ele: ele[1], reverse = True)
    
    # Return top 5 routes
    return res[:5]
    pass  # Implementation here

# Function to find the top 5 stops with the most frequent trips
def get_most_frequent_stops():
    """
    Identify the top 5 stops with the highest number of trips.

    Returns:
        list: A list of tuples, where each tuple contains:
              - stop_id (int): The ID of the stop.
              - trip_count (int): The number of trips for that stop.
    """
    res = sorted(stop_trip_count.items(), key = lambda ele: ele[1], reverse = True)
    
    # Return top 5 routes
    return res[:5]
    pass  # Implementation here

# Function to find the top 5 busiest stops based on the number of routes passing through them
def get_top_5_busiest_stops():
    """
    Identify the top 5 stops with the highest number of different routes.

    Returns:
        list: A list of tuples, where each tuple contains:
              - stop_id (int): The ID of the stop.
              - route_count (int): The number of routes passing through that stop.
    """
    stop_routes = defaultdict(set)
    for route_id, stops in route_to_stops.items():
        for stop_id in stops:
            stop_routes[stop_id].add(route_id)
            
    stop_counts = []
    for stop_id, routes in stop_routes.items():
        route_count = len(routes)
        stop_counts.append((stop_id, route_count))

    res = sorted(stop_counts, key = lambda ele: ele[1], reverse = True)
    
    return res[:5]
    pass  # Implementation here

# Function to identify the top 5 pairs of stops with only one direct route between them
def get_stops_with_one_direct_route():
    """
    Identify the top 5 pairs of consecutive stops (start and end) connected by exactly one direct route. 
    The pairs are sorted by the combined frequency of trips passing through both stops.

    Returns:
        list: A list of tuples, where each tuple contains:
              - pair (tuple): A tuple with two stop IDs (stop_1, stop_2).
              - route_id (int): The ID of the route connecting the two stops.
    """
    connections = {}
    
    for route_id, stops in route_to_stops.items():
        for i in range(len(stops) - 1):
            current_stop = stops[i]
            next_stop = stops[i + 1]
            
            # same order for consistency
            stop_pair = (current_stop, next_stop) if current_stop < next_stop else (next_stop, current_stop)

            
            # Add the route to this pair's list of routes
            if stop_pair not in connections:
                connections[stop_pair] = []
            connections[stop_pair].append(route_id)
    
    result = []
    for stop_pair, routes in connections.items():
        if len(routes) == 1:
            result.append((stop_pair, routes[0]))
    
    return result
    pass  # Implementation here

# Function to get merged fare DataFrame
# No need to change this function
def get_merged_fare_df():
    """
    Retrieve the merged fare DataFrame.

    Returns:
        DataFrame: The merged fare DataFrame containing fare rules and attributes.
    """
    global merged_fare_df
    return merged_fare_df

# Visualize the stop-route graph interactively
def visualize_stop_route_graph_interactive(route_to_stops):
    """
    Visualize the stop-route graph using Plotly for interactive exploration.

    Args:
        route_to_stops (dict): A dictionary mapping route IDs to lists of stops.

    Returns:
        None
    """
    # Step 1: Create a network graph
    grph = nx.Graph()
    
    # Step 2: Add edges (connections between stops) from each route
    edge_colors = []
    edge_labels = []
    
    unique_routes = list(route_to_stops.keys())
    color_palette = plt.cm.get_cmap('hsv')(np.linspace(0, 1, len(unique_routes)))
    
    rt_color_map = {}
    for i in range (len(unique_routes)):
        route_id = unique_routes[i]
        r, g, b, _ = color_palette[i]
        
        color_string = f'rgb({int(r*255)},{int(g*255)},{int(b*255)})'
        rt_color_map[route_id] = color_string
    
    for route_id, stops_on_route in route_to_stops.items():
        route_name = df_routes[df_routes['route_id'] == route_id]['route_long_name'].iloc[0]
        for i in range(len(stops_on_route) - 1):
            current_stop = stops_on_route[i]
            next_stop = stops_on_route[i + 1]
            
            grph.add_edge(current_stop, next_stop)
            
            edge_colors.append(rt_color_map[route_id])
            edge_labels.append(f"Route: {route_name}")
        
    # Step 3: Calculate state_positions for the graph
    stop_positions = nx.shell_layout(grph)
    
    # Step 4: Create edge trace
    edge_trace = []
    for i in range (len(grph.edges())):
        edge = list(grph.edges())[i]
        color = edge_colors[i]
        label = edge_labels[i]
        
        x0, y0 = stop_positions[edge[0]]
        x1, y1 = stop_positions[edge[1]]
        
        
        edge_trace.append(
            go.Scatter(
                x=[x0, x1, None],
                y=[y0, y1, None],
                line=dict(width=2, color=color),
                hoverinfo='text',
                text=label,
                mode='lines'
            )
        )
        
    # Step 5: Create node trace
    node_x = []
    node_y = []
    node_text = []
    
    for node in grph.nodes():
        x, y = stop_positions[node]
        node_x.append(x)
        node_y.append(y)
        # Get stop name for hover text
        stop_name = df_stops[df_stops['stop_id'] == node]['stop_name'].iloc[0]
        node_text.append(f"Stop: {stop_name}<br>ID: {node}")
    
    node_trace = go.Scatter(
        x=node_x,
        y=node_y,
        mode='markers+text',
        hoverinfo='text',
        text=node_text,
        marker=dict(
            size=10,
            color='lightblue',
            line=dict(width=2, color='darkblue')
        )
    )
    
    # Step 6: Create the figure
    fig = go.Figure(data=edge_trace + [node_trace],
                   layout=go.Layout(
                       title='Transit Network Map',
                       showlegend=False,
                       hovermode='closest',
                       margin=dict(b=0, l=0, r=0, t=40),
                       xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                       yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                       plot_bgcolor='white'
                   ))
    
    # Add a legend showing route colors
    for route_id in unique_routes:
        route_name = df_routes[df_routes['route_id'] == route_id]['route_long_name'].iloc[0]
        fig.add_trace(go.Scatter(
            x=[None],
            y=[None],
            mode='lines',
            name=route_name,
            line=dict(color=rt_color_map[route_id], width=2),
            showlegend=True
        ))
    
    # Step 7: Show the plot
    fig.show()
    pass  # Implementation here

# Brute-Force Approach for finding direct routes
def direct_route_brute_force(start_stop, end_stop):
    """
    Find all valid routes between two stops using a brute-force method.

    Args:
        start_stop (int): The ID of the starting stop.
        end_stop (int): The ID of the ending stop.

    Returns:
        list: A list of route IDs (int) that connect the two stops directly.
    """
    
    dir_routes = []
    for route_id, stops in route_to_stops.items():
        if start_stop in stops and end_stop in stops:
            # Check if start_stop comes before end_stop in the sequence
            if stops.index(start_stop) < stops.index(end_stop):
                dir_routes.append(route_id)
    return dir_routes
    
    pass  # Implementation here

# Initialize Datalog predicates for reasoning
pyDatalog.create_terms('RouteHasStop, DirectRoute, OptimalRoute, X, Y, Z, R, R1, R2')  
def initialize_datalog():
    """
    Initialize Datalog terms and predicates for reasoning about routes and stops.

    Returns:
        None
    """
    pyDatalog.clear()  # Clear previous terms
    print("Terms initialized: DirectRoute, RouteHasStop, OptimalRoute")  # Confirmation print

    # Define Datalog predicates

    create_kb()  # Populate the knowledge base
    add_route_data(route_to_stops)  # Add route data to Datalog
    
    
# Adding route data to Datalog
def add_route_data(route_to_stops):
    """
    Add the route data to Datalog for reasoning.

    Args:
        route_to_stops (dict): A dictionary mapping route IDs to lists of stops.

    Returns:
        None
    """
    for route_id, stops in route_to_stops.items():
        for stop_id in stops:
            +RouteHasStop(route_id, stop_id)

# Function to query direct routes between two stops
def query_direct_routes(start, end):
    """
    Query for direct routes between two stops.

    Args:
        start (int): The ID of the starting stop.
        end (int): The ID of the ending stop.

    Returns:
        list: A sorted list of route IDs (str) connecting the two stops.
    """
    
    # Query for routes that contain both start and end stops
    query_result = pyDatalog.ask(f'RouteHasStop(R, {start}) & RouteHasStop(R, {end})')
    
    # Process the results, assuming each answer contains a single route_id
    if query_result:
        return sorted(set(route_id[0] for route_id in query_result.answers))
    return []
    
    pass  # Implementation here

# Forward chaining for optimal route planning
def forward_chaining(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Perform forward chaining to find optimal routes considering transfers.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        stop_id_to_include (int): The stop ID where a transfer occurs.
        max_transfers (int): The maximum number of transfers allowed.

    Returns:
        list: A list of unique paths (list of tuples) that satisfy the criteria, where each tuple contains:
              - route_id (int): The ID of the route.
              - stop_id (int): The ID of the stop.
    """
    result_paths = set()
    
    # Find routes containing the required stops
    for route_id, stops in route_to_stops.items():
        try:
            # Check if all required stops are in this route
            if stop_id_to_include in stops and (start_stop_id in stops or end_stop_id in stops):
                via_idx = stops.index(stop_id_to_include)
                # print(route_id)
                # Case 1: Direct route containing all stops
                if start_stop_id in stops and end_stop_id in stops:
                    start_idx = stops.index(start_stop_id)
                    end_idx = stops.index(end_stop_id)
                    
                    # Check if the route is valid (stops are in correct order)
                    if min(start_idx, end_idx) <= via_idx <= max(start_idx, end_idx):
                        result_paths.add((route_id, stop_id_to_include, end_stop_id))
                        # print("yes")
                        
                
                # Case 2: Route contains via stop and either start or end
                elif max_transfers >= 1:
                    # Find connecting routes
                    for other_route, other_stops in route_to_stops.items():
                        # print(other_route)
                        if other_route != route_id:
                            # print(other_route)
                            if start_stop_id in stops and end_stop_id in other_stops:
                                # print(other_route)
                                if stop_id_to_include in other_stops:
                                    
                                    result_paths.add((route_id, stop_id_to_include, other_route))
                                    # print((route_id, stop_id_to_include, other_route))
                                    # print("yes2")
                                    # print(other_route)
                                    # print(end_stop_id)
                            elif start_stop_id in other_stops and end_stop_id in stops:
                                # print((other_route, stop_id_to_include, route_id,start_stop_id, end_stop_id))
                                # print(other_stops)
                                if stop_id_to_include in other_stops:
                                    # print(other_route)
                                    # print((other_route, stop_id_to_include, route_id,start_stop_id, end_stop_id))
                                    result_paths.add((other_route, stop_id_to_include, route_id))
                                    # print((other_route, stop_id_to_include, route_id))
                                    # print("yes3")
                                    # print(other_route)
        except ValueError:
            continue
    
    # Convert set to list for return
    return sorted(list(result_paths))
    
    
    pass  # Implementation here

# Backward chaining for optimal route planning
def backward_chaining(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Perform backward chaining to find optimal routes considering transfers.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        stop_id_to_include (int): The stop ID where a transfer occurs.
        max_transfers (int): The maximum number of transfers allowed.

    Returns:
        list: A list of unique paths (list of tuples) that satisfy the criteria, where each tuple contains:
              - route_id (int): The ID of the route.
              - stop_id (int): The ID of the stop.
    """
    
    result_paths = []
    seen_paths = set()  # To avoid duplicates while maintaining order
    
    # Start from the end stop and work backwards
    for route_id, stops in route_to_stops.items():
        try:
            # Check if the route contains the end stop and via stop
            if end_stop_id in stops and stop_id_to_include in stops:
                end_idx = stops.index(end_stop_id)
                via_idx = stops.index(stop_id_to_include)
                
                # Case 1: Direct route containing all stops
                if start_stop_id in stops:
                    start_idx = stops.index(start_stop_id)
                    # Check if the route is valid (stops are in correct order)
                    if min(via_idx, end_idx) <= start_idx <= max(via_idx, end_idx):
                        path_tuple = (end_stop_id, stop_id_to_include, route_id)
                        path_key = str(path_tuple)  # Convert to string for hashing
                        # print("yes1")
                        # print(path_tuple)
                        if path_key not in seen_paths:
                            seen_paths.add(path_key)
                            result_paths.append(path_tuple)
                
                # Case 2: Route contains end stop and via stop, need to find connecting route
                elif max_transfers >= 1:
                    # Look for routes that can connect to our current route at the via stop
                    for connecting_route, connecting_stops in route_to_stops.items():
                        if connecting_route != route_id:
                            # Check if connecting route has start stop and via stop
                            if start_stop_id in connecting_stops and stop_id_to_include in connecting_stops:
                                conn_start_idx = connecting_stops.index(start_stop_id)
                                conn_via_idx = connecting_stops.index(stop_id_to_include)
                                
                                # Verify the order in connecting route
                                if min(conn_start_idx, conn_via_idx) <= max(conn_start_idx, conn_via_idx):
                                    path_tuple = (route_id, stop_id_to_include, connecting_route)
                                    # print("yes2")
                                    # print(path_tuple)
                                    path_key = str(path_tuple)
                                    if path_key not in seen_paths:
                                        seen_paths.add(path_key)
                                        result_paths.append(path_tuple)
                                        
            
            # Additional case: Route contains start stop and via stop
            elif start_stop_id in stops and stop_id_to_include in stops and max_transfers >= 1:
                start_idx = stops.index(start_stop_id)
                via_idx = stops.index(stop_id_to_include)
                
                # Look for routes that can connect from via stop to end stop
                for next_route, next_stops in route_to_stops.items():
                    if next_route != route_id:
                        if end_stop_id in next_stops and stop_id_to_include in next_stops:
                            next_end_idx = next_stops.index(end_stop_id)
                            next_via_idx = next_stops.index(stop_id_to_include)
                            
                            # Verify the order in next route
                            if min(next_via_idx, next_end_idx) <= max(next_via_idx, next_end_idx):
                                path_tuple = (next_route, stop_id_to_include, route_id)
                                # print("yes3")
                                # print(path_tuple)
                                path_key = str(path_tuple)
                                if path_key not in seen_paths:
                                    seen_paths.add(path_key)
                                    result_paths.append(path_tuple)
                                
        except ValueError:
            continue
    
    # Sort the results for consistent output
    return sorted(result_paths)
    
    
    pass  # Implementation here

# PDDL-style planning for route finding
def pddl_planning(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    from pyDatalog import pyDatalog
import time
import psutil
import os
from collections import defaultdict

# Initialize PyDatalog globally
pyDatalog.create_terms('route_stop, R, X, Start, Via, End, Stop, R1, R2')
pyDatalog.create_terms('can_board, can_transfer, reachable, path')

def pddl_planning(start_stop_id, end_stop_id, stop_id_to_include, max_transfers):
    """
    Optimized PDDL-based route planning implementation using PyDatalog
    Returns list of tuples (route1, transfer_stop, route2) representing valid paths
    """
    # Start timing and memory tracking
    start_time = time.time()
    process = psutil.Process(os.getpid())
    initial_memory = process.memory_info().rss / 1024 / 1024

    # Initialize PyDatalog
    pyDatalog.clear()

    # Pre-process route data for faster lookups
    stop_to_routes = defaultdict(set)
    route_stops = defaultdict(set)
    transfer_routes = set()

    # Build lookup dictionaries and assert facts
    try:
        for route_id, stops in route_to_stops.items():
            route_stops[route_id] = set(stops)
            if stop_id_to_include in stops:
                transfer_routes.add(route_id)
            for stop in stops:
                stop_to_routes[stop].add(route_id)
                # Assert facts using proper PyDatalog syntax
                + route_stop(route_id, stop)

    except Exception as e:
        print(f"Error during fact assertion: {str(e)}")
        return []

    # Early termination checks
    if not stop_to_routes[start_stop_id] or not stop_to_routes[end_stop_id]:
        return []

    # Define rules (without + operator)
    can_board(R, X) <= route_stop(R, X)

    reachable(R, Start, Via, End) <= (
        can_board(R, Start) &
        route_stop(R, Via) &
        route_stop(R, End)
    )

    can_transfer(R1, R2, Stop) <= (
        route_stop(R1, Stop) &
        route_stop(R2, Stop)
    )

    path(R1, Via, R2) <= (
        reachable(R1, start_stop_id, Via, stop_id_to_include) &
        reachable(R2, stop_id_to_include, Via, end_stop_id)
    )

    result_paths = set()

    # Query for direct routes (no transfers)
    if max_transfers >= 0:
        for route in stop_to_routes[start_stop_id] & stop_to_routes[end_stop_id] & transfer_routes:
            try:
                stops = list(route_to_stops[route])
                start_idx = stops.index(start_stop_id)
                via_idx = stops.index(stop_id_to_include)
                end_idx = stops.index(end_stop_id)
                
                # Inline validity check for direct routes
                if start_idx < via_idx < end_idx:
                    result_paths.add((route, stop_id_to_include, route))
                    # print(f"State: Direct route found - {route}")
            except (ValueError, KeyError):
                continue

    # Query for routes with one transfer
    if max_transfers >= 1:
        # Query using PyDatalog syntax
        solutions = path(R1, Via, R2)
        if solutions:
            for r1, via, r2 in solutions:
                try:
                    stops1 = list(route_to_stops[r1])
                    stops2 = list(route_to_stops[r2])
                    if r1 != r2:
                        # Inline validity checks for transfer routes
                        start_to_via_valid = stops1.index(start_stop_id) < stops1.index(stop_id_to_include)
                        via_to_end_valid = True  # Allow reverse direction after transfer
                        
                        if start_to_via_valid and via_to_end_valid:
                            result_paths.add((r1, stop_id_to_include, r2))
                            # print(f"State: Transfer route found - {r1} to {r2} at {stop_id_to_include}")
                except (ValueError, KeyError):
                    continue

    # Calculate performance metrics
    execution_time = time.time() - start_time
    final_memory = process.memory_info().rss / 1024 / 1024
    memory_used = final_memory - initial_memory
    print(f"\nPerformance Metrics:")
    print(f"Execution Time: {execution_time:.4f} seconds")
    print(f"Memory Usage: {memory_used:.2f} MB")
    print(f"Number of Steps: {len(result_paths)}")

    return sorted(list(result_paths))
    
    pass  # Implementation here

# Function to filter fare data based on an initial fare limit
def prune_data(merged_fare_df, initial_fare):
    """
    Filter fare data based on an initial fare limit.

    Args:
        merged_fare_df (DataFrame): The merged fare DataFrame.
        initial_fare (float): The maximum fare allowed.

    Returns:
        DataFrame: A filtered DataFrame containing only routes within the fare limit.
    """
    
     # First, let's find the fare column - it might be 'price', 'fare_amount', etc.
    fare_column = None
    possible_fare_columns = ['price', 'fare', 'cost', 'fare_amount', 'amount']
    
    for col in possible_fare_columns:
        if col in merged_fare_df.columns:
            fare_column = col
            break
    
    if fare_column is None:
        # If we can't find a fare column, return the original DataFrame
        return merged_fare_df
        
    return merged_fare_df[merged_fare_df[fare_column] <= initial_fare]
    
    pass  # Implementation here

# Pre-computation of Route Summary
def compute_route_summary(pruned_df):
    """
    Generate a summary of routes based on fare information.

    Args:
        pruned_df (DataFrame): The filtered DataFrame containing fare information.

    Returns:
        dict: A summary of routes with the following structure:
              {
                  route_id (int): {
                      'min_price': float,          # The minimum fare for the route
                      'stops': set                # A set of stop IDs for that route
                  }
              }
    """
    
    summary = {}
    # First, find the fare column
    fare_column = None
    possible_fare_columns = ['price', 'fare', 'cost', 'fare_amount', 'amount']
    
    for col in possible_fare_columns:
        if col in pruned_df.columns:
            fare_column = col
            break
    
    if fare_column is None:
        # If we can't find a fare column, use a default value
        default_fare = 1.0
        
    for _, row in pruned_df.iterrows():
        route_id = row['route_id']
        fare = row[fare_column] if fare_column else default_fare
        
        if route_id not in summary:
            summary[route_id] = {
                'min_price': fare,
                'stops': set(route_to_stops[route_id])
            }
        else:
            summary[route_id]['min_price'] = min(summary[route_id]['min_price'], fare)

    return summary
    
    pass  # Implementation here

# BFS for optimized route planning
def bfs_route_planner_optimized(start_stop_id, end_stop_id, initial_fare, route_summary, max_transfers=3):
    """
    Use Breadth-First Search (BFS) to find the optimal route while considering fare constraints.

    Args:
        start_stop_id (int): The starting stop ID.
        end_stop_id (int): The ending stop ID.
        initial_fare (float): The available fare for the trip.
        route_summary (dict): A summary of routes with fare and stop information.
        max_transfers (int): The maximum number of transfers allowed (default is 3).

    Returns:
        list: A list representing the optimal route with stops and routes taken, structured as:
              [
                  (route_id (int), stop_id (int)),  # Tuple for each stop taken in the route
                  ...
              ]
    """
    
    queue = deque([(start_stop_id, [], 0, 0)])  # (current_stop, path, transfers_used, total_fare)
    visited = set()
    optimal_route = None

    while queue:
        current_stop, path, transfers_used, total_fare = queue.popleft()

        if current_stop == end_stop_id:
            if optimal_route is None or len(path) < len(optimal_route):
                optimal_route = path
            continue
        
        if transfers_used > max_transfers:
            continue
            
        if (current_stop, transfers_used) in visited:
            continue
        visited.add((current_stop, transfers_used))

        for route_id, info in route_summary.items():
            if current_stop in info['stops']:
                new_fare = total_fare + info['min_price']
                if new_fare <= initial_fare:
                    for next_stop in info['stops']:
                        if next_stop != current_stop:
                            new_transfers = transfers_used + 1 if path and path[-1][0] != route_id else transfers_used
                            if new_transfers <= max_transfers:
                                queue.append((
                                    next_stop, 
                                    path + [(route_id, next_stop)], 
                                    new_transfers,
                                    new_fare
                                ))

    return optimal_route or []
    
    pass  # Implementation here




In [3]:


# Sample public test inputs with expected outputs explicitly defined
test_inputs = {
    "direct_route": [
        ((2573, 1177), [10001, 1117, 1407]),  # Input -> Expected output
        ((2001, 2005), [10001, 1151])
    ],

    "forward_chaining": [
        ((22540, 2573, 4686, 1), [(10153, 4686, 1407)]),
        ((951, 340, 300, 1), [(294, 300, 712), (10453, 300, 712), (1211, 300, 712), (1158, 300, 712), 
                              (37, 300, 712), (1571, 300, 712), (49, 300, 712), (387, 300, 712), 
                              (1206, 300, 712), (1038, 300, 712), (10433, 300, 712), (121, 300, 712)])
    ],
    "backward_chaining": [
        ((2573, 22540, 4686, 1), [(1407, 4686, 10153)]),
        ((340, 951, 300, 1), [(712, 300, 121), (712, 300, 1211), (712, 300, 37), (712, 300, 387),
                              (712, 300, 49), (712, 300, 10453), (712, 300, 1038), (712, 300, 10433),
                              (712, 300, 1571)])
    ],
    "pddl_planning": [
        ((22540, 2573, 4686, 1), [(10153, 4686, 1407)]),
        ((951, 340, 300, 1), [(294, 300, 712), (10453, 300, 712), (1211, 300, 712), (1158, 300, 712), 
                              (37, 300, 712), (1571, 300, 712), (49, 300, 712), (387, 300, 712), 
                              (1206, 300, 712), (1038, 300, 712), (10433, 300, 712), (121, 300, 712)])
    ],
    "bfs_route": [
        ((22540, 2573, 10, 3), [(10153, 4686), (1407, 2573)]),
        ((4012, 4013, 10, 3), [(10004, 4013)])
    ],

    ### NOTE: The below values are just dummy values, the actual values are might differ! 
    "busiest_routes": [
        [(123, 456), (789, 234), (567, 235), (3456, 897), (345, 345)]
    ],
    "most_frequent_stops": [
        [(456, 456), (234, 765), (234, 765), (234, 657765), (3252, 35634)]
    ],
    "busiest_stops": [
        [(432243, 14543), (454235, 2452), (2452, 2454), (78568, 24352), (42352, 24532)]
    ],
    "stops_with_one_direct_route": [
        [((24527, 676), 542), ((243535, 8768), 2456), ((43262, 564), 65437),
         ((256, 56), 245), ((266, 256), 78)]
    ]
}

def check_output(expected, actual):
    """Function to compare expected and actual outputs."""
    return set(expected) == set(actual)

def test_direct_route_brute_force():
    for (start_stop, end_stop), expected_output in test_inputs["direct_route"]:
        actual_output = direct_route_brute_force(start_stop, end_stop)
        print(f"Test direct_route_brute_force ({start_stop}, {end_stop}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_query_direct_routes():
    for (start_stop, end_stop), expected_output in test_inputs["direct_route"]:
        actual_output = query_direct_routes(start_stop, end_stop)
        print(f"Test query_direct_routes ({start_stop}, {end_stop}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_forward_chaining():
    for (start_stop, end_stop, via_stop, max_transfers), expected_output in test_inputs["forward_chaining"]:
        actual_output = forward_chaining(start_stop, end_stop, via_stop, max_transfers)
        print(f"Test forward_chaining ({start_stop}, {end_stop}, {via_stop}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_backward_chaining():
    for (end_stop, start_stop, via_stop, max_transfers), expected_output in test_inputs["backward_chaining"]:
        actual_output = backward_chaining(start_stop, end_stop, via_stop, max_transfers)
        print(f"Test backward_chaining ({start_stop}, {end_stop}, {via_stop}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_pddl_planning():
    for (start_stop, end_stop, via_stop, max_transfers), expected_output in test_inputs["pddl_planning"]:
        actual_output = pddl_planning(start_stop, end_stop, via_stop, max_transfers)
        print(f"Test pddl_planning ({start_stop}, {end_stop}, {via_stop}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_bfs_route_planner():
    for (start_stop, end_stop, initial_fare, max_transfers), expected_output in test_inputs["bfs_route"]:
        pruned_df = prune_data(merged_fare_df, initial_fare)
        route_summary = compute_route_summary(pruned_df)
        actual_output = bfs_route_planner_optimized(start_stop, end_stop, initial_fare, route_summary, max_transfers)
        print(f"Test bfs_route_planner_optimized ({start_stop}, {end_stop}, {initial_fare}, {max_transfers}): ", 
              "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

# New test functions for the additional queries

def test_get_busiest_routes():
    expected_output = test_inputs["busiest_routes"][0]
    actual_output = get_busiest_routes()
    print(f"Test get_busiest_routes: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_get_most_frequent_stops():
    expected_output = test_inputs["most_frequent_stops"][0]
    actual_output = get_most_frequent_stops()
    print(f"Test get_most_frequent_stops: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_get_top_5_busiest_stops():
    expected_output = test_inputs["busiest_stops"][0]
    actual_output = get_top_5_busiest_stops()
    print(f"Test get_top_5_busiest_stops: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

def test_get_stops_with_one_direct_route():
    expected_output = test_inputs["stops_with_one_direct_route"][0]
    actual_output = get_stops_with_one_direct_route()
    print(f"Test get_stops_with_one_direct_route: ", 
          "Pass" if check_output(expected_output, actual_output) else f"Fail (Expected: {expected_output}, Got: {actual_output})")

if __name__ == "__main__":
    create_kb()  # Ensure the data is loaded before testing
    merged_fare_df = get_merged_fare_df()  # Use the function to retrieve the DataFrame
    initialize_datalog()
    
    # Run all tests
    test_direct_route_brute_force()
    test_query_direct_routes()
    test_forward_chaining()
    test_backward_chaining()
    test_pddl_planning()
    test_bfs_route_planner()
    
  

Terms initialized: DirectRoute, RouteHasStop, OptimalRoute
Test direct_route_brute_force (2573, 1177):  Pass
Test direct_route_brute_force (2001, 2005):  Pass
Test query_direct_routes (2573, 1177):  Pass
Test query_direct_routes (2001, 2005):  Pass
Test forward_chaining (22540, 2573, 4686, 1):  Pass
Test forward_chaining (951, 340, 300, 1):  Fail (Expected: [(294, 300, 712), (10453, 300, 712), (1211, 300, 712), (1158, 300, 712), (37, 300, 712), (1571, 300, 712), (49, 300, 712), (387, 300, 712), (1206, 300, 712), (1038, 300, 712), (10433, 300, 712), (121, 300, 712)], Got: [(37, 300, 712), (49, 300, 712), (121, 300, 712), (387, 300, 712), (1038, 300, 712), (1211, 300, 712), (1571, 300, 712), (10433, 300, 712), (10453, 300, 712)])
Test backward_chaining (22540, 2573, 4686, 1):  Pass
Test backward_chaining (951, 340, 300, 1):  Pass
State: Transfer route found - 10153 to 1407 at 4686
State: Transfer route found - 10153 to 1407 at 4686

Performance Metrics:
Execution Time: 2.1545 seconds
Mem

In [2]:
import time
import psutil
import os   

In [8]:
test_pddl_planning()
test_bfs_route_planner()

Error during fact assertion: name 'route_stop' is not defined
Test pddl_planning (22540, 2573, 4686, 1):  Fail (Expected: [(10153, 4686, 1407)], Got: [])
Error during fact assertion: name 'route_stop' is not defined
Test pddl_planning (951, 340, 300, 1):  Fail (Expected: [(294, 300, 712), (10453, 300, 712), (1211, 300, 712), (1158, 300, 712), (37, 300, 712), (1571, 300, 712), (49, 300, 712), (387, 300, 712), (1206, 300, 712), (1038, 300, 712), (10433, 300, 712), (121, 300, 712)], Got: [])
Test bfs_route_planner_optimized (22540, 2573, 10, 3):  Pass
Test bfs_route_planner_optimized (4012, 4013, 10, 3):  Pass


In [11]:
test_pddl_planning()

State: Transfer route found - 10153 to 1407 at 4686
State: Transfer route found - 10153 to 1407 at 4686

Performance Metrics:
Execution Time: 2.4773 seconds
Memory Usage: -2.84 MB
Number of Steps: 1
Test pddl_planning (22540, 2573, 4686, 1):  Pass
State: Transfer route found - 121 to 712 at 300
State: Transfer route found - 1211 to 712 at 300
State: Transfer route found - 49 to 712 at 300
State: Transfer route found - 1571 to 712 at 300
State: Transfer route found - 1571 to 712 at 300
State: Transfer route found - 10453 to 712 at 300
State: Transfer route found - 387 to 712 at 300
State: Transfer route found - 10433 to 712 at 300
State: Transfer route found - 1038 to 712 at 300
State: Transfer route found - 37 to 712 at 300

Performance Metrics:
Execution Time: 2.4973 seconds
Memory Usage: 55.45 MB
Number of Steps: 9
Test pddl_planning (951, 340, 300, 1):  Fail (Expected: [(294, 300, 712), (10453, 300, 712), (1211, 300, 712), (1158, 300, 712), (37, 300, 712), (1571, 300, 712), (49, 300