feat!: durable SQLx lease lock for QueuedRepository (postgres + sqlite)#50
Conversation
Add PostgresLockManager / SqliteLockManager — durable, cross-process
AsyncLockManager implementations backed by an `aggregate_locks` lease
table — so a QueuedRepository can serialize per-aggregate access across
processes, not just within one. Drop-in via `.queued_async_with(...)`.
Each per-key lock layers an in-process gate (InMemoryAsyncLock) over the
DB lease: same-process tasks serialize with true wakeups (no DB polling),
and only the local winner contends cross-process. Acquire is a single
atomic conditional upsert using the database clock (no cross-process
skew); release is owner-token scoped so it never frees a holder that
reclaimed an expired lease. It is a mutual-exclusion optimization, not a
fence — the event-store sequence PK remains the authoritative boundary.
v1 has no lease renewal; `sweep_expired` reclaims cold rows.
BREAKING CHANGE: `AsyncLock::{try_lock,unlock}` are now async (a durable
lock releases/acquires via I/O), which makes `AsyncUnlockableRepository`
and `AsyncAggregateRepository::{abort,unlock}` async as well. Callers must
`.await` these. The lock surface is now async-only.
Implements [[tasks/persistent-lock-sqlx]]
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (3)
📝 WalkthroughWalkthroughThis PR introduces durable SQL-backed lease-table locking for cross-process event sourcing, converts AsyncLock operations to async futures, implements Postgres/SQLite lock managers with shared lease helpers, adds migrations, updates repository APIs/tests, and expands documentation. ChangesDurable lease-table locking integration
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly Related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@migrations/postgres/0001_initial.sql`:
- Around line 131-149: The new aggregate_locks table and its index were added
into the existing 0001_initial migration, which must not be edited for
already-applied databases; instead create a new forward-only migration that
contains the DDL for the aggregate_locks table (CREATE TABLE IF NOT EXISTS
aggregate_locks ... with the CHECKs and the index
aggregate_locks_expires_at_idx) and remove those lines from 0001_initial.sql so
the original migration remains unchanged.
In `@src/aggregate/async_aggregate.rs`:
- Around line 180-188: The abort() implementation in AsyncAggregateRepository
currently calls self.repo.unlock(&identity) so repository implementations never
see their abort() hook; change abort(&self, aggregate: &A) to forward to the
repository abort method instead—compute identity with
stream_identity_for::<A>(aggregate.entity().id())? as before and call
self.repo.abort(&identity). Leave unlock(&self, id: &str) unchanged so it still
calls repo.unlock(&identity). Ensure you reference the abort and unlock methods
on the wrapped repository (repo.abort and repo.unlock) when making the change.
In `@src/lock/async_in_memory.rs`:
- Around line 128-135: The in-memory lock futures run their side effects
immediately because they return std::future::ready(self.try_lock_core()) /
ready(self.unlock_core()); change try_lock and unlock on InMemoryAsyncLock to
return lazy futures that only execute on poll by replacing the ready(...)
wrapper with an async block that calls the core methods (e.g. use an async move
{ self.try_lock_core() } and async move { self.unlock_core() } so the core
methods run when the future is awaited/polled), keeping the existing signatures
of try_lock and unlock.
In `@src/lock/sqlx_common.rs`:
- Around line 99-125: The timer in lease_lock is started after awaiting
backend.shared().gate.lock().await which means backend.config().max_wait doesn't
limit time spent waiting for the same-process gate; move started =
Instant::now() to the start of lease_lock (before gate.lock().await), then in
the retry loop compute remaining time = max - started.elapsed() (using
checked_sub) and use that remaining budget to decide timeout and to bound
tokio::time::sleep/jittered calls; keep the existing unlock and error paths
(unlock before returning Err) and continue using backend.db_acquire,
backend.mint_token, store_token, and jittered as before.
- Around line 99-160: The gate is not cancellation-safe: lease_lock and
lease_try_lock hold backend.shared().gate across await points (db_acquire,
sleep) so a dropped future can never release it; also max_wait timer starts
after acquiring the gate so it doesn't bound local contention. Add a
synchronous-drop guard (e.g., a struct Guard that calls the gate core unlock
synchronously or use a new unlock_now API on InMemoryAsyncLock) that is created
immediately after acquiring the gate and releases the gate in its Drop impl to
guarantee unlock on cancellation; use that guard in lease_lock and
lease_try_lock around the db_acquire/sleep paths and remove manual unlocks where
the guard covers them. Move starting of the max_wait timer (started =
Instant::now()) to before attempting gate.lock().await (or document/align
behavior) so the timeout includes time waiting for the in-process gate. In
lease_unlock, call backend.shared().gate.unlock (synchronous/unlock_now) before
awaiting backend.db_release(&token).await to avoid holding the gate across the
remote call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 2b8a036d-7321-43e4-9f14-6b7674a834c7
📒 Files selected for processing (18)
Cargo.tomlREADME.mddocs/postgres-event-store.mdmigrations/postgres/0001_initial.sqlmigrations/sqlite/0001_initial.sqlsrc/aggregate/async_aggregate.rssrc/lib.rssrc/lock/async_in_memory.rssrc/lock/async_lock.rssrc/lock/mod.rssrc/lock/postgres_lock.rssrc/lock/sqlite_lock.rssrc/lock/sqlx_common.rssrc/queued_repo/repository.rssrc/sqlx_repo/mod.rstests/queued_repo_async/main.rstests/sql_lock_manager/main.rstests/todos/main.rs
…_wait - Cancellation-safe in-process gate: a `GateGuard` releases the gate from `Drop`, so a `lease_lock`/`lease_try_lock`/`lease_unlock` future dropped mid-`await` (cancellation/timeout) no longer wedges the key. Replaces the explicit-error-path-only gate release. - `max_wait` now measured from entry and bounds the in-process gate wait too (was only applied to DB polling, after the gate was acquired). - In-memory `try_lock`/`unlock` are now lazy `async fn` (side effect runs on poll, not at call time) — consistent with the I/O-backed locks; a dropped, never-awaited future is a no-op. `unlock_core` is `pub(crate)` for the guard. - `AsyncAggregateRepository::abort` forwards to the repo's `abort` hook instead of `unlock`, so an `AsyncUnlockableRepository` overriding `abort` is honored. - Tests: add cancellation-safety regression (cancelled acquire releases the gate) on both backends. Migration-split suggestion intentionally not taken (owner decision): this crate re-runs the full idempotent `0001_initial.sql` on every `migrate()` — there is no applied-migration tracking — so adding `CREATE TABLE IF NOT EXISTS` to the baseline is the correct pattern here. Refs [[tasks/persistent-lock-sqlx]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What & why
QueuedRepositoryserializes per-aggregate read/modify/write through anAsyncLockManager, but the only implementation wasInMemoryAsyncLockManager— process-local and lost on restart. This adds the first durable, cross-process lock manager, the direction already sanctioned bydocs/postgres-event-store.md§Locking Model andspecs/persistent-repository-plan.Implements
[[tasks/persistent-lock-sqlx]].What shipped
PostgresLockManager/SqliteLockManager— durable per-stream leases in a newaggregate_lockstable, feature-gated like the existing SQLx repos. Drop-in:get→commithold (which can't live inside one DB transaction).InMemoryAsyncLock) over the DB lease: same-process tasks serialize with true wakeups (no DB polling); only the local winner contends cross-process.INSERT … ON CONFLICT … WHERE expired OR own-token … RETURNING) using the database clock (extract(epoch from now())/unixepoch('now','subsec')) — one authoritative clock, no cross-process skew. Release is owner-token scoped, so it never frees a holder that reclaimed an expired lease. SQLite treatsSQLITE_BUSYas contention.with_lease_ttl/with_retry_interval/with_max_wait/with_owner_id;migrate()for standalone use;sweep_expired()for cold-row GC.Breaking change
AsyncLock::{try_lock,unlock}are now async (a durable lock releases/acquires via DB I/O), which makesAsyncUnlockableRepositoryandAsyncAggregateRepository::{abort,unlock}async too. Callers must.awaitthem. The lock surface is now async-only, consistent with the framework-wide async-only migration.InMemoryAsyncLockkeeps its sync mechanics as private cores wrapped in ready futures (theWakerreentrancy/poison regression tests are preserved).Review
Ran a multi-agent adversarial review of the implementation (4 lenses + verification); 13 findings confirmed, 12 fixed + 1 nit deferred. Notable fixes:
commit_batch_asyncnow best-effort on release (a committed write must not fail on lock cleanup);store_tokenmade infallible to remove a gate-leak edge; added free-key DB-race,max_waittimeout, andsweep_expiredtests on both backends.Verification
lock::unit 7/7 ·queued_repo_async4/4 ·todos13/13sql_lock_manager10/10 (unconditional) · Postgressql_lock_manager10/10 (against a live PG) · skips cleanly withoutDATABASE_URL--all-featuressuite 584/0cargo clippy --all-features --all-targets -- -D warningsclean ·cargo fmt --checkclean🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Breaking Changes
Documentation
Tests