Skip to content

Conversation

@robacourt
Copy link
Contributor

Summary

Fixes #3787 - addresses the snapshot/replication race condition in the Materializer that caused "Key already exists" crashes.

The race condition

The Materializer subscribed to the Consumer before reading from storage:

# BEFORE: In handle_continue(:start_materializer, ...)
Consumer.subscribe_materializer(stack_id, shape_handle, self())  # <- Subscribes first
{:noreply, state, {:continue, {:read_stream, shape_storage}}}    # <- Then reads ALL storage

During the window between subscribing and reading, any changes that arrive via Consumer.new_changes() would be delivered to the Materializer. If those changes included records that were also in the snapshot being read, the Materializer received duplicates and crashed.

Production example (maxwell instance, 27 Jan 2026)

18:10:10.437 [error] GenServer Materializer "97489818-..." terminating
** (RuntimeError) Key "public"."offers"/"d3c8d8a5-5060-4a36-a67d-240de0c95a88" already exists

The transaction that triggered it:

%Electric.Replication.Changes.NewRecord{
  relation: {"public", "offers"},
  record: %{"id" => "d3c8d8a5-5060-4a36-a67d-240de0c95a88"},
  key: "\"public\".\"offers\"/\"d3c8d8a5-5060-4a36-a67d-240de0c95a88\"",
  move_tags: ["e12422d3af57a36d01a50b4645a517e4"]  # <- Move-in event
}

The record was already in the snapshot (matched via is_template = true OR the subquery), AND was delivered via replication with move_tags as a move-in event.

The fix

Consumer now returns its current offset when a Materializer subscribes. The Materializer reads storage only up to that offset, not beyond. Changes after that offset will be delivered via new_changes messages, ensuring each change is delivered exactly once.

# AFTER: Consumer returns offset on subscription
def handle_call({:subscribe_materializer, pid}, _from, state) do
  {:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true}, ...}
end

# AFTER: Materializer uses offset to bound storage reads
{:ok, subscribed_offset} = Consumer.subscribe_materializer(stack_id, shape_handle, self())
{:noreply, %{state | subscribed_offset: subscribed_offset}, {:continue, {:read_stream, ...}}}

# In handle_continue({:read_stream, ...})
{:ok, offset, stream} = get_stream_up_to_offset(state.offset, state.subscribed_offset, storage)

Test plan

  • Added test verifying offset coordination prevents duplicates
  • All existing Materializer tests pass (35 tests)
  • Full test suite passes (1334 tests)

🤖 Generated with Claude Code

The Materializer subscribed to the Consumer BEFORE reading from storage,
creating a window where the same record could be delivered twice (via
storage read AND via new_changes message). This caused "Key already exists"
crashes in production.

The fix: Consumer returns its current offset on subscription, and
Materializer reads storage only up to that offset. Changes after that
offset are delivered via new_changes, ensuring no duplicates.

Fixes #3787

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@codecov
Copy link

codecov bot commented Jan 28, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 87.34%. Comparing base (2f11f91) to head (f0bff55).
⚠️ Report is 2 commits behind head on main.
✅ All tests successful. No failed tests found.

Additional details and impacted files
@@             Coverage Diff             @@
##             main    #3794       +/-   ##
===========================================
+ Coverage   75.75%   87.34%   +11.58%     
===========================================
  Files          11       23       +12     
  Lines         693     2039     +1346     
  Branches      172      542      +370     
