Skip to content

feat(lease-read): leader-local lease read for coordinator and engine#549

Merged
bootjp merged 43 commits intomainfrom
feat/lease-read
Apr 20, 2026
Merged

feat(lease-read): leader-local lease read for coordinator and engine#549
bootjp merged 43 commits intomainfrom
feat/lease-read

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 19, 2026

Summary

Introduce a leader-local lease read path so leadership confirmation is amortised across reads. Implements the design in docs/lease_read_design.md.

  • raftengine.LeaseProvider (optional interface): LeaseDuration(), AppliedIndex(), RegisterLeaderLossCallback(). Implemented by the etcd engine; the hashicorp engine and test stubs leave it unimplemented and callers fall back to LinearizableRead.
  • etcd engine: LeaseDuration = electionTimeout - 300ms (700 ms with current tick config), AppliedIndex from the published Status. refreshStatus fires registered callbacks on leader -> non-leader transitions, and shutdown() fires them on close while still leader.
  • kv.Coordinate / kv.ShardedCoordinator: LeaseRead and LeaseReadForKey. Per-coordinator lease for Coordinate, per-shard lease for ShardedCoordinator (via leaseRefreshingTxn wrapper around g.Txn). Fast path returns the engine applied index when the lease is unexpired; slow path runs LinearizableRead and refreshes the lease on success. Coordinate.Dispatch and every ShardedCoordinator dispatch path refresh the lease on a successful commit using the pre-dispatch timestamp.
  • leaseState: lock-free atomic.Pointer[time.Time] plus a generation counter. invalidate() bumps gen before clearing expiry; extend() captures gen at entry and undoes its own CAS if gen moved, so a Dispatch that succeeded just before a leader-loss invalidate cannot resurrect the lease.
  • Callers switched:
    • adapter/redis_lua_context.go newLuaScriptContext — was full per-script ReadIndex, now lease-aware.
    • adapter/redis.go get — was no quorum check, now bounded by lease with redisDispatchTimeout context.
    • adapter/dynamodb.go getItem — was no quorum check, now bounded by lease via LeaseReadForKey(dynamoTableMetaKey(tableName)) so sharded deployments consult the owning shard. Input parsing extracted into parseGetItemInput to stay under the cyclop limit.

Remaining read paths (KEYS, EXISTS, ZSet/Hash/List/Set/Stream readers, DynamoDB query/scan/transactGet/batchGet) still rely on the lease being kept warm by Lua scripts and successful Dispatch calls; tracked as #557 follow-up.

Motivation

Profiling after PR #547/#548 showed average redis.call() time of 800 ms - 2.2 s, with redis.call() accounting for ~100% of Lua VM time. Investigation traced this to newLuaScriptContext calling coordinator.LinearizableRead(ctx) per script — a full etcd/raft ReadOnlySafe ReadIndex (heartbeat broadcast + quorum MsgHeartbeatResp wait) on every Lua script invocation.

A lease-based check skips the broadcast under steady load. Stale-read window is bounded by LeaseDuration < electionTimeout, the same trade-off DynamoDB / Redis non-Lua already accept (and that change tightens for those two paths as a side benefit).

Test plan

  • go build ./... passes
  • go test ./adapter/... ./kv/... ./internal/... passes
  • go test -race on all lease tests passes
  • TestCoordinate_LeaseRead_AmortizesLinearizableRead proves 100 LeaseRead calls within one lease window trigger exactly 1 underlying LinearizableRead
  • Sustained-load test: confirm Lua script throughput improves and LinearizableRead call rate drops below the script invocation rate
  • Partition-style test: confirm a stale leader stops serving reads at most LeaseDuration after losing quorum

Follow-ups (tracked)

Notes

  • No metric for lease hit/miss yet (one of the open questions in the design doc).

Summary by CodeRabbit

  • New Features

    • Leader-local lease-based read APIs added (including per-shard lease reads) and optional engine lease capability to enable fast-path reads.
  • Refactor

    • DynamoDB/Redis read flows now consult lease checks with bounded timeouts and re-verify routing/ts to avoid stale reads; Redis request contexts and cancellation tightened for safer timeouts and shutdown.
    • Coordinators refresh/invalidate leases around commits and leader-loss events.
  • Documentation

    • Added lease-read design document.
  • Tests

    • Extensive unit tests for lease state, coordinator/sharded behavior, and leader-loss callbacks.
  • Chores

    • Coordinator cleanup ensured on server shutdown.

bootjp added 6 commits April 20, 2026 05:54
- raftengine.LeaseProvider (optional interface): LeaseDuration() and
  AppliedIndex(). Implemented by the etcd engine; hashicorp engine and
  test stubs leave it unimplemented and fall back to LinearizableRead.
- etcd Engine: LeaseDuration = electionTimeout - leaseSafetyMargin
  (300 ms), AppliedIndex from published Status.
- kv.Coordinate / kv.ShardedCoordinator: LeaseRead and LeaseReadForKey
  using a per-coordinator (or per-shard for sharded) lease state. Fast
  path returns the engine applied index when the lease is unexpired;
  slow path runs LinearizableRead and refreshes the lease on success.
- Coordinator interface gains LeaseRead/LeaseReadForKey. Test stubs
  delegate to LinearizableRead so existing semantics are preserved.

No callers switched yet -- that lands in a follow-up commit. See
docs/lease_read_design.md.
- adapter/redis_lua_context.go: newLuaScriptContext now calls LeaseRead
  instead of LinearizableRead. Within the lease window, script start no
  longer pays a Raft round-trip; outside the window, the slow path runs
  LinearizableRead and refreshes the lease.
- kv/coordinator.go: Coordinate.Dispatch refreshes the lease on a
  successful commit, since a committed entry implies majority append +
  ack. ShardedCoordinator does not yet refresh per-shard leases on
  Dispatch (leases there are still refreshed only on the LeaseRead slow
  path).
Bounds the stale-read window for two paths that previously read local
store state with no quorum check at all. Within the lease window the
cost is one atomic load + one wall-clock compare; outside the window
callers pay one LinearizableRead, after which the lease is refreshed
for ~LeaseDuration.

DynamoDB getItem grew over the cyclop limit, so the input parsing /
validation portion is split into parseGetItemInput.

Other read paths (KEYS, EXISTS, ZSet/Hash/List/Set/Stream readers,
DynamoDB query/scan/transactGet) still rely on the lease being kept
warm by Lua scripts and successful Dispatch calls. They can be wrapped
explicitly in a follow-up.
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 19, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds leader-local lease-read support across the stack: new raftengine.LeaseProvider API, lock‑free leaseState, Coordinator and ShardedCoordinator LeaseRead APIs, dispatch-time lease refresh/invalidation, adapter changes to call LeaseRead/LeaseReadForKey, many tests, per-shard wiring, and a design doc.

Changes

