### Setup some basic stuff

In [None]:
import logging
logging.getLogger().setLevel(logging.DEBUG)

In [None]:
import folium
import folium.features as fof
import folium.utilities as ful
import branca.element as bre
import json
import geojson as gj
import arrow

import shapely.geometry as shpg
import pandas as pd
import geopandas as gpd

In [None]:
def lonlat_swap(lon_lat):
    return list(reversed(lon_lat))

In [None]:
def get_row_count(n_maps, cols):
    rows = (n_maps / cols)
    if (n_maps % cols != 0):
        rows = rows + 1
    return rows

In [None]:
def get_one_marker(loc, disp_color):
    if loc["geometry"]["type"] == "Point":
        curr_latlng = lonlat_swap(loc["geometry"]["coordinates"])
        return folium.Marker(curr_latlng, icon=folium.Icon(color=disp_color),
                  popup="%s" % loc["properties"]["name"])
    elif loc["geometry"]["type"] == "Polygon":
        assert len(loc["geometry"]["coordinates"]) == 1,\
            "Only simple polygons supported!"
        curr_latlng = [lonlat_swap(c) for c in loc["geometry"]["coordinates"][0]]
        # print("Returning polygon for %s" % curr_latlng)
        return folium.PolyLine(curr_latlng, color=disp_color, fill=disp_color,
                  popup="%s" % loc["properties"]["name"])        

In [None]:
def get_marker(loc, disp_color):
    if type(loc) == list:
        return [get_one_marker(l, disp_color) for l in loc]
    else:
        print("Found single entry, is this expected?")
        return [get_one_marker(loc, disp_color)]

### Read the data

In [None]:
spec_to_validate = json.load(open("final_sfbayarea_filled_reroutes/train_bus_ebike_mtv_ucb.filled.reroute.json"))
sensing_configs = json.load(open("sensing_regimes.all.specs.json"))

### Validating the time range

In [None]:
print("Experiment runs from %s -> %s" % (arrow.get(spec_to_validate["start_ts"]), arrow.get(spec_to_validate["end_ts"])))
start_fmt_time_to_validate = arrow.get(spec_to_validate["start_ts"]).format("YYYY-MM-DD")
end_fmt_time_to_validate = arrow.get(spec_to_validate["end_ts"]).format("YYYY-MM-DD")
if (start_fmt_time_to_validate != spec_to_validate["start_fmt_date"]):
    print("VALIDATION FAILED, got start %s, expected %s" % (start_fmt_time_to_validate, spec_to_validate["start_fmt_date"]))
if (end_fmt_time_to_validate != spec_to_validate["end_fmt_date"]):
    print("VALIDATION FAILED, got end %s, expected %s" % (end_fmt_time_to_validate, spec_to_validate["end_fmt_date"]))

### Validating calibration trips

In [None]:
def get_map_for_calibration_test(trip):
    curr_map = folium.Map()
    if trip["start_loc"] is None or trip["end_loc"] is None:
        return curr_map
    curr_start = lonlat_swap(trip["start_loc"]["geometry"]["coordinates"])
    curr_end = lonlat_swap(trip["end_loc"]["geometry"]["coordinates"])
    folium.Marker(curr_start, icon=folium.Icon(color="green"),
                  popup="Start: %s" % trip["start_loc"]["properties"]["name"]).add_to(curr_map)
    folium.Marker(curr_end, icon=folium.Icon(color="red"),
                  popup="End: %s" % trip["end_loc"]["properties"]["name"]).add_to(curr_map)

    folium.PolyLine([curr_start, curr_end], popup=trip["id"]).add_to(curr_map)
    curr_map.fit_bounds([curr_start, curr_end])    
    return curr_map

In [None]:
calibration_tests = spec_to_validate["calibration_tests"]
rows = get_row_count(len(calibration_tests), 4)
calibration_maps = bre.Figure((rows,4))
for i, t in enumerate(calibration_tests):
    if t["config"]["sensing_config"] != sensing_configs[t["config"]["id"]]["sensing_config"]:
        print("Mismatch in config for test" % t)
    curr_map = get_map_for_calibration_test(t)
    calibration_maps.add_subplot(rows, 4, i+1).add_child(curr_map)
