# LADDMS lidar trajectory data tutorial
#### Last updated: 08/18/2024
#### https://ndot-laddms.org

In [None]:
import matplotlib.pyplot as plt
import shapely as shp
import geopandas as gpd
import numpy as np
import datetime as dt
import folium
import zoneinfo
import json
import random
import warnings

## Load the trajectory dataset and show some basic info

Loading data assumes that the files are located in the same directory as this notebook.

In [None]:
with open('./sample_data.json', 'r') as f:
    trajectories = json.load(f)
    print(f"Loaded {len(trajectories)} object trajectories.")

In [None]:
for key, value in trajectories[0].items():
    if not isinstance(value, list):
        print(key, ": ", value)
    else:
        print(key, ": ", value[:6], "...")

In [None]:
classes = set([traj['classification'] for traj in trajectories])
print("Available classes:", classes)
locations = set([traj['location_id'] for traj in trajectories])
print("Available locations:", locations)

## Calculate and plot some speed statistics

In [None]:
# Make a 2x2 subplot for each of four known object classes.
fig, axs = plt.subplots(2, 2, figsize=(12, 8))
for c, ax in zip(['BICYCLE', 'VEHICLE', 'PERSON', 'LARGE_VEHICLE'], axs.flatten()):
    median_speeds = []
    pct85_speeds = []
    # For trajectories matching the current class, calculate median and 85th percentile speed.
    for traj in trajectories:
        if traj['classification'] == c:
            # Calculate RSS of x- and y-component velocities (m/s), then convert to miles/hour.
            traj_speed_mph = np.sqrt(np.array(traj['vel_x'])**2 + np.array(traj['vel_y'])**2) * 2.23694
            median_speeds.append(np.quantile(traj_speed_mph, 0.5))
            pct85_speeds.append(np.quantile(traj_speed_mph, 0.85))
    # Plot median and 85th percentile histograms separately.
    ax.hist(median_speeds, bins=25, histtype='step', fill=False, label="Median")
    ax.hist(pct85_speeds, bins=25, histtype='step', fill=False, label="85th pct.")
    ax.set_xlabel("Speed (mph)")
    ax.set_ylabel("Frequency (num. traj.)")
    ax.set_title(c)
    ax.legend()
plt.tight_layout()
plt.show()

In [None]:
# Make a 2x4 subplot for each of eight locations.
loc_names = ['26th & Clarksville', '25th & Clarksville', 'Mid-block @ 24th/Clarksville',
             'Mid-block @ ~23rd/Clarksville', 'DB Todd & Clarksville', 'DB Todd & Buchanan',
             'Mid-block @ 14th/Buchanan', '9th & Buchanan']
fig, axs = plt.subplots(2, 4, figsize=(18, 8))
for loc, loc_name, ax in zip(range(1, len(loc_names)+1), loc_names, axs.flatten()):
    median_speeds = []
    pct85_speeds = []
    # For trajectories matching the current location, calculate median and 85th percentile speed.
    for traj in trajectories:
        if traj['location_id'] == loc:
            # Only include objects that were a vehicle type.
            if traj['classification'] in ('VEHICLE', 'LARGE_VEHICLE'):
                # Calculate RSS of x- and y-component velocities (m/s), then convert to miles/hour.
                traj_speed_mph = np.sqrt(np.array(traj['vel_x'])**2 + np.array(traj['vel_y'])**2) * 2.23694
                # Drop trajectories that had a 95th percentile speed less than 2mph (i.e., parked cars).
                if np.quantile(traj_speed_mph, 0.95) < 2:
                    continue
                median_speeds.append(np.quantile(traj_speed_mph, 0.5))
                pct85_speeds.append(np.quantile(traj_speed_mph, 0.85))
    # Plot median and 85th percentile histograms separately.
    ax.hist(median_speeds, bins=25, histtype='step', fill=False, label="Median")
    ax.hist(pct85_speeds, bins=25, histtype='step', fill=False, label="85th pct.")
    ax.set_xlabel("Vehicle speed (mph)")
    ax.set_ylabel("Frequency (num. traj.)")
    ax.set_title(loc_name)
    ax.legend()
plt.tight_layout()
plt.show()

## Load some polygon zones and calculate geometric intersection

In [None]:
# This geo-JSON file contains a roadway zone drawn roughly around each lidar installation.
geojson_path = 'location_zones.geojson'
gdf = gpd.read_file(geojson_path)
gdf = gdf.explode(index_parts=True).reset_index(drop=True)
gdf = gdf.set_crs(epsg=4326)
gdf = gdf.to_crs(epsg=32616)
print(gdf)
zones = gdf.to_dict(orient='index')
zones = {r['location_id']: r['geometry'] for r in zones.values()}
print(zones)

In [None]:
def trajectories_to_linestrings(trajectory_dictioinaries, simplify=1):
    """
    Converts trajectory UTM coordinate lists into linestring geometry.
    :param trajectory_dictionaries: list of dictionaries, each containing trajectory data; keys 'utm_x' and 'utm_y' are used.
    :param simplify: simplification factor for linestring gemetry; takes every N points.
    """
    # LIST[0::N] gets the Nth element of LIST; when N=1, it returns the entire list
    return [shp.geometry.LineString([(x, y) for x, y in zip(traj['utm_x'][0::simplify], traj['utm_y'][0::simplify])]) 
            for traj in trajectory_dictioinaries]