Cohort / File(s) Summary
Core lease state & tests
kv/lease_state.go, kv/lease_state_test.go
New lock‑free leaseState type (generation + atomic expiry) with extend/valid/invalidate semantics and comprehensive unit/concurrency tests.
Coordinator & lease integration
kv/coordinator.go, kv/lease_read_test.go
Coordinator gains lease tracking, leader-loss callback registration/deregistration, Close(), LeaseRead/LeaseReadForKey helpers and LeaseReadThrough fallbacks; dispatch path refreshed to extend/invalidate leases; tests cover fast/slow paths and corner cases.
Sharded coordinator & per‑shard lease
kv/sharded_coordinator.go, kv/sharded_lease_test.go
Per-shard lease state, leaseRefreshingTxn wrapper that refreshes/invalidates shard lease on commit/abort results, per-shard leader-loss callbacks, Close() to deregister, and shard-aware LeaseReadForKey with tests.
Raft engine interface & etcd impl
internal/raftengine/engine.go, internal/raftengine/etcd/engine.go, internal/raftengine/etcd/leader_loss_callback_test.go
Introduces exported LeaseProvider interface; etcd engine adds LeaseDuration, AppliedIndex, applied-index tracking, leader-loss callback registration/firing (with panic recovery) and tests for callbacks/AppliedIndex.
Adapters (Redis, DynamoDB)
adapter/redis.go, adapter/redis_lua_context.go, adapter/dynamodb.go, adapter/dynamodb_test.go
Adapters switched to use lease-based reads (LeaseRead/LeaseReadForKey), Dynamo parsing refactor, bounded lease-read timeout for shard-key lease checks and readTS resampling after lease confirmation; Lua script contexts use lease reads.
Adapter test stubs
adapter/..._test.go
adapter/distribution_server_test.go, adapter/redis_info_test.go, adapter/redis_keys_pattern_test.go, adapter/redis_retry_test.go, adapter/s3_test.go, adapter/dynamodb_test.go
Multiple test doubles extended with LeaseRead/LeaseReadForKey methods that delegate to LinearizableRead to satisfy new interfaces.
KV tests & wiring
kv/leader_routed_store_test.go
Test stubs updated to implement lease read methods delegating to LinearizableRead.
Engine shutdown/cleanup & demo
internal/raftengine/etcd/engine.go, cmd/server/demo.go
Engine now fires leader-loss callbacks on leader transitions/shutdown; transport close warnings added; demo run defers coordinator.Close().
Design doc
docs/lease_read_design.md
New design document describing lease-read model, invariants, API, refresh/invalidation triggers, rollout plan, and open questions.
Misc tests
internal/raftengine/etcd/leader_loss_callback_test.go, kv/lease_read_test.go, kv/sharded_lease_test.go
New and extended tests exercising leader-loss callbacks, lease read semantics, shard isolation, and transactional lease refresh behavior.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Adapter as "Adapter (Redis/DynamoDB)"
    participant Coord as "Coordinator / ShardedCoordinator"
    participant Engine as "RaftEngine / LeaseProvider"
    participant Raft as "Raft"

    Client->>Adapter: GET(key)
    activate Adapter

    Adapter->>Coord: LeaseReadForKey(ctx, key)
    activate Coord

    alt Engine implements LeaseProvider
        Coord->>Engine: check lease.valid(now)
        alt lease valid
            Engine-->>Coord: AppliedIndex()
        else lease expired
            Coord->>Engine: LinearizableRead (read-index)
            Engine->>Raft: consensus/read-index
            Raft-->>Engine: read index
            Engine-->>Coord: read index
            Coord->>Coord: extend lease (now + LeaseDuration)
        end
    else Engine lacks LeaseProvider
        Coord->>Engine: LinearizableRead (read-index)
    end

    Coord-->>Adapter: read index
    deactivate Coord

    Adapter->>Adapter: compute snapshot & serve item
    Adapter-->>Client: response
    deactivate Adapter
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Possibly related PRs

Poem

🐇 I nibble on leases, quick and bright,
I guard an expiry through day and night.
When leaders wobble I hop and alert,
Extend on commit, avoid stale dirt.
A rabbit cheers the cluster's new light!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(lease-read): leader-local lease read for coordinator and engine' accurately summarizes the main change—implementing a leader-local lease-read mechanism across coordinators and the raft engine.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/lease-read

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@bootjp bootjp requested a review from Copilot April 19, 2026 21:13
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
adapter/dynamodb.go (1)

1370-1385: ⚠️ Potential issue | 🟠 Major

Use shard-keyed lease reads for DynamoDB GetItem.

LeaseRead only checks the default shard on ShardedCoordinator; GetItem then reads table metadata and item keys that may belong to other shards. Use LeaseReadForKey for the table metadata key and the resolved item key before reading local state.

🐛 Proposed fix
-	if _, err := d.coordinator.LeaseRead(r.Context()); err != nil {
+	if _, err := d.coordinator.LeaseReadForKey(r.Context(), dynamoTableMetaKey(in.TableName)); err != nil {
 		writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error())
 		return
 	}
 	readTS := d.resolveDynamoReadTS(in.ConsistentRead)
 	schema, exists, err := d.loadTableSchemaAt(r.Context(), in.TableName, readTS)
@@
 	if !exists {
 		writeDynamoError(w, http.StatusBadRequest, dynamoErrResourceNotFound, "table not found")
 		return
 	}
+	itemKey, err := schema.itemKeyFromAttributes(in.Key)
+	if err != nil {
+		writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error())
+		return
+	}
+	if _, err := d.coordinator.LeaseReadForKey(r.Context(), itemKey); err != nil {
+		writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error())
+		return
+	}
 
 	current, found, err := d.readLogicalItemAt(r.Context(), schema, in.Key, readTS)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/dynamodb.go` around lines 1370 - 1385, The current GetItem flow uses
coordinator.LeaseRead which only validates the default shard; change it to call
coordinator.LeaseReadForKey for the table-metadata key before calling
loadTableSchemaAt and again for the resolved item key (after computing readTS
via resolveDynamoReadTS and resolving the item key) before calling
readLogicalItemAt so both metadata and item reads hold leases on the correct
shards; ensure errors from LeaseReadForKey are handled the same way as the
existing LeaseRead error path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapter/redis.go`:
- Around line 984-987: The LeaseReadForKey call is using context.Background(),
which can block a Redis connection during Raft linearizable reads; replace that
with a context bounded by the existing Redis dispatch timeout (create ctx,
cancel := context.WithTimeout(context.Background(), r.dispatchTimeout) and defer
cancel()) and pass that ctx into r.coordinator.LeaseReadForKey(key) so the slow
path is bounded; keep the existing error handling (conn.WriteError and return).

In `@docs/lease_read_design.md`:
- Around line 195-203: Update the rollout/design doc to match the implemented
API and actual call-site coverage: replace any statements claiming Phase 1 adds
LeaderView.LeaseDuration() and that full read paths (query, scan, keys, exists,
transactGetItems, batchGetItem) are already wrapped with language that the PR
implements an optional raftengine.LeaseProvider and that additional read paths
are follow-ups; specifically adjust the table rows and the paragraphs
referencing adapter/redis_lua_context.go, adapter/redis.go, adapter/dynamodb.go
and the sections around lines noted (302-305, 314-321, 333-335) to reflect
partial coverage and the phased rollout rather than full wrapping of those APIs.
- Around line 150-159: The doc claims three lease invalidation triggers but the
code only invalidates on engine.LinearizableRead errors in kv/coordinator.go
(LeaseRead path); refreshStatus (internal/raftengine/etcd/engine.go:
refreshStatus) is not wired to clear leases and engine.Propose errors are only
logged, not invalidating—so update the design doc to reflect reality: remove or
demote the unimplemented triggers and rollout items to “Phase 2/Follow-up” and
explicitly list which hooks/paths are implemented (coordinator.LeaseRead) and
which read paths still need wrapping (get, keyTypeAt, keys, exists-family,
getItem, query, scan, transactGetItems, batchGetItem); alternatively, if you
prefer to implement the behavior now, wire refreshStatus to clear the
coordinator lease and modify the engine.Propose error handling in
coordinator.Propose (or the caller) to invalidate the lease on errors and add
unit tests for these behaviors—reference refreshStatus, engine.Propose,
engine.LinearizableRead, and coordinator.LeaseRead when making changes.

