# Demo Overview

Using AIS vessel observation data to find interesting patters in recreational vessel traffic. 

* Create and display observation data as tracks
* Determine areas of interest using dwell detection
* Find common routes between interesting areas
* Detect potential anomalies in vessel traffic

In [None]:
import geoanalytics
import geoanalytics.sql.functions as ST
import geoanalytics.tracks.functions as TRK
import pyspark.sql.functions as F
from geoanalytics.tools import *

In [None]:
from esrimap import EsriJSMap, Renderers, Labels, Popups
from pyspark.sql import DataFrame

# Add display method to DataFrame for additional convenience
DataFrame.display_layer = EsriJSMap.display_layer

## Set up plotting extent and style

In [None]:
import matplotlib.pyplot as plt

san_pedro_bay = (-118.303734,33.701202,-118.178078,33.779397)
coast_of_la = (-119.2388,33.2165,-117.2804,34.1125)

bay_extent = san_pedro_bay
demo_extent = coast_of_la
output_path = None # "C:/dev/demo/uc2025/sandbox/la_coast_demo"

demo_figsize=(14, 14)
demo_basemap="light"

demo_style = dict(basemap=demo_basemap, extent=demo_extent, figsize=demo_figsize, quantize=True, sr=4326)
bay_style = dict(basemap=demo_basemap, extent=bay_extent, figsize=demo_figsize, sr=4326)

area_style = dict(alpha=.3, cmap_values="type", edgecolor="black")
line_style = dict(edgecolor="blue")

# Setup plotting and utilities

# Disable x-ticks and y-ticks using rcParams
plt.rcParams['xtick.bottom'] = False
plt.rcParams['xtick.labelbottom'] = False
plt.rcParams['ytick.left'] = False
plt.rcParams['ytick.labelleft'] = False


# Show demo extent
spark.sql("select st_point(0, 0, 4326)").st.plot(**demo_style)

spark.conf.set("spark.sql.shuffle.partitions", 50)



## Load AIS data and filter to demo extent

In [None]:

# Load dataset from parquet, filter to June and July, and create point geometry column in WGS84
ais = spark.read.format("parquet").load("C:/dev/demo/commondata/ais_parquet")\
  .where("month in (6,7)")\
  .withColumn("location", ST.point("lon", "lat", 4326))

# Assign vessel group based on vessel type in data
ais = ais.withColumn("vesselgroup", F.when(F.expr("vesseltype == 30"), "fishing")
                              .when(F.expr("vesseltype == 35"), "military")
                              .when(F.expr("vesseltype in (36, 37)"), "recreational")
                              .when(F.expr("vesseltype in (21, 22, 31, 32, 52)"), "tug")
                              .when(F.expr("vesseltype == 50"), "pilot")
                              .when(F.expr("vesseltype == 51"), "S&R")
                              .when(F.expr("vesseltype between 60 and 69"), "passenger")
                              .when(F.expr("vesseltype between 70 and 79"), "cargo")
                              .when(F.expr("vesseltype between 80 and 89"), "tanker"))\
   .select("mmsi", "basedatetime", "location", "vesselgroup")


# Filter to observations within our demo extent
ais = ais.where(ST.bbox_intersects("location", *demo_extent))

# Project to CA state plane for processing
ais = ais.withColumn("location", ST.transform("location", 2230))\

print("Observation Count: " + str(ais.persist().count()))

# Display breakdown of vessel types in demo extent

In [None]:
ais.groupBy("vesselgroup").agg(
      F.countDistinct("mmsi").alias("unique_vessels"), 
      F.count("*").alias("observation_count"))\
   .orderBy("unique_vessels").show()

# Create tracks for recreational vessels

* Split tracks based on one week interval
* Discard segments that span more than 3000 meters (e.g., distance between consecutive observations)

In [None]:
from pyspark.sql import Window

user_track_fields = ["mmsi"]
track_fields = [*user_track_fields, "track_index"]

ais_tracks = ais.where("vesselgroup = 'recreational'")\
  .groupBy(*user_track_fields).agg(TRK.aggr_create_track("location", "BaseDateTime").alias("track")) \
  .withColumn("track", F.explode(TRK.split_by_duration("track", (7, "days")))) \
  .withColumn("track", F.explode(TRK.split_by_distance_gap("track", (3000, "meters")))) \
  .withColumn("track_index", F.row_number().over(Window.partitionBy(*user_track_fields).orderBy(TRK.start_timestamp("track")))) \
  .persist()


print("Track Count: " + str(ais_tracks.count()))

ais_tracks.st.plot(**demo_style, edgecolor="#FF0000A0", linewidth=.1)

