Skip to content

fix: bound replication receive memory to stop worker OOM crash loops#147

Merged
kriszyp merged 3 commits into
mainfrom
fix/replication-receive-backpressure
May 14, 2026
Merged

fix: bound replication receive memory to stop worker OOM crash loops#147
kriszyp merged 3 commits into
mainfrom
fix/replication-receive-backpressure

Conversation

@kriszyp
Copy link
Copy Markdown
Member

@kriszyp kriszyp commented May 14, 2026

Summary

Stop the receive side of replication from being able to push a worker past its 2 GB old-gen limit during peer catch-up.

Why

A production node was crash-looping with ERR_WORKER_OUT_OF_MEMORY every ~25 seconds (284 OOM events in 2 hours) while catching up a 4-hour prerender-DB backlog from a peer. The chain of cause and effect (verified live with allocation sampling against the 5.0.16 dist):

  • onWSMessage decoded every audit record in an incoming transaction batch inside a synchronous do { ... } while (decoder.position < body.byteLength) loop. A single WS message can carry thousands of records; each one created an event holding a decoded value and a closure over the source body buffer, and pushed it onto tableSubscriptionToReplicator with no intra-message backpressure. outstandingCommits was only incremented after the loop ended, so ws.pause() could never fire mid-message.
  • Allocation profile against the running worker showed readAuditEntry @ auditStore.js:442 at 84% of all allocations. Heap was stable at ~70 MB between crashes; the OOM is a sub-second burst that GC reclaims completely after the worker dies.
  • Every worker death triggered onDatabase(...) reassignments for every database it held, which all fired in the same tick — a fresh worker was instantly slammed with catch-up connections and OOM'd in turn. The log shows "Setting up subscription with leader …" repeating in bursts after each crash.

A BLOB_CHUNK handler bug compounds this: stream.write(blobBody) ignored the return value, so when saveBlob's file-write pipeline fell behind, blob chunks accumulated in the PassThrough buffer (627 MB external memory on surviving workers).

What changed