calibration_maps

### Validating evaluation trips

In [None]:
def add_waypoint_markers(waypoint_coords, curr_map):
    for i, wpc in enumerate(waypoint_coords["geometry"]["coordinates"]):
        folium.map.Marker(
            lonlat_swap(wpc), popup="%d" % i,
            icon=fof.DivIcon(class_name='leaflet-div-icon')).add_to(curr_map)

def get_map_for_travel_leg(trip):
    curr_map = folium.Map()
    [get_one_marker(loc, "green").add_to(curr_map) for loc in trip["start_loc"]]
    [get_one_marker(loc, "red").add_to(curr_map) for loc in trip["end_loc"]]
    
    # iterate over all reroutes
    for rc in trip["route_coords"]:
        coords = rc["geometry"]["coordinates"]
        print("Found %d coordinates for the route" % (len(coords)))
        
        latlng_coords = [lonlat_swap(c) for c in coords]
        folium.PolyLine(latlng_coords, popup="%s: %s" % (trip["mode"], trip["name"])).add_to(curr_map)
        
        for i, c in enumerate(latlng_coords):
            folium.CircleMarker(c, radius=5, popup="%d: %s" % (i, c)).add_to(curr_map)
            
        curr_map.fit_bounds(ful.get_bounds(latlng_coords))
    
    return curr_map

In [None]:
def get_map_for_shim_leg(trip):
    curr_map = folium.Map()
    for loc in trip["loc"]:
        mkr = get_one_marker(loc, "purple")
        mkr.add_to(curr_map)
        curr_map.fit_bounds(mkr.get_bounds())
    return curr_map

In [None]:
evaluation_trips = spec_to_validate["evaluation_trips"]
map_list = []
for t in evaluation_trips:
    for l in t["legs"]:
        if l["type"] == "TRAVEL":
            curr_map = get_map_for_travel_leg(l)
            map_list.append(curr_map)
        else:
            curr_map = get_map_for_shim_leg(l)
            map_list.append(curr_map)

rows = get_row_count(len(map_list), 2)
evaluation_maps = bre.Figure(ratio="{}%".format((rows/2) * 100))
for i, curr_map in enumerate(map_list):
    evaluation_maps.add_subplot(rows, 2, i+1).add_child(curr_map)
evaluation_maps

### Validating start and end polygons

In [None]:
def check_start_end_contains(leg):
    for rc in leg["route_coords"]:
        points = gpd.GeoSeries([shpg.Point(p) for p in rc["geometry"]["coordinates"]])
        
        route_start_ts = rc["properties"]["valid_start_ts"]
        route_end_ts = rc["properties"]["valid_end_ts"]
        
        # query all start_locs and end_locs where [route_start_ts, route_end_ts] ∈ [loc_start_ts, loc_end_ts]
        start_locs = [shpg.shape(sl["geometry"]) for sl in leg["start_loc"]
                      if route_start_ts >= sl["properties"]["valid_start_ts"]\
                      and route_end_ts <= sl["properties"]["valid_end_ts"]]
        
        end_locs = [shpg.shape(el["geometry"]) for el in leg["end_loc"]
                    if route_start_ts >= el["properties"]["valid_start_ts"]\
                    and route_end_ts <= el["properties"]["valid_end_ts"]]
        
        assert len(start_locs) >= 1
        assert len(end_locs) >= 1
        
        for sl in start_locs:
            start_contains = points.apply(lambda p: sl.contains(p))
            print(points[start_contains])
            
            # some of the points are within the start polygon
            assert start_contains.any(), leg
            
            # the first point is within the start polygon
            assert start_contains.iloc[0], points.head()
            
            # points within polygons are contiguous
            max_index_diff_start = pd.Series(start_contains[start_contains == True].index).diff().max()
            assert pd.isnull(max_index_diff_start) or max_index_diff_start == 1, "Max diff in index = %s for points %s" % (gpd.GeoSeries(start_contains[start_contains == True].index).diff().max(), points.head())
            
        for el in end_locs:
            end_contains = points.apply(lambda p: el.contains(p))
            print(points[end_contains])
            
            # some of the points are within the end polygon
            assert end_contains.any(), leg
        
            # the last point is within the end polygon
            assert end_contains.iloc[-1], points.tail()
        
            # points within polygons are contiguous
            max_index_diff_end = pd.Series(end_contains[end_contains == True].index).diff().max()
            assert pd.isnull(max_index_diff_end) or max_index_diff_end == 1, "Max diff in index = %s for points %s" % (gpd.GeoSeries(end_contains[end_contains == True].index).diff().max(), points.tail())

