Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
eb7b34c
docs: lease read design
bootjp Apr 19, 2026
61feb0e
docs(lease-read): bump safety margin to 300ms
bootjp Apr 19, 2026
55d092e
feat(lease-read): add LeaseRead to coordinator and engine surface
bootjp Apr 19, 2026
0d299fb
feat(lease-read): use LeaseRead in Lua scripts and refresh on Dispatch
bootjp Apr 19, 2026
fde9540
feat(lease-read): wrap Redis GET and DynamoDB getItem in LeaseRead
bootjp Apr 19, 2026
2a70b72
test(lease-read): unit tests for leaseState
bootjp Apr 19, 2026
b9ac2b8
style(lease-read): linter formatting on lease_state_test.go
bootjp Apr 19, 2026
f28bf0d
fix(lease-read): pre-Propose time capture and monotonic extend
bootjp Apr 19, 2026
f86c689
fix(lease-read): invalidate lease on leader-loss
bootjp Apr 19, 2026
9994bbb
fix(lease-read): per-shard lease refresh on Dispatch via leaseRefresh…
bootjp Apr 19, 2026
5b3ab7c
test(lease-read): coordinator LeaseRead and leader-loss invalidation
bootjp Apr 19, 2026
4f24db6
fix(lease-read): address external review (Copilot/CodeRabbit)
bootjp Apr 19, 2026
6d30563
test(lease-read): amortization end-to-end + design doc closure
bootjp Apr 19, 2026
9c15a89
fix(lease-read): plug invalidate/extend race + fire callbacks on shut…
bootjp Apr 19, 2026
22448db
fix(lease-read): Copilot 2nd-pass nits on PR #549
bootjp Apr 20, 2026
33eef30
docs(lease-read): clarify RegisterLeaderLossCallback does NOT fire on…
bootjp Apr 20, 2026
8d3511f
fix(lease-read): panic-protect leader-loss callbacks + sync.Mutex in …
bootjp Apr 20, 2026
76a93d7
fix(lease-read): only refresh lease when Raft commit actually happened
bootjp Apr 20, 2026
f83fed5
fix(lease-read): fire callbacks from fail(), log panic recoveries,
bootjp Apr 20, 2026
571cd93
fix(lease-read): leaseRefreshingTxn forwards Close + unify GET context
bootjp Apr 20, 2026
f60d744
style(lease-read): Copilot nits on test busy-loop + comment accuracy
bootjp Apr 20, 2026
90f78f0
docs(lease-read): clarify pre-dispatch sampling is strictly conservative
bootjp Apr 20, 2026
b82ba32
fix(lease-read): sample generation BEFORE quorum op (Critical race)
bootjp Apr 20, 2026
f3ba1b1
fix(lease-read): deregister API + idempotent Txn wrapping
bootjp Apr 20, 2026
83b4323
fix(lease-read): short-circuit when LeaseDuration <= 0
bootjp Apr 20, 2026
5238eea
fix(lease-read): plug engine callback leak via Coordinate.Close
bootjp Apr 20, 2026
b630541
fix(lease-read): correct shard for DynamoDB item lease + redis base ctx
bootjp Apr 20, 2026
2c33582
docs(lease-read): sync API + pseudocode with final signatures
bootjp Apr 20, 2026
c2185d1
fix(lease-read): root all Redis handler timeouts in server baseCtx
bootjp Apr 20, 2026
17455da
fix(lease-read): round 13 review pass
bootjp Apr 20, 2026
0f8c28f
style(dynamodb): fix gci alignment of dynamo const block
bootjp Apr 20, 2026
cc9a27d
fix(lease-read): DynamoDB schema-gen revalidation + async callback doc
bootjp Apr 20, 2026
b94ee7e
fix(lease-read): fast path also checks engine leader state
bootjp Apr 20, 2026
d9ca3f6
Potential fix for pull request finding
bootjp Apr 20, 2026
8afa20b
Potential fix for pull request finding
bootjp Apr 20, 2026
1343d19
fix(dynamodb): compare itemKey bytes (not just generation) after lease
bootjp Apr 20, 2026
93377bf
fix(lease-read): zero tail before truncating leaderLossCbs slice
bootjp Apr 20, 2026
5c5a820
fix(dynamodb): defer leaseCancel for panic-safety
bootjp Apr 20, 2026
f9265bf
refactor(lease-read): make lease read an optional Coordinator capability
bootjp Apr 20, 2026
9cea2a8
perf(lease-read): make AppliedIndex lock-free via atomic.Uint64
bootjp Apr 20, 2026
b70f167
revert(lease-read): fire leader-loss callbacks synchronously
bootjp Apr 20, 2026
d1d255f
Potential fix for pull request finding
bootjp Apr 20, 2026
c087a7c
fix(lease-read): pin readTS in DynamoDB getItem + sync doc for leader…
bootjp Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions adapter/distribution_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,11 @@ func (s *distributionCoordinatorStub) Clock() *kv.HLC {
func (s *distributionCoordinatorStub) LinearizableRead(_ context.Context) (uint64, error) {
return 0, nil
}

func (s *distributionCoordinatorStub) LeaseRead(ctx context.Context) (uint64, error) {
return s.LinearizableRead(ctx)
}

func (s *distributionCoordinatorStub) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error) {
return s.LinearizableRead(ctx)
}
106 changes: 90 additions & 16 deletions adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,17 @@ const (
transactRetryMaxBackoff = 10 * time.Millisecond
transactRetryBackoffFactor = 2
tableCleanupAsyncTimeout = 5 * time.Minute
itemUpdateLockStripeCount = 256
tableLockStripeCount = 128
batchWriteItemMaxItems = 25
transactGetItemsMaxItems = 100
dynamoMaxRequestBodyBytes = 1 << 20
// dynamoLeaseReadTimeout bounds how long LeaseReadForKey's slow
// path (LinearizableRead) may block before returning an error to
// the HTTP client. Matches the order of magnitude of Redis's
// redisDispatchTimeout so both adapters give up at similar
// wall-clock budgets on quorum loss.
dynamoLeaseReadTimeout = 5 * time.Second
itemUpdateLockStripeCount = 256
tableLockStripeCount = 128
batchWriteItemMaxItems = 25
transactGetItemsMaxItems = 100
dynamoMaxRequestBodyBytes = 1 << 20

dynamoTableMetaPrefix = kv.DynamoTableMetaPrefix
dynamoTableGenerationPrefix = kv.DynamoTableGenerationPrefix
Expand Down Expand Up @@ -1340,38 +1346,85 @@ func (d *DynamoDBServer) commitItemWrite(ctx context.Context, req *kv.OperationG
return nil
}

func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) {
func (d *DynamoDBServer) parseGetItemInput(w http.ResponseWriter, r *http.Request) (getItemInput, bool) {
body, err := io.ReadAll(maxDynamoBodyReader(w, r))
if err != nil {
writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error())
return
return getItemInput{}, false
}
var in getItemInput
if err := json.Unmarshal(body, &in); err != nil {
writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error())
return
return getItemInput{}, false
}
if strings.TrimSpace(in.TableName) == "" {
writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, "missing table name")
return
return getItemInput{}, false
}
if err := d.ensureLegacyTableMigration(r.Context(), in.TableName); err != nil {
writeDynamoErrorFromErr(w, err)
return
return getItemInput{}, false
}
return in, true
}

readTS := d.resolveDynamoReadTS(in.ConsistentRead)
schema, exists, err := d.loadTableSchemaAt(r.Context(), in.TableName, readTS)
if err != nil {
func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) {
in, ok := d.parseGetItemInput(w, r)
if !ok {
return
}
// Tentative TS for schema resolution only; schemas change rarely
// so a slight pre-lease stale is acceptable. The item read below
// is sampled AFTER the lease check.
tentativeTS := d.resolveDynamoReadTS(in.ConsistentRead)
_, itemKey, ok := d.resolveGetItemTarget(w, r, in, tentativeTS)
if !ok {
return
}
// Lease-check the shard that actually owns the ITEM key with a
// bounded timeout so a stalled Raft cannot hang this handler
// indefinitely if the client never cancels. Use defer so the
// cancel runs even if LeaseReadForKey panics or a future
// refactor inserts an early return; the cost of keeping ctx
// alive until handler exit is negligible because the next
// in-handler calls are local store reads.
leaseCtx, leaseCancel := context.WithTimeout(r.Context(), dynamoLeaseReadTimeout)
defer leaseCancel()
if _, err := kv.LeaseReadForKeyThrough(d.coordinator, leaseCtx, itemKey); err != nil {
writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error())
return
}
if !exists {
writeDynamoError(w, http.StatusBadRequest, dynamoErrResourceNotFound, "table not found")
// Re-sample readTS AFTER the lease confirmation so that any write
// that completed on the same shard BEFORE the confirmation is
// visible. Sampling earlier would violate linearizability for
// ConsistentRead=false reads by returning a snapshot from before
// the most recent confirmed commit.
readTS := d.resolveDynamoReadTS(in.ConsistentRead)
// Pin readTS so concurrent MVCC GC cannot reclaim versions
// between the schema revalidation and the item read below;
// matches the pattern already used by queryItems / scanItems /
// transactGetItems.
readPin := d.pinReadTS(readTS)
defer readPin.Release()

// Re-resolve schema + itemKey at readTS and verify that the key
// we lease-checked is STILL the key that will be read. A table
// migration that commits between the tentative schema load and
// the lease confirmation may shift the item to a different shard
// even if the request parameters are unchanged, so comparing the
// computed item keys (not just generation) catches any future
// schema change that alters item routing.
finalSchema, freshItemKey, ok := d.resolveGetItemTarget(w, r, in, readTS)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The schema and item key are re-resolved here to detect routing changes during the lease check. While safe, resolveGetItemTarget involves loading the table schema, which may be expensive if not perfectly cached. Since schemas change rarely, consider if a simpler check (e.g., comparing schema generations) would be sufficient to detect shifts in item routing.

if !ok {
return
}
if !bytes.Equal(freshItemKey, itemKey) {
writeDynamoError(w, http.StatusServiceUnavailable, dynamoErrInternal,
"table routing changed during read; please retry")
return
}

current, found, err := d.readLogicalItemAt(r.Context(), schema, in.Key, readTS)
current, found, err := d.readLogicalItemAt(r.Context(), finalSchema, in.Key, readTS)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if err != nil {
writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error())
return
Expand All @@ -1389,6 +1442,27 @@ func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) {
writeDynamoJSON(w, map[string]any{"Item": projected})
}

// resolveGetItemTarget loads the schema and computes the item key whose
// shard must be lease-checked before the read. Returns false after
// writing an error response; the caller should simply return.
func (d *DynamoDBServer) resolveGetItemTarget(w http.ResponseWriter, r *http.Request, in getItemInput, readTS uint64) (*dynamoTableSchema, []byte, bool) {
schema, exists, err := d.loadTableSchemaAt(r.Context(), in.TableName, readTS)
if err != nil {
writeDynamoError(w, http.StatusInternalServerError, dynamoErrInternal, err.Error())
return nil, nil, false
}
if !exists {
writeDynamoError(w, http.StatusBadRequest, dynamoErrResourceNotFound, "table not found")
return nil, nil, false
}
itemKey, err := schema.itemKeyFromAttributes(in.Key)
if err != nil {
writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error())
return nil, nil, false
}
return schema, itemKey, true
}

func (d *DynamoDBServer) deleteItem(w http.ResponseWriter, r *http.Request) {
in, shouldReturnOld, err := decodeDeleteItemInput(maxDynamoBodyReader(w, r))
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions adapter/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1859,3 +1859,11 @@ func (w *testCoordinatorWrapper) Clock() *kv.HLC {
func (w *testCoordinatorWrapper) LinearizableRead(ctx context.Context) (uint64, error) {
return w.inner.LinearizableRead(ctx)
}

func (w *testCoordinatorWrapper) LeaseRead(ctx context.Context) (uint64, error) {
return kv.LeaseReadThrough(w.inner, ctx)
}

func (w *testCoordinatorWrapper) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error) {
return kv.LeaseReadForKeyThrough(w.inner, ctx, key)
}
Loading
Loading