Skip to content

Add selective replication for ReplicatedMergeTree#102244

Open
zoomxi wants to merge 1 commit intoClickHouse:masterfrom
zoomxi:selective_replication
Open

Add selective replication for ReplicatedMergeTree#102244
zoomxi wants to merge 1 commit intoClickHouse:masterfrom
zoomxi:selective_replication

Conversation

@zoomxi
Copy link
Copy Markdown
Contributor

@zoomxi zoomxi commented Apr 9, 2026

By default, every replica in a ReplicatedMergeTree shard stores a full copy of all data. This PR introduces selective replication: a new replication_factor table setting that controls how many replicas store each partition's data.This reduces storage costs and write amplification while maintaining read availability through automatic partition-to-replica assignment, query routing, and background rebalancing.

This implementation is inspired by #58132 .

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Add selective replication for ReplicatedMergeTree, allowing each partition to be stored on only replication_factor replicas instead of all replicas in a shard. Partitions are automatically rebalanced in the background. Closes #45766.

Documentation entry for user-facing changes

  • [*] Documentation is written (mandatory for new features)

@azat azat added the can be tested Allows running workflows for external contributors label Apr 9, 2026
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh Bot commented Apr 9, 2026

Workflow [PR], commit [eb3f0d9]

Summary:

job_name test_name status info comment
Stateless tests (amd_asan_ubsan, flaky check) failure
04076_selective_replication_mutations FAIL cidb
04076_selective_replication_mutations FAIL cidb
04076_selective_replication_mutations FAIL cidb
04076_selective_replication_mutations FAIL cidb
04076_selective_replication_mutations FAIL cidb
BuzzHouse (arm_asan_ubsan) error

AI Review

Summary

This PR adds selective replication for ReplicatedMergeTree (new replication_factor, assignment/migration machinery, read/write routing, system tables, and tests). I found one blocker: clone migration currently writes one Keeper log entry per part without batching, which can overload Keeper and make large-partition migrations unreliable.

Findings

❌ Blockers

  • [src/Storages/MergeTree/PartitionMigrationCoordinator.cpp:388] startClone creates one zk->create call per part despite defining SelectiveReplication::GET_PART_BATCH_SIZE. For large partitions, this can cause high Keeper write amplification, timeouts, and partial-progress failures during migration.
    • Suggested fix: batch GET_PART log writes via tryMulti in chunks of GET_PART_BATCH_SIZE with retry/error handling per batch.
Tests
  • ⚠️ Add a stress case covering migration of a partition with many parts (far above 100) to verify batched GET_PART behavior and Keeper load safety.
ClickHouse Rules
Item Status Notes
Deletion logging
Serialization versioning
Core-area scrutiny
No test removal
Experimental gate
No magic constants
Backward compatibility
SettingsChangesHistory.cpp
PR metadata quality
Safe rollout ⚠️ Keeper pressure risk during clone migration for large partitions until batched writes are implemented
Compilation time
No large/binary files
Final Verdict
  • Status: ❌ Block
  • Minimum required actions:
    • Batch clone GET_PART log creation in startClone (chunked tryMulti using GET_PART_BATCH_SIZE) and handle partial batch failures robustly.

@clickhouse-gh clickhouse-gh Bot added the pr-feature Pull request with new product feature label Apr 9, 2026
Comment thread src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from e6432f0 to 76ef782 Compare April 10, 2026 09:53
Comment thread src/Storages/StorageReplicatedMergeTree.cpp
@zoomxi zoomxi force-pushed the selective_replication branch 2 times, most recently from 481fe9a to 989b2dd Compare April 13, 2026 07:51
Comment thread src/Storages/MergeTree/PartitionMigrationCoordinator.cpp Outdated
Comment thread src/Storages/MergeTree/PartitionMigrationCoordinator.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from 989b2dd to d140478 Compare April 13, 2026 08:50
Comment thread src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from d140478 to 24aaa89 Compare April 13, 2026 09:56
Comment thread src/Storages/MergeTree/KeeperReplicaAssignment.cpp Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from 24aaa89 to f7c9889 Compare April 13, 2026 11:29
Comment thread docs/en/operations/system-tables/selective_assignments.md Outdated
Comment thread docs/en/operations/system-tables/selective_migrations.md Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from f7c9889 to 07edaf9 Compare April 14, 2026 13:02
Comment thread src/Storages/MergeTree/PartitionMigrationCoordinator.cpp Outdated
Comment thread docs/en/operations/system-tables/replication_queue.md Outdated
@zoomxi zoomxi force-pushed the selective_replication branch from 07edaf9 to eb3f0d9 Compare April 15, 2026 03:44

