In [0]:
%pip install networkx folium h3 polars

In [0]:
import folium
from folium.plugins import TimestampedGeoJson
import datetime
from typing import TypedDict
import pyspark.sql.functions as F
from pyspark.sql.window import Window


In [0]:
df = spark.read.parquet("/Volumes/caspers_abm/experiments/abm_runs_raw/00001/positions.parquet")
df.write.format("delta").mode("overwrite").saveAsTable("caspers_abm.experiments.positions")

df = spark.read.parquet("/Volumes/caspers_abm/experiments/abm_runs_raw/00001/people.parquet")
df.write.format("delta").mode("overwrite").saveAsTable("caspers_abm.experiments.people")

In [0]:
df_people = (
    spark.table("caspers_abm.experiments.people")
    .select("id", "role")
    .filter(F.col("role") == "courier")
)
df_positions = spark.table("caspers_abm.experiments.positions").sort("id", "timestamp")

# Define window specification
window_spec = Window.partitionBy("id").orderBy("timestamp")

# Calculate the difference in position
df_positions = df_positions.withColumn(
    "prev_position", F.lag("position").over(window_spec)
)

# Identify idle and active regions
df_positions = df_positions.withColumn(
    "status",
    F.when(F.col("position") == F.col("prev_position"), "idle").otherwise("active"),
)

# Identify the start of a new status region
df_positions = df_positions.withColumn(
    "new_status_region",
    F.when(F.col("status") != F.lag("status").over(window_spec), 1).otherwise(0),
)

# Assign region ids
df_positions = df_positions.withColumn(
    "trip_id", F.sum("new_status_region").over(window_spec)
)

# Drop the helper columns
df_positions = df_positions.filter(F.col("status") == "active").drop(
    "prev_position", "new_status_region", "status"
)

df_trips = (
    df_positions.groupBy("id", "trip_id")
    .agg(
        F.collect_list("timestamp").alias("timestamps"),
        F.collect_list("position").alias("positions"),
    )
    .filter(F.size("timestamps") > 2)
)


display(df_trips)


In [0]:
from typing import TypedDict

class Row(TypedDict):
    id: str
    trip_id: int
    timestamps: list[datetime.datetime]
    positions: list[tuple[float, float]]


trips = df_trips.toArrow().to_pylist()

In [0]:
routes = []
for trip in trips:
    routes.append(
        {
            "type": "Feature",
            "geometry": {
                "type": "LineString",
                "coordinates": trip["positions"],
            },
            "properties": {
                "times": [ts.isoformat() for ts in trip["timestamps"]],
                "style": {
                    "weight": 0,
                },
                "icon": "circle",
                "iconstyle": {
                    "fillColor": "red",
                    "fillOpacity": 0.6,
                    "stroke": "false",
                    "radius": 5,
                },
            },
        }
    )

In [0]:
lng, lat = -0.13381370382489707, 51.518898098201326
resolution = 6

m = folium.Map(
    location=[lat, lng],
    zoom_start=13,
    tiles="CartoDB Positron",
)

TimestampedGeoJson(
    {"type": "FeatureCollection", "features": routes},
    add_last_point=True,
    loop=True,
    auto_play=False,
    date_options="YYYY/MM/DD HH:mm:ss",
    period="PT1M",
    duration="PT1M",
    loop_button=True,
).add_to(m)

m