In [1]:
def get_ais_updates_in_ports():
    # filter AIS updates table down to entries that are close to ports
    pass


# pick a "flagged" port
# get all ships that have been in that port - flag them
# get all ship encounters with flagged ships, flag them too

# 1. compute all encounters

# 2. compute all flagged-port visits

# 3. compute graph

In [2]:
def get_encounters_for_port(port_id: str):
    pass

import duckdb

def get_stops_for_vessel(mmsi: str):
    # query ais table for this ship
    # for when it has had velocity (column `sog`) = 0
    # for 2-8 hours

    con = duckdb.connect()

    con.execute("""
        CREATE TABLE ais_data AS
        SELECT * FROM read_parquet('./data/parquet/AIS_*.parquet')
    """)

    statement = """
    WITH vessel_sog_zero AS (
  SELECT 
    mmsi,
    timestamp,
    lat,
    lon,
    sog,
    -- Row number over time for each vessel
    ROW_NUMBER() OVER (PARTITION BY mmsi ORDER BY timestamp) 
      - ROW_NUMBER() OVER (PARTITION BY mmsi, CASE WHEN sog <= 0.5 THEN 1 ELSE 0 END ORDER BY timestamp) AS grp
  FROM 
    ais_data
),

pause_groups AS (
  SELECT
    mmsi,
    MIN(timestamp) AS start_time,
    MAX(timestamp) AS end_time,
    AVG(lat) AS avg_lat,
    AVG(lon) AS avg_lon,
    COUNT(*) AS point_count
  FROM
    vessel_sog_zero
  WHERE
    sog <= 0.5  -- consider low SOG as paused (threshold can be adjusted)
  GROUP BY
    mmsi, grp
  HAVING
    (MAX(timestamp) - MIN(timestamp)) BETWEEN INTERVAL '2 hours' AND INTERVAL '8 hours'
)

SELECT
  mmsi,
  avg_lat AS lat,
  avg_lon AS lon,
  start_time,
  end_time
FROM
  pause_groups
ORDER BY
  start_time;
    """
    print( 
        con.execute(statement).fetch_df()
    )

get_stops_for_vessel("hi")


            mmsi        lat         lon          start_time  \
0      367387860  29.951921  -90.387626 2024-01-01 00:00:00   
1      367194680  38.576065  -90.219979 2024-01-01 00:00:00   
2      368111380  37.904210 -122.372424 2024-01-01 00:00:00   
3      367009930  40.730698  -74.012630 2024-01-01 00:00:00   
4      366941830  29.731084  -95.049651 2024-01-01 00:00:00   
...          ...        ...         ...                 ...   
22744  367299580  26.555353  -97.429779 2024-01-05 21:58:46   
22745  368221490  30.058834  -93.369630 2024-01-05 21:58:48   
22746  367061980  40.640499  -74.129270 2024-01-05 21:59:00   
22747  367170330  29.936361  -90.334758 2024-01-05 21:59:19   
22748  261036090  18.340948  -64.794153 2024-01-05 21:59:33   

                 end_time  
0     2024-01-01 07:16:41  
1     2024-01-01 06:34:40  
2     2024-01-01 04:12:59  
3     2024-01-01 02:14:09  
4     2024-01-01 05:10:30  
...                   ...  
22744 2024-01-05 23:59:56  
22745 2024-01-05 23

In [3]:
import polars as pl
from pathlib import Path
from datetime import timedelta