In [None]:
def calculate_zone_intersection(trajectory_linestrings, zone_polygons):
    """
    Generates count of number of intersecting trajectories for each zone polygon, as {polygon_id: count}
    :param trajectory_linestrings: list of shapely linestring geometries.
    :param zone_polygons: dictionary of format {polygon_id: polygon shapely geometry object}, assumed same CRS as linestrings
    """
    zone_counts = {}
    for zi, z_poly in zone_polygons.items():
        count = 0
        for t in trajectory_linestrings:
            if t.intersects(z_poly):
                count += 1
        zone_counts[zi] = count
    return zone_counts

In [None]:
def filter_trajectories_by_intersection(trajectory_linestrings, zone_polygons):
    """
    Generates filtered list of trajectory linestrings, those that intersect with any of the zone polygons.
    :param trajectory_linestrings: list of shapely linestring geometries.
    :param zone_polygons: dictionary of format {polygon_id: polygon shapely geometry object}, assumed same CRS as linestrings
    """
    filtered = []
    for t in trajectory_linestrings:
        for zi, z_poly in zone_polygons.items():
            if t.intersects(z_poly):
                filtered.append(t)
                break
    return filtered

In [None]:
# Calculate the number of pedestrian trajectories entering the roadway at any point at each location.
# Crossing at a crosswalk would likely intersect the zone.
ped_lines = trajectories_to_linestrings([t for t in trajectories if t['classification'] == 'PERSON'])
peds_by_zone = calculate_zone_intersection(ped_lines, zones)
print("Pedestrian trajectories by roadway zone:", peds_by_zone)

## Plot trajectories and zones on a base map

In [None]:
def plot_trajectories(linestrings, 
                      polygons_geodataframe=None, 
                      polygons_tooltip=False, 
                      polygon_color_dict=None,
                      save_html_filepath=None):
    """
    Plot linestrings on an OpenStreetMap base map. Optionally, display polygon zones on the same map.
    :param linestrings: list of Shapely linestring geometry objects
    :param polygons_geodataframe: GeoDataFrame with 'location_id' and 'geometry' (polygon) columns
    :param polygons_tooltip: True/False, display interactive tooltip for location_id
    :param polygon_color_dict: dictionary mapping location_id to a text or RGB color definition
    :param save_html_filepath: file path at which to save the map, absolute or relative to this notebook
    """
    if len(linestrings) == 0:
        print("NO DATA TO PLOT")
        return None
    # Create a GeoDataFrame out of the linestring geometries.
    gdf = gpd.GeoDataFrame(geometry=linestrings, crs="EPSG:32616")  # UTM Zone 16N
    # Simplify linestrings based on point proximity.
    gdf['geometry'] = gdf['geometry'].simplify(tolerance=0.25)
    # Change the coordinate reference system of the data to WGS84 (lat/long).
    # This is the CRS that the basemap uses.
    gdf_wgs84 = gdf.to_crs(epsg=4326)
    
    def random_color():
        # Generate a random color, bit-wise.
        return f'#{random.randint(0, 0xFFFFFF):06x}'

    def polygon_style_function(feature):
        # If the polygon_color_dict was supplied, use this for polygon styling; otherwise, use red.
        id_value = feature['properties']['location_id']
        if polygon_color_dict is None:
            return {'fillColor': 'red', 'color': 'black', 'weight': 1.5, 'fillOpacity': 0.5}
        else:
            return {'fillColor': polygon_color_dict[id_value], 'color': 'black', 'weight': 1.5, 'fillOpacity': 0.5}
    
    # Create a folium map centered on the first polygon.
    with warnings.catch_warnings():
        # Ignore the UserWarning that is generated about centroid calculation.
        warnings.simplefilter("ignore")
        centroid = gdf_wgs84.geometry.centroid.iloc[0]
    m = folium.Map(location=[centroid.y, centroid.x], zoom_start=15)

    # Plot the polygons if they were supplied.
    if polygons_geodataframe is not None:
        # Add the tooltip function if indicated.
        if polygons_tooltip is True:
            folium.GeoJson(polygons_geodataframe,
                           style_function=polygon_style_function,
                           tooltip=folium.GeoJsonTooltip(fields=['location_id'], aliases=['Location: '], localize=True)
                          ).add_to(m)
        else:
            folium.GeoJson(polygons_geodataframe,
                           style_function=polygon_style_function
                          ).add_to(m)
    
    # Add the trajectory linestrings to the folium map.
    for _, row in gdf_wgs84.iterrows():
        sim_geo = gpd.GeoSeries(row['geometry'])
        geo_j = sim_geo.to_json()
        geo_json = folium.GeoJson(
            data=geo_j, style_function=lambda x, color=random_color(): {'color': color, 'weight': 2}
        )
        geo_json.add_to(m)
    
    # Save the map to an HTML file, if indicated.
    if save_html_filepath is not None:
        m.save(save_html_filepath)
    # Return the map so it could be displayed it in the notebook.
    return m


In [None]:
# Plot the pedestrian trajectories on a map, along with the location zones.
traj_map = plot_trajectories(ped_lines, polygons_geodataframe=gdf, polygons_tooltip=True)
# Open the map in the Python notebook.
traj_map

In [None]:
# Plot all trajectories on a map. This may take up to a minute.
all_lines = trajectories_to_linestrings(trajectories, simplify=4)
_ = plot_trajectories(all_lines, save_html_filepath='map.html')
# >> Now open the new file 'map.html' in your browser.