In [1]:
from dask import dataframe as dd
from dask import array as dp
import numpy as np
import dask
import coiled

### Setup

Specify cluster size, must have significant RAM for this algorithm, can run on any dask backend, we are using coiled here, probably should use only 1/2 workers of the quality listed below based on last run.
This was optimized for a previous version of algorithm that was more memory intensive.

In [2]:
cluster = coiled.Cluster(n_workers=2, scheduler_vm_types=["m7g.xlarge"], worker_vm_types=["r8g.24xlarge"], spot_policy="spot_with_fallback")
client = cluster.get_client()

Output()

Output()

In [6]:
# Read dataset in
data = dd.read_csv("https://www.maserv.work/ATD/atd2025/src/atd2025/data/dataset1.csv")
data["time"] = dd.to_datetime(data["time"])

Verify dataset is loaded:

In [7]:
data.head()

Unnamed: 0,point_id,time,lat,lon,speed,course
0,0,2024-01-01,28.03387,-96.974543,6.5,56.1
1,1,2024-01-01,30.010361,-90.726282,2.9,270.3
2,2,2024-01-01,30.708593,-88.039977,0.0,156.5
3,3,2024-01-01,30.006844,-90.472937,12.9,158.0
4,4,2024-01-01,28.174239,-82.818806,2.5,74.8


In [None]:
data.tail()

Unnamed: 0,point_id,point_id_original,track_id,time,lat,lon,speed,course
110285,110285,8285135,367672370,2024-01-01 23:59:59,29.972829,-90.399484,0.0,327.1
110286,110286,8281679,368113380,2024-01-01 23:59:59,30.346252,-88.504087,0.0,9.8
110287,110287,8285158,366995370,2024-01-01 23:59:59,30.374604,-91.226913,0.0,234.9
110288,110288,8299380,369970969,2024-01-01 23:59:59,30.38958,-81.459862,0.0,20.3
110289,110289,8276283,367139290,2024-01-01 23:59:59,30.390538,-81.433416,0.0,45.9


### Algorithm

Algorithm to create distance matrix between all pairs of points in dataset

In [None]:
# ktm mult should be < 1, speed_cutoff should differentiate between 'fast' and 'slow' ships
def bidirectional_distance(x1, y1, t1, speed1, course1, speed_cutoff = 4, ktm_mult = 0.1, angle_cutoff = 0.2):
    # Setup proper broadcasting rules
    x2 = x1[:, None]
    y2 = y1[:, None]
    t2 = t1[:, None]
    speed2 = speed1[:, None]
    course2 = course1[:, None]

    x1 = x1[None, :]
    y1 = y1[None, :]
    t1 = t1[None, :]
    speed1 = speed1[None, :]
    course1 = course1[None, :]

    knots_to_mps = 0.514444
    
    # Forward prediction
    dt = (t2 - t1).astype("timedelta64[s]").astype("float64")
    pred_x = x1 + speed1 * dp.sin(course1) * dt * knots_to_mps
    pred_y = y1 + speed1 * dp.cos(course1) * dt * knots_to_mps
    forward_dist = dp.square(x2 - pred_x) + dp.square((y2 - pred_y))

    # Backward prediction
    back_x = x2 - speed2 * dp.sin(course2) * dt * knots_to_mps
    back_y = y2 - speed2 * dp.cos(course2) * dt * knots_to_mps
    backward_dist = dp.square(x1 - back_x) + dp.square(y1 - back_y)

    # Spatiotemporal vector
    tau = ((((speed1 + speed2) / 2) > speed_cutoff) * (knots_to_mps - (knots_to_mps * ktm_mult))) + (knots_to_mps * ktm_mult)
    st_vector = dp.array([tau * dt, y2 - y1, x2 - x1])

    pred_vector = dp.array([tau * dt, speed1 * dp.cos(course1) * dt * knots_to_mps,
                    speed1 * dp.sin(course1) * dt * knots_to_mps])
    cos_angle = dp.sum(st_vector * pred_vector, axis=0) / (dp.sqrt(dp.sum(dp.square(st_vector), axis=0)) * dp.sqrt(dp.sum(dp.square(pred_vector), axis=0)))
    
    dist = 0.5 * (forward_dist + backward_dist)

    # Can try to add this factor if we find many vessels close to one another in space and time
    # | (dt < (time_window / 2)) & (speed1 == 0)
    # Ensure that we only look within the next time_window if speed is not 0. 
    toInf = (dt <= 0) | dp.isnan(cos_angle) | (cos_angle < angle_cutoff)
    
    dist[toInf] = dp.inf
    
    return dist

Preprocess data for correct format for distance matrix creator

In [9]:
rad_earth = 6371000

# Approximately convert lon/lat to x/y in meters
def lonlat_to_xy(lon, lat):
    return lon * (dp.pi / 180) * rad_earth * dp.cos(lat * (dp.pi / 180)), lat * rad_earth * (dp.pi / 180)

# Approximately convert back to lon/lat
def xy_to_lonlat(x, y):
    lat = y * (180 / (dp.pi * rad_earth))
    return x * (180 / (dp.pi * rad_earth * dp.cos(lat * (dp.pi / 180)))), lat

# Read in coordinates
lon = data["lon"].to_dask_array()
lat = data["lat"].to_dask_array()

# Convert to meters
x, y = lonlat_to_xy(lon, lat)

# Read in rest of data
t = data["time"].to_dask_array()
speed = data["speed"].to_dask_array()
course = data["course"].to_dask_array() * dp.pi / 180

# Rechunk the data to optimize the amount of data transferring occuring within the cluster
for z in (x, y, t, speed, course):
    z.compute_chunk_sizes()
chunksize = 5000
x = x.rechunk(chunksize)
y = y.rechunk(chunksize)
t = t.rechunk(chunksize)
speed = speed.rechunk(chunksize)
course = course.rechunk(chunksize)

Actually run the algorithm

In [10]:
print("Start")

# Initialize hyperparameters

# Create distance matrix, get top 10 values and their indices.
dist_matrix = bidirectional_distance(x, y, t, speed, course)
closest = dp.argtopk(dist_matrix, -10, axis=0)
closest_dists = dp.topk(dist_matrix, -10, axis=0)

# Actually do the computation and save out to file so we don't have to run the cluster for analysis/plotting.
closest, closest_dists = dask.compute(closest, closest_dists)
np.save("./closest_ds1", closest)
np.save("./closest_dists_ds1", closest_dists)

Start


  result = blockwise(
  result = blockwise(
  result = blockwise(
  result = blockwise(


In [11]:
cluster.shutdown()