-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
This PR adds the Ingest Processor, which is responsible for the final phase of the distributed import pipeline: ingesting pre-merged SSTs into CockroachDB’s KV layer using AddSSTable.
It consumes metadata produced by the distributed merge phase, splits and scatters ranges as needed, and ingests each SST directly into Pebble. This replaces the placeholder import ingestion logic with a real, distributed AddSST flow.
This PR implements the Ingest phase, which follows the distributed merge stage introduced in #PR-6.
After the merge processor produces range-aligned, sorted SSTs:
- The ingest phase pre-splits ranges (using
splitAndScatterSpans()from #PR-2). - Each processor ingests SSTs directly into Pebble via
DB.AddSSTable(). - The entire flow executes as a DistSQL job, parallelized across nodes.
This has been implemented already in the prototype fork at: jeffswenson/cockroach@feature-distributed-merge. The goal of this issue is to pull it out into a smaller PR thats easier to review.
Goal
Implement an ingestion stage that:
- Takes merged SST metadata as input (URIs, start/end keys).
- Splits and scatters KV ranges before ingestion.
- Efficiently ingests SST files via
DB.AddSSTable(). - Handles hundreds of SSTs in parallel across nodes.
This processor is designed to be used both by:
- The distributed import pipeline (final ingestion phase).
- Future bulk restore or rebalancing operations that ingest pre-built SSTs.
Implementation Highlights
ingestFileProcessor
-
New DistSQL processor implemented under
pkg/sql/bulkingest/ingest_file_processor.go. -
Accepts a
BulkMergeSpec_Output(list of SSTs with start/end keys) via the DistSQL flow. -
For each SST:
- Opens it via the
CloudStorageMux(sql/bulkutil: add CloudStorageMux for managing multi-node external storage #156587) - Reads SST bytes into memory with
ioctx.ReadAllWithScratch(). - Calls
db.AddSSTable(ctx, startKey, endKey, data, ...). - Closes and releases all handles.
- Opens it via the
-
Uses Pebble’s built-in validation to ensure SSTs are well-formed and range-aligned.
Range Preparation
- Before ingestion, calls
splitAndScatterSpans()(sql/bulkingest: add range split and scatter utilities #156574) to:- Split ranges at SST boundaries.
- Randomize replica and lease placement for better distribution.
- Tolerates scatter errors, logging them without halting ingestion.
Cleanup and Safety
- Each SST read and write is scoped to a context; all readers are closed promptly.
- Calls
defer reader.Close(ctx)and releases buffers when complete. - Uses
CloudStorageMux.Close()to release all cachedExternalStorageinstances at the end of execution.
Planning and Flow Integration
- Integrated into DistSQL via
rowexec.NewIngestFileProcessor. - Can be scheduled as a final stage after the merge coordinator processor.
Testing Plan
Unit Tests
ingest_test.go- Writes synthetic SSTs via
bulksst.Writer. - Runs a mock ingestion flow using the new processor.
- Verifies:
- SSTs are successfully ingested into KV.
- Data can be queried and matches expected key/value pairs.
- No range overlap or data loss occurs.
- Writes synthetic SSTs via
Integration Tests
- Multi-node cluster test:
- Generates SSTs for a 3-node cluster using the merge pipeline.
- Runs the ingest phase to AddSST each file.
- Confirms:
- All expected ranges exist (via
SHOW RANGES). - All data is visible in SQL queries.
- No “range missing” or “duplicate key” errors appear.
- All expected ranges exist (via
Dependencies
- Depends on sql/bulkmerge: implement real SST merge logic in merge processor #156658 for merged SST metadata and output protobufs.
- Depends on sql/bulkutil: add CloudStorageMux for managing multi-node external storage #156587 for
CloudStorageMux, which manages file access and reuse. - Depends on sql/bulkingest: add range split and scatter utilities #156574 for
splitAndScatterSpans()utilities used before ingestion. - Relies on the nodelocal improvements from cloud/nodelocal: remove path requirement and improve file handle cleanup #156569 for safe file descriptor cleanup.
Jira issue: CRDB-56097
Epic CRDB-48845