In [12]:
first_run = True

Imports

In [1]:
# --- Standard Library ---
import math
import random
import time
import warnings
from collections import defaultdict
from datetime import datetime, timedelta, timezone

# --- Third-party Libraries ---
import numpy as np
import pandas as pd
import ijson
import dtale

# --- Matplotlib ---
import matplotlib.pyplot as plt
from matplotlib import colors as mcolors
from matplotlib.cm import ScalarMappable
from matplotlib.colors import LinearSegmentedColormap, Normalize
from matplotlib.patches import Rectangle, Polygon as MplPolygon
from matplotlib.ticker import FixedLocator

# --- Shapely ---
from shapely.geometry import (
    Polygon, MultiPolygon, GeometryCollection
)
from shapely.ops import unary_union

# --- IPython / Widgets ---
from IPython.display import display
from ipywidgets import Checkbox, HBox, VBox, HTML

# --- Enable Interactive Plotting ---
%matplotlib widget


Allowing multiple files to be run via batch_run.py

In [None]:
import os

# Only define injected_file_path if not set externally
# Set injected_file_path to the realpath of the file you want to run (if running notebook individually)
# Will overwrite if using batch_run.py, so doesn't matter in that case)
try:
    injected_file_path
except NameError:
    injected_file_path = "/real/path/to/trajectory/file/637b023440527bf2daa5932f__post1.json"

# Always assign file_path
file_path = injected_file_path

file_id = os.path.splitext(os.path.basename(file_path))[0]

# Extract post folder (assumes format has '__postX' at the end)
post_folder = file_id.split("__")[-1]  # e.g. 'post1'
output_dir = os.path.join("./output", post_folder)
os.makedirs(output_dir, exist_ok=True)

# Plot counter starts at 0 for each notebook run
plot_counter = 0


Saving plots and dataframes to file

In [3]:
def save_plot(name_suffix):
    global plot_counter
    plot_counter += 1
    filename = f"{output_dir}/plot_{file_id}_{plot_counter}_{name_suffix}.png"
    plt.savefig(filename, dpi=300)
    print(f"Saved plot to {filename}")

In [4]:
def save_df(df, name_suffix):
    filename = f"{output_dir}/df_{file_id}_{name_suffix}.csv"
    df.to_csv(filename, index=False)
    print(f"Saved DataFrame to {filename}")

Functions

In [10]:
# Convert timestamp to Edie time bin
def time_bin(t, dt):
    return int(t // dt)

# Convert x-position to Edie space bin
def space_bin(x, dx):
    return int(x // dx)

# Create the main parser and Edie statistics accumulator
def compute_edie_stats_threshold(file_path, direction=-1, dx=0.02 * 5280, dt=6, light_thresh=20, heavy_thresh=10, subsample_step=10):
    edie_grid = defaultdict(lambda: {"distance": 0.0, "time": 0.0, "count": 0})
    got_date = False
    count = 0
    start_time = time.time()
    with open(file_path, 'r') as input_file:
        parser = ijson.items(input_file, 'item', use_float=True)

        for record in parser:
            if record["direction"] != direction:
                continue

            timestamps = np.array(record["timestamp"])[::subsample_step]
            x_positions = np.array(record["x_position"])[::subsample_step]
            if len(timestamps) < 2:
                continue

            if not got_date:
                date = datetime.fromtimestamp(timestamps[0]).date()
                got_date = True

            for i in range(1, len(timestamps)):
                t0, t1 = timestamps[i - 1], timestamps[i]
                x0, x1 = x_positions[i - 1], x_positions[i]

                dt_seg = t1 - t0
                dx_seg = (x1 - x0) * direction
                if dt_seg <= 0 or dx_seg <= 0:
                    continue

                t_bins = list(range(time_bin(t0, dt), time_bin(t1, dt) + 1))
                x_bins = list(range(space_bin(x0, dx), space_bin(x1, dx) + 1))

                for tb in t_bins:
                    for xb in x_bins:
                        edie_grid[(xb, tb)]["distance"] += dx_seg
                        edie_grid[(xb, tb)]["time"] += dt_seg
                        edie_grid[(xb, tb)]["count"] += 1

            count += 1
            if count % 100000 == 0:
                print(f"Processed {count} trajectories...")
    end_time = time.time()
    print(f"✅ Finished parsing {count} trajectories in {end_time-start_time} seconds.")
    
    # Apply dual threshold filtering
    edie_grid_light = {}
    edie_grid_heavy = {}

    for key, cell in edie_grid.items():
        if cell["time"] == 0:
            continue
        speed_mph = (cell["distance"] / cell["time"]) * 3600 / 5280
        if speed_mph <= light_thresh:
            edie_grid_light[key] = cell
        if speed_mph <= heavy_thresh:
            edie_grid_heavy[key] = cell

    print(f"🌕 Edie cells ≤ {light_thresh} mph: {len(edie_grid_light)}")
    print(f"🔴 Edie cells ≤ {heavy_thresh} mph: {len(edie_grid_heavy)}")

    return edie_grid_light, edie_grid_heavy, date



def union_parallelograms(parallelograms):
    print("🔁 Unioning parallelograms...")
    t0 = time.time()
    merged = unary_union(parallelograms)
    t1 = time.time()
    print(f"✅ Union complete in {t1 - t0:.2f} seconds.")
    return merged

def create_tilted_parallelogram(x, t, speed, length=200, time_span=60):
    """
    Create a parallelogram centered at (x, t), tilted according to speed.
    - x in feet, t in seconds
    - speed in mph
    """
    mph_to_ftps = 1.46667
    slope = speed * mph_to_ftps  # ft/s
    
    half_len = length / 2
    half_time = time_span / 2
    
    # Compute corners with tilt
    corners = [
        (t - half_time, x - slope * half_time - half_len),
        (t - half_time, x - slope * half_time + half_len),
        (t + half_time, x + slope * half_time + half_len),
        (t + half_time, x + slope * half_time - half_len),
    ]
    return Polygon(corners)


def split_by_thin_connections(polygons, buffer_distance=100):
    """
    For each polygon in the input list, apply erosion followed by dilation.
    This removes thin connections, possibly splitting a polygon into multiple parts.
    
    Args:
        polygons (List[Polygon]): List of input polygons
        buffer_distance (float): Buffer distance for erosion and dilation
    
    Returns:
        List[Polygon]: All resulting polygons after split
    """
    results = []

    for i, poly in enumerate(polygons):
        if poly.is_empty or not isinstance(poly, Polygon):
            continue

        # Erode (shrink) to remove narrow connections
        eroded = poly.buffer(-buffer_distance)
        if eroded.is_empty:
            continue

        # Dilate (expand back to original size)
        dilated = eroded.buffer(buffer_distance)

        # Collect individual polygons
        if isinstance(dilated, (MultiPolygon, GeometryCollection)):
            parts = [g for g in dilated.geoms if isinstance(g, Polygon) and not g.is_empty]
            results.extend(parts)
        elif isinstance(dilated, Polygon):
            results.append(dilated)

    return results




def build_all_parallelograms(points, speeds, length=200, time_span=60):
    parallelograms = []
    n_points = len(points)
    print(f"Building parallelograms for {n_points} points...")
    for i, ((x, t), speed) in enumerate(zip(points, speeds)):
        if i % 100000 == 0 and i > 0:
            print(f"  Processed {i} points...")
        poly = create_tilted_parallelogram(x, t, speed, length, time_span)
        parallelograms.append(poly)
    print(f"Done creating {len(parallelograms)} parallelograms.")
    return parallelograms

def filter_polygons_by_area(polygons, min_area=10000):
    """
    Filter polygons, keeping only those with area >= min_area.
    """
    if hasattr(polygons, "geoms"):
        poly_list = list(polygons.geoms)
    else:
        poly_list = [polygons]

    kept_polygons = [poly for poly in poly_list if poly.area >= min_area]

    print(f"Filtered {len(poly_list)} polygons down to {len(kept_polygons)} by area >= {min_area}")
    return kept_polygons


def find_and_build_congestion_zones(
    file_path,
    traj_threshold_seconds=10,
    direction=-1,
    light_thresh=20,
    heavy_thresh=10,
    light_params=None,
    heavy_params=None
):
    """
    Combines segment extraction + polygon creation into a single pass.
    Also returns bounds for plotting without needing full trajectories.
    """
    if light_params is None:
        light_params = {}
    if heavy_params is None:
        heavy_params = {}

    date = None
    got_date = False
    count = 0

    # Accumulators for raw points/speeds
    light_points, light_speeds = [], []
    heavy_points, heavy_speeds = [], []

    # Bounds tracking
    min_ts, max_ts = float('inf'), float('-inf')
    min_pos, max_pos = float('inf'), float('-inf')
    min_speed, max_speed = float('inf'), float('-inf')

    t0 = time.time()

    with open(file_path, 'r') as input_file:
        parser = ijson.items(input_file, 'item', use_float=True)

        for record in parser:
            if record["direction"] != direction:
                continue

            duration = record["last_timestamp"] - record["first_timestamp"]
            if duration < traj_threshold_seconds:
                continue

            if not got_date:
                date = datetime.fromtimestamp(record["first_timestamp"]).date()
                got_date = True

            x_pos = np.array(record["x_position"])
            timestamp = np.array(record["timestamp"])

            # Update bounds, used for plotting
            min_ts = min(min_ts, np.min(timestamp))
            max_ts = max(max_ts, np.max(timestamp))
            min_pos = min(min_pos, np.min(x_pos))
            max_pos = max(max_pos, np.max(x_pos))

            # Calculate speeds (mph) for all points in trajectory. 
            speed = np.diff(x_pos) / np.diff(timestamp) * direction
            speed = np.append(speed[0], speed)
            speed *= 0.681818  # ft/s → mph

            min_speed = min(min_speed, np.min(speed))
            max_speed = max(max_speed, np.max(speed))

            def find_segments(threshold, points_accum, speeds_accum):
                mask = (speed > 0) & (speed <= threshold) # mask of speed-array. E.g [True,True,False,True,True] [8,7,25,12,13], if thresh = 20. 
                in_segment = False
                segment_start = 0

                for i in range(len(mask)):
                    # If we hit a True in the mask and we’re not already inside a segment, this marks the beginning of a congestion segment.
                    if mask[i] and not in_segment:
                        segment_start = i
                        in_segment = True

                    #If the mask switches back to False and we were inside a segment, then we just found the end of that congestion segment.
                    elif not mask[i] and in_segment:
                        segment_end = i - 1
                        in_segment = False

                        # Saves the start and end point of the segment: Each point is (position, timestamp) Stored in points_accum. 
                        # Also stores the speeds at those two endpoints into speeds_accum.
                        points_accum.extend(zip(
                            [x_pos[segment_start], x_pos[segment_end]],
                            [timestamp[segment_start], timestamp[segment_end]]
                        ))
                        speeds_accum.extend([speed[segment_start], speed[segment_end]])

                # If we reach the end of the trajectory while still inside a segment, then the segment extends to the last data point.
                if in_segment:
                    segment_end = len(mask) - 1
                    points_accum.extend(zip(
                        [x_pos[segment_start], x_pos[segment_end]],
                        [timestamp[segment_start], timestamp[segment_end]]
                    ))
                    speeds_accum.extend([speed[segment_start], speed[segment_end]])

            # finds low-speed stretches (light or heavy congestion) and saves their start/end points for later polygon building.
            find_segments(light_thresh, light_points, light_speeds)
            find_segments(heavy_thresh, heavy_points, heavy_speeds)

            count += 1
            if count % 100000 == 0:
                print(f"Processed {count} trajectories...")

    t1 = time.time()
    print(f"✅ Finished parsing {count} trajectories in {t1 - t0:.2f}s")
    print(f"{file_path}, {date} → Collected (light={len(light_points)}, heavy={len(heavy_points)}) points.")

    # Build polygons
    print("\n🔹 Building light congestion zones...")
    light_polygons = create_congestion_zones_from_points(light_points, light_speeds, **light_params)

    print("\n🔹 Building heavy congestion zones...")
    heavy_polygons = create_congestion_zones_from_points(heavy_points, heavy_speeds, **heavy_params)

    # bounds for plotting
    bounds = {
        "timestamps": (min_ts, max_ts),
        "positions": (min_pos, max_pos),
        "speeds": (max(0, min_speed), max_speed)
    }

    return light_polygons, heavy_polygons, date, bounds


def create_congestion_zones_from_points(
    points,
    speeds,
    length=250,
    time_span=30,
    min_area=500000,
    simplify_tolerance=0,
    label=None,
    union=True,
    detailed_zone=True,
    min_area_hulls=500000,
    buffer_distance=None,
    fill_gaps_distance=None
):
    
    t0 = time.time()
    print(f"\n🔹 Building congestion polygon for: {label or 'Unnamed'}")
    print(f"✅ Received {len(points)} points.")

    # Step 1: Build parallelograms
    parallelograms = build_all_parallelograms(points, speeds, length=length, time_span=time_span)
    if not union:
        return parallelograms

    # Step 2: Union, filter, simplify
    congestion_polygon = union_parallelograms(parallelograms)
    filtered_by_area = filter_polygons_by_area(congestion_polygon, min_area=min_area)

    if buffer_distance is not None:
        filtered_by_area = split_by_thin_connections(filtered_by_area, buffer_distance=buffer_distance)

    if fill_gaps_distance is not None:
        filtered_by_area = fill_polygon_gaps(filtered_by_area, buffer_distance=fill_gaps_distance)

    if detailed_zone:
        simplified = simplify_polygons(filtered_by_area, tolerance=simplify_tolerance)
        print(f"✅ Detailed Zone processed in {time.time() - t0:.2f}s")
        return simplified

    convexified = [poly.convex_hull for poly in filtered_by_area]
    merged_convex = unary_union(convexified)
    merged_list = [g for g in getattr(merged_convex, "geoms", [merged_convex]) if isinstance(g, Polygon)]
    final_polygons = [poly for poly in merged_list if poly.area >= min_area_hulls]
    print(f"✅ Polygon processed in {time.time() - t0:.2f}s")
    return final_polygons


def simplify_polygons(polygons, tolerance=1000):
    # If MultiPolygon, simplify each geometry
    if hasattr(polygons, "geoms"):
        simplified_geoms = [poly.simplify(tolerance, preserve_topology=True) for poly in polygons.geoms]
        return MultiPolygon(simplified_geoms)
    elif isinstance(polygons, Polygon):
        return polygons.simplify(tolerance, preserve_topology=True)
    else:
        # If it's a list of polygons
        simplified = [poly.simplify(tolerance, preserve_topology=True) for poly in polygons]
        return simplified

def fill_polygon_gaps(polygons, buffer_distance=100):
    """
    Slightly dilate (expand) each polygon and merge with the original to fill small gaps.
    
    Args:
        polygons (List[Polygon]): Input polygons
        buffer_distance (float): Distance to buffer for filling
    
    Returns:
        List[Polygon]: Cleaned, gap-filled polygons
    """
    filled_polygons = []
    
    for poly in polygons:
        if poly.is_empty or not isinstance(poly, Polygon):
            continue
        
        # Expand slightly
        dilated = poly.buffer(buffer_distance)
        
        # Merge with original to preserve finer detail
        merged = poly.union(dilated)

        # Simplify topology (optional)
        if isinstance(merged, (MultiPolygon, GeometryCollection)):
            parts = [g for g in merged.geoms if isinstance(g, Polygon)]
            filled_polygons.extend(parts)
        else:
            filled_polygons.append(merged)
    
    return filled_polygons
    
def edie_cell_to_polygon(x_bin, t_bin, dx=160, dt=30):
    x0 = x_bin * dx / 5280
    x1 = (x_bin + 1) * dx / 5280
    t_start = t_bin * dt
    t_end = (t_bin + 1) * dt
    
    # The corners of the rectangle in (time, mile_marker) coordinates
    corners = [
        (t_start, x0),
        (t_start, x1),
        (t_end, x1),
        (t_end, x0),
        (t_start, x0),  # Close polygon by repeating the first point
    ]
    
    return Polygon(corners)


def interactive_plot_edies_polys_from_bounds(bounds, patches_by_label):
    plt.rc('font', family='serif', size=14)
    fig, ax = plt.subplots(figsize=(12, 4))

    # Set axis limits from bounds
    min_ts, max_ts = bounds["timestamps"]
    min_pos, max_pos = bounds["positions"]
    min_speed, max_speed = bounds["speeds"]

    ax.set_xlim(min_ts, max_ts)
    ax.set_ylim(min_pos / 5280, max_pos / 5280)
    ax.invert_yaxis()
    ax.set_xlabel("Time")
    ax.set_ylabel("Mile Marker")

    # Colorbar
    colors = [plt.cm.jet(x) for x in np.linspace(1, 0.5, 256)]
    green_to_red = LinearSegmentedColormap.from_list('GreenToRed', colors, N=256)
    norm = Normalize(vmin=0, vmax=min(max_speed, 55))
    sm = ScalarMappable(cmap=green_to_red, norm=norm)
    sm.set_array([])
    plt.colorbar(sm, ax=ax, label='Speed (mph)', pad=0.01)

    # Format time axis
    ticks_loc = ax.get_xticks().tolist()
    ax.set_xticks(ticks_loc)
    labels = [datetime.fromtimestamp(ts, tz=timezone(timedelta(hours=-6))).strftime('%H:%M:%S') for ts in ticks_loc]
    ax.set_xticklabels(labels, rotation=45)

    # Hover display
    def format_hover_info(x, y):
        try:
            dt = datetime.fromtimestamp(x, tz=timezone(timedelta(hours=-6)))
            return f"Time: {dt.strftime('%H:%M')}, Mile: {y:.2f}"
        except:
            return f"Time: {x:.2f}, Mile: {y:.2f}"
    ax.format_coord = format_hover_info

    # --- TOGGLES ---
    patches_controls = {}
    toggles = []

    for label, patches in patches_by_label.items():
        for patch in patches:
            ax.add_patch(patch)

        checkbox = Checkbox(value=True, indent=False, layout={'width': '15px'})
        color = patch.get_facecolor() if patches else "black"
        label_html = HTML(f"<span style='color:{mcolors.to_hex(color)}; font-weight:bold;'>{label.replace('_', ' ').title()}</span>")
        toggles.append(HBox([checkbox, label_html], layout={'align_items': 'center', 'gap': '4px'}))
        patches_controls[label] = (patches, checkbox)

    def update_display(change=None):
        for _, (patches, checkbox) in patches_controls.items():
            for patch in patches:
                patch.set_visible(checkbox.value)
        fig.canvas.draw_idle()

    for _, (_, checkbox) in patches_controls.items():
        checkbox.observe(update_display, names='value')

    update_display()
    display(VBox(toggles))
    plt.tight_layout()
    save_plot("cong_and_edie_poly")
    plt.show()


def build_edie_patches(edie_grid, dx, dt):
    """Convert edie_grid to colored matplotlib Rectangles."""

    # Colormap for Edies
    jet = plt.cm.jet
    colors = [jet(x) for x in np.linspace(1, 0.5, 256)]
    cmap = LinearSegmentedColormap.from_list('GreenToRed', colors, N=256)
    norm = Normalize(vmin=0, vmax=55)

    patches = []
    for (x_bin, t_bin), cell in edie_grid.items():
        if cell["time"] == 0:
            continue
        speed_mph = (cell["distance"] / cell["time"]) * 3600 / 5280
        speed_for_color = min(speed_mph, 55)
        color = cmap(norm(speed_for_color))

        x0 = x_bin * dx / 5280
        x1 = (x_bin + 1) * dx / 5280
        t_start = t_bin * dt
        t_end = (t_bin + 1) * dt

        rect = Rectangle((t_start, x0), t_end - t_start, x1 - x0,
                         facecolor=color, edgecolor='none', alpha=1)
        patches.append(rect)
    return patches


def convert_polys_to_miles(polygons):
    """
    Converts the y-coordinates of all polygons in the list from feet to miles.

    Parameters:
        polygons (list[Polygon]): List of shapely Polygons in feet.

    Returns:
        list[Polygon]: New list with y-coordinates converted to miles.
    """
    converted = []
    for poly in polygons:
        # poly.exterior.coords gives a list of (x, y) vertices for the outer boundary
        # Only the y-coordinate is divided by 5280 to convert feet → miles
        coords = [(x, y / 5280) for x, y in poly.exterior.coords]
        converted.append(Polygon(coords))
    return converted

def build_polygon_patches(polygons, facecolor):
    """Create matplotlib polygon patches from shapely polygons."""
    patches = []
    for poly in polygons:
        coords = np.array(poly.exterior.coords)
        patch = MplPolygon(coords, facecolor=facecolor, edgecolor='black', linewidth=0.8)
        patches.append(patch)
    return patches

def compare_polygon_sets(edie_light, edie_heavy, light_polys_miles, heavy_polys_miles, dx, dt):
    """Convert Edie cells to polygons, combine with other polygon sets, and compute pairwise area coverage."""
    
    def edie_cells_to_polygons(edie_dict, dx, dt):
        """Convert Edie cell dictionary to a list of shapely polygons."""
        print(f"Converting {len(edie_dict)} Edie cells to polygons...")
        polys = []
        for (x_bin, t_bin), cell in edie_dict.items():
            if cell["time"] == 0:
                continue
            polys.append(edie_cell_to_polygon(x_bin, t_bin, dx, dt))
        print(f"Converted to {len(polys)} polygons (excluding zero-time cells).")
        return polys
    
    def total_area(polygons):
        """Calculate total area of a list of polygons."""
        return sum(p.area for p in polygons)
    
    # --- Step 1: Convert Edie cells ---
    print("Starting conversion of Edie light polygons...")
    edie_light_polygons = edie_cells_to_polygons(edie_light, dx, dt)
    
    print("Starting conversion of Edie heavy polygons...")
    edie_heavy_polygons = edie_cells_to_polygons(edie_heavy, dx, dt)
    
    # --- Step 2: Store all sets in a dict ---
    poly_sets = {
        "Light Edie": edie_light_polygons,
        "Heavy Edie": edie_heavy_polygons,
        "Light Poly": light_polys_miles,
        "Heavy Poly": heavy_polys_miles,
    }
    
    # --- Step 3: Pairwise area comparisons ---
    print("\nStarting pairwise area comparisons...\n")
    results = []
    for gt_name, gt_polys in poly_sets.items():
        gt_area = total_area(gt_polys)
        for pred_name, pred_polys in poly_sets.items():
            if gt_name == pred_name:
                continue
            pred_area = total_area(pred_polys)
            coverage_pct = (pred_area / gt_area * 100) if gt_area > 0 else 0
            results.append({
                "Truth": gt_name,
                "Approx.": pred_name,
                "Coverage %": coverage_pct
            })
    
    # --- Step 4: Sort & save ---
    print("\nPairwise area coverage results:")
    df_results = pd.DataFrame(results)
    df_results_sorted = df_results.iloc[(df_results["Coverage %"] - 100).abs().argsort()]
    df_results_sorted = df_results_sorted[["Approx.", "Truth", "Coverage %"]].round(2)
    
    save_df(df_results_sorted, "coverage")
    print(df_results_sorted)
    
    return df_results_sorted








Edies

In [None]:
DX_FEET = 0.02 * 5280 # 0.02 miles converted to feet
DT_SECONDS = 6 # 6 seconds 

edie_light, edie_heavy, date = compute_edie_stats_threshold(
    file_path=file_path,  # file_path from 
    dx=DX_FEET,        # 0.02 miles
    dt=DT_SECONDS,         # 6 seconds time resolution
    direction=-1,   # set the correct traffic direction
    light_thresh=20,
    heavy_thresh=10,
    subsample_step=10,
)

Our Algorithm

In [None]:
file_paths = [file_path]  # Can be changed to [file1, file2] etc if you want to run multiple files via the notebook. 
                          # Remember the last file will be used for later cells in that case.
all_rows = []

for path in file_paths:
    polygons_light, polygons_heavy, date, bound = find_and_build_congestion_zones(
        path,
        light_thresh=20,
        heavy_thresh=10,
        
        # polygon settings

        light_params=dict(length=250,
                            time_span=30,
                            min_area=500000,
                            union=True,
                            detailed_zone=False,
                            min_area_hulls=50000,
                            label="light_sub_20"),
        heavy_params=dict(length=500,
                            time_span=5,
                            min_area=50000,
                            union=True,
                            detailed_zone=True,
                            buffer_distance=5,
                            fill_gaps_distance=5,
                            label="heavy_sub_10")
    )

    # polygons_light and polygons_heavy are already computed
    print(f"{path} → light zones: {len(polygons_light)}, heavy zones: {len(polygons_heavy)}")


Plotting

In [None]:
# Convert polygons to miles
light_polys_miles = convert_polys_to_miles(polygons_light)
heavy_polys_miles = convert_polys_to_miles(polygons_heavy)

# Build polygon patches
light_poly_patches = build_polygon_patches(light_polys_miles, "#04ff00")
heavy_poly_patches = build_polygon_patches(heavy_polys_miles, "#ff0000")

# Build Edie patches
light_edie_patches = build_edie_patches(edie_light, dx=DX_FEET, dt=DT_SECONDS)
heavy_edie_patches = build_edie_patches(edie_heavy, dx=DX_FEET, dt=DT_SECONDS)

# Combine into dict
patches_by_label = {
    "light_sub_20": light_poly_patches,
    "heavy_sub_10": heavy_poly_patches,
    "light_sub_20_edie": light_edie_patches,
    "heavy_sub_10_edie": heavy_edie_patches
}


# 4. Plot
interactive_plot_edies_polys_from_bounds(bound, patches_by_label)


Area comparisons

In [None]:
df_results = compare_polygon_sets(
    edie_light,
    edie_heavy,
    light_polys_miles,
    heavy_polys_miles,
    dx=DX_FEET,
    dt=DT_SECONDS
)
