Skip to content

Persist the in-memory store on a standalone background cycle#1275

Merged
DZakh merged 16 commits into
claude/inspiring-mayer-XfmHefrom
claude/vibrant-ptolemy-onYTn
Jun 3, 2026
Merged

Persist the in-memory store on a standalone background cycle#1275
DZakh merged 16 commits into
claude/inspiring-mayer-XfmHefrom
claude/vibrant-ptolemy-onYTn

Conversation

@DZakh
Copy link
Copy Markdown
Member

@DZakh DZakh commented Jun 3, 2026

What

Moves DB persistence off the event-processing path into a standalone background cycle owned by InMemoryStore. Processing now records a batch in memory (commitBatch) and returns immediately; a single in-flight write loop drains queued batches and writes them to the db behind the processing frontier.

Why

Previously every batch did a synchronous db write inline in processEventBatch, so processing stalled on each write. Decoupling them lets handlers keep running while the previous batch persists, with backpressure to bound memory.

How it works

  • Processing frontier vs commit frontier. processedCheckpointId runs ahead in memory; committedCheckpointId trails as the cycle writes. New batches build on the processed frontier.
  • Reorg-threshold-aware writes. drainBatchRun merges the leading run of queued batches that share isInReorgThreshold into one transaction and leaves the rest queued, so a single write never mixes history-saving modes. isInReorgThreshold is carried on the batch.
  • Backpressure. awaitCapacity (called at the top of processEventBatch) blocks once the store holds more than keepLatestChangesLimit (100k) uncommitted entity changes plus unwritten batch items. It frees committed changes first, and only waits on a commit when a queued batch can actually free capacity (otherwise it proceeds, to avoid a rollback-diff deadlock).
  • Chain metadata is folded into the batch transaction (stale-diff against the last persisted snapshot) instead of going through its own throttler, so it never races the batch write.
  • Effect cache stays warm during an in-flight write via a pendingDict swap.
  • Flush points. Exit and rollback await flush so the db reflects everything processed before they proceed.

Error handling

A background write failure is surfaced immediately through a single onError handler (log + exit), never deferred to a later batch. onError is held once on GlobalState; the ErrorExit action delegates to it, and the store calls it directly on a write failure. A hasFailedWrite flag stops the write loop and prevents capacity waits from deadlocking after a failure.

Tests

pnpm rescript clean; pnpm vitest run green (568 passed, 4 skipped). MockIndexer updated to drive the background cycle (flush, idle detection, async restart); WriteRead_test covers dropCommittedChanges.

https://claude.ai/code/session_01Taw9xnp2tLPUvHiW1BSumS


Generated by Claude Code

Summary by CodeRabbit

  • Refactor

    • Restructured internal write cycles for improved batch persistence and error handling.
    • Enhanced backpressure coordination and capacity management.
    • Increased in-memory change tracking capacity.
  • Tests

    • Updated test suite to reflect revised processing behavior and refactored store architecture.

claude added 13 commits June 2, 2026 12:54
Decouple the database write from batch processing. Processing now only
updates the in-memory store and continues; a persistence cycle owned by
the in-memory store drains changes to Postgres on its own.

- Split the checkpoint pointer into committedCheckpointId (last persisted
  to db) and processedCheckpointId (in-memory frontier). createBatch keys
  off processedCheckpointId; history retention still keys off committed.
- commitBatch accumulates batch metadata and triggers a single-writer
  background loop (strictly one write in flight, overlapping processing).
- Snapshot rawEvents/effects/entity changes synchronously at write start so
  the in-memory store is never reset before its changes are committed;
  effect outputs being written stay readable via a pending dict.
- Capacity gate (50k changes) before each batch: drop committed changes,
  else await a commit.
- Drain the cycle before a rollback and flush it before a successful exit.
- Serialize chain-metadata writes with batch writes to avoid concurrent
  updates to the chains table.