if output_path:
    ais_tracks.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/tracks")

# Find dwell events

* Distance Threshold: **150 meters**
* Minimum dwell duration: **1 hour**

In [None]:
dwell_events = ais_tracks\
  .withColumn("dwell", F.explode(TRK.find_dwells("track", (150, "meters"), (1, "hour"))))\
  .selectExpr(
    *track_fields,
    "ST_Centroid(dwell) as dwell_centroid",
    "TRK_StartTimestamp(dwell) as dwell_start"
  )\
  .coalesce(10)\
  .persist()

dwell_events.st.plot(**bay_style)

if output_path:
    dwell_events.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/dwells")

# Generate areas of interest by finding groups of proximal dwell locations

1. Group dwells such that events within 300 meters of each other are in the same group
2. Create convex hull around all dwells in a group
3. Filter out groups with less than 15 dwell events

In [None]:
dwell_clusters = GroupByProximity()\
  .setSpatialRelationship("NearPlanar", 300, "meters")\
  .run(dwell_events)\
  .groupBy("group_id").agg(
    F.count("*").alias("count"),
    ST.aggr_convex_hull("dwell_centroid").alias("dwell_area")
  )\
  .withColumn("dwell_area", ST.buffer("dwell_area", (100, "meters")))\
  .where("count > 15")\
  .coalesce(10)\
  .persist()


map = EsriJSMap(basemap="oceans")
map.add_layer(dwell_clusters, color=[255, 0, 0, .5], popup=["count"])
map.add_layer(dwell_events, color="black", size="5px")
map.display()

Javascript Map Source: https://github.com/climbage/uc/tree/main/2025

# Assign useful labels to areas

* Load TIGER water body data from Census
* Join with dwell areas and select label from water body with largest intersecting area

In [None]:
# Select all water bodies except oceans
awater = spark.read.format("shapefile").load(r"C:\dev\demo\uc2025\sandbox\tiger\areawater")\
  .where(ST.bbox_intersects("geometry", *demo_extent))\
  .where("fullname not like '%Ocean%'")\
  .withColumnRenamed("geometry", "water_polygon")

areas = dwell_clusters.join(awater, ST.intersects("dwell_area", "water_polygon"), "left")\
  .groupBy("group_id").agg(
      F.max_by("FULLNAME", ST.area(ST.intersection("water_polygon", "dwell_area"))).alias("label"),
      F.first("dwell_area").alias("dwell_area")
  )\
  .withColumn("label", F.when(F.expr("label == '' or label is null"), F.expr("concat('Unnamed ', monotonically_increasing_id())")).otherwise(F.expr("label")))\
  .persist()

areas.display_layer(basemap="gray-vector", label="label")

In [None]:
if output_path:
    areas.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/areas")

# Find every instance of a track traveling through a dwell area

1. Join tracks with dwell areas
2. Find all enter/exit events
3. Discard events less than 10 minutes 

In [None]:
# Collapsing dwells reduces the number of points in the track and improves geofence performance
geofence_tracks = ais_tracks.withColumn("track", TRK.collapse_dwells("track", (100, "meters"), (30, "minutes")))

geofence_events = geofence_tracks.join(areas, ST.intersects("track", "dwell_area"))\
    .withColumn("event", F.explode(TRK.entry_exit_points("track", "dwell_area")))\
    .where("cast(event.exit.time as long) - cast(event.entry.time as long) > 10 * 60")\
    .select(
        *track_fields, 
        F.col("label").alias("event_label"),
        F.inline(F.array(
            F.struct(F.col("event.entry.time").alias("event_time"), 
                     F.col("event.entry.point").alias("event_location"),
                     F.when(F.col("event.entry.track_endpoint"), "Begin").otherwise("Enter").alias("event_type")),
            F.struct(F.col("event.exit.time").alias("event_time"), 
                     F.col("event.exit.point").alias("event_location"),
                     F.when(F.col("event.exit.track_endpoint"), "End").otherwise("Exit").alias("event_type"))
        ))
    )

geofence_events = geofence_events.persist()
print("Event Count: " + str(geofence_events.count()))

geofence_events.withColumn("event_time", F.col("event_time").cast("string"))\
  .display_layer(popup=["mmsi", "track_index", "event_type", "event_label", "event_time"])

In [None]:
if output_path:
    geofence_events.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/geofence_events")

# Create "trips" between areas of interest

* Each trip starts when a vessel exits an area of interest and ends when it enters another area of interest.
* Trips that exit and enter the same area are discarded
* Short trips (<500 meters) are also ignored