===========================================
+ Hits          525     1781     +1256     
- Misses        167      256       +89     
- Partials        1        2        +1     
Flag Coverage Δ
packages/experimental 87.73% <ø> (ø)
packages/react-hooks 86.48% <ø> (ø)
packages/start 82.83% <ø> (ø)
packages/typescript-client 93.31% <ø> (?)
packages/y-electric 56.05% <ø> (ø)
typescript 87.34% <ø> (+11.58%) ⬆️
unit-tests 87.34% <ø> (+11.58%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@icehaunter icehaunter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it

@robacourt robacourt merged commit 1c20fac into main Jan 28, 2026
46 checks passed
@robacourt robacourt deleted the rob/fix-materializer-race branch January 28, 2026 17:06
@github-actions
Copy link
Contributor

This PR has been released! 🚀

The following packages include changes from this PR:

  • @core/sync-service@1.4.2

Thanks for contributing to Electric!

Copy link
Member

@alco alco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reviewing this PR after it has already been merged. Wanted to just see the fix for the bug but noticed issues in the test code. I think it's criticil we make changes in our process or rely on human judgement to prevent occasional agentic sloppiness from taking root in our codebase.

Storage.mark_snapshot_as_started(storage)

# Write a record to storage at LogOffset.first()
alias Electric.Replication.LogOffset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alias should never be interspersed with regular code inside execution units such functions/tests. Putting it at the top of the module is best. In the worst case, it should be at the top of the describe block.

test_pid = self()

consumer =
spawn(fn ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such ad-hoc mocking is really difficult to parse for a human. This one-off process encodes a specific sequence of messages, relying on the underlying implementation of GenServer calls. It has no relation to the high-level code inside consumer.ex. There's no way to ensure that it stays in sync with that implementation. I'm not sure why has this approach even been considered for this particular test?

To highlight a more general problem:

I've noticed this module has a few instances of such receive expressions. At the same time, there's the respond_to_call() local function that abstracts over some instances of receive.

We should not allow such slip ups because they will infect the test code and will be difficult to remove after the fact. Good old test-only module or a mocked behaviour is a better way to write custom functionality for a specific component in the test env.

MapSet.new([10])

{:error, reason} ->
flunk("Materializer failed to start: #{inspect(reason)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed Claude's propensity to litter explicit flunk calls in the test code it generates. Kind of odd to have that in this case, a simple {:ok, pid} = Materializer.start_link(...) would have been perfectly fine.

# We use a mocked Consumer that returns an offset, and the Materializer
# should only read storage up to that offset, preventing duplicates.

Process.flag(:trap_exit, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This a smell when there's no good reason for making the test process trap exits or there is a reason but no explanation of it in the code.

@balegas
Copy link
Contributor

balegas commented Feb 2, 2026

That's valuable feedback. We want to maintain an high quality bar in our code.

You mention specific code quality patterns. The agent must know we want to follow these patterns. My suggestion is that we should turn this into code quality guidelines that the agent/the reviewer agent must check whenever we create a pr. Why not creating that file or editing any coding style guidance we already have with your observations? Everytime we find something we record it

@alco
Copy link
Member

alco commented Feb 2, 2026

@balegas 100% agree. But I wanted to flag this up as the responsibility of the author of the PR.

We're still ways off from the workflow where all code can be written and reviewed by agents alone. Until then, we humans must remain in the loop and gatekeep any nonsense that agents try to add to the codebase. I think it's only fair for the human operator who delegates code generation to agents to make the necessary adjustments to the codified rules that we maintain to ensure code quality. Conversely, it wouldn't have been fair if I opened some PRs and expecting people who will be reviewing them to notice code that doesn't pass our quality bar and expect that they fix our rules for agents to try preventing this from happening again. The code already commited into Git must be fixed too, in addition to updating the rules for agents.

@balegas
Copy link
Contributor

balegas commented Feb 2, 2026

Developer and reviewer and making sure that our tooling does the work for us. Let's start with setting up the tooling

robacourt added a commit that referenced this pull request Feb 11, 2026
## Summary

Addresses code quality feedback from @alco's [review on
#3794](#3794):

- **Move `alias` to module top**: `alias Electric.Replication.LogOffset`
was interspersed in the test body. Moved to module-level aliases and
replaced all full-path references.
- **Replace ad-hoc mock consumer**: The spawned process with raw
`$gen_call` message handling was difficult to parse and had no relation
to the high-level consumer code. Replaced with the existing
`respond_to_call()` helper pattern.
- **Remove unnecessary `flunk`**: Replaced `case`/`flunk` with a simple
pattern match `{:ok, _pid} = Materializer.start_link(...)`.
- **Remove unexplained `Process.flag(:trap_exit, true)`**: No longer
needed after the refactor since the test expects success.

## Test plan

- [x] All 35 materializer tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ShapeLogCollector memory growth to 3.5GB causing OOM crash on maxwell

4 participants