- MockIndexer awaits the full write (and settles) before returning.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
- Await in-memory store capacity before starting the batch timer.
- Drop the redundant comment over commitBatch.
- Remove db-write duration from processing metrics; the write now happens
  off the processing path in the in-memory store cycle.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
Persist chain metadata from the persistence cycle instead of a separate
throttled write. Because the cycle is the single db writer, the metadata
write no longer races batch writes on the chains table, so the throttler
and the serializeDbWrite mutex are both removed.

Also make the effect table's pendingDict always present instead of
optional, for simplicity.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
Revert the in-cycle chain-metadata write back to a throttled, separate
setChainMeta, serialized through the store's serializeDbWrite so it never
overlaps a background batch write on the chains table.

Also replace drainForRollback with flush - awaiting the write cycle
already drains all pending batches, so the explicit resets were redundant.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
- Persist chain metadata as part of the batch write transaction instead of
  a separate throttled write. The store keeps current vs committed metadata
  and only the stale per-chain diff is folded into writeBatch, so metadata
  never races the batch write and the throttler/serializeDbWrite are gone.
- Make persistence and config immutable creation params of InMemoryStore
  instead of mutable fields set per batch.
- Stop the ProcessEventBatch loop once an exit is decided, so the async exit
  flush doesn't let further batches process (fixes the auto-exit smoke test
  processing past the first event block).

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
Move isInReorgThreshold onto Batch.t (set at creation from the chain
manager) instead of passing it separately into commitBatch.

The persistence cycle no longer merges all queued batches blindly. It
drains the leading run of processed batches that share isInReorgThreshold
and writes only those, leaving the rest for the next write. Entity changes
are snapshotted up to the run boundary so a single write never mixes
history-saving modes (avoids over-saving history across the threshold
transition).

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
- persistence and config are now non-optional fields of InMemoryStore.make.
  The in-memory-only test helper supplies a shared default persistence.
- Drive the write cycle off processedBatches being non-empty, so
  drainBatchRun is never called with an empty array.
- drainBatchRun now splits the run and accumulates checkpoints/progress in a
  single pass instead of one forEach plus five map+concatMany.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
- awaitCapacity only waits for a commit when there is a queued batch to
  free capacity. A large rollback diff is staged without a batch, so
  waiting on it would deadlock; let processing proceed instead.
- Remove resetButKeepLatestChanges/resetButKeepLoadedFromDbChanges, dead
  since the cycle uses snapshotChanges/dropCommittedChanges. Replace the
  obsolete unit test with one for dropCommittedChanges.
- Remove the now-unused chain-metadata throttle env var.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt
…via onError

- keepLatestChangesLimit 50k -> 100k, now counts queued batch items alongside
  entity changes so a low-entity/high-item workload can't outrun persistence.
- InMemoryStore.make takes a required onError callback; a failed background
  write reports through it immediately instead of being thrown at the next
  batch's awaitCapacity. Main wires it to dispatch ErrorExit.
- awaitCapacity/flush no longer rethrow persistenceError; they stop draining
  since onError owns surfacing the failure.
The stored exn was never read back - it's handed straight to onError at the
failure site. The field only gates the write loop, so a plain bool says what
it is.
…gnoring

These stores never run the persistence cycle, so onError firing means a test
is wired wrong - log and raise rather than swallow it.
Hold one onError callback (log + exit) on GlobalState and share it with the
in-memory store. The store calls it directly on a background write failure
instead of dispatching ErrorExit, and the ErrorExit action delegates to the
same callback rather than inlining its own log + exit.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 3, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b1f8ec4e-829a-4160-a03f-2452a0a566a4

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR refactors the batch persistence model from synchronous writes to an async queued write loop. It adds explicit processed vs committed checkpoint tracking, integrates configurable error handling, and stageable chain metadata. The persistence contract is extended with optional chain-metadata parameters, and the test infrastructure is updated to use the real InMemoryStore implementation.

Changes

Batch Persistence and Write-Loop Refactor

