### **Demo File for Pathfinding in GTFS**

This file demoes the usage of the path-finding Spark Job in `raptor_celery.py`

The algorithm is based on the [RAPTOR algorithm](https://www.microsoft.com/en-us/research/wp-content/uploads/2012/01/raptor_alenex.pdf) researched at Microsoft for finding the earliest trip in a transit schedule. While it wasn't possible to replicate the exact algorithm, the idea is the same. 

Basic Algorithm: 

1. Initialize the origin with `Earliest Arrival Time` to 0 for round 1, and `inifinity` for all other stops
2. Initialize the improved boolean to True for origin and False for all others
3. Start a loop from `0` to `max_transfers`
   1. Find all the improved stops from the previous `round`
   2. For each route from the improved stops find the trips that can occur in the current number of transfers
   3. For each stop in that trip, check how long it takes to arrive and compare it with a previous arrival time, if any
   4. For each stop in that trip, check if another stop is close enough to walk to and if the arrival time is faster that way
   5. Store the parent trips and the round where improvement last occured
   6. Increment the number of transfers and repeat the process
4. Use all the stored information to rebuild the path for the destination by looping backwards from it


### **Example from the paper**

![](algorithm_example.png)

### **Commands to execute to run this demo**
1. `pip3 install -r requirements.txt`
2. `hdfs dfs -put data gtfs_data`
3. `spark-submit preprocess.py gtfs_data preprocessed_data 24`
4. `spark-submit --master=local[*] raptor_celery.py`

If there are no issues in the above commands, you should be able to run each cell in order.

**Note:**  
1. Preprocessing takes about 20 minutes due to the distance being calculated using a UDF for every single stop pair but it only needs to be done once.
2. If any cell fails, especially with an error such as `"Raptor not registered"` or `"run() needs 6 positional arguments but received 7"`, kill all processes since there's a Celery task running in the background that has not died correctly.
3. If any Query Example cell runs, but does not display the output, please run it again.
4. This notebook can only run on the cluster since rabbitmq and celery is already set up there with the appropriate URLs.


### **Demo Code**

**Load the `trips.txt` data file to display the final journey information provided by the Spark Job**

In [8]:

def load_trips():
    with open('data/trips.txt', 'r') as f:
        input = f.read()

    input = input.strip().split('\n')
    input = [x.split(',') for x in input]
    header = input[:1][0]
    rows = input[1:]
    objects = [dict(zip(header, x)) for x in rows]
    trip_dictionary = {item['trip_id']: item for item in objects}
    return trip_dictionary

trips = load_trips()

**Load the `stops.txt` data file to display the final journey information provided by the Spark Job**

In [9]:
def load_stops():
    with open('data/stops.txt', 'r') as f:
        input = f.read()

    input = input.strip().split('\n')
    input = [x.split(',') for x in input]
    header = input[:1][0]
    rows = input[1:]
    objects = [dict(zip(header, x)) for x in rows]
    stop_dictionary = {item['stop_name']: int(item['stop_id']) for item in objects}
    return stop_dictionary
    
stops = load_stops()

**Get the stops matching the users query with a simple string search returning N matching values. We sort by length and then slice to give preference to stops that are more of an exact match to the user's query.**

In [10]:
def get_closest_stop(search, limit = 3):
    search = search.lower()
    stop_names = list(stops.keys())
    check_stops = [stop.lower() for stop in stop_names]
    check_stops = [i if search in stop else 0 for i, stop in enumerate(check_stops)]
    closest = list(filter(lambda stop: stop, check_stops))
    closest = [stop_names[i] for i in closest]
    closest.sort(key=lambda s: len(s))
    closest_names = closest[:limit]
    closest = [stops[name] for name in closest_names]
    return closest, closest_names

**Calls the Spark Job with the required parameters using Celery and returns the result.**

In [11]:
from raptor_celery import tasks
from termcolor import colored

def get_path(day, departure_time, origin, destination):
    origins, origin_names = get_closest_stop(origin, 3)
    destinations, destination_names = get_closest_stop(destination, 10)
    print(f"{colored('Departure Details', attrs=['bold'])} - {day.title()} at {departure_time}")
    print(f"{colored('Origin Stations Found', attrs=['bold'])} - {', '.join(origin_names)}")
    print(f"{colored('Destination Stations Found', attrs=['bold'])} - {', '.join(destination_names)}\n")
    results = []
    for i, o in enumerate(origins):  
        res = tasks.delay(day, departure_time, o, destinations, 'preprocessed_data')
        result = res.get()
        results.append((origin_names[i], result))
    return results

**Print the journey information given by Spark in a more human-readable format.**

In [12]:
from operator import itemgetter
from termcolor import colored

def show_journey_information(journeys):
    if(not journeys or len(journeys) == 0):
        print("Sorry, no journey was found within the given number of transfers.")
        return
    journeys = sorted(journeys, key=itemgetter(1))[:3]
    for i, journey in enumerate(journeys):
        journey_duration = journey[1]
        trip_ids = journey[3]
        trip_ids = list(filter(lambda x: x != None, trip_ids))
        trip_transfers = sum([0 if "W" in trip_id else 1 for trip_id in trip_ids])
        stop_ids = journey[5]
        stop_ids = list(filter(lambda x: x != None, stop_ids))
        plural = "s" if trip_transfers > 1 else ""
        if(not i):
            print(colored(f"Journey {i + 1}: {journey_duration} minutes, {trip_transfers} transfer{plural}", "green", attrs=['bold']))
        else:
            print(f"Journey {i + 1}: {journey_duration} minutes, {trip_transfers} transfer{plural}")
        for j, trip_id in enumerate(trip_ids):
            if("W" in trip_id):
                method = "Walk"
            else:
                method = trips[trip_id]['trip_headsign']
            print(f"Trip {j + 1}: {colored(method, 'yellow')} from {colored(stop_ids[j], 'blue')} to {colored(stop_ids[j+1], 'blue')}")
        print("")

**Get all the journeys that match the user's query and display them.**

In [13]:
from termcolor import colored

def get_journeys(day, departure_time, origin, destination):
    paths = get_path(day, departure_time, origin, destination)
    paths = list(filter(lambda p: len(p[1]), paths))
    if(not len(paths)):
        print("Sorry, no journey was found within the given number of transfers.")
    print("-----------------------------------------------------")
    for path in paths:
        print(f"Journeys originating from {colored(path[0], 'red', attrs=['bold'])}")
        show_journey_information(path[1])
        print("-----------------------------------------------------")

**Query 1:** From Holdom Skytrain Station to SFU

In [24]:
path = get_journeys('monday', '14:30', 'Holdom Station', 'SFU')

[1mDeparture Details[0m - Monday at 14:30
[1mOrigin Stations Found[0m - Holdom Station, Holdom Station @ Platform 1, Holdom Station @ Platform 2
[1mDestination Stations Found[0m - SFU Transit Exchange @ Bay 1, SFU Transit Exchange @ Bay 3, SFU Transit Exchange @ Bay 4, SFU Transit Exchange @ Bay 2, SFU Transportation Centre @ Bay 2, SFU Transportation Centre @ Bay 1

-----------------------------------------------------
Journeys originating from [1m[31mHoldom Station @ Platform 2[0m
[1m[32mJourney 1: 32 minutes, 2 transfers[0m
Trip 1: [33mMillennium Line To Lafarge Lake-Douglas[0m from [34mHoldom Station @ Platform 2[0m to [34mSperling-Burnaby Lake Station @ Platform 2[0m
Trip 2: [33mWalk[0m from [34mSperling-Burnaby Lake Station @ Platform 2[0m to [34mSperling Station @ Bay 2[0m
Trip 3: [33m144 SFU[0m from [34mSperling Station @ Bay 2[0m to [34mSFU Transportation Centre @ Bay 2[0m

Journey 2: 33 minutes, 2 transfers
Trip 1: [33mMillennium Line To Lafarge

**Query 2:** From SFU Transit Exchange to Metrotown Skytrain Station

In [21]:
get_journeys('monday', '13:30', 'SFU Transit Exchange', 'Metrotown Station @ Platform')

[1mDeparture Details[0m - Monday at 13:30
[1mOrigin Stations Found[0m - SFU Transit Exchange @ Bay 1, SFU Transit Exchange @ Bay 3, SFU Transit Exchange @ Bay 4
[1mDestination Stations Found[0m - Metrotown Station @ Platform 2, Metrotown Station @ Platform 1

-----------------------------------------------------
Journeys originating from [1m[31mSFU Transit Exchange @ Bay 1[0m
[1m[32mJourney 1: 48 minutes, 2 transfers[0m
Trip 1: [33m145 Production Station[0m from [34mSFU Transit Exchange @ Bay 1[0m to [34mProduction Way Station @ Bay 1[0m
Trip 2: [33mWalk[0m from [34mProduction Way Station @ Bay 1[0m to [34mProduction Way-University Station @ Platform 2[0m
Trip 3: [33mExpo Line To Waterfront[0m from [34mProduction Way-University Station @ Platform 2[0m to [34mMetrotown Station @ Platform 1[0m

Journey 2: 58 minutes, 1 transfer
Trip 1: [33mWalk[0m from [34mSFU Transit Exchange @ Bay 1[0m to [34mSFU Transit Exchange @ Bay 3[0m
Trip 2: [33m144 Metrotown

**Query 3:** From Waterfront to Lonsdale Quay

In [22]:
get_journeys('tuesday', '17:30', 'waterfront', 'Lonsdale Quay')

[1mDeparture Details[0m - Tuesday at 17:30
[1mOrigin Stations Found[0m - Waterfront Station, Waterfront Station @ Bay 3, Waterfront Station @ Bay 2
[1mDestination Stations Found[0m - Lonsdale Quay @ Bay 3, Lonsdale Quay @ Bay 4, Lonsdale Quay @ Bay 6, Lonsdale Quay @ Bay 9, Lonsdale Quay @ Bay 2, Lonsdale Quay @ Bay 7, Lonsdale Quay @ Bay 1, Lonsdale Quay @ Bay 8, Lonsdale Quay @ Bay 5, Lonsdale Quay Station

-----------------------------------------------------
Journeys originating from [1m[31mWaterfront Station @ Bay 2[0m
[1m[32mJourney 1: 17 minutes, 1 transfer[0m
Trip 1: [33mWalk[0m from [34mWaterfront Station @ Bay 2[0m to [34mWaterfront Station Northbound[0m
Trip 2: [33mSeaBus Northbound To Lonsdale Quay[0m from [34mWaterfront Station Northbound[0m to [34mLonsdale Quay Northbound[0m
Trip 3: [33mWalk[0m from [34mLonsdale Quay Northbound[0m to [34mLonsdale Quay @ Bay 2[0m

Journey 2: 17 minutes, 1 transfer
Trip 1: [33mWalk[0m from [34mWaterfront Sta

In [23]:
get_journeys('monday', '13:30', 'Holdom Station @ Platform 1', 'Holdom Station @ Platform 2')

[1mDeparture Details[0m - Monday at 13:30
[1mOrigin Stations Found[0m - Holdom Station @ Platform 1
[1mDestination Stations Found[0m - Holdom Station @ Platform 2

-----------------------------------------------------
Journeys originating from [1m[31mHoldom Station @ Platform 1[0m
[1m[32mJourney 1: 0 minutes, 0 transfer[0m
Trip 1: [33mWalk[0m from [34mHoldom Station @ Platform 1[0m to [34mHoldom Station @ Platform 2[0m

-----------------------------------------------------


In [26]:
user_day = input()
user_time = input()
user_origin = input()
user_destination = input()
get_journeys(user_day, user_time, user_origin, user_destination)

[1mDeparture Details[0m - Wednesday at 15:45
[1mOrigin Stations Found[0m - UBC Exchange @ Bay 9, UBC Exchange @ Bay 7, UBC Exchange @ Bay 1
[1mDestination Stations Found[0m - Waterfront Station, Waterfront Station @ Bay 3, Waterfront Station @ Bay 2, Waterfront Station @ Bay 1, Waterfront Station Eastbound, Waterfront Station Northbound, Waterfront Station Southbound, Waterfront Station Unload Only, Waterfront Station @ Platform 2, Waterfront Station @ Platform 1

-----------------------------------------------------
Journeys originating from [1m[31mUBC Exchange @ Bay 9[0m
[1m[32mJourney 1: 44 minutes, 1 transfer[0m
Trip 1: [33mWalk[0m from [34mUBC Exchange @ Bay 9[0m to [34mUBC Exchange @ Bay 10[0m
Trip 2: [33mWalk[0m from [34mUBC Exchange @ Bay 10[0m to [34mUBC Exchange @ Bay 11[0m
Trip 3: [33m4 Powell[0m from [34mUBC Exchange @ Bay 11[0m to [34mWaterfront Station @ Bay 3[0m

Journey 2: 51 minutes, 2 transfers
Trip 1: [33m14 Hastings[0m from [34mUBC E