def get_stops_for_vessel(mmsi: str | list[str] | None = None) -> pl.DataFrame:
    """
    Find periods where vessels have stopped (very low speed) for 2-8 hours.
    
    Args:
        mmsi: Optional vessel MMSI to filter for a specific vessel
        
    Returns:
        DataFrame with vessel stops including location and timing information
    """
    # Read all AIS data files
    # Read all AIS data files
    # Read all AIS data files
    df = pl.concat([
        pl.read_parquet(f) 
        for f in Path('./data/parquet').glob('AIS_*.parquet')
    ])
    
    if mmsi and isinstance(mmsi, str):
        df = df.filter(pl.col("mmsi") == mmsi)
    elif mmsi and isinstance(mmsi, list):
        df = df.filter(pl.col("mmsi").is_in(mmsi))
    
    # Consider vessel stopped when SOG <= 0.5 knots
    df = df.with_columns([
        # Convert timestamp to datetime if it isn't already
        pl.col("timestamp").cast(pl.Datetime).alias("timestamp"),
        # Flag for stopped vessels
        (pl.col("sog") <= 0.5).alias("is_stopped")
    ]).sort(["mmsi", "timestamp"])
    
    # Create groups of consecutive stops using rank difference method
    df = df.with_columns([
        pl.col("timestamp").rank().over("mmsi").alias("row_nr"),
        pl.col("timestamp").rank().over(["mmsi", "is_stopped"]).alias("group_row_nr")
    ]).with_columns([
        (pl.col("row_nr") - pl.col("group_row_nr")).alias("stop_group")
    ])
    
    # Group by vessel and stop group to find extended stops
    stops = df.filter(pl.col("is_stopped")).group_by(
        ["mmsi", "stop_group"]
    ).agg([
        pl.col("timestamp").min().alias("start_time"),
        pl.col("timestamp").max().alias("end_time"),
        pl.col("lat").mean().alias("lat"),
        pl.col("lon").mean().alias("lon"),
        pl.count().alias("point_count")
    ]).with_columns([
        (pl.col("end_time") - pl.col("start_time")).alias("duration")
    ]).filter(
        # Filter for stops between 2 and 8 hours
        (pl.col("duration") >= timedelta(hours=2)) 
        & (pl.col("duration") <= timedelta(hours=8))
    ).sort("start_time")
    
    return stops.select([
        "mmsi",
        "lat",
        "lon",
        "start_time",
        "end_time",
        "duration",
        "point_count"
    ])


    
stops = get_stops_for_vessel()

  pl.count().alias("point_count")


In [12]:
import polars as pl
from datetime import timedelta
from pathlib import Path
import math


def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    Returns distance in nautical miles
    """
    # Convert decimal degrees to radians
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    
    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    # Radius of earth in nautical miles
    r = 3440  # Earth's radius in nautical miles
    return c * r


def filter_for_proximity_to_port(stops: pl.DataFrame, threshold: float, port_ids: list[str] | None = None) -> pl.DataFrame:
    """
    Filter vessel stops based on proximity to ports.
    
    Args:
        stops: DataFrame containing vessel stops with lat/lon coordinates
        threshold: Distance threshold in nautical miles
        port_ids: Optional list of port UIDs to filter for specific ports
        
    Returns:
        DataFrame with stops that are within threshold distance of ports,
        including port information
    """
    # Load ports data
    ports = pl.read_parquet("./data/parquet/ports.parquet")
    
    # Filter ports if specific IDs are provided
    if port_ids:
        ports = ports.filter(pl.col("uid").is_in(port_ids))
    
    # Create a cross join between stops and ports
    # This will allow us to calculate distances between all combinations
    cross_joined = stops.join(ports, how="cross")
    
    # Calculate distances using haversine formula
    cross_joined = cross_joined.with_columns([
        pl.struct(["lat", "lon", "lat_right", "lon_right"]).map_elements(
            lambda x: haversine_distance(x["lat"], x["lon"], x["lat_right"], x["lon_right"])
        ).alias("distance_nm")
    ])
    
    # Filter for stops within threshold distance of any port
    nearby_stops = cross_joined.filter(pl.col("distance_nm") <= threshold)
    
    # For each stop, keep only the closest port
    result = nearby_stops.group_by([
        "mmsi", "lat", "lon", "start_time", "end_time", "duration", "point_count"
    ]).agg([
        pl.col("uid").alias("port_uid").first(),
        pl.col("name").alias("port_name").first(),
        pl.col("region_name").first(),
        pl.col("country_code").first(),
        pl.col("world_water_body").first(),
        pl.col("distance_nm").min().alias("distance_to_port_nm")
    ]).sort(["start_time", "mmsi"])
    
    return result


filter_for_proximity_to_port(stops, 0.1, port_ids=[i for i in range(10000)])

  cross_joined = cross_joined.with_columns([


mmsi,lat,lon,start_time,end_time,duration,point_count,port_uid,port_name,region_name,country_code,world_water_body,distance_to_port_nm
i64,f64,f64,datetime[μs],datetime[μs],duration[μs],u32,f64,str,str,str,str,f64
366996610,28.950537,-95.333915,2024-01-01 00:00:08,2024-01-01 02:23:49,2h 23m 41s,124,9250.0,"""Freeport""","""US Gulf Coast -- 8650""","""United States""","""Gulf of Mexico; North Atlantic…",0.044459
367564240,29.733433,-95.19967,2024-01-01 00:01:48,2024-01-01 06:40:50,6h 39m 2s,129,9210.0,"""Norsworthy""","""US Gulf Coast -- 8650""","""United States""","""Gulf of Mexico; North Atlantic…",0.01821
366516370,40.668308,-74.016655,2024-01-01 00:29:30,2024-01-01 02:31:48,2h 2m 18s,59,7630.0,"""Brooklyn""","""United States E Coast -- 6585""","""United States""","""North Atlantic Ocean""",0.098504
368008060,25.782255,-80.183027,2024-01-01 02:00:08,2024-01-01 05:21:07,3h 20m 59s,58,8640.0,"""Miami""","""United States E Coast -- 6585""","""United States""","""North Atlantic Ocean""",0.066808
775998212,25.781977,-80.183608,2024-01-01 03:11:26,2024-01-01 06:04:51,2h 53m 25s,46,8640.0,"""Miami""","""United States E Coast -- 6585""","""United States""","""North Atlantic Ocean""",0.082771
…,…,…,…,…,…,…,…,…,…,…,…,…
368308420,25.950004,-97.400805,2024-01-05 16:26:40,2024-01-05 23:58:59,7h 32m 19s,93,9340.0,"""Brownsville""","""US Gulf Coast -- 8650""","""United States""","""Gulf of Mexico; North Atlantic…",0.043462
368088250,28.950818,-95.334942,2024-01-05 17:23:24,2024-01-05 23:59:55,6h 36m 31s,341,9250.0,"""Freeport""","""US Gulf Coast -- 8650""","""United States""","""Gulf of Mexico; North Atlantic…",0.097762
368233290,31.150295,-81.499981,2024-01-05 17:23:39,2024-01-05 23:59:49,6h 36m 10s,344,8550.0,"""Brunswick""","""United States E Coast -- 6585""","""United States""","""North Atlantic Ocean""",0.017728
338144207,27.815195,-97.400857,2024-01-05 18:14:09,2024-01-05 20:55:36,2h 41m 27s,26,9300.0,"""Corpus Christi""","""US Gulf Coast -- 8650""","""United States""","""Gulf of Mexico; North Atlantic…",0.099435