In `@internal/raftengine/etcd/engine.go`:
- Around line 27-32: The current LeaseDuration() and leaseSafetyMargin (300ms)
are unsafe because callers refresh using time.Now().Add(lp.LeaseDuration())
after the quorum-confirming operation returns, which can extend the lease based
on post-operation wall time; change the API so the raft engine returns a
quorum-confirmation timestamp/deadline (e.g., ReturnQuorumDeadline or
LeaseConfirmTime) or accept a supplied start time instead of only a duration,
then have callers capture a timestamp immediately before the quorum operation
and pass that to the raft engine to compute the lease (avoid time.Now() after
the operation); update LeaseDuration(), any uses of leaseSafetyMargin, and all
callers that currently call time.Now().Add(lp.LeaseDuration()) to use the new
timestamp-based API to ensure the lease is anchored to pre-quorum time.

In `@kv/coordinator.go`:
- Around line 127-141: The lease renewal is happening after the
quorum-confirming dispatch and uses time.Now(), which can extend the lease
incorrectly if leadership changed; before calling c.dispatchTxn or c.dispatchRaw
capture a conservative base timestamp (e.g., now := time.Now()), perform the
dispatch, then only if err==nil confirm this node is still leader (use the
engine's leadership check or the raftengine.LeaseProvider interface) before
calling c.lease.extend(now.Add(lp.LeaseDuration())); apply the same change for
the other renewal site referenced (the block around lines 238-252) and ensure
cached lease usage (e.g., LeaseRead) only happens when the node verifies it is
still leader.

In `@kv/sharded_coordinator.go`:
- Around line 633-638: Capture a conservative start time before invoking
linearizableReadEngineCtx and use that timestamp when extending the lease so the
verified window isn't overextended; specifically, record start := time.Now()
before calling linearizableReadEngineCtx(ctx, engine), call
linearizableReadEngineCtx and handle errors (keeping g.lease.invalidate on
error), and then call g.lease.extend(start.Add(lp.LeaseDuration())) instead of
using time.Now() after the call.

---

Outside diff comments:
In `@adapter/dynamodb.go`:
- Around line 1370-1385: The current GetItem flow uses coordinator.LeaseRead
which only validates the default shard; change it to call
coordinator.LeaseReadForKey for the table-metadata key before calling
loadTableSchemaAt and again for the resolved item key (after computing readTS
via resolveDynamoReadTS and resolving the item key) before calling
readLogicalItemAt so both metadata and item reads hold leases on the correct
shards; ensure errors from LeaseReadForKey are handled the same way as the
existing LeaseRead error path.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5dff0c18-cd67-439d-93ba-9cae9e800164

📥 Commits

Reviewing files that changed from the base of the PR and between 173fbbc and 2a70b72.

📒 Files selected for processing (17)
  • adapter/distribution_server_test.go
  • adapter/dynamodb.go
  • adapter/dynamodb_test.go
  • adapter/redis.go
  • adapter/redis_info_test.go
  • adapter/redis_keys_pattern_test.go
  • adapter/redis_lua_context.go
  • adapter/redis_retry_test.go
  • adapter/s3_test.go
  • docs/lease_read_design.md
  • internal/raftengine/engine.go
  • internal/raftengine/etcd/engine.go
  • kv/coordinator.go
  • kv/leader_routed_store_test.go
  • kv/lease_state.go
  • kv/lease_state_test.go
  • kv/sharded_coordinator.go

Comment thread adapter/redis.go Outdated
Comment thread docs/lease_read_design.md
Comment thread internal/raftengine/etcd/engine.go
Comment thread kv/coordinator.go
Comment thread kv/sharded_coordinator.go Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a leader-local lease read path to reduce per-read quorum confirmation cost while keeping stale-read risk bounded by a lease duration derived from Raft election timeout. This fits into the KV/raftengine layering by exposing an optional engine capability (LeaseProvider) and using it in coordinators and adapter read handlers.

Changes:

  • Introduces raftengine.LeaseProvider and implements it in the etcd engine (LeaseDuration, AppliedIndex).
  • Adds leaseState and LeaseRead / LeaseReadForKey to Coordinate and ShardedCoordinator, with fallback to LinearizableRead.
  • Switches key adapter read entrypoints (Redis Lua script context, Redis GET, DynamoDB GetItem) to use lease-based reads; adds design doc and supporting tests/stub updates.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
kv/sharded_coordinator.go Adds per-shard lease tracking and lease-based read methods.
kv/lease_state.go Introduces lock-free lease expiry state via atomics.
kv/lease_state_test.go Unit tests for leaseState semantics and concurrency safety.
kv/coordinator.go Adds coordinator lease state, new Coordinator interface methods, and lease refresh on successful Dispatch.
kv/leader_routed_store_test.go Updates coordinator test stub to satisfy new interface methods.
internal/raftengine/engine.go Defines optional LeaseProvider engine capability.
internal/raftengine/etcd/engine.go Implements lease duration calculation and applied index reporting for etcd engine.
adapter/redis_lua_context.go Replaces per-script LinearizableRead with lease-aware LeaseRead.
adapter/redis.go Adds lease check to GET path.
adapter/dynamodb.go Adds lease check to GetItem and extracts request parsing helper.
adapter/dynamodb_test.go Updates test wrapper to forward new coordinator methods.
adapter/redis_retry_test.go Updates coordinator stub to satisfy new interface methods.
adapter/redis_keys_pattern_test.go Updates coordinator stub to satisfy new interface methods.
adapter/redis_info_test.go Updates coordinator stub to satisfy new interface methods.
adapter/s3_test.go Updates coordinator stub to satisfy new interface methods.
adapter/distribution_server_test.go Updates coordinator stub to satisfy new interface methods.
docs/lease_read_design.md Adds lease-read design documentation.

Comment thread kv/coordinator.go
Comment on lines +231 to +253
// LeaseRead returns a read fence backed by a leader-local lease when
// available, falling back to a full LinearizableRead when the lease has
// expired or the underlying engine does not implement LeaseProvider.
//
// The returned index is the engine's current applied index (fast path) or
// the index returned by LinearizableRead (slow path). Callers that resolve
// timestamps via store.LastCommitTS may discard the value.
func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) {
lp, ok := c.engine.(raftengine.LeaseProvider)
if !ok {
return c.LinearizableRead(ctx)
}
if c.lease.valid(time.Now()) {
return lp.AppliedIndex(), nil
}
idx, err := c.LinearizableRead(ctx)
if err != nil {
c.lease.invalidate()
return 0, err
}
c.lease.extend(time.Now().Add(lp.LeaseDuration()))
return idx, nil
}
Comment thread kv/sharded_coordinator.go
Comment on lines +603 to +639
// LeaseRead routes through the default group's lease. See Coordinate.LeaseRead
// for semantics.
func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error) {
g, ok := c.groups[c.defaultGroup]
if !ok {
return 0, errors.WithStack(ErrLeaderNotFound)
}
return groupLeaseRead(ctx, g)
}

// LeaseReadForKey performs the lease check on the shard group that owns key.
// Each group maintains its own lease since each group has independent
// leadership and term.
func (c *ShardedCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) {
g, ok := c.groupForKey(key)
if !ok {
return 0, errors.WithStack(ErrLeaderNotFound)
}
return groupLeaseRead(ctx, g)
}

func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) {
engine := engineForGroup(g)
lp, ok := engine.(raftengine.LeaseProvider)
if !ok {
return linearizableReadEngineCtx(ctx, engine)
}
if g.lease.valid(time.Now()) {
return lp.AppliedIndex(), nil
}
idx, err := linearizableReadEngineCtx(ctx, engine)
if err != nil {
g.lease.invalidate()
return 0, err
}
g.lease.extend(time.Now().Add(lp.LeaseDuration()))
return idx, nil
Comment thread kv/coordinator.go Outdated
Comment on lines +243 to +251
if c.lease.valid(time.Now()) {
return lp.AppliedIndex(), nil
}
idx, err := c.LinearizableRead(ctx)
if err != nil {
c.lease.invalidate()
return 0, err
}
c.lease.extend(time.Now().Add(lp.LeaseDuration()))
Comment thread kv/sharded_coordinator.go
Comment on lines +624 to +632
func groupLeaseRead(ctx context.Context, g *ShardGroup) (uint64, error) {
engine := engineForGroup(g)
lp, ok := engine.(raftengine.LeaseProvider)
if !ok {
return linearizableReadEngineCtx(ctx, engine)
}
if g.lease.valid(time.Now()) {
return lp.AppliedIndex(), nil
}
Comment thread adapter/redis.go Outdated
Comment on lines 984 to 989
if _, err := r.coordinator.LeaseReadForKey(context.Background(), key); err != nil {
conn.WriteError(err.Error())
return
}
readTS := r.readTS()
typ, err := r.keyTypeAt(context.Background(), key, readTS)
Comment thread kv/lease_state.go Outdated
Comment on lines +31 to +39
// extend sets the lease expiry to until. Concurrent calls race on the
// pointer swap; the most recent writer wins, which matches the desired
// semantics (any successful quorum confirmation refreshes the lease).
func (s *leaseState) extend(until time.Time) {
if s == nil {
return
}
s.expiry.Store(&until)
}
Two correctness fixes from PR #549 reviews:

1. extend was called with time.Now() AFTER Propose / LinearizableRead
   returned. Apply-queue depth, FSM batch latency, and goroutine
   scheduling jitter could push the effective lease window past
   electionTimeout, eating into the safety margin.

   Coordinate.Dispatch and Coordinate.LeaseRead now sample t0 := time.Now()
   BEFORE the underlying Propose/LinearizableRead, and use t0 + LeaseDuration
   as the lease expiry. ShardedCoordinator's groupLeaseRead does the same.

2. leaseState.extend was last-writer-wins. An out-of-order writer that
   sampled time.Now() earlier could overwrite a fresher extension and
   prematurely shorten the lease. extend now uses a CAS loop that only
   replaces the expiry if the new instant is strictly after the current
   one. invalidate() still wins unconditionally so leadership-loss
   callbacks are not blocked.

Tests updated: the "shorter extension overwrites" assertion is replaced
by "extend is monotonic" plus an "invalidate beats extend" test that
codifies the new contract.
Plumbs a leader-loss callback from the etcd engine into Coordinate /
ShardGroup. Before this fix a former leader (graceful transfer,
CheckQuorum step-down, shutdown) kept serving fast-path reads from its
local LastCommitTS for up to LeaseDuration after a successor leader was
already accepting writes - a phantom-read window of up to 700 ms flagged
HIGH by data-loss, concurrency, and consistency reviewers on PR #549.

Changes:

- raftengine.LeaseProvider gains RegisterLeaderLossCallback(fn func()).
- etcd Engine stores callbacks, fires them synchronously from
  refreshStatus when the previous state was Leader and the new state
  isn't (which already covers shutdown via the closed override that
  rewrites status.State to StateShutdown before this check).
- Coordinate registers c.lease.invalidate as the callback in
  NewCoordinatorWithEngine.
- ShardedCoordinator registers per-shard g.lease.invalidate in
  NewShardedCoordinator.

Test stubs do not implement LeaseProvider so they are unaffected. Only
the etcd engine implements LeaseProvider; hashicorp engine and test
stubs continue to fall back to LinearizableRead.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 19, 2026

Addressed the following from the internal review pass (5 perspectives):

Fixed in commits f28bf0d + f86c689

  • HIGH (data-loss / concurrency / consistency): Coordinate.Dispatch and Coordinate.LeaseRead now sample time.Now() before the underlying Propose/LinearizableRead, so the lease window starts at quorum confirmation rather than after the round-trip returns. groupLeaseRead does the same for the sharded path.
  • HIGH (data-loss / concurrency / consistency): LeaseProvider now exposes RegisterLeaderLossCallback. The etcd engine fires registered callbacks synchronously from refreshStatus whenever the local node leaves the leader role (graceful transfer, CheckQuorum step-down, shutdown). Coordinate and per-shard ShardGroup register lease.invalidate so a former leader stops serving fast-path reads the instant the engine notices the transition, instead of waiting for wall-clock expiry.
  • LOW (concurrency): leaseState.extend is now a CAS loop that only replaces the expiry when the new instant is strictly later, preventing an out-of-order writer from regressing the lease. invalidate still wins unconditionally so the leader-loss callback above is not blocked.

Still TODO in this PR

  • ShardedCoordinator.Dispatch per-shard lease refresh on commit (lease still works via slow-path refresh in LeaseRead; warmth optimisation only).
  • New tests: Coordinate.LeaseRead fast/slow/error/fallback paths, Dispatch refresh hook, sharded per-shard isolation, leader-loss invalidation end-to-end.

Deferred to follow-up issues

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 19, 2026

follow-up issue リストから #551 / #552 を除外しました (それぞれ Close 済み)。残り: #553-#556 (パフォーマンス系の 4 件)。

bootjp added 2 commits April 20, 2026 06:36
…ingTxn

Wraps each ShardGroup.Txn in a leaseRefreshingTxn that extends the
per-shard lease on every successful Commit/Abort. All ShardedCoordinator
dispatch paths flow through g.Txn (raw via router.Commit,
dispatchSingleShardTxn, dispatchTxn 2PC primary/secondary, and
dispatchDelPrefixBroadcast), so this single hook catches them all.

The wrapper samples time.Now() before delegating to inner.Commit so the
lease window starts at quorum confirmation rather than after Commit
returned, mirroring the same fix already applied to Coordinate.Dispatch.

Addresses HIGH/MED finding from PR #549 review (sharded path was
previously refreshed only via the LeaseRead slow path).
Adds five tests covering paths that were untested in PR #549:

- Coordinate.LeaseRead fast path: pre-extended lease returns AppliedIndex
  with zero engine.LinearizableRead calls.
- Coordinate.LeaseRead slow path success: first call invokes
  LinearizableRead and refreshes the lease; second call hits fast path.
- Coordinate.LeaseRead slow path error: LinearizableRead failure
  invalidates the lease so subsequent calls keep taking the slow path.
- LeaseProvider type-assertion fallback: an engine that does not
  implement LeaseProvider always falls back to LinearizableRead.
- Leader-loss callback: NewCoordinatorWithEngine registers exactly one
  callback; firing it invalidates a previously valid lease.

Test stubs (fakeLeaseEngine, nonLeaseEngine) live in this file rather
than poll the existing distributed test scaffolding, keeping the tests
hermetic.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 19, 2026

更に 2 コミット追加しました:

  • 9994bbb: ShardedCoordinator.Dispatch の per-shard lease refresh。leaseRefreshingTxng.Txn を包み、全 dispatch path (raw / single-shard txn / 2PC primary+secondary / DelPrefix broadcast) の Commit/Abort 成功で per-shard lease を更新します。time.Now() も Commit 前にサンプリングします。
  • 5b3ab7c: Coordinate.LeaseRead の fast/slow/error/fallback 4 パスと leader-loss callback の単体テスト 5 件を追加。

これで内部レビューの HIGH 重大度はすべて対応済み:

  • ✅ 事前 Now() / monotonic extend
  • ✅ leader-loss invalidation hook (engine + Coordinate + ShardedCoordinator per-shard)
  • ✅ ShardedCoordinator.Dispatch refresh
  • ✅ LeaseRead 主要パスのテスト

残り (follow-up issues): #553-#556 (性能系最適化)

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

When deregistering, the previous `append(cbs[:i], cbs[i+1:]...)`
pattern leaves the removed slot in the backing array at the old
tail position. The slot holds a closure that typically captures a
*Coordinate; that reference keeps the Coordinate (and its transitive
state) alive until the engine itself is dropped, even though the
caller has already called Close() expecting the lease registration
to be released for GC.

Replace with explicit copy + zero-the-tail + reslice so the
captured reference is actually cleared. Mirror the same pattern in
fakeLeaseEngine's test stub to keep it faithful to the production
engine.
@bootjp bootjp requested a review from Copilot April 20, 2026 11:58
gemini recommended defer over explicit leaseCancel() so the context
is released even if LeaseReadForKey panics or a future refactor
inserts an early return. The extra lifetime cost is negligible --
the only post-lease operations are local store reads -- and the
safety benefit is real.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 20, 2026

5c5a820 で gemini の defer 指摘 fix。

Copilot r3110247453 / r3110247413 (groupLeaseRead と Coordinate.LeaseRead の fast path で leader state 確認) はすでに b94ee7e で対応済:

// Coordinate.LeaseRead:328
if c.lease.valid(now) && c.engine.State() == raftengine.StateLeader {
    return lp.AppliedIndex(), nil
}

// groupLeaseRead:755
if g.lease.valid(now) && engine.State() == raftengine.StateLeader {
    return lp.AppliedIndex(), nil
}

Copilot の「LeadTransferee != 0 も reject」は transfer 完了前の leader は quorum を保持して読み込み可能なので不要と判断。async callback の race は State() (~10ms refresh) の即時検知でカバー。

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 20, 2026

gemini の r3110392269 (errors.Is が pkg/errors と衝突するという指摘) は 誤検出 です:

  • ファイルは github.com/cockroachdb/errors を import (redis.go:25)。pkg/errors ではない
  • cockroachdb/errors は stdlib 互換で Is / As / Unwrap を完全サポート
  • 実際 errors.Is はこのファイル内の既存 4 箇所 (line 772, 1067, 1642, 1649) で同じ形で使われており、本 PR で追加したのは 1 行だけ。コンパイル・テストも通過。

No-op で closeします。

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a leader-local “lease read” path across coordinators and the etcd raft engine so repeated reads can avoid per-read quorum confirmation, while still bounding staleness by a lease duration derived from the election timeout.

Changes:

  • Add optional raftengine.LeaseProvider (lease duration, applied index, leader-loss callbacks) and implement it in the etcd engine.
  • Implement per-coordinator and per-shard lease tracking (leaseState), plus LeaseRead / LeaseReadForKey in Coordinate and ShardedCoordinator, including lease refresh on successful commits and invalidation on errors/leader loss.
  • Switch high-traffic adapter read paths (Redis Lua context + GET, DynamoDB GetItem) to use lease-aware reads; add extensive unit tests and a design document.

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
kv/sharded_lease_test.go Adds per-shard lease behavior tests (isolation, invalidation, wrapper Close forwarding, callback wiring).
kv/sharded_coordinator.go Adds per-shard leaseState, wraps shard Txn to refresh lease on real commits, adds lease read APIs and coordinator Close for callback deregistration.
kv/lease_state_test.go Adds concurrency/semantics tests for lock-free lease expiry + generation guard.
kv/lease_state.go Introduces lock-free lease state (atomic expiry pointer + generation counter).
kv/lease_read_test.go Adds tests for Coordinate.LeaseRead fast/slow/error/fallback paths and callback deregistration.
kv/leader_routed_store_test.go Updates coordinator stub to satisfy the expanded coordinator interface.
kv/coordinator.go Adds leader-loss callback registration, Close(), lease refresh on dispatch commit, and LeaseRead APIs.
internal/raftengine/etcd/leader_loss_callback_test.go Tests leader-loss callback panic containment and empty-list safety.
internal/raftengine/etcd/engine.go Implements LeaseProvider methods, callback registration/invocation, leader-loss firing on transitions and shutdown/fail, and warns on disabled lease config.
internal/raftengine/engine.go Defines new optional LeaseProvider interface.
docs/lease_read_design.md Adds design document describing lease reads, safety bounds, refresh/invalidate triggers, and rollout plan.
cmd/server/demo.go Ensures coordinator callback deregistration via deferred coordinator.Close().
adapter/s3_test.go Updates test coordinator stub to satisfy the expanded coordinator interface.
adapter/redis_retry_test.go Updates test coordinator stub to satisfy the expanded coordinator interface.
adapter/redis_lua_context.go Switches Lua script entry fence from LinearizableRead to LeaseRead.
adapter/redis_keys_pattern_test.go Updates test coordinator stub to satisfy the expanded coordinator interface.
adapter/redis_info_test.go Updates test coordinator stub to satisfy the expanded coordinator interface.
adapter/redis.go Adds a cancellable base context for handlers, improves shutdown, and gates GET with a bounded LeaseReadForKey check.
adapter/dynamodb_test.go Updates test coordinator wrapper to forward lease read calls.
adapter/dynamodb.go Adds bounded lease check for GetItem (shard-routed), refactors parsing/target resolution to keep complexity in bounds.
adapter/distribution_server_test.go Updates test coordinator stub to satisfy the expanded coordinator interface.