Layer / File(s) Summary
Batch structure and checkpoint tracking
packages/envio/src/Batch.res, packages/envio/src/ChainManager.res
Batch t record gains isInReorgThreshold flag; batch factory methods propagate this flag; ChainManager.createBatch switches from committedCheckpointId to processedCheckpointId for checkpoint calculation.
InMemoryStore write-loop architecture
packages/envio/src/InMemoryStore.res
Complete restructuring to maintain processed/committed checkpoints, queue batches for async write loops, track effect cache pending state, manage chain-metadata diffs, and coordinate commit signaling; removes synchronous writeBatch in favor of commitBatch and write-loop orchestration.
Persistence interface and storage implementations
packages/envio/src/Persistence.res, packages/envio/src/PgStorage.res, packages/envio/src/TestIndexerProxyStorage.res
Extends writeBatch contract with optional ~chainMetaData parameter; PgStorage and test-proxy implementations accept and forward metadata through transactions.
EventProcessing refactoring and batch commit
packages/envio/src/EventProcessing.res
Removes isInReorgThreshold parameter; replaces direct database writes with InMemoryStore.commitBatch(); adds backpressure via awaitCapacity; simplifies metrics to exclude write duration.
InMemoryTable checkpoint management
packages/envio/src/InMemoryTable.res
Marks entity state fields as mutable; adds snapshotChanges to extract deltas across checkpoint ranges; adds dropCommittedChanges to remove persisted changes and reset indices.
GlobalState orchestration and error handling
packages/envio/src/GlobalState.res, packages/envio/src/Env.res
Integrates configurable onError callback; removes chain-metadata throttler; refactors updateChainMetadataTable to stage metadata in InMemoryStore; updates exit-check and rollback logic to flush store; switches batch creation to processedCheckpointId.
Central error handling and LoadLayer integration
packages/envio/src/Main.res, packages/envio/src/LoadLayer.res
Establishes shared onError callback that logs and exits; threads it into InMemoryStore and GlobalState; updates LoadLayer effect lookups to use InMemoryStore's Option-based API.
Test infrastructure and MockIndexer updates
scenarios/test_codegen/test/helpers/MockIndexer.res, scenarios/test_codegen/test/*.res
Updates MockIndexer to construct real InMemoryStore with shared persistence; converts restart to async with store flush; replaces polling wait with flush-and-idle checks; updates test setups and replaces reset-behavior test with checkpoint-drop test.

Sequence Diagram(s)

sequenceDiagram
  participant Batch
  participant InMemoryStore
  participant WriteLoop
  participant Persistence
  Batch->>InMemoryStore: commitBatch(batch)
  InMemoryStore->>InMemoryStore: enqueue batch to processedBatches
  InMemoryStore->>InMemoryStore: update processedCheckpointId
  InMemoryStore->>WriteLoop: kick write loop
  WriteLoop->>InMemoryStore: drainBatchRun()
  WriteLoop->>InMemoryStore: snapshotEffects()
  WriteLoop->>InMemoryStore: snapshotChanges per entity
  WriteLoop->>Persistence: writeBatch(entities, effects, chainMeta)
  Persistence->>InMemoryStore: signal write complete
  InMemoryStore->>InMemoryStore: update committedCheckpointId
  InMemoryStore->>InMemoryStore: wakeCommitWaiters()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • enviodev/hyperindex#1076: Related batch persistence refactoring that extends Persistence.storage.writeBatch interface and modifies InMemoryStore batching behavior with separate metadata plumbing.

Suggested reviewers

  • JonoPrest
  • JasoonS
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title clearly and specifically describes the main architectural change: moving database persistence from the synchronous event-processing path into a standalone background write cycle.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
packages/envio/src/InMemoryStore.res (1)

146-152: 💤 Low value

JSON.stringifyAny comparison may be sensitive to key ordering.

JavaScript object serialization order can vary depending on how objects are constructed. If meta and prev have identical values but different key insertion orders, this comparison could incorrectly detect a change. Consider comparing field-by-field or using a stable serialization if chain metadata structure is known.

This is low-risk if chain metadata objects are always constructed the same way.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/envio/src/InMemoryStore.res` around lines 146 - 152, The
JSON.stringifyAny comparison between meta and prev can yield false positives due
to key ordering; replace the stringified comparison in the changed calculation
(the switch using inMemoryStore.committedChainMeta and
Utils.Dict.dangerouslyGetNonOption) with a deterministic deep equality check or
a stable-serialization that sorts object keys first. Concretely, implement or
reuse a structural equality helper (e.g., Utils.deepEqual or similar) and use
Utils.deepEqual(meta, prev) (or compare stableSerialize(meta) !==
stableSerialize(prev)) instead of JSON.stringifyAny(meta) !=
JSON.stringifyAny(prev) so changes are detected by value not key insertion
order.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@packages/envio/src/InMemoryStore.res`:
- Around line 146-152: The JSON.stringifyAny comparison between meta and prev
can yield false positives due to key ordering; replace the stringified
comparison in the changed calculation (the switch using
inMemoryStore.committedChainMeta and Utils.Dict.dangerouslyGetNonOption) with a
deterministic deep equality check or a stable-serialization that sorts object
keys first. Concretely, implement or reuse a structural equality helper (e.g.,
Utils.deepEqual or similar) and use Utils.deepEqual(meta, prev) (or compare
stableSerialize(meta) !== stableSerialize(prev)) instead of
JSON.stringifyAny(meta) != JSON.stringifyAny(prev) so changes are detected by
value not key insertion order.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e3bb2416-6732-4b05-8b01-2d8a26539c82

📥 Commits

Reviewing files that changed from the base of the PR and between 67fbdbb and d47eb35.

📒 Files selected for processing (18)
  • packages/envio/src/Batch.res
  • packages/envio/src/ChainManager.res
  • packages/envio/src/Env.res
  • packages/envio/src/EventProcessing.res
  • packages/envio/src/GlobalState.res
  • packages/envio/src/InMemoryStore.res
  • packages/envio/src/InMemoryTable.res
  • packages/envio/src/LoadLayer.res
  • packages/envio/src/Main.res
  • packages/envio/src/Persistence.res
  • packages/envio/src/PgStorage.res
  • packages/envio/src/TestIndexerProxyStorage.res
  • scenarios/test_codegen/test/ChainManager_test.res
  • scenarios/test_codegen/test/E2E_test.res
  • scenarios/test_codegen/test/EventOrigin_test.res
  • scenarios/test_codegen/test/LoadLayer_test.res
  • scenarios/test_codegen/test/WriteRead_test.res
  • scenarios/test_codegen/test/helpers/MockIndexer.res
💤 Files with no reviewable changes (1)
  • packages/envio/src/Env.res

@DZakh DZakh changed the base branch from main to claude/inspiring-mayer-XfmHe June 3, 2026 10:50
…to claude/vibrant-ptolemy-onYTn

# Conflicts:
#	packages/envio/src/GlobalState.res
#	packages/envio/src/InMemoryStore.res
#	scenarios/test_codegen/test/E2E_test.res
#	scenarios/test_codegen/test/helpers/MockIndexer.res
…to claude/vibrant-ptolemy-onYTn

# Conflicts:
#	packages/envio/src/Batch.res
#	packages/envio/src/EventProcessing.res
#	packages/envio/src/GlobalState.res
#	packages/envio/src/InMemoryStore.res
#	packages/envio/src/InMemoryTable.res
#	packages/envio/src/Main.res
#	packages/envio/src/Persistence.res
#	scenarios/test_codegen/test/E2E_test.res
#	scenarios/test_codegen/test/helpers/MockIndexer.res
The merged store signature makes onError required; the in-memory test
store raises on any unexpected persistence write.

https://claude.ai/code/session_01Taw9xnp2tLPUvHiW1BSumS
@DZakh DZakh merged commit 3f0556c into claude/inspiring-mayer-XfmHe Jun 3, 2026
7 of 8 checks passed
@DZakh DZakh deleted the claude/vibrant-ptolemy-onYTn branch June 3, 2026 12:05
DZakh added a commit that referenced this pull request Jun 3, 2026
* Persist batches concurrently with processing

Make the in-memory store fire its batch write into a single-slot
pendingPersistence and return control, so the next batch can process
while the previous one writes. At most one write is in flight: the next
write awaits the prior before firing, keeping writes in batch order.

- InMemoryStore: split writeBatch into a synchronous prepare (snapshot +
  store reset + committedCheckpointId advance) and a fired storage write.
  Concurrency is gated on keepLatestChanges; when the store drops its
  latest changes the write is awaited inline so later DB reads stay
  consistent. flushPendingPersistence awaits the in-flight write and is
  called from prepareRollbackDiff before clearing the cache.
- LoadLayer: serve effect cache hits from the in-flight write's effects
  snapshot before reading the not-yet-committed DB rows.
- GlobalState: flush the pending write before rollback DB reads and
  before the success-exit paths.
- MockIndexer: getBatchWritePromise awaits the in-flight write.

Known pending: one realtime-ordering E2E test ("Live source should not
participate in initial height fetch but should after sync") asserts a
fetch-vs-write interleaving that legitimately shifts now that the write
no longer blocks EventBatchProcessed. Awaiting decision on how to update.

https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1

* Update realtime height-race test for concurrent batch writes

Reaching head now flips isRealtime before the first waitForNewBlock
since the batch write no longer blocks EventBatchProcessed, so the
first race already runs in realtime mode (Live primary, Sync secondary).

https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1

* Flush pending write on mock restart; fix effect read-through across schema change

Mock indexer restart now awaits the in-memory store's in-flight write
before starting the new indexer on the same DB, so the old and new
writes don't race.

Fix the effect cache read-through to only serve from the in-flight
write's snapshot when it's the same effect instance. A different effect
sharing the name (e.g. an updated output schema) must go through the
DB-load path, which re-validates and invalidates stale outputs; serving
the raw pending value bypassed invalidation and fed handlers a stale
result.

https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1

* Chain metadata persistence via throttled idle-flush on the background cycle (#1276)

* Make in-memory store persistence a standalone background cycle

Decouple the database write from batch processing. Processing now only
updates the in-memory store and continues; a persistence cycle owned by
the in-memory store drains changes to Postgres on its own.

- Split the checkpoint pointer into committedCheckpointId (last persisted
  to db) and processedCheckpointId (in-memory frontier). createBatch keys
  off processedCheckpointId; history retention still keys off committed.
- commitBatch accumulates batch metadata and triggers a single-writer
  background loop (strictly one write in flight, overlapping processing).
- Snapshot rawEvents/effects/entity changes synchronously at write start so
  the in-memory store is never reset before its changes are committed;
  effect outputs being written stay readable via a pending dict.
- Capacity gate (50k changes) before each batch: drop committed changes,
  else await a commit.
- Drain the cycle before a rollback and flush it before a successful exit.
- Serialize chain-metadata writes with batch writes to avoid concurrent
  updates to the chains table.
- MockIndexer awaits the full write (and settles) before returning.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Address PR review on EventProcessing

- Await in-memory store capacity before starting the batch timer.
- Drop the redundant comment over commitBatch.
- Remove db-write duration from processing metrics; the write now happens
  off the processing path in the in-memory store cycle.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Fold chain-metadata write into the in-memory store cycle

Persist chain metadata from the persistence cycle instead of a separate
throttled write. Because the cycle is the single db writer, the metadata
write no longer races batch writes on the chains table, so the throttler
and the serializeDbWrite mutex are both removed.

Also make the effect table's pendingDict always present instead of
optional, for simplicity.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Write chain metadata via a separate throttler again

Revert the in-cycle chain-metadata write back to a throttled, separate
setChainMeta, serialized through the store's serializeDbWrite so it never
overlaps a background batch write on the chains table.

Also replace drainForRollback with flush - awaiting the write cycle
already drains all pending batches, so the explicit resets were redundant.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Fold chain metadata into the batch write; tidy store fields

- Persist chain metadata as part of the batch write transaction instead of
  a separate throttled write. The store keeps current vs committed metadata
  and only the stale per-chain diff is folded into writeBatch, so metadata
  never races the batch write and the throttler/serializeDbWrite are gone.
- Make persistence and config immutable creation params of InMemoryStore
  instead of mutable fields set per batch.
- Stop the ProcessEventBatch loop once an exit is decided, so the async exit
  flush doesn't let further batches process (fixes the auto-exit smoke test
  processing past the first event block).

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Carry isInReorgThreshold on the batch and split writes on its boundary

Move isInReorgThreshold onto Batch.t (set at creation from the chain
manager) instead of passing it separately into commitBatch.

The persistence cycle no longer merges all queued batches blindly. It
drains the leading run of processed batches that share isInReorgThreshold
and writes only those, leaving the rest for the next write. Entity changes
are snapshotted up to the run boundary so a single write never mixes
history-saving modes (avoids over-saving history across the threshold
transition).

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Make persistence/config required; single-pass batch-run drain

- persistence and config are now non-optional fields of InMemoryStore.make.
  The in-memory-only test helper supplies a shared default persistence.
- Drive the write cycle off processedBatches being non-empty, so
  drainBatchRun is never called with an empty array.
- drainBatchRun now splits the run and accumulates checkpoints/progress in a
  single pass instead of one forEach plus five map+concatMany.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Review fixes: avoid capacity deadlock and remove dead code

- awaitCapacity only waits for a commit when there is a queued batch to
  free capacity. A large rollback diff is staged without a batch, so
  waiting on it would deadlock; let processing proceed instead.
- Remove resetButKeepLatestChanges/resetButKeepLoadedFromDbChanges, dead
  since the cycle uses snapshotChanges/dropCommittedChanges. Replace the
  obsolete unit test with one for dropCommittedChanges.
- Remove the now-unused chain-metadata throttle env var.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Persist chain metadata on a throttled idle path with delta tracking

Stage chain metadata as a per-chain dirty delta computed at stage time via
structural comparison instead of a JSON-stringify diff on every write. A batch
write folds the delta into its transaction for free; when no batch is flowing,
a throttled standalone upsert flushes it, restoring idle freshness while keeping
all writes serialized through the single write loop.

* Track chain-meta dirtiness with a bool instead of a delta dict

setChainMeta writes a single unnest upsert regardless of chain count, so a
per-chain delta bought nothing at the db level. Replace dirtyChainMeta with a
flag and write a shallow-copied snapshot of the latest per-chain metadata.

* Defer Throttler execution to setImmediate

Run scheduled functions on the next setImmediate instead of synchronously
inside schedule, so work queued before them (e.g. a batch task) runs first.
This makes chain-metadata fold into the imminent batch write by default and
replaces the startThrottled priming.

* Tighten comments; share setImmediate binding via NodeJs

Condense the persistence-cycle and chain-metadata comments to one line where
they earn it, and move the duplicated setImmediate external into NodeJs so
Throttler and GlobalStateManager share a single binding.

* Reuse NodeJs.setImmediate in Throttler test; retry timing tests

Drop the duplicate setImmediate external from the test and reuse the shared
NodeJs binding. Add retry to the two interval-timing tests, matching the
others, since deferred execution adds macrotask jitter.

---------

Co-authored-by: Claude <noreply@anthropic.com>

* Persist the in-memory store on a standalone background cycle (#1275)

* Make in-memory store persistence a standalone background cycle

Decouple the database write from batch processing. Processing now only
updates the in-memory store and continues; a persistence cycle owned by
the in-memory store drains changes to Postgres on its own.

- Split the checkpoint pointer into committedCheckpointId (last persisted
  to db) and processedCheckpointId (in-memory frontier). createBatch keys
  off processedCheckpointId; history retention still keys off committed.
- commitBatch accumulates batch metadata and triggers a single-writer
  background loop (strictly one write in flight, overlapping processing).
- Snapshot rawEvents/effects/entity changes synchronously at write start so
  the in-memory store is never reset before its changes are committed;
  effect outputs being written stay readable via a pending dict.
- Capacity gate (50k changes) before each batch: drop committed changes,
  else await a commit.
- Drain the cycle before a rollback and flush it before a successful exit.
- Serialize chain-metadata writes with batch writes to avoid concurrent
  updates to the chains table.
- MockIndexer awaits the full write (and settles) before returning.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Address PR review on EventProcessing

- Await in-memory store capacity before starting the batch timer.
- Drop the redundant comment over commitBatch.
- Remove db-write duration from processing metrics; the write now happens
  off the processing path in the in-memory store cycle.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Fold chain-metadata write into the in-memory store cycle

Persist chain metadata from the persistence cycle instead of a separate
throttled write. Because the cycle is the single db writer, the metadata
write no longer races batch writes on the chains table, so the throttler
and the serializeDbWrite mutex are both removed.

Also make the effect table's pendingDict always present instead of
optional, for simplicity.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Write chain metadata via a separate throttler again

Revert the in-cycle chain-metadata write back to a throttled, separate
setChainMeta, serialized through the store's serializeDbWrite so it never
overlaps a background batch write on the chains table.

Also replace drainForRollback with flush - awaiting the write cycle
already drains all pending batches, so the explicit resets were redundant.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Fold chain metadata into the batch write; tidy store fields

- Persist chain metadata as part of the batch write transaction instead of
  a separate throttled write. The store keeps current vs committed metadata
  and only the stale per-chain diff is folded into writeBatch, so metadata
  never races the batch write and the throttler/serializeDbWrite are gone.
- Make persistence and config immutable creation params of InMemoryStore
  instead of mutable fields set per batch.
- Stop the ProcessEventBatch loop once an exit is decided, so the async exit
  flush doesn't let further batches process (fixes the auto-exit smoke test
  processing past the first event block).

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Carry isInReorgThreshold on the batch and split writes on its boundary

Move isInReorgThreshold onto Batch.t (set at creation from the chain
manager) instead of passing it separately into commitBatch.

The persistence cycle no longer merges all queued batches blindly. It
drains the leading run of processed batches that share isInReorgThreshold
and writes only those, leaving the rest for the next write. Entity changes
are snapshotted up to the run boundary so a single write never mixes
history-saving modes (avoids over-saving history across the threshold
transition).

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Make persistence/config required; single-pass batch-run drain

- persistence and config are now non-optional fields of InMemoryStore.make.
  The in-memory-only test helper supplies a shared default persistence.
- Drive the write cycle off processedBatches being non-empty, so
  drainBatchRun is never called with an empty array.
- drainBatchRun now splits the run and accumulates checkpoints/progress in a
  single pass instead of one forEach plus five map+concatMany.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Review fixes: avoid capacity deadlock and remove dead code

- awaitCapacity only waits for a commit when there is a queued batch to
  free capacity. A large rollback diff is staged without a batch, so
  waiting on it would deadlock; let processing proceed instead.
- Remove resetButKeepLatestChanges/resetButKeepLoadedFromDbChanges, dead
  since the cycle uses snapshotChanges/dropCommittedChanges. Replace the
  obsolete unit test with one for dropCommittedChanges.
- Remove the now-unused chain-metadata throttle env var.

https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt

* Raise capacity limit to 100k incl. batch items; surface write errors via onError

- keepLatestChangesLimit 50k -> 100k, now counts queued batch items alongside
  entity changes so a low-entity/high-item workload can't outrun persistence.
- InMemoryStore.make takes a required onError callback; a failed background
  write reports through it immediately instead of being thrown at the next
  batch's awaitCapacity. Main wires it to dispatch ErrorExit.
- awaitCapacity/flush no longer rethrow persistenceError; they stop draining
  since onError owns surfacing the failure.

* Replace persistenceError option<exn> with a hasFailedWrite bool

The stored exn was never read back - it's handed straight to onError at the
failure site. The field only gates the write loop, so a plain bool says what
it is.

* Surface unexpected writes from in-memory-only test store instead of ignoring

These stores never run the persistence cycle, so onError firing means a test
is wired wrong - log and raise rather than swallow it.

* Route fatal errors through a single onError handler

Hold one onError callback (log + exit) on GlobalState and share it with the
in-memory store. The store calls it directly on a background write failure
instead of dispatching ErrorExit, and the ErrorExit action delegates to the
same callback rather than inlining its own log + exit.

* Tighten comments

* Pass required onError to InMemoryStore in ChainMeta_test

The merged store signature makes onError required; the in-memory test
store raises on any unexpected persistence write.

https://claude.ai/code/session_01Taw9xnp2tLPUvHiW1BSumS

---------

Co-authored-by: Claude <noreply@anthropic.com>

* Defer raw event creation and deep NUL stripping to batch write (#1278)

* Build raw_events in PgStorage from batch items

Move raw event row construction out of the per-event processing path and
into PgStorage.writeBatch, which now derives the rows by iterating the
batch items being written. Carry batch items through drainBatchRun so they
reach the write, and drop the rawEvents accumulator and the ~rawEvents
parameter from the storage interface.

* Cover raw_events in the e2e indexer test

Enable `raw_events: true` in the e2e_test config and assert the indexer
writes one raw_events row per processed event, with the decoded params,
src address and transaction fields matching the known first event.

* Sanitize NUL bytes in raw_events writes (#1195)

A NUL byte in event params made the raw_events jsonb INSERT fail with
22P05, poisoning the batch transaction and aborting unrelated entity
writes. Route the raw_events write through the same escape-and-retry path
used for entities: on a Postgres encoding error, escape the offending
table and retry. The stripper now recurses into nested objects/arrays so
a NUL buried inside an event param object (or a json entity field) is
removed, and the classifier also recognizes the jsonb-specific error
message in addition to the text-column one.

---------

Co-authored-by: Claude <noreply@anthropic.com>

* Track effect cache entries by checkpoint for commit-gated eviction (#1279)

* Track effect cache entries by checkpoint for commit-gated eviction

Store each effect cache entry as a Change stamped with the per-item
checkpointId (mirroring entity changes) instead of a raw output in a dict
that was wiped after every write. Committed entries are now reclaimed by
dropCommittedEffects in awaitCapacity, and effect entries count toward the
in-memory changes limit. cache:false outputs are stored in memory but not
persisted, and are evictable (re-run on a later miss). Removes pendingDict
and the per-write dict swap.

Make the changes limit configurable via ENVIO_MAX_IN_MEMORY_CHANGES.

Known open item: E2E "Track effects in prom metrics" fails. It swaps an
effect's output schema mid-run (no restart) and expects the warm in-memory
entry to be re-validated/invalidated. Under the new model committed entries
stay warm and are only re-validated on a db reload (i.e. across restarts,
the real-world schema-change path). Pending a decision on adapting the test.

* Test effect-cache schema invalidation via the single restart

Under commit-gated eviction a committed effect entry stays warm in memory,
so a mid-run output-schema change isn't re-validated. Schema changes are
code changes that take effect on restart, where the db cache is reloaded and
re-validated. Restructure the test so both cache entries are written before
the existing restart, then exercise the new schema in the post-restart batch
(avoids a second restart, which collides on the checkpoint pkey).

* Rename env to ENVIO_IN_MEMORY_OBJECTS_TARGET; inline mapChangeToEffectOutput

* Evict committed changes before db-loaded ones in awaitCapacity

Tiered backpressure: drop our committed writes first (cheap to re-derive),
then db-loaded entries, and only then wait for a commit. Applies to both
entity and effect tables via keepLoadedFromDb.

* Fix doc comment placement for dropCommitted/awaitCapacity

---------

Co-authored-by: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants