Skip to content

fix(atomicity): wrap streams CRUD + pg_notify in one transaction#125

Merged
RAprogramm merged 1 commit into
mainfrom
124-stream-crud-atomicity
May 11, 2026
Merged

fix(atomicity): wrap streams CRUD + pg_notify in one transaction#125
RAprogramm merged 1 commit into
mainfrom
124-stream-crud-atomicity

Conversation

@RAprogramm
Copy link
Copy Markdown
Owner

Closes #124

Why

When the `streams` feature is on, generated CRUD methods previously did multiple round-trips on the pool with no coordination. Two real bugs:

  1. `update_method` race — `find_by_id` (old payload) and `UPDATE` (new state) ran as separate queries on the bare pool. Concurrent UPDATE between them → Updated event describes a transition that never happened.
  2. `create` / `delete` event loss — `pg_notify` was a separate round-trip after the DML committed. Crash / connection drop between the two silently dropped the event; row write persisted.

Fix

Wrap the DML and the `pg_notify` in one `sqlx::Transaction` whenever `streams` is on:

  • Postgres buffers `NOTIFY` per transaction and only broadcasts on commit — atomic with the DML.
  • `SELECT ... FOR UPDATE` locks the row for the transaction's lifetime, so `old` and `new` are guaranteed adjacent.

When `streams` is off, the generated method stays a single SQL statement — no perf regression for non-streams entities.

Changes

  • `sql/postgres/crud.rs::tx_wrapping(streams) -> (open, close, executor)` helper. Streams-on returns `let mut tx = self.begin().await?;`, `tx.commit().await?;`, `&mut *tx`. Streams-off returns empty fragments and `self`.
  • `create_method` / `delete_method` thread the helper through every branch (`ReturningMode::Full | Id | None | Custom` for create; soft and hard for delete).
  • `update_method` splits: streams-on takes `RETURNING *` always (the Updated event payload needs the full row anyway); streams-off keeps the historical match on `ReturningMode`.
  • `notify.rs` executor token is now `&mut tx` in all four `notify_`. `fetch_old_for_update` issues a direct `SELECT ... FOR UPDATE` against the transaction instead of routing through the pool-bound trait method.
  • `README.md` codeblock for the tracing log sample is tagged `text` so rustdoc no longer tries to compile it (side-effect of `lib.rs` doing `#![doc = include_str!("../README.md")]`).

Version bump

Crate Old New
entity-derive-impl 0.6.0 0.6.1
entity-derive 0.8.1 0.8.2

`entity-core` is unchanged.

Test plan

  • `cargo test --all-features` — 543 + 9 + 45 + 2 lib/integration + 2 trybuild pass
  • `cargo clippy --all-targets --all-features -- -D warnings` — clean
  • `cargo +nightly fmt --all -- --check` — clean
  • `cargo check` on examples/transactions, examples/full-app, examples/streams — clean
  • CI green incl. Codecov before merge

Out of scope (follow-up issues)

  • Aggregate-root `save` opens its own transaction but does not emit a `Created` event for streams entities — separate fix.
  • Hooks trait is generated but never invoked anywhere in CRUD — separate fix.

When the `streams` feature is enabled, generated CRUD methods previously
emitted 2–3 separate SQL round-trips on the pool with no coordination:

1. `update_method` race — `find_by_id` (read old) and `UPDATE` ran on the
   bare pool with no shared transaction. A concurrent UPDATE between the
   two could land, leaving the Updated event payload describing a
   transition that never actually happened.
2. `create_method` / `delete_method` event loss — `pg_notify` was a
   separate round-trip after the DML committed. A crash, connection drop,
   or pool error between commit and notify silently dropped the event;
   subscribers missed the change while the row write persisted.

Both classes of bug are eliminated by wrapping the DML and the notify in
one `sqlx::Transaction`. Postgres' `NOTIFY` is buffered per transaction
and broadcast only on commit, so:

- the new state and the notify commit atomically;
- on rollback the notify is discarded automatically;
- the `SELECT ... FOR UPDATE` row lock used to read the old payload now
  also blocks concurrent writers until the same transaction commits, so
  `old` and `new` are guaranteed adjacent states.

Changes:

- `sql/postgres/crud.rs::tx_wrapping(streams) -> (open, close, executor)`
  helper. Streams-on returns `let mut tx = self.begin().await?;`,
  `tx.commit().await?;`, and `&mut *tx`. Streams-off returns empty
  fragments and `self`, preserving the single-statement fast path for
  non-streams entities.
- `create_method` / `delete_method` thread the helper through every
  branch (`ReturningMode::Full`, `Id`, `None`, `Custom` for create;
  soft and hard for delete).
- `update_method` splits into two paths: streams-on takes the always-
  `RETURNING *` route because the Updated event payload requires the
  full row anyway; streams-off keeps the historical match on
  `ReturningMode`.
- `notify.rs` updates the executor token in every `notify_*` to
  `&mut *tx` and replaces `fetch_old_for_update` with a direct
  `SELECT ... FOR UPDATE` against the transaction instead of routing
  through the trait method on the pool.
- `README.md` codeblock for the tracing sample log line is tagged `text`
  so rustdoc no longer tries to compile it (a side-effect of `lib.rs`
  including the README via `include_str!`).

Bump (changed crates only):

- entity-derive-impl: 0.6.0 -> 0.6.1
- entity-derive: 0.8.1 -> 0.8.2

`entity-core` is unchanged.

Closes #124
@codecov
Copy link
Copy Markdown

codecov Bot commented May 11, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ All tests successful. No failed tests found.

📢 Thoughts on this report? Let us know!

@RAprogramm RAprogramm merged commit b36292f into main May 11, 2026
17 checks passed
@RAprogramm RAprogramm deleted the 124-stream-crud-atomicity branch May 11, 2026 09:32
RAprogramm added a commit that referenced this pull request May 11, 2026
…entities (#134)

`crates/entity-derive-impl/src/entity/sql/postgres/save.rs::save_method`
already wrapped the INSERT in a transaction (#118 era), but for entities
with both `aggregate_root` and `streams` enabled it never spliced
`pg_notify`. Subscribers missed every new aggregate-root insert silently,
even though the row write itself was atomic.

Splice `self.notify_created()` into the existing transaction body, after
the INSERT but before the commit. The notify executes against
`&mut *tx`, the same handle the INSERT uses, so Postgres only broadcasts
`Created` on commit and discards it on rollback — the same guarantee
already in place for `create_method` (#125).

When `streams` is off, `notify_created()` returns an empty TokenStream,
so non-streams aggregate roots stay one round-trip with no regression.

Tests (3 new lib tests in `save::tests`):

- `save_emits_pg_notify_when_streams_enabled`: combined attribute set
  produces a save body that contains `pg_notify` and executes it on
  `&mut *tx`.
- `save_omits_pg_notify_when_streams_disabled`: aggregate_root without
  streams does NOT emit `pg_notify` (perf regression guard).
- `save_is_empty_for_non_aggregate_root`: untouched control case —
  `save_method` returns empty when aggregate_root is off.

Bump:

- entity-derive-impl: 0.6.3 -> 0.6.4
- entity-derive: 0.8.4 -> 0.8.5

`entity-core` is unchanged.

Closes #133
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix(atomicity): wrap create/update/delete + pg_notify in one transaction when streams enabled

1 participant