In [16]:
df_stops_a = get_stops_for_vessel(mmsi=[368008060])
df_stops = get_stops_for_vessel(mmsi=[775998212])

import polars as pl
import numpy as np

# Parameters
max_distance_meters = 500  # e.g., vessels must be within 500 meters
min_overlap_seconds = 300  # e.g., overlapping for at least 5 minutes

# Assume df already loaded with columns: mmsi, lat, lon, start_time, end_time

# --- Helper: Haversine Distance ---
def haversine(lat1, lon1, lat2, lon2):
    R = 6371000  # Earth radius in meters
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    lat2 = np.radians(lat2)
    lon2 = np.radians(lon2)
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    return 2 * R * np.arcsin(np.sqrt(a))

# --- Cross Join vessels with themselves ---
df1 = df_stops_a.with_columns(mmsi1=pl.col("mmsi"))
df2 = df_stops.with_columns(mmsi2=pl.col("mmsi"))

pairs = df1.join(df2, how="cross")

print("joined", pairs.shape)

# --- Filter out self-matches ---
pairs = pairs.filter(pl.col("mmsi1") < pl.col("mmsi2"))

# --- Compute distance between vessel locations ---
pairs = pairs.with_columns(
    pl.Series(
        "distance_meters",
        haversine(
            pairs["lat"], pairs["lon"],
            pairs["lat_right"], pairs["lon_right"]
        )
    )
)

print("distances", pairs.shape)

# --- Filter by spatial proximity ---
pairs = pairs.filter(pl.col("distance_meters") <= max_distance_meters)

# --- Compute temporal overlap ---
pairs = pairs.with_columns([
    pl.max_horizontal(["start_time", "start_time_right"]).alias("overlap_start"),
    pl.min_horizontal(["end_time", "end_time_right"]).alias("overlap_end")
])

print("overlap calcuulated", pairs.shape)

pairs = pairs.with_columns(
    (pl.col("overlap_end") - pl.col("overlap_start")).alias("overlap_duration")
)