In [None]:
invalid_legs = []
for t in evaluation_trips:
    for l in t["legs"]:
        if l["type"] == "TRAVEL" and l["id"] not in invalid_legs:
            print("Checking leg %s, %s" % (t["id"], l["id"]))
            check_start_end_contains(l)

### Validating sensing settings

In [None]:
for ss in spec_to_validate["sensing_settings"]:
    for phoneOS, compare_map in ss.items():
        compare_list = compare_map["compare"]
        for i, ssc in enumerate(compare_map["sensing_configs"]):
            if ssc["id"] != compare_list[i]:
                print("Mismatch in sensing configurations for %s" % ss)

### Validating routes for no duplicate coordinates

In [None]:
REL_TOL = 1e-5

def is_coords_equal(c1, c2):
    return abs(c2[0] - c1[0]) < REL_TOL and abs(c2[1] - c1[1]) < REL_TOL

for t in evaluation_trips:
    for l in t["legs"]:
        if l["type"] == "TRAVEL":
            for rc in l["route_coords"]:
                print("Checking leg %s, %s between dates %s, %s" % (t["id"], l["id"], rc["properties"]["valid_start_fmt_date"], rc["properties"]["valid_end_fmt_date"]))
                for i in range(len(rc["geometry"]["coordinates"])):
                    c1 = rc["geometry"]["coordinates"][i]
                    for j in range(i + 1, len(rc["geometry"]["coordinates"])):
                        c2 = rc["geometry"]["coordinates"][j]
                        if is_coords_equal(c1, c2):
                            # print(f"Found duplicate entry, checking entries {i}...{j}")
                            not_matched_index = -1
                            for k in range(i, j+1):
                                c3 = rc["geometry"]["coordinates"][k]
                                if not is_coords_equal(c1, c3):
                                    not_matched_index = k
                            if not_matched_index != -1:
                                assert False, (f"\tDuplicates {c1}, {c2} found @ indices {i}, {j} with non-duplicate {not_matched_index} in between")

### Validating overlapping time ranges

Representative test case (should break):

In [None]:
def check_overlaps(x):
    ranges = sorted([(l["properties"]["valid_start_ts"], l["properties"]["valid_end_ts"]) for l in x],
                     key=lambda c: c[0])
    for i, r in enumerate(ranges[:-1]):
        assert (ts1 := r[1]) <= (ts2 := ranges[i + 1][0]), f"Overlapping timestamps: {arrow.get(ts1)}, {arrow.get(ts2)}"


invalid_ranges = [
    {
        "properties": {
            "valid_start_ts": arrow.get("2020-01-01").timestamp,
            "valid_end_ts": arrow.get("2020-03-30").timestamp
        }
    },
    {
        "properties": {
            "valid_start_ts": arrow.get("2019-07-16").timestamp,
            "valid_end_ts": arrow.get("2020-04-30").timestamp
        }
    }
]

try:
    check_overlaps(invalid_ranges)
except AssertionError as e:
    print(e)

Actual check of spec:

In [None]:
for t in evaluation_trips:
    for l in t["legs"]:
        print("Checking leg %s, %s" % (t["id"], l["id"]))
        
        # check locs for shim legs
        if "loc" in l:
            print("\tChecking shim locs...")
            check_overlaps(l["loc"])
        
        # check start locs
        if "start_loc" in l:
            print("\tChecking start locs...")
            check_overlaps(l["start_loc"])
        
        # check end locs
        if "end_loc" in l:
            print("\tChecking end locs...")
            check_overlaps(l["end_loc"])
        
        # check trajectories
        if l["type"] == "TRAVEL":
            print("\tChecking trajectories...")
            check_overlaps(l["route_coords"])