In [None]:
# Use the `lead` window function to include the next geofence event in each row. Each trip goes from the
# event in the row to the newly added next event.
trips = geofence_events\
  .withColumn("event", F.struct("event_location", "event_label", "event_time"))\
  .select(*track_fields, "event", F.lead("event").over(Window.partitionBy("mmsi", "track_index").orderBy("event_time")).alias("next_event"))

# Rejoin with track and compute the trip start and end offsets in seconds
trips = trips.join(ais_tracks, on=track_fields)\
             .withColumn("track_start_seconds", F.unix_timestamp(TRK.start_timestamp("track")))\
             .withColumn("event_offset", F.unix_timestamp("event.event_time") - F.col("track_start_seconds"))\
             .withColumn("next_event_offset", F.unix_timestamp("next_event.event_time") - F.col("track_start_seconds"))

# Use offsets to cut each track into subtracks
trips = trips.withColumn("subtrack", TRK.between("track", ("event_offset", "seconds"), ("next_event_offset", "seconds")))\
             .withColumn("subtrack_length", TRK.length("subtrack"))

trips = trips.selectExpr(*track_fields, 
                         "event.event_label as orig", 
                         "next_event.event_label as dest", 
                         "subtrack as track", 
                         "subtrack_length as length")

# Remove trips that just return to the same area or are less than 500 meters
trips = trips.where("orig != dest and length > 500")

# Create canonical ID for the bidirectional segment between two locations
trips = trips.withColumn("route_id", F.array_join(F.array_sort(F.array("orig", "dest")), " - "))

print("Trip Count: " + str(trips.persist().count()))

trips.where("mmsi = '338054481'").show(truncate=50)

if output_path:
    trips.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/trips")

# Visualize trips in ArcGIS Pro

* Use new geoparquet layer type
* Identify patterns in trip length

# Group trips together based on length

* Trips whose distances are within 0.1% of each other are grouped together
* Use route_id which ignores directionality (e.g., orig -> dest = dest -> orig)

In [None]:
trip_groups = GroupByProximity()\
  .setSpatialRelationship("NearPlanar", 500, "meters")\
  .setAttributeRelationship(f"""
      a.route_id = b.route_id
      and abs(a.length - b.length) < a.length * .001
   """)\
  .run(trips)\
  .withColumnRenamed("group_id", "trip_group")

# Count the number of trips per group
trip_groups = trip_groups.withColumn("group_size", F.count("*").over(Window.partitionBy("trip_group")))\
  .persist()

if output_path:
    trip_groups.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/trip_groups")

# Identify common routes by averaging similar trips

1. For each route, choose a representative trip track
2. Iterate vertices in representative track and average the closest points on all other tracks in route
3. Create line from average points as an approximate of the route line

In [None]:
mean_routes = trip_groups\
           .withColumn("max_for_route", F.max_by("trip_group", "group_size").over(Window.partitionBy("route_id")))\
           .where("trip_group = max_for_route and group_size > 2")\
           .drop("max_for_route")\
           .groupBy("route_id").agg(
               F.min_by("track", "length").alias("rep_track"),
               F.collect_list("track").alias("all_tracks"),
               F.first("group_size").alias("group_size")
           )\
           .withColumn("mean_trip_line", F.expr("st_linestring(transform(st_points(rep_track), pt -> st_centroid(st_multipoint(transform(all_tracks, other -> st_closestpoint(other, pt))))))"))\
           .persist()

route_filter = "route_id = 'Isthmus Cv - Marina del Rey'"

map = EsriJSMap(basemap="oceans", height="800px")
map.add_layer(mean_routes.where(route_filter).select(F.explode("all_tracks")), color="gray", width=1)
map.add_layer(mean_routes.where(route_filter).select("rep_track"), color="blue")
map.add_layer(mean_routes.where(route_filter).select("mean_trip_line"), width=3, color="black", style="dash")
map.display()

if output_path:
    mean_routes.drop("all_tracks", "mean_trip_line")\
      .repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/representative_trips")

mean_routes = mean_routes.drop("all_tracks", "rep_track")

if output_path:
    mean_routes.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/mean_routes")

# Determine how far each trip deviates from its common route

In [None]:
route_deviation = trips.join(mean_routes, on=["route_id"])\
  .withColumn("deviation", ST.hausdorff_distance("track", "mean_trip_line"))\
  .drop("rep_track", "mean_trip_line")

route_deviation.where(route_filter).st.plot(basemap="light", figsize=demo_figsize, cmap_values="deviation", legend=True)

if output_path:
    route_deviation.repartition(1).write.format("geoparquet").mode("overwrite").save(f"{output_path}/route_deviations")