# --- Keep only positive, sufficient overlaps ---
pairs = pairs.filter(
    (pl.col("overlap_duration") > pl.duration(seconds=0)) &
    (pl.col("overlap_duration") >= pl.duration(seconds=min_overlap_seconds))
)

# --- Select output columns ---
result = pairs.select([
    "mmsi1",
    "mmsi2",
    "overlap_start",
    "overlap_end",
    "lat", "lon",
    "distance_meters",
    "overlap_duration"
])
result

  pl.count().alias("point_count")


joined (3, 16)
distances (3, 17)
overlap calcuulated (1, 19)


mmsi1,mmsi2,overlap_start,overlap_end,lat,lon,distance_meters,overlap_duration
i64,i64,datetime[μs],datetime[μs],f64,f64,f64,duration[μs]
368008060,775998212,2024-01-01 03:11:26,2024-01-01 05:21:07,25.782255,-80.183027,65.809725,2h 9m 41s


In [20]:
con = duckdb.connect()
con.execute("""
        CREATE TABLE ais_data AS
        SELECT * FROM read_parquet('./data/parquet/AIS_*.parquet')
    """)

statement="""-- Install and load spatial extension if not already
INSTALL spatial;
LOAD spatial;

-- Drop materialized view if exists
DROP VIEW IF EXISTS vessel_stops;

-- Create new materialized view
CREATE VIEW vessel_stops AS

WITH preprocessed AS (
  SELECT 
    mmsi,
    timestamp,
    lat,
    lon,
    sog,
    -- Integer bucket lat/lon (~100m precision)
    CAST(FLOOR(lat * 1110) AS INTEGER) AS lat_bucket,
    CAST(FLOOR(lon * 1110) AS INTEGER) AS lon_bucket,
    -- Define if vessel is moving very slowly (sog <= 0.2)
    CASE WHEN sog <= 0.5 THEN 1 ELSE 0 END AS is_stopped
  FROM 
    ais_data
),

grouped AS (
  SELECT 
    *,
    -- Trick: Create session groups by detecting breaks in stop sequences
    ROW_NUMBER() OVER (PARTITION BY mmsi ORDER BY timestamp)
      - ROW_NUMBER() OVER (PARTITION BY mmsi, lat_bucket, lon_bucket, is_stopped ORDER BY timestamp)
    AS stop_group
  FROM 
    preprocessed
),

stops AS (
  SELECT 
    mmsi,
    lat_bucket,
    lon_bucket,
    MIN(timestamp) AS start_time,
    MAX(timestamp) AS end_time,
    MAX(timestamp) - MIN(timestamp) AS duration
  FROM 
    grouped
  WHERE 
    is_stopped = 1
  GROUP BY 
    mmsi, lat_bucket, lon_bucket, stop_group
  HAVING 
    MAX(timestamp) - MIN(timestamp) BETWEEN INTERVAL '2 minutes' AND INTERVAL '8 hours'
)

SELECT 
  mmsi,
  lat_bucket,
  lon_bucket,
  start_time,
  end_time,
  duration
FROM 
  stops
ORDER BY 
  start_time;"""


con.execute(statement).fetch_df()

Unnamed: 0,Count


In [None]:
import datetime
from sqlalchemy import text


def get_stops_for_vessel(mmsi: str) -> list[dict[str, any]]:
    statement = f"""
    SELECT * FROM vessel_stops WHERE mmsi = {mmsi}
    """

    return con.execute(text(statement)).fetchall()

def get_vessels_for_stop_and_time(stop_location: tuple[float, float], start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None) -> list[str]:
    # use this to construct graph
    # scenario spec-start < start and spec-end > end (spec fully contains)
    # scenario spec-start < start and spec-end < end (spec starts earlier)
    # scenario spec-start > start and spec-end > end (spec ends later)
    # scenario spec-start > start and spec-end < end (spec fully contained)

    # negative scenarios
    # scenario spec-end < start
    # scenario spec-start > end
    statement = f"SELECT mmsi FROM vessel_stops WHERE lat_bucket = {stop_location[1]} AND lon_bucket = {stop_location[0]} AND NOT ({end_time} < date_start  OR {start_time} > date_end)"
    return con.execute(statement).fetchall()

def get_vessels_for_stop_location(stop_location: tuple[float, float]) -> list[str]:
    # use this to get vessels from flagged ports
    return con.execute("SELECT mmsi FROM vessel_stops WHERE lat_bucket = ? AND lon_bucket = ?", stop_location).fetchall()

