feat(ingest): stream catalog in chunks to bound ingest memory#674
Merged
Conversation
The dataset ingest pipeline previously held the entire parsed catalog in memory as a single pandas DataFrame, so peak RSS scaled linearly with the number of files in the input tree. A 30k-file archive needed ~87 MiB peak; projecting to 1M+ files would OOM on modest hosts. This change adds a new chunked streaming path that walks the tree once and parses + validates + registers ``chunk_size`` files at a time, flushing the in-memory DataFrame between chunks. Chunks only split at directory boundaries so files belonging to the same CMIP6 dataset (which live in a single DRS version directory) stay together and ``register_dataset`` never sees a partial dataset. User-facing surface: - ``ref datasets ingest --chunk-size N`` enables streaming on the CLI (currently CMIP6 only; non-CMIP6 adapters fall back to the legacy path with a warning). - ``ingest_datasets(..., chunk_size=N)`` is the programmatic equivalent. Internals: - New ``iter_discovered_chunks`` and ``iter_built_catalogs`` generators in ``catalog_builder`` so chunked walking is shared infrastructure. - ``CMIP6DatasetAdapter`` exposes ``iter_local_datasets`` and shares post-parse enrichment with ``find_local_datasets`` via the new ``_enrich_parsed_catalog`` helper. - ``build_instance_id`` now accepts ``copy=False`` so the streaming enrichment path avoids an extra full-DataFrame copy (kills one of the two known 2× peaks). Default stays ``copy=True`` so the no-mutation contract is preserved for existing callers. - ``IngestionStats`` learned ``__iadd__`` so chunked runs can roll their per-chunk stats into a single final summary. Diagnostics: - ``scripts/benchmark_ingest_memory.py`` synthesises a CMIP6 DRS tree of empty .nc files and reports baseline vs streaming peak memory via ``tracemalloc``. On 30k files / 300 datasets / chunk_size=5000 the peak drops from ~87 MiB to ~25 MiB (3.5× lower) with no wall-time regression. Tests: - New coverage for ``iter_discovered_chunks`` (directory-boundary respect, empty roots, single-file inputs) and ``iter_built_catalogs`` (invalid filtering per chunk, empty inputs). - New ``TestCMIP6IterLocalDatasets`` asserts streaming yields exactly the same rows as ``find_local_datasets`` on the sample data so the two paths cannot drift apart.
Contributor
There was a problem hiding this comment.
Pull request overview
Adds a chunked/streaming ingest path to bound peak memory during dataset ingestion (currently CMIP6-only), with supporting catalog-builder iterators, CLI/API surface, and unit tests.
Changes:
- Add chunked discovery/parsing (
iter_discovered_chunks,iter_built_catalogs) and a CMIP6iter_local_datasets(..., chunk_size=...)streaming path. - Extend ingestion pipeline with
--chunk-size(CLI) andingest_datasets(..., chunk_size=...)(API), plus stats accumulation viaIngestionStats.__iadd__. - Optimize
build_instance_idto avoiditerrows()and allow callers to opt out of a defensive DataFrame copy.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| scripts/benchmark_ingest_memory.py | New standalone benchmark to compare memory/time for whole-tree vs chunked ingest. |
| ruff.toml | Add per-file ignores for scripts/*.py (docstrings, lazy imports, etc.). |
| packages/climate-ref/tests/unit/datasets/test_cmip6.py | Add tests ensuring CMIP6 streaming yields the same catalog as whole-tree parsing. |
| packages/climate-ref/tests/unit/datasets/test_catalog_builder.py | Add tests for directory-boundary chunking and per-chunk invalid-row filtering. |
| packages/climate-ref/src/climate_ref/datasets/utils.py | Add copy= flag and optimize build_instance_id construction. |
| packages/climate-ref/src/climate_ref/datasets/cmip6.py | Refactor enrichment into a shared helper and add iter_local_datasets streaming API. |
| packages/climate-ref/src/climate_ref/datasets/catalog_builder.py | Add chunked discovery/parsing iterators and centralize invalid-row filtering. |
| packages/climate-ref/src/climate_ref/datasets/init.py | Add streaming ingest support (chunk_size), refactor ingestion loop, and accumulate stats. |
| packages/climate-ref/src/climate_ref/cli/datasets.py | Add --chunk-size option and implement streaming ingest path in CLI. |
| changelog/674.feature.md | Document the new --chunk-size feature and its memory behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
* origin/main: docs: reformat docs: add changelog for PR #673 fix(ingest): stop re-ingesting unchanged CMIP6 files every run
- Clarify benchmark reports tracemalloc Python-allocator peak, not RSS. - Fix benchmark files_per_dataset docstring/impl mismatch (use ceil division). - Drop redundant skip_invalid kwarg on the chunked ingest_datasets call; the catalog is already validated, and the early return ignores it.
Mirror the CMIP6 split: factor the post-parse enrichment out of `find_local_datasets` into `_enrich_parsed_catalog` and add `iter_local_datasets` to `CMIP7DatasetAdapter`. Discovery walks the tree once and chunks flush at directory boundaries so a CMIP7 DRS dataset (which lives in one version directory) is never split across chunks. The CLI `--chunk-size` option now applies to CMIP7 ingest as well as CMIP6. Adapter help text updated accordingly. Adds `TestCMIP7IterLocalDatasets` (matches/whole-tree, non-empty chunks, empty-enriched chunk filtered) to mirror the CMIP6 coverage.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
--chunk-sizeoption onref datasets ingest(and matchingingest_datasets(chunk_size=...)API) that walks the input tree once and parses+validates+registerschunk_sizefiles at a time, flushing the in-memory DataFrame between chunks. Chunks split only at directory boundaries so CMIP6 DRS datasets are never partial when handed toregister_dataset.build_instance_idskip its defensive.copy()when the caller opts in viacopy=False(default staysTrueso existing no-mutation tests pass).IngestionStats.__iadd__lets the streaming path roll per-chunk stats into a single final summary.Memory impact
Run via the new
scripts/benchmark_ingest_memory.pyon a synthetic CMIP6 DRS tree of empty.ncfiles:Wall time is unchanged (within noise). The benchmark forces
cmip6_parser = "drs"because synthetic.touch()files cannot be opened by netCDF4.Test plan
uv run pytest packages/climate-ref/tests/unit— 820 passed, 2 skipped, 2 xfaileduv run pre-commit run --files <changed>— ruff + mypy cleanTestIterDiscoveredChunks— directory-boundary respect, empty roots, single-file inputsTestIterBuiltCatalogs— invalid filtering per chunk, empty inputsTestCMIP6IterLocalDatasets— asserts streaming yields the same rows asfind_local_datasetson the sample CMIP6 archive so the two paths cannot driftscripts/benchmark_ingest_memory.pyat 4k / 20k / 30k file counts (table above)Out of scope (follow-ups)
iter_local_datasets+_enrich_parsed_catalogsplit).