Comment thread kv/lease_state.go
Comment on lines +65 to +69
if current != nil && !until.After(*current) {
return
}
if !s.expiry.CompareAndSwap(current, &until) {
continue
Comment thread internal/raftengine/etcd/engine.go Outdated
Comment on lines +588 to +590
e.mu.RLock()
defer e.mu.RUnlock()
return e.status.AppliedIndex
Comment thread kv/coordinator.go Outdated
Comment on lines +126 to +130
LeaseRead(ctx context.Context) (uint64, error)
RaftLeader() raft.ServerAddress
IsLeaderForKey(key []byte) bool
VerifyLeaderForKey(key []byte) error
LeaseReadForKey(ctx context.Context, key []byte) (uint64, error)
Copilot flagged: adding LeaseRead / LeaseReadForKey to the exported
Coordinator interface was a breaking change for any external
implementation. Reverted the interface change and moved lease reads
to the optional-capability pattern already used by
raftengine.LeaseProvider.

- kv.Coordinator no longer requires LeaseRead / LeaseReadForKey.
  External coordinators that predate the lease-read feature keep
  compiling unchanged.
- kv.LeaseReadableCoordinator is the new optional interface; concrete
  *Coordinate and *ShardedCoordinator implement it.
- kv.LeaseReadThrough / LeaseReadForKeyThrough are helpers that
  type-assert and fall back to LinearizableRead when the assertion
  fails.
- adapter/redis.go GET, adapter/redis_lua_context.go
  newLuaScriptContext, adapter/dynamodb.go getItem now call the
  helpers instead of the interface method directly.
- adapter/dynamodb_test.go testCoordinatorWrapper forwards via the
  helpers so it keeps working without depending on Coordinator
  carrying the methods.

No behavior change: every concrete coordinator we ship still
implements the lease methods, so the fast path is unchanged.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 20, 2026

f9265bf で Copilot 新規 2 件を fix:

  • Coordinator インターフェース breaking change: LeaseRead / LeaseReadForKey を interface から外し、LeaseReadableCoordinator optional interface に移行 (raftengine.LeaseProvider と同じパターン)。ヘルパー kv.LeaseReadThrough / kv.LeaseReadForKeyThrough を追加。adapter 3 箇所 (redis.go GET / redis_lua_context.go / dynamodb.go getItem) と test wrapper が helper 経由に。外部 Coordinator 実装は非破壊。動作は同一 (concrete 型はすべて lease methods を実装済)。
  • leaderLossCbsMu docstring: 既に async 実装に合わせて update 済 (line 178 Callbacks are started asynchronously...) — このラウンドでは追加変更なし。

残 2 件 (heap alloc / AppliedIndex RLock) は既存 #553 / #554 で tracked 済、本 PR スコープ外。

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a lease-based read mechanism to optimize performance by reducing the frequency of Raft-based linearizable reads. It adds a LeaseProvider interface to the raft engine, implements a lock-free leaseState for tracking expiry, and integrates LeaseRead functionality into the Redis and DynamoDB adapters. Feedback highlights an opportunity to reduce GC pressure by using atomic.Int64 for timestamps instead of atomic.Pointer and suggests refining the execution of leader-loss callbacks to avoid potential goroutine leaks.

Comment thread kv/lease_state.go
Comment on lines +68 to +70
if !s.expiry.CompareAndSwap(current, &until) {
continue
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using atomic.Pointer[time.Time] and storing the address of the local variable until causes a heap allocation on every successful lease extension. In high-throughput scenarios (every successful commit or slow-path read), this pressure on the GC can be significant. Consider using atomic.Int64 to store the UnixNano timestamp instead, which would avoid the heap allocation while maintaining lock-free semantics.

References
  1. For frequently accessed fields that require monotonic updates, prefer atomic operations over mutexes to improve performance on hot paths.

Comment on lines +671 to +681
func (e *Engine) fireLeaderLossCallbacks() {
e.leaderLossCbsMu.Lock()
cbs := make([]func(), len(e.leaderLossCbs))
for i, c := range e.leaderLossCbs {
cbs[i] = c.fn
}
e.leaderLossCbsMu.Unlock()
for _, fn := range cbs {
go e.invokeLeaderLossCallback(fn)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Spawning a new goroutine for every registered leader-loss callback can lead to an unbounded number of goroutines if many coordinators are registered. While the number of shards is typically small, it's safer to use a bounded worker pool or execute them sequentially if they are guaranteed to be fast (as the documentation requires). Sequential execution is acceptable here because the callbacks are explicitly required to be non-blocking.

References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections or processes that can grow from external requests.

bootjp added 2 commits April 20, 2026 21:23
Copilot / gemini repeatedly flagged that AppliedIndex() on the lease-
read fast path took e.mu.RLock(), contending with refreshStatus's
writer lock ~100 Hz at the current tick interval. Tracked as #553 as
a deferred item; now addressed in-PR since it affects the motivating
hot path (Lua script entry + Redis/DynamoDB GET) directly.

- Added e.appliedIndex atomic.Uint64 mirroring e.applied.
- setApplied(index) helper advances both fields from the single-
  writer Raft run loop; callers (applyCommitted entries, snapshot
  restore, Open init) go through it.
- AppliedIndex() now returns e.appliedIndex.Load() with no lock.
- Nil-receiver guard preserved.

Tests:
- TestAppliedIndex_LockFreeLoad holds e.mu exclusively and calls
  AppliedIndex(), asserting the caller does not block (would
  deadlock if RLock were still used).
- TestAppliedIndex_NilReceiver covers the nil guard.

All existing tests pass. Closes #553.
gemini flagged the unbounded-goroutine concern: spawning a new
goroutine per registered callback on every leader-loss event scales
linearly with the number of shards/coordinators and has no backpressure.

Since the callback contract already requires non-blocking work (a
lock-free lease-invalidate flag flip), and LeaseRead's fast path
already guards on engine.State() == StateLeader to close the narrow
window between a transition and the callback completing, sync
execution is both safe and simpler:

- fireLeaderLossCallbacks iterates inline.
- invokeLeaderLossCallback still defer-recovers panics and logs, so
  one buggy holder cannot break siblings.
- Docstrings (both the interface in raftengine/engine.go and the
  etcd-impl method) now describe the synchronous contract and the
  State() guard that makes it safe despite no async backstop.

If a blocking callback ever causes a real problem, we'll revisit
with a bounded worker pool; for now the simpler path stays.

Test TestFireLeaderLossCallbacks_ContainsPanic reverted to direct
counter assertions (no Eventually wait needed in sync mode).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 20, 2026

9cea2a8 / b70f167 で 2 件対応:

  • AppliedIndex を lock-free atomic.Uint64 化 (perf(lease-read): make AppliedIndex lock-free atomic (deferred from #549) #553 Closes): run loop の apply / snapshot restore / Open init で setApplied(idx) ヘルパー経由で e.appliede.appliedIndex atomic を同時更新。AppliedIndex()e.appliedIndex.Load() で lock 不要。TestAppliedIndex_LockFreeLoad で mutex 排他取得下でも非ブロックを検証。

  • fireLeaderLossCallbacks を同期に revert (gemini 指摘: unbounded goroutine): callback contract 上 non-blocking が要件、かつ LeaseRead fast-path で既に State() == StateLeader チェック済のため async 必須ではない。sync 化で unbounded goroutine 懸念解消。panic recover は維持 (defer recover + log)。

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 20, 2026

/gemini review

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
kv/sharded_coordinator.go (1)

161-172: Consider guarding Close against concurrent invocation.

Close() reads and clears c.deregisterLeaseCbs without synchronisation. The doc comment claims idempotency, but two concurrent Close calls (or Close racing with a late registration path) would be a data race under -race and could double-invoke deregister callbacks. A sync.Once (or a mutex) would make the "idempotent" contract actually hold.

♻️ Suggested refactor
 type ShardedCoordinator struct {
 	...
 	deregisterLeaseCbs []func()
+	closeOnce          sync.Once
 }
@@
 func (c *ShardedCoordinator) Close() error {
 	if c == nil {
 		return nil
 	}
-	cbs := c.deregisterLeaseCbs
-	c.deregisterLeaseCbs = nil
-	for _, fn := range cbs {
-		fn()
-	}
+	c.closeOnce.Do(func() {
+		for _, fn := range c.deregisterLeaseCbs {
+			fn()
+		}
+		c.deregisterLeaseCbs = nil
+	})
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@kv/sharded_coordinator.go` around lines 161 - 172, The Close method on
ShardedCoordinator currently reads and clears deregisterLeaseCbs without
synchronization, causing races and possible double invocation; make Close
idempotent and concurrency-safe by protecting the shutdown with a sync.Once (or
a mutex) inside the ShardedCoordinator type and have Close call that
once-guarded cleanup that swaps out/reads deregisterLeaseCbs and invokes each
callback exactly once; update any late-registration paths to check the same
once/closed flag (or mutex-protected state) so registrations after Close are
rejected or immediately deregistered.
kv/coordinator.go (2)

76-85: Close() does not invalidate the lease — consider doing so for defense in depth.

After Close() returns, the leader-loss callback is deregistered, so any subsequent loss of leadership will no longer invalidate c.lease. If a caller (incorrectly, but plausibly in tests or hot-reload paths) keeps issuing LeaseRead against this Coordinate after Close(), a still-valid lease may serve stale reads that the engine no longer has any way to revoke. A one-liner c.lease.invalidate() before clearing the deregister closure closes that window cheaply and aligns with the design doc's stance that any "teardown" path should drop the lease.

🛡️ Proposed fix
 func (c *Coordinate) Close() error {
 	if c == nil {
 		return nil
 	}
 	if c.deregisterLeaseCb != nil {
 		c.deregisterLeaseCb()
 		c.deregisterLeaseCb = nil
 	}
+	// Drop any currently-valid lease so a post-Close LeaseRead
+	// caller (e.g. leaked goroutine) can't serve from stale local
+	// state once leader-loss callbacks are no longer firing.
+	c.lease.invalidate()
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@kv/coordinator.go` around lines 76 - 85, Coordinate.Close currently only
deregisters the leader-loss callback but leaves c.lease valid, which can let
callers keep using LeaseRead after Close; before clearing deregisterLeaseCb call
c.lease.invalidate() (guarded by nil check) to drop the lease, then proceed to
call and nil out c.deregisterLeaseCb and return; update the Coordinate.Close
method to explicitly invalidate c.lease so teardown always revokes the lease.

150-168: Reorder parameters to follow Go's context-first convention.

The function signatures should have context.Context as the first parameter, matching the Coordinator interface methods above and the broader codebase pattern. This avoids lint issues (revive's context-as-argument, gocritic) and follows Go best practices.

♻️ Proposed signature change
-func LeaseReadThrough(c Coordinator, ctx context.Context) (uint64, error) {
+func LeaseReadThrough(ctx context.Context, c Coordinator) (uint64, error) {
 	if lr, ok := c.(LeaseReadableCoordinator); ok {
 		idx, err := lr.LeaseRead(ctx)
 		return idx, errors.WithStack(err)
 	}
 	idx, err := c.LinearizableRead(ctx)
 	return idx, errors.WithStack(err)
 }

-func LeaseReadForKeyThrough(c Coordinator, ctx context.Context, key []byte) (uint64, error) {
+func LeaseReadForKeyThrough(ctx context.Context, c Coordinator, key []byte) (uint64, error) {
 	if lr, ok := c.(LeaseReadableCoordinator); ok {
 		idx, err := lr.LeaseReadForKey(ctx, key)
 		return idx, errors.WithStack(err)
 	}
 	idx, err := c.LinearizableRead(ctx)
 	return idx, errors.WithStack(err)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@kv/coordinator.go` around lines 150 - 168, Change LeaseReadThrough and
LeaseReadForKeyThrough to accept context.Context as the first parameter (match
Coordinator methods): swap parameter order from (c Coordinator, ctx
context.Context) to (ctx context.Context, c Coordinator) and for
LeaseReadForKeyThrough also (ctx context.Context, c Coordinator, key []byte).
Update all internal calls to use the reordered parameters (e.g., call
lr.LeaseRead(ctx), lr.LeaseReadForKey(ctx, key), c.LinearizableRead(ctx)) and
update any callers of these two functions throughout the codebase to pass ctx
first then the Coordinator (and key last) to satisfy Go's context-first
convention and lint rules.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapter/dynamodb.go`:
- Around line 1402-1421: The read snapshot (readTS) must be pinned before doing
local MVCC reads to prevent GC from reclaiming versions between lease-check and
read; after calling resolveGetItemTarget (finalSchema, freshItemKey, ok) and
before invoking readLogicalItemAt, call d.pinReadTS(r.Context(), readTS), check
for error and return an appropriate error response if pinning fails, and ensure
you unpin (defer the corresponding unpin call) once the read completes. Update
the GetItem flow so pinReadTS is used around the readLogicalItemAt call to
stabilize the snapshot.

---

Nitpick comments:
In `@kv/coordinator.go`:
- Around line 76-85: Coordinate.Close currently only deregisters the leader-loss
callback but leaves c.lease valid, which can let callers keep using LeaseRead
after Close; before clearing deregisterLeaseCb call c.lease.invalidate()
(guarded by nil check) to drop the lease, then proceed to call and nil out
c.deregisterLeaseCb and return; update the Coordinate.Close method to explicitly
invalidate c.lease so teardown always revokes the lease.
- Around line 150-168: Change LeaseReadThrough and LeaseReadForKeyThrough to
accept context.Context as the first parameter (match Coordinator methods): swap
parameter order from (c Coordinator, ctx context.Context) to (ctx
context.Context, c Coordinator) and for LeaseReadForKeyThrough also (ctx
context.Context, c Coordinator, key []byte). Update all internal calls to use
the reordered parameters (e.g., call lr.LeaseRead(ctx), lr.LeaseReadForKey(ctx,
key), c.LinearizableRead(ctx)) and update any callers of these two functions
throughout the codebase to pass ctx first then the Coordinator (and key last) to
satisfy Go's context-first convention and lint rules.

In `@kv/sharded_coordinator.go`:
- Around line 161-172: The Close method on ShardedCoordinator currently reads
and clears deregisterLeaseCbs without synchronization, causing races and
possible double invocation; make Close idempotent and concurrency-safe by
protecting the shutdown with a sync.Once (or a mutex) inside the
ShardedCoordinator type and have Close call that once-guarded cleanup that swaps
out/reads deregisterLeaseCbs and invokes each callback exactly once; update any
late-registration paths to check the same once/closed flag (or mutex-protected
state) so registrations after Close are rejected or immediately deregistered.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 11a20076-4415-4c6e-9012-6b34da625348

📥 Commits

Reviewing files that changed from the base of the PR and between 17455da and b70f167.

📒 Files selected for processing (10)
  • adapter/dynamodb.go
  • adapter/dynamodb_test.go
  • adapter/redis.go
  • adapter/redis_lua_context.go
  • internal/raftengine/engine.go
  • internal/raftengine/etcd/engine.go
  • internal/raftengine/etcd/leader_loss_callback_test.go
  • kv/coordinator.go
  • kv/lease_read_test.go
  • kv/sharded_coordinator.go
✅ Files skipped from review due to trivial changes (2)
  • internal/raftengine/engine.go
  • kv/lease_read_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • internal/raftengine/etcd/leader_loss_callback_test.go
  • adapter/dynamodb_test.go
  • adapter/redis.go
  • internal/raftengine/etcd/engine.go

Comment thread adapter/dynamodb.go
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a leader-local lease read fast path across the KV coordinator layer and the etcd Raft engine, reducing the steady-state cost of read fencing by amortizing quorum confirmation across a bounded lease window.

Changes:

  • Add a lock-free leaseState and lease-aware read APIs (LeaseRead / LeaseReadForKey) for Coordinate and ShardedCoordinator, with per-shard isolation and leader-loss invalidation.
  • Extend the etcd raft engine with an optional raftengine.LeaseProvider capability (lease duration, applied index, leader-loss callbacks) and a lock-free applied-index mirror.
  • Switch high-traffic adapter read paths (Redis Lua entry, Redis GET, DynamoDB GetItem) to use lease-aware reads; add targeted unit tests for lease behavior and callback wiring.

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
kv/lease_state.go Introduces lock-free lease expiry + generation guard to prevent stale lease resurrection.
kv/lease_state_test.go Unit tests for lease validity, monotonic extend, invalidate behavior, and concurrency/race coverage.
kv/coordinator.go Adds leader-loss callback registration + Close(), lease refresh on successful dispatch, and LeaseRead* APIs with safe fallbacks.
kv/sharded_coordinator.go Adds per-shard leaseState, wraps shard Txn to refresh lease on real commits, registers per-shard leader-loss callbacks, and adds sharded LeaseRead*.
kv/lease_read_test.go Tests Coordinate.LeaseRead fast/slow paths, fallbacks, amortization, and callback deregistration.
kv/sharded_lease_test.go Tests per-shard lease isolation, error invalidation scoping, commit-index gating, Close forwarding, and per-shard leader-loss callbacks.
kv/leader_routed_store_test.go Updates coordinator stub to support lease-read methods for new adapter call sites.
internal/raftengine/engine.go Adds optional LeaseProvider interface for engines that support leader-local lease reads.
internal/raftengine/etcd/engine.go Implements lease duration computation, lock-free AppliedIndex, leader-loss callback registration/invocation, and fires callbacks on leader->non-leader + shutdown paths.
internal/raftengine/etcd/leader_loss_callback_test.go Tests panic containment in callbacks and lock-free AppliedIndex behavior (including nil receiver).
adapter/redis_lua_context.go Switches Lua script context initialization from per-script LinearizableRead to lease-aware LeaseReadThrough.
adapter/redis.go Adds server base context + cancellation on stop, routes handler contexts through it, and adds lease-check to GET with bounded timeout.
adapter/redis_retry_test.go Updates coordinator stub to support lease-read methods.
adapter/redis_keys_pattern_test.go Updates coordinator stub to support lease-read methods.
adapter/redis_info_test.go Updates coordinator stub to support lease-read methods.
adapter/dynamodb.go Adds bounded lease-check for GetItem routed by item key; refactors input parsing and re-validates routing after lease confirmation.
adapter/dynamodb_test.go Updates coordinator wrapper to use lease-through helpers.
adapter/distribution_server_test.go Updates coordinator stub to support lease-read methods.
adapter/s3_test.go Updates coordinator stub to support lease-read methods.
cmd/server/demo.go Ensures coordinator deregisters leader-loss callback via deferred Close().
docs/lease_read_design.md Adds the design document describing rationale, safety argument, APIs, and rollout plan.

Comment thread internal/raftengine/etcd/engine.go Outdated
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a lease-based read mechanism to optimize performance by reducing Raft quorum overhead. It introduces a LeaseProvider interface in the raft engine and integrates lease management into the Coordinator and ShardedCoordinator layers, with updates to Redis and DynamoDB adapters to utilize these leases. Feedback suggests optimizing the schema re-resolution logic in the DynamoDB getItem handler by using schema generations to detect routing changes.

Comment thread adapter/dynamodb.go
// even if the request parameters are unchanged, so comparing the
// computed item keys (not just generation) catches any future
// schema change that alters item routing.
finalSchema, freshItemKey, ok := d.resolveGetItemTarget(w, r, in, readTS)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The schema and item key are re-resolved here to detect routing changes during the lease check. While safe, resolveGetItemTarget involves loading the table schema, which may be expensive if not perfectly cached. Since schemas change rarely, consider if a simpler check (e.g., comparing schema generations) would be sufficient to detect shifts in item routing.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 20, 2026

05a27e04 で最新レビュー round の actionable 2 件を fix:

  • [CodeRabbit Major] DynamoDB getItempinReadTS(readTS) + defer readPin.Release() を追加。query/scan/transactGet と同じ pattern。schema 再解決と item 読み取りの間で MVCC GC が version を reclaim する race を closed。
  • [gemini MED] leaderLossCbsMu struct comment を sync 実装に合わせて修正 (b70f167 の revert 後に残ってた stale コメント)。

その他の再掲: heap alloc (#554 既存), async goroutine (b70f167 で sync に revert 済)。

…LossCbs

Two findings from the latest review pass.

- [CodeRabbit Major] adapter/dynamodb.go getItem now calls
  pinReadTS(readTS) / defer Release so concurrent MVCC GC cannot
  reclaim versions between the schema revalidation and the item
  read. queryItems / scanItems / transactGetItems already pin; this
  brings getItem in line after the pre/post-lease readTS split made
  the window observable.
- [gemini MED] internal/raftengine/etcd/engine.go leaderLossCbsMu
  field comment still described asynchronous firing; updated to
  match the now-synchronous implementation (matches
  RegisterLeaderLossCallback's docstring and the engine.go interface
  comment).

Remaining comments (heap alloc, async goroutine) reference
already-reverted or already-tracked state.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 20, 2026

/gemini review

@bootjp bootjp enabled auto-merge April 20, 2026 12:40
@bootjp bootjp merged commit 76894ae into main Apr 20, 2026
8 checks passed
@bootjp bootjp deleted the feat/lease-read branch April 20, 2026 12:41
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a lease-based read mechanism to improve performance and consistency across the DynamoDB and Redis adapters. It implements a new LeaseProvider interface in the raft engine, a lock-free leaseState tracker in the KV layer, and updates the coordinators to amortize LinearizableRead costs. The Redis server was also updated to manage per-request contexts via a cancelable base context. Feedback suggests using a sync.WaitGroup in the Redis server's Stop method to ensure in-flight handlers complete and ensuring that errors from Close calls are logged to prevent silent resource leaks.

Comment thread adapter/redis.go
Comment on lines 849 to 867
func (r *RedisServer) Stop() {
_ = r.relayConnCache.Close()
_ = r.listen.Close()
// Cancel baseCtx first so in-flight handlers observe a cancelled
// context before their network connections are torn down.
_ = r.Close()
if err := r.relayConnCache.Close(); err != nil {
slog.Warn("redis server: relay conn cache close",
slog.String("addr", r.redisAddr),
slog.Any("err", err),
)
}
if r.listen != nil {
if err := r.listen.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
slog.Warn("redis server: listener close",
slog.String("addr", r.redisAddr),
slog.Any("err", err),
)
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Stop method initiates shutdown by canceling the base context and closing the listener, but it does not wait for in-flight request handlers to complete. Consider using a sync.WaitGroup to ensure all handlers finish. Additionally, ensure that any error returned by listen.Close() is logged rather than silently ignored, as per repository standards for resource cleanup.

References
  1. Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.

Comment thread kv/sharded_coordinator.go
Comment on lines +87 to 102
// Close forwards to the wrapped Transactional if it implements
// io.Closer. ShardStore.closeGroup relies on the type assertion
// `g.Txn.(io.Closer)` to release per-shard resources (e.g. the gRPC
// connection cached by LeaderProxy). Without this pass-through, the
// wrapping would silently swallow the Closer capability and leak
// connections / goroutines at shutdown.
func (t *leaseRefreshingTxn) Close() error {
closer, ok := t.inner.(io.Closer)
if !ok {
return nil
}
if err := closer.Close(); err != nil {
return errors.WithStack(err)
}
return nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The leaseRefreshingTxn wrapper correctly ensures that Close is called on the underlying Transactional if it implements io.Closer. To fully adhere to the repository's safety rules, ensure that any error returned by this Close call is explicitly logged to prevent silent resource leaks or cleanup failures, especially since the underlying proxy may hold network connections.

References
  1. Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.

bootjp added a commit that referenced this pull request Apr 20, 2026
## 背景: 本番で lease fast path が効いていない

PR #549 をマージ後、本番クラスタのメトリクス:
- `EVALSHA` avg **6.3 s/op**
- `redis.call()` avg **6.25 s/call**
- `GET` avg **1.11 s/op**

すべて LinearizableRead (heartbeat 往復) を経由しており、lease fast path に到達していない。

## 原因

`refreshLeaseAfterDispatch` と `leaseRefreshingTxn.Commit/Abort` が **任意の
err を leadership-loss とみなして lease を invalidate** していた。本番の Lua retry ループで
write-conflict が頻発しており、その都度 lease が invalidate される。次の LeaseRead は slow
path に落ち、heartbeat 往復で 1 秒以上、その後の write で再び invalidate、の悪循環。

## Fix

`isLeadershipLossError(err)` ヘルパーを追加し、**真の leadership loss のみ**
invalidate:
- hashicorp `raft.ErrNotLeader` / `raft.ErrLeadershipLost` /
`raft.ErrLeadershipTransferInProgress`
- etcd engine の "not leader" / "leadership transfer" / "leadership lost"
sentinel (cockroachdb/errors が errors.Is を traverse しないことがあるので substring
match)

真の leadership loss は `RegisterLeaderLossCallback` でカバー済、lease fast path
も `engine.State() == StateLeader`
でガード済なので、**この変更で安全性は低下せず**、write-conflict 嵐での擬似 invalidation だけ防げる。

## Test plan

- [x] `go test -race ./kv/...` パス
- [ ] デプロイ後、`elastickv_redis_request_duration_seconds` の GET avg が ms
オーダーに下がることを確認
- [ ] `elastickv_lua_redis_call_duration_seconds` の avg が script あたり少数回の
LinearizableRead 相当まで下がることを確認


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Bug Fixes**
* Refined lease invalidation behavior to only trigger on actual
leadership loss errors, rather than invalidating on all dispatch
failures. This improves system reliability when handling transient
errors.

* **Tests**
  * Added comprehensive test coverage for leadership error detection.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants