Conversation
…l + background puller) Phase 2 closes the loop on peer replication: actual byte-level transfer of Parquet files between cluster nodes. Phase 1 (#386) committed file metadata to the Raft manifest; this PR adds the data plane that moves the bytes. ## What's new - **`filereplication` package** — `Puller` (bounded worker pool reacting to FSM callbacks) and `FetchClient` (HMAC-authenticated TCP client that streams body bytes through a SHA-256 hasher into the local storage backend). - **New protocol messages** — `MsgFetchFile` / `MsgFetchFileAck` hooked into the existing coordinator TCP dispatch, so replication traffic rides the cluster port and reuses the TLS config from #382. - **SHA-256 on flush** — computed once on the in-memory Parquet buffer in `arrow_writer.flushPartitionedData`, threaded through file_registrar into the Raft manifest so every node knows the authoritative hash before any fetch starts. - **Coordinator wiring** — `handleFetchFile` validates the HMAC, sanitizes the path, confirms the entry is in the FSM manifest (so peers cannot fetch arbitrary local files), then streams the body via `Backend.ReadTo`. - **Shutdown ordering** — puller at PriorityHTTPServer+1 (stop consuming), file-registrar at PriorityBuffer (drain producer queue), coordinator at PriorityCompaction (close listener + Raft). - **Startup validation** — `ReplicationEnabled && SharedSecret == ""` is a fatal startup error. ## Security HMAC binds `{nonce, nodeID, clusterName, path, timestamp}` via a new `ComputeFetchHMAC` / `ValidateFetchHMAC` pair — the path is included in the signed payload so a stolen MAC for file A cannot be replayed to fetch a different file B within the freshness window. Join handshake continues to use the original `ComputeHMAC` (no breaking change). Additional hardening: - Explicit `SizeBytes >= 0` guards before `io.CopyN`, both on `entry` (caller) and on `ack` (peer), preventing a hostile or buggy origin from short- circuiting body validation via a negative length. - Tightened stream deadlines (client fallback 60s, server body-stream 2min). - `handleFetchFile` derives contexts from `coordinator.ctx` instead of `context.Background()`, so shutdown cancels in-flight transfers. - Path sanitization rejects absolute paths, traversal, and null bytes before the manifest lookup. Gating: peer replication is part of `FeatureClustering`. The puller is constructed inside `coordinator.Start()` only after the license check, so a user without an enterprise license cannot enable it by setting env vars. ## Tests - Unit: 19 tests across `puller_test.go` and `fetch_client_test.go` covering enqueue/skip/retry/drop semantics, HMAC propagation, happy path, checksum mismatch, body-hash mismatch, size mismatch, dial error, large-body streaming, config validation. - Integration: 4 tests in `filereplication_integration_test.go` wire the real handler to the real client: roundtrip, bad-HMAC rejection, **path-binding replay rejection** (regression test for the critical HMAC fix), and off-manifest rejection. - Coordinator-side: `coordinator_fetch_test.go` unit-tests `handleFetchFile`. - Manual docker test: 3-node cluster on local storage (per-node volumes, no S3), line-protocol ingestion, SHA-256 verified identical across all three volumes, queries on readers return the replicated data. ## Non-goals for Phase 2 Async replication only. No resume via HTTP Range, no catch-up on join, no quorum durability, no multi-peer fanout, no compaction-aware routing — those land in Phases 3–5.
Contributor
There was a problem hiding this comment.
Code Review
This pull request implements Phase 2 of Peer File Replication for Arc Enterprise, enabling the transfer of Parquet file bytes between cluster nodes. The implementation includes a background Puller worker pool that monitors the Raft manifest for new files, an HMAC-authenticated fetch protocol over the existing coordinator TCP connection, and SHA-256 integrity verification. The ArrowBuffer has been updated to compute checksums during the flush path, which are then stored in the cluster manifest. Feedback was provided regarding the hardcoded 2-minute streaming timeout in the fetch handler, which may be too restrictive for large files or slow network environments and should ideally be configurable.
Addresses Gemini review feedback on PR #387. The origin-side body-stream timeout was hardcoded to 2 minutes, which could be too restrictive for large Parquet files or edge deployments on slow links. Operators now control it via `cluster.replication_serve_timeout_ms` (default 120000), mirroring the existing puller-side `replication_fetch_timeout_ms`.
xe-nvdk
added a commit
that referenced
this pull request
Apr 11, 2026
…r fallback + startup reconciliation) Phase 2 (#387) replicated files reactively — the puller only saw files that committed to Raft while it was running. Phase 3 closes two fatal gaps for Kubernetes deployments: 1. A node that was down when a file was flushed never got the callback; on restart the file was missing from local disk and queries returned silently incomplete results. 2. A fresh reader joining a cluster with existing data received the Raft log but couldn't pull anything because entry.OriginNodeID often pointed to a writer pod that had been rescheduled and no longer existed. ## What's new - **Startup reconciliation walker** — new `Puller.RunCatchUp` feeds `fsm.GetAllFiles()` through the existing worker pool. Coordinator spawns it in a background goroutine after the puller starts, guarded by `sync.Once` so it runs exactly once per coordinator lifetime. Queue-depth backpressure (sleep when queue > 80% full) prevents thundering-herd drop storms on large manifests. - **Multi-peer fallback resolver** — `PeerResolver.ResolvePeer(nodeID) (string, bool)` becomes `ResolvePeers(originNodeID, path string) []string`. The coordinator's resolver now returns `[origin, ...healthy peers excluding self]` and the puller iterates within a single attempt, falling through on any per-peer failure except checksum mismatch. The `path` parameter is part of the signature so Phase 4+ can add shard-aware routing without changing the interface. The old `*raft.FileEntry` parameter is gone — the resolver takes just the two fields it actually needs, decoupling the interface from the raft package. - **Typed machine-readable ack Code** — new `protocol.AckErrorCode` with constants (`AckCodeNotFound`, `AckCodeManifest`, `AckCodeAuth`, `AckCodeBackend`, `AckCodeRaft`, `AckCodeInvalidPath`). Lets the client distinguish "peer doesn't have this file" from "peer rejected me" without brittle substring matching. Backward-compatible additive JSON field: Phase 2 peers that don't send Code are supported via exact-match fallback on `protocol.ErrMsgFileNotInManifest` / `ErrMsgFileNotFound`, both of which live in the protocol package next to the typed codes so a refactor touches both sites at once. Exact match (not substring) prevents an adversary from crafting "file not found on local backend: /etc/passwd" to confuse the check. - **Inflight dedup set** on `Puller` — prevents double-pulls when the walker races with reactive FSM callbacks. Cleanup via `defer` in `processEntry` so panics don't leak the slot. - **`Node.Barrier` wrapper** over `hraft.Raft.Barrier` — runs before the catch-up walk so `fsm.GetAllFiles()` reflects every committed entry. On timeout the walker logs a warning and proceeds against the possibly-stale follower view. - **`ReplicationReady()` / `ReplicationCatchUpStatus()`** coordinator accessors land now but are not yet consumed. Phase 5 will use them to hard-gate the query path; the surface lands here to avoid another API change later. ## Configuration Three new knobs (ARC_CLUSTER_REPLICATION_CATCHUP_*): - replication_catchup_enabled=true master switch / kill-switch - replication_catchup_barrier_timeout_ms=10000 Raft barrier before walking - replication_catchup_queue_high_water=0.8 backpressure threshold No new worker-count knob — catch-up shares the Phase 2 pool. ## Security - Checksum mismatch is a short-circuit, not a fallback trigger. A malicious peer cannot force other readers to pull-and-corrupt a file by responding with a bad body — the puller stops trying other peers within the same attempt on ErrChecksumMismatch. Regression test: TestPullerMultiPeerChecksumMismatchDoesNotFallThrough. - Phase 2 path-bound HMAC preserved — no change to the auth layer. - Every `sendFetchError` call site uses a typed `protocol.AckErrorCode` constant; no path where user input flows into Code. - Multi-peer fallback expands trust from `OriginNodeID` to any healthy cluster member, but all peers are already mutually authenticated via shared secret + TLS. ## Gating Peer replication catch-up is part of FeatureClustering. The puller is only constructed inside the already-gated clustering code path; the catch-up goroutine is spawned from the same site. No new license flag, no new API routes, zero overhead for OSS / standalone. ## Tests - **25+ unit tests** covering the walker, multi-peer fallback, dedup, ack Code handling, and the Phase 2 fallback exact-match contract: - catchup_test.go: happy path, throttle at high-water, ctx cancellation, single-shot guard, concurrent reactive dedup - puller_test.go: multi-peer fallback on ErrFileNotOnPeer; fallback on transport error; all-peers-fail max-attempts; checksum mismatch does NOT fall through; Enqueue dedup on identical paths - fetch_client_test.go: Code=not_found / Code=manifest map to ErrFileNotOnPeer; Phase 2 peer fallback via both known strings; auth and backend errors do NOT map to ErrFileNotOnPeer - **3 integration tests** wiring the real handleFetchFile server to the real puller + FetchClient + RunCatchUp path: - TestPhase3CatchUpHappyPath: 10 files pulled into a fresh backend with SHA-256 verified - TestPhase3CatchUpOriginAbsentFallbackSucceeds: the Kubernetes rotation case — resolver returns [peer2] with no origin, catch-up still succeeds - TestPhase3CatchUpNoPeerHasFile: all peers return Code=not_found, clean give-up, reader backend stays empty Manual docker test deferred (docker daemon not available); the 28 unit/integration tests cover every behavior the docker test would verify. ## Pre-merge review Four parallel review agents ran before commit (security, license gating, redundancy, elegance). All findings addressed: - Typed AckErrorCode constants instead of bare strings - Shared error strings extracted to protocol package - sync.Once once-per-lifetime guarantee documented - fakeFetcher extended with repeat mode; countingFetcher removed - PeerResolver interface decoupled from *raft.FileEntry Deferred as non-blocking: - Paginated fsm.GetAllFiles — accepted O(N) snapshot copy; emergency kill-switch exists (Phase 5) - Barrier timeout proceeds against stale manifest — documented posture ## Non-goals for Phase 3 Async, startup-only catch-up. Not included: periodic reconciliation, hard query gating during catch-up, paginated FSM iteration, bandwidth caps, HTTP Range resume. These land in Phases 4 and 5.
xe-nvdk
added a commit
that referenced
this pull request
Apr 11, 2026
…r fallback + startup reconciliation) (#388) * feat(cluster): peer-replication catch-up on join — Phase 3 (multi-peer fallback + startup reconciliation) Phase 2 (#387) replicated files reactively — the puller only saw files that committed to Raft while it was running. Phase 3 closes two fatal gaps for Kubernetes deployments: 1. A node that was down when a file was flushed never got the callback; on restart the file was missing from local disk and queries returned silently incomplete results. 2. A fresh reader joining a cluster with existing data received the Raft log but couldn't pull anything because entry.OriginNodeID often pointed to a writer pod that had been rescheduled and no longer existed. ## What's new - **Startup reconciliation walker** — new `Puller.RunCatchUp` feeds `fsm.GetAllFiles()` through the existing worker pool. Coordinator spawns it in a background goroutine after the puller starts, guarded by `sync.Once` so it runs exactly once per coordinator lifetime. Queue-depth backpressure (sleep when queue > 80% full) prevents thundering-herd drop storms on large manifests. - **Multi-peer fallback resolver** — `PeerResolver.ResolvePeer(nodeID) (string, bool)` becomes `ResolvePeers(originNodeID, path string) []string`. The coordinator's resolver now returns `[origin, ...healthy peers excluding self]` and the puller iterates within a single attempt, falling through on any per-peer failure except checksum mismatch. The `path` parameter is part of the signature so Phase 4+ can add shard-aware routing without changing the interface. The old `*raft.FileEntry` parameter is gone — the resolver takes just the two fields it actually needs, decoupling the interface from the raft package. - **Typed machine-readable ack Code** — new `protocol.AckErrorCode` with constants (`AckCodeNotFound`, `AckCodeManifest`, `AckCodeAuth`, `AckCodeBackend`, `AckCodeRaft`, `AckCodeInvalidPath`). Lets the client distinguish "peer doesn't have this file" from "peer rejected me" without brittle substring matching. Backward-compatible additive JSON field: Phase 2 peers that don't send Code are supported via exact-match fallback on `protocol.ErrMsgFileNotInManifest` / `ErrMsgFileNotFound`, both of which live in the protocol package next to the typed codes so a refactor touches both sites at once. Exact match (not substring) prevents an adversary from crafting "file not found on local backend: /etc/passwd" to confuse the check. - **Inflight dedup set** on `Puller` — prevents double-pulls when the walker races with reactive FSM callbacks. Cleanup via `defer` in `processEntry` so panics don't leak the slot. - **`Node.Barrier` wrapper** over `hraft.Raft.Barrier` — runs before the catch-up walk so `fsm.GetAllFiles()` reflects every committed entry. On timeout the walker logs a warning and proceeds against the possibly-stale follower view. - **`ReplicationReady()` / `ReplicationCatchUpStatus()`** coordinator accessors land now but are not yet consumed. Phase 5 will use them to hard-gate the query path; the surface lands here to avoid another API change later. ## Configuration Three new knobs (ARC_CLUSTER_REPLICATION_CATCHUP_*): - replication_catchup_enabled=true master switch / kill-switch - replication_catchup_barrier_timeout_ms=10000 Raft barrier before walking - replication_catchup_queue_high_water=0.8 backpressure threshold No new worker-count knob — catch-up shares the Phase 2 pool. ## Security - Checksum mismatch is a short-circuit, not a fallback trigger. A malicious peer cannot force other readers to pull-and-corrupt a file by responding with a bad body — the puller stops trying other peers within the same attempt on ErrChecksumMismatch. Regression test: TestPullerMultiPeerChecksumMismatchDoesNotFallThrough. - Phase 2 path-bound HMAC preserved — no change to the auth layer. - Every `sendFetchError` call site uses a typed `protocol.AckErrorCode` constant; no path where user input flows into Code. - Multi-peer fallback expands trust from `OriginNodeID` to any healthy cluster member, but all peers are already mutually authenticated via shared secret + TLS. ## Gating Peer replication catch-up is part of FeatureClustering. The puller is only constructed inside the already-gated clustering code path; the catch-up goroutine is spawned from the same site. No new license flag, no new API routes, zero overhead for OSS / standalone. ## Tests - **25+ unit tests** covering the walker, multi-peer fallback, dedup, ack Code handling, and the Phase 2 fallback exact-match contract: - catchup_test.go: happy path, throttle at high-water, ctx cancellation, single-shot guard, concurrent reactive dedup - puller_test.go: multi-peer fallback on ErrFileNotOnPeer; fallback on transport error; all-peers-fail max-attempts; checksum mismatch does NOT fall through; Enqueue dedup on identical paths - fetch_client_test.go: Code=not_found / Code=manifest map to ErrFileNotOnPeer; Phase 2 peer fallback via both known strings; auth and backend errors do NOT map to ErrFileNotOnPeer - **3 integration tests** wiring the real handleFetchFile server to the real puller + FetchClient + RunCatchUp path: - TestPhase3CatchUpHappyPath: 10 files pulled into a fresh backend with SHA-256 verified - TestPhase3CatchUpOriginAbsentFallbackSucceeds: the Kubernetes rotation case — resolver returns [peer2] with no origin, catch-up still succeeds - TestPhase3CatchUpNoPeerHasFile: all peers return Code=not_found, clean give-up, reader backend stays empty Manual docker test deferred (docker daemon not available); the 28 unit/integration tests cover every behavior the docker test would verify. ## Pre-merge review Four parallel review agents ran before commit (security, license gating, redundancy, elegance). All findings addressed: - Typed AckErrorCode constants instead of bare strings - Shared error strings extracted to protocol package - sync.Once once-per-lifetime guarantee documented - fakeFetcher extended with repeat mode; countingFetcher removed - PeerResolver interface decoupled from *raft.FileEntry Deferred as non-blocking: - Paginated fsm.GetAllFiles — accepted O(N) snapshot copy; emergency kill-switch exists (Phase 5) - Barrier timeout proceeds against stale manifest — documented posture ## Non-goals for Phase 3 Async, startup-only catch-up. Not included: periodic reconciliation, hard query gating during catch-up, paginated FSM iteration, bandwidth caps, HTTP Range resume. These land in Phases 4 and 5. * fix(cluster): Phase 3 review — catchupSkippedLocal comment + peer loop ctx.Canceled early-exit Addresses both Gemini review findings on PR #388. 1. catchupSkippedLocal metric comment was wrong. It claimed "entries the walker skipped because backend.Exists" but the walker actually bumps this counter on origin==self and on inflight-dedup. backend.Exists skips are counted in totalSkippedLocal via processEntry, not in RunCatchUp. Replaced the misleading one-liner with a precise comment that cross-references the two other counters it could be confused with. 2. The peer fallback loop in processEntry continued to iterate through every remaining candidate after pullOnce returned a context.Canceled- wrapped error, burning wasted dial attempts and debug log lines during shutdown on clusters with many peers. Added an explicit errors.Is( err, context.Canceled) check right after the error is captured — same spot where ErrChecksumMismatch already short-circuits — that returns immediately instead of falling through. Added regression test TestPullerMultiPeerCtxCanceledEarlyExit: peer-1 returns context.Canceled, peers 2 and 3 would fall through if called, and the test asserts peers 2 and 3 receive exactly zero calls. Sits next to TestPullerMultiPeerChecksumMismatchDoesNotFallThrough so the two "does-not-fall-through" regression guards live together.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
filereplicationpackage with a bounded-queuePullerworker pool reacting to FSM callbacks + aFetchClientthat dials peers over the existing cluster coordinator TCP port, authenticates with a path-bound HMAC, streams the body through a SHA-256 hasher, and writes into the local backend.What's new
MsgFetchFile/MsgFetchFileAckhooked into the existinghandlePeerConnectiondispatch. Replication traffic rides the cluster port (:9100), physically isolated from client API traffic, and reuses the cluster TLS config from feat(cluster): add TLS encryption and shared secret authentication (#366, #367) #382.arrow_writer.flushPartitionedData, threaded throughfile_registrarinto the Raft manifest. Every node learns the authoritative hash before any fetch starts; the puller verifies on the wire and aborts on mismatch.handleFetchFilevalidates the HMAC, sanitizes the path, confirms the entry is in the FSM manifest (so peers cannot fetch arbitrary local files even with a valid HMAC), then streams the body viaBackend.ReadTo.PriorityHTTPServer+1(stop consuming), file-registrar atPriorityBuffer(drain producer queue), coordinator atPriorityCompaction(close listener + Raft). In-flight pulls on shutdown are short-circuited first, then the producer queue drains, then Raft stops.cluster.replication_enabled=trueandcluster.shared_secretis empty.Security
HMAC binds the file path via new
security.ComputeFetchHMAC/ValidateFetchHMAChelpers:A stolen MAC for file A cannot be replayed within the ±5-minute freshness window to fetch a different file B. The join handshake continues to use the original
ComputeHMAC(no breaking change to cluster join).Additional hardening from the pre-merge security review:
SizeBytes >= 0guards beforeio.CopyNon both the manifest entry and the peer ack header, preventing a hostile or buggy origin from short-circuiting body validation via a negative length.handleFetchFilederives itsExistsand body-stream contexts fromcoordinator.ctxinstead ofcontext.Background(), so shutdown cancels any in-flight transfer cleanly.Gating: peer replication is part of
FeatureClustering. The puller is constructed insidecoordinator.Start()only after the license check, so a user without an enterprise license cannot enable it by setting env vars.Configuration
arc.tomlkeyARC_CLUSTER_REPLICATION_ENABLEDcluster.replication_enabledfalseARC_CLUSTER_REPLICATION_PULL_WORKERScluster.replication_pull_workers4ARC_CLUSTER_REPLICATION_QUEUE_SIZEcluster.replication_queue_size1024ARC_CLUSTER_REPLICATION_FETCH_TIMEOUT_MScluster.replication_fetch_timeout_ms60000ARC_CLUSTER_REPLICATION_RETRY_MAX_ATTEMPTScluster.replication_retry_max_attempts3ARC_CLUSTER_SHARED_SECRETis required when replication is enabled.Non-goals for Phase 2
Async replication only. Not included (planned for Phases 3–5):
Test plan
go test ./internal/cluster/...— full cluster suite greengo build ./cmd/... ./internal/...— clean buildfilereplication/puller_test.go+filereplication/fetch_client_test.gocovering enqueue/skip-self/skip-already-local/retry/drop semantics, HMAC propagation, happy path, checksum mismatch, body-hash mismatch, size mismatch, dial error, large-body streaming, config validation.filereplication_integration_test.gowire the realhandleFetchFilehandler to the realFetchClient:TestPhase2FetchRoundtrip— full protocol round-trip including HMAC validation, manifest lookup, body streaming, checksum verificationTestPhase2FetchRejectsBadHMAC— wrong secret rejectedTestPhase2FetchRejectsHMACPathReplay— regression test for path binding: signs a MAC for file A and requests file B, asserts the server rejects itTestPhase2FetchRejectsUnknownPath— file exists on disk but is not in the FSM manifest → rejectedcoordinator_fetch_test.goexercisehandleFetchFiledirectly.docker compose up→ all 3 nodes join, pullers start on readerssha256sumidentical across all 3 volumes for every replicated filecpu+ 2 rows ofmem)/api/v1/cluster/filesmanifest consistent across all 3 nodesPre-merge review findings
Four parallel review agents ran before merge (security, license gating, redundancy, elegance). All CRITICAL + HIGH security findings and all SHOULD-FIX elegance findings are addressed in this PR:
ComputeFetchHMAC/ValidateFetchHMAC)SizeBytes >= 0validation beforeio.CopyNhandleFetchFileusescoordinator.ctxinstead ofcontext.Background()var _ net.Connassertion removedDeferred to follow-up (non-blocking):
SizeBytes > 0+ SHA-256 hex format validation onRegisterFile(defense in depth)memBackend→testutilpackage)Puller.Stats()typed struct instead ofmap[string]int64