String shared_log_path = fs::path(storage.getZooKeeperPath()) / "log/log-";

for (const auto & part_name : meta.source_parts_snapshot)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startClone defines SelectiveReplication::GET_PART_BATCH_SIZE, but still issues one zk->create RPC per part in a tight loop. For large partitions this can generate thousands of sequential Keeper writes and significantly delay migration or trigger Keeper timeouts under load.

Please batch log entry creation in chunks (for example, GET_PART_BATCH_SIZE) using tryMulti and retry partial batch failures. That keeps migration throughput predictable and reduces Keeper pressure.

@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh Bot commented Apr 15, 2026

LLVM Coverage Report

Metric Baseline Current Δ
Lines 84.00% 84.10% +0.10%
Functions 90.90% 90.90% +0.00%
Branches 76.50% 76.60% +0.10%

Changed lines: 74.48% (1643/2206) | lost baseline coverage: 58 line(s) · Uncovered code

Full report · Diff report

@zoomxi zoomxi force-pushed the selective_replication branch from eb3f0d9 to dcdfb2a Compare April 15, 2026 14:12
@clickhouse-gh clickhouse-gh Bot added the manual approve Manual approve required to run CI label Apr 15, 2026
@zoomxi zoomxi force-pushed the selective_replication branch 3 times, most recently from ad3885f to 914956b Compare April 21, 2026 12:07
@zoomxi
Copy link
Copy Markdown
Contributor Author

zoomxi commented Apr 21, 2026

1. Keeper Data Structure

Under each table's ZK path ({zk_path}), a /selective subtree stores all metadata for selective replication:

{zk_path}/selective/
  config                        — JSON: {"replication_factor": N}
  counts                        — "format version: 1\nr1:10,r2:8,..." (per-replica partition counts)
  assignments/
    {partition_id}              — "format version: 1\nr1,r2" or "r1,r2:cloning" during migration
  rebalance_lock                — Ephemeral node holding auto-rebalance distributed lock
  migrations/
    {uuid}                      — JSON: {state, partition_id, source_replica, target_replica, coordinator, ...}
      clone_complete            — Empty node, created by target replica to signal clone is done

Key points:

  • /selective/config is written once at CREATE TABLE and verified by subsequent replicas on join.
  • /selective/assignments/{pid} uses a format version: 1 text format. The :cloning suffix marks a replica still pulling data during migration — excluded from read/write routing until SWITCH removes the suffix.
  • /selective/counts is a denormalized cache of per-replica partition counts. Updated atomically with assignments in a single ZK multi-op, so the least-loaded-first allocator never needs a full scan.
  • /selective/migrations/{uuid} stores migration metadata as JSON. State transitions are CLONE → SWITCH → DONE (or FAILED on rollback), CAS-protected.

@zoomxi
Copy link
Copy Markdown
Contributor Author

zoomxi commented Apr 21, 2026

2. Read Path

When replication_factor > 0, SELECT queries are routed so each partition is read only from an assigned replica:

SELECT * FROM table
  │
  ├─ createStorageSnapshot()
  │    └─ getAssignments(zk, {})  →  60s TTL cache or ZK refresh
  │    └─ snapshot.selective_assignment_map[pid] = assigned_replicas
  │
  ├─ getQueryProcessingStage()
  │    ├─ depth == 0 && has remote partitions → WithMergeableState
  │    └─ depth == 0 && all local              → default (Complete)
  │
  ├─ read()  [processed_stage == WithMergeableState && depth == 0]
  │    └─ readWithSelectiveRouting()
  │         ├─ buildPartitionRoutingMap():
  │         │    ├─ local partition  → read locally + _partition_id IN (...) filter
  │         │    ├─ remote partition → hash-select one assigned replica → remote sub-query
  │         │    └─ not in map       → fallback to local read
  │         ├─ redistributeFailedReplicas() on unreachable replicas
  │         └─ UnionStep merge all shard plans
  │
  └─ Sub-queries (depth > 0): readLocalImpl() only, no re-routing

