Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/skills/conflate-snapshots/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ upload for web consumption.

- Rated OSM snapshot (`osm_snapshot_rated.parquet`) at `versions.snapshot_osm` — produced by [skills/full-data-pull](../full-data-pull/SKILL.md) step 3.
- Overture snapshot (`overture_snapshot.parquet`) at `versions.snapshot_overture`.
- OSM history parquets (`osm_versions.parquet`, `osm_changes.parquet`) at `versions.osm_data` — produced by [skills/model-history-pipeline](../model-history-pipeline/SKILL.md). Required by the change-detection step in stage 4.
- OSM history parquets (`osm_versions.parquet`, `osm_changes.parquet`) at `versions.osm_data` — **regenerated each month** by [skills/full-data-pull](../full-data-pull/SKILL.md) step 2 (via `scripts/osm_data/download_history.py`). The full re-fit pipeline at [skills/model-history-pipeline](../model-history-pipeline/SKILL.md) is only invoked when re-fitting λ. Required by the change-detection step in stage 4.
- **Fresh Source Cooperative temp credentials** in `.env.json` at the repo root. Tokens expire in ~1 hour.

> ⚠️ **Credential refresh check.** Source Cooperative uses short-lived AWS
Expand Down
14 changes: 11 additions & 3 deletions .claude/skills/full-data-pull/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ description: Use when the user wants to refresh the independent POI snapshots (O

# Full data pull

Downloads the snapshot sources (50 US states + DC + 5 inhabited territories: PR, VI, GU, MP, AS) and applies the rating model to OSM so conflation can run.
Downloads the snapshot sources (50 US states + DC + 5 inhabited territories: PR, VI, GU, MP, AS), refreshes the OSM history that drives ghost reconstruction, and applies the rating model to OSM so conflation can run.

## Prerequisites

- conda env `openpois` active.
- For OSM: `osmium` in env bin (resolved automatically via `Path(sys.executable).parent / "osmium"`).
- Boundary cache at `directories.boundary` (auto-downloads on first use).
- A fitted model exists for the OSM rating step (see [skills/model-history-pipeline](../model-history-pipeline/SKILL.md)).
- For OSM history: a fresh Geofabrik OAuth cookie file at `download.osm.history_cookie_file` (Netscape format; any OSM account works). See [docs/data-sources.md](../../docs/data-sources.md#osm-history-geofabrik-full-history-pbfs).

## Steps

Expand All @@ -21,23 +22,30 @@ Downloads the snapshot sources (50 US states + DC + 5 inhabited territories: PR,
versions:
snapshot_osm: "YYYYMMDD"
snapshot_overture: "YYYYMMDD"
osm_data: "YYYYMMDD" # bumps each month — history is refreshed for ghosts
ghost_osm: "YYYYMMDD" # pinned to osm_data; bumps in lockstep
```
See [docs/data-versioning.md](../../docs/data-versioning.md).
`model_output` does **not** bump unless you're re-fitting λ from scratch (see [skills/model-history-pipeline](../model-history-pipeline/SKILL.md)). See [docs/data-versioning.md](../../docs/data-versioning.md).

2. **Run the downloads** (independent — order doesn't matter, can run in parallel):

```bash
python scripts/osm_snapshot/download.py # 4 Geofabrik PBFs → osm_snapshot.parquet
python scripts/overture/download.py # DuckDB over S3 → overture_snapshot.parquet
python scripts/osm_data/download_history.py # 4 internal OSH PBFs → osm_versions.parquet + osm_changes.parquet
```
The snapshot loader pulls 4 extracts in sequence: `us`, `pr`, `usvi`, `american_oceania`. Per-source details, auth, and schema quirks are in [docs/data-sources.md](../../docs/data-sources.md).
Each loader pulls 4 extracts in sequence: `us`, `pr`, `usvi`, `american_oceania`. Per-source details, auth, and schema quirks are in [docs/data-sources.md](../../docs/data-sources.md).

**Gotcha — interrupted snapshot runs**: all 4 extracts share `~/data/openpois/snapshots/osm/<v>/parse_chunks/`. If a run dies between extracts, leftover chunks from extract N may be silently mistaken for extract N+1's parsed output on resume (the parser short-circuits on existing chunks). Before resuming an interrupted snapshot run, nuke the work dir:
```bash
rm -rf ~/data/openpois/snapshots/osm/{version}/parse_chunks/
```
This forces a clean re-parse of whichever extract was in flight; completed extracts (which write their own per-extract intermediate parquet next to the final output) are still skipped.

**Gotcha — `download_history.py` is for ghost regeneration only**: do **not** re-run `scripts/osm_data/format_tabular.py` or `scripts/models/osm_turnover.py` in the monthly cycle — those are part of the model-fit pipeline, which stays pinned to `versions.model_output`. The monthly history refresh only feeds `build_ghosts.py` (invoked by `make conflate`).

**Gotcha — per-territory 404 tolerance**: if Geofabrik stops publishing a territory's `*-internal.osh.pbf`, the loader logs a warning, skips that extract, and continues. The territory's POIs still flow through downstream stages but the rater falls back to the global-mean δ for its `shared_label`s.

3. **Apply the rating model to OSM** → `osm_snapshot_rated.parquet`:
```bash
python scripts/osm_snapshot/apply_model.py
Expand Down
83 changes: 83 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Changelog

## 2026-05-21-v0

### Snapshot inputs

| Source | Value |
| ---------------------- | ------------------------------------------- |
| OSM snapshot date | 2026-05-21 |
| Overture release | `2026-05-20.0` (pinned) |
| OSM snapshot rows | 8,799,633 |
| Overture snapshot rows | 13,458,763 |
| Boundary footprint | US + all territories (PR, USVI, GU, MP, AS) |

### Conflated output

| Metric | This run | Prior | Δ |
| ---------------------------- | ----------- | ------------ | ----------------------- |
| Total rows | 17,989,377 | 17,788,585 | +200,792 (+1.13%) |
| Matched OSM × Overture | 2,696,484 | 2,677,091 | +19,393 (+0.72%) |
| OSM-only | 6,103,149 | 6,031,413 | +71,736 (+1.19%) |
| Overture-only | 9,189,744 | 9,080,081 | +109,663 (+1.21%) |
| Shadow-matched (CD penalty) | 47,925 | n/a | new — first run with change detection |
| Shared labels | 93 | 93 | unchanged set |

### Methods changes vs. prior release

- **Change detection (new).** Post-conflation pass that reconstructs "ghost" POIs from OSM history (deleted or renamed nodes) and uses them to penalize unmatched Overture POIs that shadow-match a ghost. Penalty multiplies the Overture row's `conf_mean` by the per-`shared_label` δ from the fitted turnover model. 47,925 rows penalized this run. Adds audit columns to every conflated row: `shadow_matched`, `shadow_ghost_id`, `shadow_event_type`, `shadow_event_timestamp`, `shadow_score`, `shadow_distance_m`, `original_conf_mean`. **PR #29**; design in `docs/change-detection.md`.
- **US territory expansion.** Spatial footprint widened from CONUS + PR to include all US territories (PR, USVI, GU, MP, AS). Affects both snapshots and the conflation domain. **PR #31**.
- **Wider metadata propagation.** Additional OSM and Overture metadata fields now flow through to the conflated parquet (website, wikidata, wikipedia, etc.). **PR #30**.
- **PMTiles re-tuned.** Zoom range narrowed to Z10–Z14 with `--drop-densest-as-needed`, so feature drops cascade through low zooms instead of failing tile builds. Site updated with zoom-aware point styling. **PR #33**.
- **Covering bbox in partitioned parquet.** GeoParquet 1.1 `bbox` struct column emitted via `write_covering_bbox=True`, enabling DuckDB row-group pruning on viewport queries. **PR #32**.
- **Overture release pinned.** `download.overture.release_date` set to `2026-05-20.0` (was `null` = auto-detect latest). Future runs against the same pin are deterministic.
- **Pipeline memory hardening (uncommitted on `lifecycle/may-2026-release`).** Both `apply_change_detection.py` and the partitioned-write helper hit the 24 GB WSL cap on nationwide inputs. The CD writer now mutates in place and streams the output parquet in row-group chunks via `pyarrow.parquet.ParquetWriter`; the geohash partition writer drops one full-partition copy (numpy `argsort` + `iloc` instead of pandas `sort_values`) and streams large partitions in chunks. See `src/openpois/conflation/change_detection.py` and `src/openpois/io/geohash_partition.py`.

### Taxonomy changes

**Overture crosswalk** (`src/openpois/conflation/data/taxonomy_crosswalk_overture_maps.csv`, uncommitted on `lifecycle/may-2026-release`): 7 new entries under `services_and_business.family_service`, previously unmapped and dropped from the partitioned output.

| Overture sub-category | Maps to |
| ----------------------------- | ------------------ |
| `funeral_service` | Other Professional |
| `adoption_service` | Other Professional |
| `family_service_center` | Other Professional |
| `nanny_service` | Other Professional |
| `genealogist` | Other Professional |
| `elder_care_planning` | Other Professional |
| `mobility_equipment_service` | Other Shop |

This is the proximate cause of the +22,715 row jump (+8.45%) in **Other Professional**.

No OSM-side taxonomy changes since 2026-04-23.

### Top label-level row-count changes

| Shared label | This run | Prior | Δ rows | Δ % | Δ matched |
| ------------------- | ----------- | ----------- | --------- | ------- | --------- |
| Specialty Store | 1,026,395 | 917,422 | +108,973 | +11.88% | +753 |
| Other Amenity | 3,858,315 | 3,819,068 | +39,247 | +1.03% | +3,124 |
| Clothing Store | 317,177 | 288,506 | +28,671 | +9.94% | +779 |
| Other Professional | 291,500 | 268,785 | +22,715 | +8.45% | 0 |
| Other Healthcare | 995,881 | 1,016,112 | −20,231 | −1.99% | +54 |
| (unlabeled) | 701,209 | 719,862 | −18,653 | −2.59% | +1,506 |
| Car Dealer | 182,314 | 164,517 | +17,797 | +10.82% | +521 |
| Restaurant | 718,472 | 702,020 | +16,452 | +2.34% | +1,092 |
| Supermarket | 193,777 | 179,783 | +13,994 | +7.78% | +361 |
| Recreation | 1,302,776 | 1,293,338 | +9,438 | +0.73% | −510 |

Drivers:
- Most positive movers (Specialty Store, Clothing Store, Car Dealer, Supermarket, Bakery, Charging Station) track Overture's snapshot growth (+2.5% overall) landing in shared labels with moderate base counts.
- **Other Professional** also reflects the new `family_service` crosswalk entries above.
- **Other Healthcare** dropping by ~20k against a larger Overture snapshot is worth a closer look — likely an Overture taxonomy reshuffle inside `health_and_medical` upstream. Flagged for QA, not blocking.

### Version pins

| Key | This run | Prior |
| ------------------------- | ------------------------ | ------------------------ |
| `versions.conflation` | 20260521 | 20260423 |
| `versions.snapshot_osm` | 20260521 | 20260417 |
| `versions.snapshot_overture` | 20260521 | 20260423 |
| `versions.osm_data` | 20260521 | 20260515 |
| `versions.ghost_osm` | 20260521 | 20260515 |
| `versions.model_output` | 20260422_by_shared_label | 20260422_by_shared_label (unchanged — model not refit this cycle) |
17 changes: 9 additions & 8 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# Versioned directories (used with config.get_dir_path())
versions:
osm_data: "20260515"
osm_data: "20260521"
model_output: "20260422_by_shared_label"
snapshot_osm: "20260417"
snapshot_overture: "20260423"
conflation: "20260423"
source_coop: "2026-04-23-v0" # Source Cooperative upload folder (YYYY-MM-DD-v<IDX>); bump v<IDX> only for same-day re-uploads
snapshot_osm: "20260521"
snapshot_overture: "20260521"
conflation: "20260521"
source_coop: "2026-05-21-v0" # Source Cooperative upload folder (YYYY-MM-DD-v<IDX>); bump v<IDX> only for same-day re-uploads
# Ghost POI dataset reconstructed from OSM history (one row per
# detected previous-state event). Pinned to the same value as
# ``osm_data`` since it is derived from the same history parquets.
ghost_osm: "20260515"
# ``osm_data`` since it is derived from the same history parquets,
# and regenerated together with the monthly snapshot refresh.
ghost_osm: "20260521"

# Settings for downloading data
download:
Expand Down Expand Up @@ -74,7 +75,7 @@ download:
'website','wikidata','wikipedia'
]
overture:
release_date: null # null = auto-detect latest
release_date: "2026-05-20.0" # pin for determinism; null = auto-detect latest
s3_bucket: "overturemaps-us-west-2"
s3_region: "us-west-2"
# DuckDB resource caps for the per-part S3 scans and the final polygon
Expand Down
27 changes: 27 additions & 0 deletions scripts/publish/upload_to_source_coop.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ def parse_args() -> argparse.Namespace:
"default because these rarely change."
),
)
parser.add_argument(
"--skip-changelog", action = "store_true",
help = (
"Skip uploading CHANGELOG.md to the repo top level. Default "
"is to upload it on every run so the public copy stays in "
"sync with the latest per-release deltas."
),
)
parser.add_argument(
"--skip-latest-mirror", action = "store_true",
help = (
Expand Down Expand Up @@ -204,6 +212,25 @@ def main() -> None:
f"deleted {summary['deleted']} stale object(s)."
)

# -------------------------------------------------------------------------
# Top-level CHANGELOG.md (per-release deltas, updated every run by default)
# -------------------------------------------------------------------------
if not args.skip_changelog:
changelog_path = CONFIG_PATH.parent / "CHANGELOG.md"
if changelog_path.exists():
upload_bytes(
client = client,
data = changelog_path.read_bytes(),
bucket = bucket,
key = f"{repo_prefix}/CHANGELOG.md",
content_type = "text/markdown; charset=utf-8",
dry_run = args.dry_run,
)
else:
print(
f"Skipping CHANGELOG.md upload — {changelog_path} not found."
)

# -------------------------------------------------------------------------
# Top-level README + LICENSE (opt-in)
# -------------------------------------------------------------------------
Expand Down
101 changes: 75 additions & 26 deletions src/openpois/conflation/change_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,42 +647,91 @@ def apply_shadow_match(
new_conf_lower[target_global] = np.nan
new_conf_upper[target_global] = np.nan

# -- Stitch into output --------------------------------------------
out = conflated.copy()
out["conf_mean"] = new_conf_mean
out["conf_lower"] = new_conf_lower
out["conf_upper"] = new_conf_upper
out["shadow_matched"] = shadow_matched
out["shadow_ghost_id"] = shadow_ghost_id
out["shadow_event_type"] = shadow_event_type
out["shadow_event_timestamp"] = shadow_event_timestamp.values
out["shadow_score"] = shadow_score
out["shadow_distance_m"] = shadow_distance_m
out["original_conf_mean"] = original_conf_mean
# -- Summary scalars are derived now, while pre- and post-penalty
# conf_mean views are both still alive. Lengths are captured here
# so we can free the heavy intermediates before the parquet write.
mean_penalty_factor = (
float(
(new_conf_mean[shadow_matched]
/ np.where(
original_conf_mean[shadow_matched] == 0, 1,
original_conf_mean[shadow_matched],
)
).mean()
)
if shadow_matched.any() else float("nan")
)
n_ghosts_in = int(len(ghosts))
n_shadow_matches = int(len(matches))

# On nationwide data the original "copy then write" path peaked
# past the 24 GB WSL cap (≈18M-row shapely-geometry GDF + a full
# .copy() + the pyarrow Table materialized inside to_parquet +
# the 9M-row unmatched_ov subset). Free the scratch state before
# the write peak.
del ghosts, matches
if "unmatched_ov" in locals():
del unmatched_ov # noqa: F821 -- only bound in the else branch
gc.collect()

# Mutate conflated in place rather than allocating a full copy:
# the un-penalized baseline isn't needed after this point and the
# audit data is held in standalone numpy arrays.
conflated["conf_mean"] = new_conf_mean
conflated["conf_lower"] = new_conf_lower
conflated["conf_upper"] = new_conf_upper
conflated["shadow_matched"] = shadow_matched
conflated["shadow_ghost_id"] = shadow_ghost_id
conflated["shadow_event_type"] = shadow_event_type
conflated["shadow_event_timestamp"] = shadow_event_timestamp.values
conflated["shadow_score"] = shadow_score
conflated["shadow_distance_m"] = shadow_distance_m
conflated["original_conf_mean"] = original_conf_mean

# Pandas copied each array on assignment, so the standalone
# references are now redundant -- drop them before the write.
del (
new_conf_mean, new_conf_lower, new_conf_upper,
shadow_matched, shadow_ghost_id, shadow_event_type,
shadow_event_timestamp, shadow_score, shadow_distance_m,
original_conf_mean,
)
gc.collect()

output_path = Path(output_path)
output_path.parent.mkdir(parents = True, exist_ok = True)
if verbose:
print(f"Writing {output_path} ...")
out.to_parquet(output_path, compression = "zstd")

# Stream the write in row-group chunks. The default
# GeoDataFrame.to_parquet materializes a full pyarrow Table
# alongside the live GDF, doubling peak memory; on 18M-row
# nationwide inputs that exceeds the 24 GB WSL cap.
from geopandas.io.arrow import _geopandas_to_arrow

chunk_rows = 2_000_000
sample_tbl = _geopandas_to_arrow(conflated.iloc[:1])
schema = sample_tbl.schema
del sample_tbl
with pq.ParquetWriter(
str(output_path), schema, compression = "zstd",
) as writer:
for start in range(0, n, chunk_rows):
end = min(start + chunk_rows, n)
chunk_tbl = _geopandas_to_arrow(
conflated.iloc[start:end]
)
writer.write_table(chunk_tbl)
del chunk_tbl
gc.collect()

summary = {
"n_total": int(n),
"n_unmatched_overture": int(len(ov_global_idx)),
"n_ghosts": int(len(ghosts)),
"n_shadow_matches": int(len(matches)),
"n_ghosts": n_ghosts_in,
"n_shadow_matches": n_shadow_matches,
"n_survivor_dropped": int(n_survivor_dropped),
"mean_penalty_factor": (
float(
(new_conf_mean[shadow_matched]
/ np.where(
original_conf_mean[shadow_matched] == 0, 1,
original_conf_mean[shadow_matched],
)
).mean()
)
if shadow_matched.any() else float("nan")
),
"mean_penalty_factor": mean_penalty_factor,
}

# Confirm read-back schema integrity.
Expand Down
Loading