-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
This PR implements the core SST merging logic within the bulk merge DistSQL processor. It builds upon the scaffolding added in #156580 and introduces real data merging using Pebble iterators and external SST readers.
With this change, the merge processor now reads multiple input SSTs from nodelocal:// or cloud storage, merges them into key order, and writes a new, range-aligned SST as output. This forms the central data-processing step of the distributed import pipeline.
This PR corresponds to the Merge phase of the pipeline, where SSTs produced during the import map phase are merged and repartitioned before ingestion.
Previous PRs established the processor structure (loopback, coordinator, DistSQL plan). This PR replaces the placeholder “string append” logic with actual Pebble-based SST merging.
This has been implemented already in the prototype fork at: jeffswenson/cockroach@feature-distributed-merge. The goal of this issue is to pull it out into a smaller PR thats easier to review.
Goal
Implement the end-to-end merge behavior of the bulk merge processor:
- Read multiple SSTs from external storage (via the
CloudStorageMux). - Use Pebble iterators to merge KVs in key order.
- Write merged data into new SSTs aligned to split boundaries.
- Return metadata describing the merged SSTs (URIs, start/end keys).
Implementation Highlights
bulkMergeProcessor.mergeSSTs()
Core merge implementation:
- Reads the list of SST URIs from the processor spec (
BulkMergeSpec). - Uses the
CloudStorageMuxto open input SSTs efficiently. - Creates Pebble iterators across all input SSTs using
storageccl.ExternalSSTReader(). - Merges keys in sorted order, writing to a new SST via
storage.MakeIngestionSSTWriter(). - Flushes the final SST to the processor’s
OutputUrilocation (e.g.nodelocal://1/merge/out/0.sst).
Range Partitioning Logic
- Uses split keys from
spec.Splitsto assign input keys to the correct merge task. - Each merge processor operates on a single
[start, end)key range. - Ensures non-overlapping SST outputs and consistent key boundaries.
Updated BulkMergeSpec
- The
BulkMergeSpecnow carries full information required for merging:
message BulkMergeSpec {
repeated SST ssts = 1; // Input SSTs to merge
repeated bytes splits = 2; // Key split boundaries
optional string output_uri = 3; // Output location for merged SSTs
}- Each SST includes its URI, start/end keys, and (later) optional stats.
Output Format
- After merging, each processor emits a protobuf-encoded BulkMergeSpec_Output message containing the new SSTs it produced.
- This output is consumed by the merge coordinator and, later, by the ingest phase.
Testing Plan
Unit Tests
- Add merge_test.go with in-memory clusters to verify:
- Merging two overlapping SSTs produces a single sorted output.
- Start and end keys of merged SSTs match expected splits.
- SSTs can be re-opened and read back correctly.
Integration Tests
- Run a multi-node merge flow using:
- Input SSTs generated by bulksst.Writer.
- Split boundaries from bulkingest.pickSplits().
- Validate that:
- Each node writes exactly one merged SST.
- Output SSTs are sorted and non-overlapping.
- DistSQL flow completes successfully across nodes.
Dependencies
- Depends on sql/bulkmerge: add merge processor scaffolding and DistSQL flow #156580 for merge processor scaffolding and DistSQL flow wiring.
- Depends on sql/bulkutil: add CloudStorageMux for managing multi-node external storage #156587 for CloudStorageMux, which handles external storage access.
- Uses output SST metadata structures introduced in sql/bulksst: introduce SST writer and file allocator #156571 (
bulksst.Writer)
Jira issue: CRDB-56096
Epic CRDB-48845
Metadata
Metadata
Assignees
Labels
Type
Projects
Status