Depth guard: Sub-queries arrive with distributed_depth > 0 and skip selective routing, reading only their locally-filtered partitions. MAX_FORWARDING_DEPTH = 3 prevents infinite forwarding loops.

Cache consistency: The 60-second TTL means a recently-migrated partition may route to the old replica until the cache expires. This is an acceptable trade-off for avoiding per-query ZK lookups.

@zoomxi
Copy link
Copy Markdown
Contributor Author

zoomxi commented Apr 21, 2026

3. Write Path

INSERT queries are forwarded to assigned replicas with CAS-protected assignment verification:

INSERT (distributed_depth == 0)
  │
  ├─ allocatePartitions()
  │    └─ least-loaded-first: prefer replicas with fewer partitions
  │    └─ tie-break: hash(partition_id) rotation
  │    └─ ZK multi-op: create assignment + update counts (atomic)
  │
  ├─ buildForwardingPlan()
  │    ├─ local blocks  (assigned to self)  → stay for local write
  │    └─ remote blocks (assigned to other) → forward with depth++
  │
  ├─ commitPart()  [for each local block]
  │    ├─ getAssignmentCASVersion()  →  cache-first, ZK on miss
  │    ├─ CheckRequest(assignment_path, cas_version) in multi-op
  │    ├─ Success → part committed
  │    └─ ZBADVERSION:
  │         ├─ still assigned    → rollback temp part, retry with new version
  │         └─ no longer assigned → AssignmentChangedException → Phase 2
  │
  └─ Phase 2: re-forward (assignment_failures)
       ├─ depth >= MAX_FORWARDING_DEPTH (3) → throw error
       ├─ re-allocatePartitions()
       └─ buildForwardingPlan + executeForwarding (re-forward)

Queue filtering: shouldSkipForSelectiveReplication() skips log entries (GET_PART/MERGE/MUTATE) for partitions not assigned to the local replica.

@zoomxi
Copy link
Copy Markdown
Contributor Author

zoomxi commented Apr 21, 2026

4. Rebalance & Migration

Migration State Machine

CLONE  ──→  SWITCH  ──→  DONE
  │                      ↑
  └──→ FAILED ──────────┘  (rollback)
CLONE phase:
  ├─ Coordinator records source_parts_snapshot in migration metadata
  ├─ Target appears as "target:cloning" in assignment (excluded from read/write routing)
  ├─ Coordinator writes GET_PART entries to shared replication log
  ├─ Target pulls data via normal replication queue
  └─ Target creates clone_complete signal node when all parts are present

SWITCH phase:
  ├─ Coordinator atomically updates assignment (target:cloning → target, remove source)
  ├─ Atomically updates counts (source -1, target +1) in same multi-op
  └─ CAS-protected against concurrent modifications

DONE / FAILED:
  ├─ Migration ZK subtree cleaned up
  └─ On failure: :cloning entry removed from assignment

@zoomxi
Copy link
Copy Markdown
Contributor Author

zoomxi commented Apr 21, 2026

This PR is labeled manual approve because it modifies clickhouse-test to randomly inject replication_factor on ReplicatedMergeTree tables for selective replication testing. The randomizer distributes replication_factor as follows: 30% unset (0, i.e. no selective replication) / 20% rf=1 / 30% rf=2 / 20% rf=3.

Could someone help approve this? Thanks!

CC @azat

@zoomxi zoomxi force-pushed the selective_replication branch from 8ac59c8 to f070806 Compare May 6, 2026 11:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors manual approve Manual approve required to run CI pr-feature Pull request with new product feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Trivial Support For Resharding (RFC)

2 participants