Three independent fixes, all in replication/:

  1. replicationConnection.ts receive loop — convert onWSMessage to async, serialize handler invocations via a messageProcessing promise chain (preserves WS message order), and inside the per-record do/while check tableSubscriptionToReplicator.queue.length. When it exceeds RECEIVE_EVENT_HIGH_WATER_MARK (default 100, env-tunable via replication_receiveEventHighWaterMark) we pause the WS and await tableSubscriptionToReplicator.waitForDrain() before continuing. The existing EVENT_HIGH_WATER_MARK = 100 pattern in core/resources/Table.ts:2753,2836 is reused.
  2. replicationConnection.ts BLOB_CHUNK — respect the stream.write() return value. On false, pause the WS and listen on both 'drain' and 'close' (so a destroyed stream doesn't strand the pause reason).
  3. subscriptionManager.ts worker-exit stagger — when a worker exits, space its onDatabase reassignments by WORKER_EXIT_REASSIGN_STAGGER_MS (100 ms) using a rolling nextWorkerExitReassignAt timestamp.

Because we now have three independent reasons to pause receive (commit backlog, consumer queue, blob stream), the single replicationPaused boolean is replaced with a pauseReasons refcount + addPauseReason()/removePauseReason() helpers so one source resuming the WS can't override another that still wants it paused.

Where to look

  • The async conversion of onWSMessage and the messageProcessing serialization (~line 471): is the .then(..., onReject) form correct for keeping the chain alive through one message's failure? Cross-model review (Gemini) was positive.
  • The pauseReasons refcount and the commitBacklogPaused boolean that gates the original commit-backlog pause/resume. The existing semantics — pause once, resume on any onCommit — are preserved.
  • BLOB_CHUNK release listener pair (on('drain', release); on('close', release)): each pause reason is decremented exactly once. stream.off removes the exact-same function reference so it's safe.
  • The stagger uses module-level state; under concurrent worker-death this is single-thread main, so no synchronization needed.

Testing

  • npm run build → same 60 pre-existing TS errors before and after this change, dist emitted.
  • npm run lint:required → 0 errors.
  • npm run test:integration -- integrationTests/cluster/replicationLoad.test.mjs fails identically on this branch and on main (pre-existing breakage, database 'data' does not exist). Not introduced here.

⚠️ The harper-pro outer repo has no unit-test infrastructure for the replication/ modules. The new code paths are exercised by the existing replication integration tests; a focused stress test that sends a single WS message containing 10k+ records would be the right follow-up to lock in this fix.

Test plan

  • CI passes (or, where it doesn't, fails the same way as main)
  • Deploy to one node in a cluster, watch for ERR_WORKER_OUT_OF_MEMORY to stop in docker logs
  • Confirm system_information.threads[*].utilization stabilizes and the cascade of "Setting up subscription with leader …" log messages stops repeating
  • Verify replication catch-up still completes (cluster_status lastReceivedRemoteTime advances)

🤖 Generated with Claude Code

When a peer is far behind (e.g. a 4-hour prerender backlog) it can send
WS messages containing thousands of audit records. The receive path in
onWSMessage decoded every record in a synchronous do/while, pushing all
events onto the consumer queue in one tick. Allocation profiling on a
production node showed 84% of allocations in readAuditEntry, and workers
were hitting their 2 GB old-gen limit every ~25 seconds, taking
subscriptions with them and triggering a cascade of re-subscriptions
that crashed the new worker too.

Three fixes:

1. replication/replicationConnection.ts — convert onWSMessage to async,
   serialize handler invocations via a promise chain, and inside the
   per-record loop yield to the consumer once tableSubscriptionToReplicator's
   queue exceeds RECEIVE_EVENT_HIGH_WATER_MARK (default 100, env-tunable
   via replication_receiveEventHighWaterMark). Replace the single
   replicationPaused boolean with a pauseReasons refcount so multiple
   backpressure sources can coexist without racing on resume.

2. replication/replicationConnection.ts (BLOB_CHUNK) — respect the
   PassThrough's stream.write() return value. When it goes false the
   downstream file write (saveBlob's pipeline) can't keep up, so pause
   the WS and resume on 'drain'. Also listen for 'close' so a destroyed
   stream (e.g. saveBlob error) doesn't strand a pause reason.

3. replication/subscriptionManager.ts — when a worker exits, stagger
   its onDatabase reassignments by WORKER_EXIT_REASSIGN_STAGGER_MS
   (100 ms) per pair so the replacement worker isn't slammed by a
   thundering herd of catch-up connections at once.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kriszyp kriszyp requested a review from a team as a code owner May 14, 2026 20:49
messageProcessing = messageProcessing.then(
() => onWSMessage(body),
() => onWSMessage(body)
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocker, but worth noting: the messageProcessing chain is never reset when the WS closes. If onWSMessage is mid-await (e.g. inside waitForDrain) when the socket closes, the chain continues running until the drain resolves, keeping the body Buffer alive in the closure. This is bounded — tableSubscriptionToReplicator drains on its own — but calling ws.resume() / ws.pause() on an already-closed socket is a side-effect (harmless with the ws library, which no-ops after destroy).

A simple guard to consider for a follow-up:

Suggested change
);
let messageProcessing: Promise<void> = Promise.resolve();
let wsClosed = false;
ws.on('message', (body: Buffer) => {
messageProcessing = messageProcessing.then(
() => wsClosed ? undefined : onWSMessage(body),
() => wsClosed ? undefined : onWSMessage(body)
);
});

and set wsClosed = true in the ws.on('close', ...) handler. Not required for this fix.

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 14, 2026

Reviewed; no blockers found.

kriszyp and others added 2 commits May 14, 2026 14:56
Addresses claude[bot] review comment on PR #147. After the WS closes,
messages already in flight on the messageProcessing chain would otherwise
keep running through onWSMessage, holding their source body Buffers alive
in closures until the consumer drains and calling ws.pause()/resume() on
an already-destroyed socket. Set a wsClosed flag in the 'close' handler
and short-circuit the chain.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A separate analysis of the production OOM crash loop identified
MaxListenersExceededWarning for 'dropDatabase' and 'updateTable' on the
global databaseEventsEmitter as a compounding cause: leaked listeners
hold references that prevent V8 from reclaiming memory between worker
restarts. Two distinct bugs were stacking up listeners:

1. replication/replicator.ts — forEachReplicatedDatabase registered both
   an onRemovedDB and an onUpdatedTable listener but only returned the
   onUpdatedTable handle, so callers that .remove()'d the returned value
   still leaked the dropDatabase listener forever. The per-(database,peer)
   call site in replicationConnection.ts hit this every system-DB
   subscription, matching the observed "11 dropDatabase listeners" warning
   on a node with ~10 peers. Return a composite handle that removes both.

2. replication/replicationConnection.ts — listener registration for
   onUpdatedTable/onRemovedDB happens inside an async .then() chain, and
   forEachReplicatedDatabase registration happens after `await
   authorization` now that onWSMessage is async. If the WS closes before
   either path attaches its ws.on('close', cleanup) handler, that handler
   is registered too late: the close event has already fired and the
   cleanup never runs. Use the wsClosed flag (from the previous commit)
   to detect this race synchronously and remove the listeners
   immediately when it happens.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kriszyp kriszyp added the patch label May 14, 2026
@kriszyp kriszyp merged commit 573052e into main May 14, 2026
38 of 40 checks passed
kriszyp added a commit that referenced this pull request May 14, 2026
Addresses claude[bot] review comment on PR #147. After the WS closes,
messages already in flight on the messageProcessing chain would otherwise
keep running through onWSMessage, holding their source body Buffers alive
in closures until the consumer drains and calling ws.pause()/resume() on
an already-destroyed socket. Set a wsClosed flag in the 'close' handler
and short-circuit the chain.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kriszyp kriszyp deleted the fix/replication-receive-backpressure branch May 14, 2026 22:07
github-actions Bot pushed a commit that referenced this pull request May 14, 2026
Addresses claude[bot] review comment on PR #147. After the WS closes,
messages already in flight on the messageProcessing chain would otherwise
keep running through onWSMessage, holding their source body Buffers alive
in closures until the consumer drains and calling ws.pause()/resume() on
an already-destroyed socket. Set a wsClosed flag in the 'close' handler
and short-circuit the chain.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kriszyp added a commit that referenced this pull request May 15, 2026
…nish

receiveBlobs() pushed the raw saveBlob() promise into outstandingBlobsToFinish
and attached .catch separately for logging. The .catch returned a new promise
that nobody held, so the original promise in the array kept its rejected state.
When the end_txn onCommit path then did `await Promise.all(outstandingBlobsToFinish)`,
the rejection propagated up and out of onCommit as an uncaughtException — observed
in prod as ~35/sec ENOENT spam during a peer catch-up after restart.

Push the catch-handled promise ('tracked') into the array instead. Promise.all
now sees a fulfilled promise (the .catch resolves to undefined), the rejection
is already logged exactly once, and the .finally still removes the entry on
settle. Move `.blobId` and update the indexOf inside the finally to match.

PR #147 reduces the receive pressure that triggers this path, but doesn't fix
the rejection escape itself.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
github-actions Bot pushed a commit that referenced this pull request May 15, 2026
…nish

receiveBlobs() pushed the raw saveBlob() promise into outstandingBlobsToFinish
and attached .catch separately for logging. The .catch returned a new promise
that nobody held, so the original promise in the array kept its rejected state.
When the end_txn onCommit path then did `await Promise.all(outstandingBlobsToFinish)`,
the rejection propagated up and out of onCommit as an uncaughtException — observed
in prod as ~35/sec ENOENT spam during a peer catch-up after restart.

Push the catch-handled promise ('tracked') into the array instead. Promise.all
now sees a fulfilled promise (the .catch resolves to undefined), the rejection
is already logged exactly once, and the .finally still removes the entry on
settle. Move `.blobId` and update the indexOf inside the finally to match.

PR #147 reduces the receive pressure that triggers this path, but doesn't fix
the rejection escape itself.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kriszyp added a commit that referenced this pull request May 19, 2026
…lob save)

Two cluster integration tests covering the receive-side failure modes that took a
prod node off the cluster:

1. receiveBacklogMemory.test.mjs — guards PR #147's
   RECEIVE_EVENT_HIGH_WATER_MARK fix. Kills receiver B, bursts 40 transactions
   of 500 records each on A (each transaction = one WS message → 500 audit
   entries decoded), restarts B, samples memory while it catches up, asserts
   peak RSS < 1.5 GB and no ERR_WORKER_OUT_OF_MEMORY in the log.

2. blobSaveRejectionContainment.test.mjs — guards PR #149's contract that a
   rejected saveBlob promise is logged exactly once and never escapes onCommit
   as uncaughtException. Installs a fault-injection component on B only that
   monkey-patches fs.createWriteStream to fail every 7th /blobs/ write with
   ENOENT, drives Location-component blob traffic from A, asserts the
   "Blob save failed for ..." line appears but uncaughtException lines do not,
   and that liveness (a fresh write) still propagates after failures.

Adds shared helpers to clusterShared.mjs: readLog, waitForCatchUp,
getMemoryInfo, peakMemory. The fault-injection fixture lives at
integrationTests/cluster/fixture-blob-fail-injector/ and is opt-in via
HARPER_TEST_BLOB_FAIL_INTERVAL env var.

These exercise the same failure surface that affected wtk-ap-west-1 in May:
unbounded synchronous decode on receive, and blob save rejections escaping
the commit confirmation path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants