In [None]:
import json
import polyline as pl
import pandas as pd
from pytest import approx

## Load all data

In [None]:
original_route = json.load(open("final_sfbayarea_filled/train_bus_ebike_mtv_ucb.filled.json"))
sm_reroute = json.load(open("final_sfbayarea_filled/train_bus_ebike_sm_reroute_mtv_ucb.filled.json"))

In [None]:
combined_spec = json.load(open("final_sfbayarea/train_bus_ebike_mtv_ucb.json"))
combined_reroute = json.load(open("final_sfbayarea_filled_reroutes/train_bus_ebike_mtv_ucb.filled.reroute.json"))

## Check input spec

First, we verify that we have a trajectory defined for each leg in the spec, and that the leg contains a polyline. This is true by spot-checking, but this checks it programmatically

In [None]:
def check_polyline_existence(t_or_l):
    trajectory_key_list = t_or_l.keys() - ["id", "name", "mode", "start_loc", "end_loc", "multiple_occupancy"]
    if len(trajectory_key_list) != 1:
        print("key list = %s" % trajectory_key_list)
        return False
    trajectory_key = next(iter(trajectory_key_list))
    polyline_exists = (trajectory_key == "polyline" or trajectory_key == "polylines")
    if not polyline_exists:
        if trajectory_key == 'waypoint_coords':
            wc = t_or_l["waypoint_coords"]
            if type(wc) == list:
                polyline_exists_list = ["polyline" in w for w in wc]
                polyline_exists = pd.Series(polyline_exists_list).all()
            else:
                polyline_exists = "polyline" in wc
        else:
            polyline_exists = False
    else:
        pass
    print("Trajectory is defined using %s, includes polyline %s" % (trajectory_key, polyline_exists))
    if not polyline_exists:
        print("======POLYLINE NOT FOUND for %s" % t_or_l["id"])
    return polyline_exists

In [None]:
for t in combined_spec["evaluation_trips"]:
    if "legs" not in t:
        print("Checking unimodal trip %s" % t["id"])
        check_polyline_existence(t)
        continue
    for l in t["legs"]:
        print("Checking leg %s in trip %s" % (l["id"], t["id"]))
        check_polyline_existence(l)

## Check output spec


In [None]:
def check_coordinates(l1, l2):
    if l1 == l2:
        return True
    elif len(l1) != len(l2):
        print("=== list lengths don't match: %s != %s" % (len(l1), len(l2)))
        return False
    else:
        for i, (e1, e2) in enumerate(zip(l1, l2)):
            if e1 != approx(e2):
                print("=== mismatch found at index %i: %s != %s" % (i, e1, e2))
                return False
        print("=== elements match, but lists don't match?!") #: %s != %s" % (l1, l2))
        return False

In [None]:
def validate_shim_leg(ti, t, li, l):
    if len(l["loc"]) == 1:
        orig_loc = original_route["evaluation_trips"][ti]["legs"][li]["loc"]
        combo_loc = l["loc"][0]
        # print(orig_loc["geometry"]["coordinates"], combo_loc["geometry"]["coordinates"])
        valid = check_coordinates(orig_loc["geometry"]["coordinates"], combo_loc["geometry"]["coordinates"])
        if not valid:
            print("!!! Shim Leg %s has not been rerouted, check against original route = %s!!!" % (l["id"], valid))
        else:
            print("Shim Leg %s has not been rerouted, check against original route = %s" % (l["id"], valid))
    else:
        print("=== Leg %s has %s reroutes, need to check each of them")

In [None]:
def validate_leg_key(ti, t, li, l, key):
    orig_l = original_route["evaluation_trips"][ti]["legs"][li]
    reroute_l = sm_reroute["evaluation_trips"][ti]["legs"][li]
    if len(l[key]) == 1:
        # print(l[key][0]["geometry"]["coordinates"])
        # print(orig_l[key]["geometry"]["coordinates"])
        # No reroute. The combo fields should be equal to both original and rerouted fields, and they should be equal to each other
        combo_orig = check_coordinates(l[key][0]["geometry"]["coordinates"], orig_l[key]["geometry"]["coordinates"])
        combo_reroute = check_coordinates(l[key][0]["geometry"]["coordinates"], reroute_l[key]["geometry"]["coordinates"])
        orig_reroute = check_coordinates(orig_l[key]["geometry"]["coordinates"], reroute_l[key]["geometry"]["coordinates"])
        print(combo_orig, combo_reroute, orig_reroute)
        return combo_orig and combo_reroute and orig_reroute
    if len(l[key]) > 1:
        # Reroute. Only two reroutes supported. The first combo fields should be equal to original and the second should be the rerouted
        assert len(l[key]) == 2, "Only two reroutes at this time, so use a simple check"
        combo_orig = check_coordinates(l[key][0]["geometry"]["coordinates"], orig_l[key]["geometry"]["coordinates"])
        combo_reroute = check_coordinates(l[key][1]["geometry"]["coordinates"], reroute_l[key]["geometry"]["coordinates"])
        print(combo_orig, combo_reroute)
        return combo_orig and combo_reroute

In [None]:
def validate_travel_leg(ti, t, li, l):
    invalid_feature_list = []
    if not validate_leg_key(ti, t, li, l, "start_loc"):
        invalid_feature_list.append("start_loc")
    if not validate_leg_key(ti, t, li, l, "end_loc"):
        invalid_feature_list.append("end_loc")
    if not validate_leg_key(ti, t, li, l, "route_coords"):
        invalid_feature_list.append("route_coords")
    
    if len(invalid_feature_list) > 0:
        print("!! Travel Leg %s in trip %s has invalid keys %s !!" % (l["id"], t["id"], invalid_feature_list))
    else: 
        print("Travel Leg %s in trip %s matches previous trajectories" % (l["id"], t["id"]))

In [None]:
for ti, t in enumerate(combined_reroute["evaluation_trips"]):
    for li, l in enumerate(t["legs"]):
        if "loc" in l:
            validate_shim_leg(ti, t, li, l)
        else:
            validate_travel_leg(ti, t, li, l)