Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c5525b6
docs(design): proposed txn secondary-commit idempotency (Jepsen run 2…
bootjp May 21, 2026
22753ab
Merge branch 'main' into docs/txn-idempotency-design
bootjp May 21, 2026
589f6b8
docs(design): close open question — trigger is leadership churn (run …
bootjp May 25, 2026
aa41e9f
docs(design): switch to CheckTxnStatus probe, demote UUID marker to r…
bootjp May 26, 2026
561b053
docs(design): make option 2 (write-set reuse + exact-ts dedup) primar…
bootjp May 26, 2026
8f2b800
docs(design): close synchronous-dispatch linchpin — verified against …
bootjp May 26, 2026
c1f2823
feat(store): add CommittedVersionAt exact-ts MVCC probe (txn idempote…
bootjp May 26, 2026
1b7d9cb
feat(kv): wire one-phase idempotency dedup probe (txn idempotency M2b)
bootjp May 26, 2026
d3e9705
feat(redis): write-set-reuse list-push retry + R1 reconstruction (txn…
bootjp May 26, 2026
fd0831c
fix(kv): thread PrevCommitTS through legacy Coordinate path + lint fi…
bootjp May 28, 2026
65daa3a
fix(redis): boundary-position ReadKeys fence reuse against intervenin…
bootjp May 28, 2026
7781725
fix(redis): wire ELASTICKV_REDIS_ONEPHASE_DEDUP env + gate pending-sa…
bootjp May 28, 2026
624579c
fix(redis): re-read meta on reuse-apply to capture intervening non-co…
bootjp May 28, 2026
e79c44d
fix(redis): extract reuse-dispatch + length-resolve helpers to satisf…
bootjp May 28, 2026
8dec9f6
fix(kv): leader-fence the CommittedVersionAt wrapper paths (#796 code…
bootjp May 28, 2026
bc11779
fix(kv): LinearizableRead-fence CommittedVersionAt on non-leader path…
bootjp May 28, 2026
b469510
fix(kv): restructure non-leader CommittedVersionAt to avoid nilerr fa…
bootjp May 28, 2026
71430f2
fix(txn-dedup): address codex P1/P2 review on PR #796
bootjp May 29, 2026
04876c0
fix(txn-dedup): address codex P1/P2 round-11 + reviewdog lint
bootjp May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 293 additions & 5 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,31 @@ type RedisServer struct {
// redis_key_waiters.go.
zsetWaiters *keyWaiterRegistry

// onePhaseTxnDedup enables option-2 one-phase idempotency: on a
// retryable write error, list-push retries reuse the failed attempt's
// write set and carry prev_commit_ts so the FSM can dedup a commit that
// landed under leadership churn (see
// docs/design/2026_05_21_proposed_txn_secondary_idempotency.md). It
// MUST stay off until every node runs a probe-aware binary — see R5
// (FSM determinism across a rolling upgrade). Default off; enabled via
// WithOnePhaseTxnDedup / the ELASTICKV_REDIS_ONEPHASE_DEDUP env var
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Wire the advertised env flag into server construction

This says operators can enable the feature with ELASTICKV_REDIS_ONEPHASE_DEDUP, but a repo-wide search shows that string only appears in docs/comments, and NewRedisServer only reads ELASTICKV_REDIS_TRACE; cmd/server/demo.go also never passes WithOnePhaseTxnDedup. In the production server path, setting the documented env var therefore has no effect and the feature remains disabled after rollout.

Useful? React with 👍 / 👎.

// after a full rollout.
onePhaseTxnDedup bool

route map[string]func(conn redcon.Conn, cmd redcon.Command)
}

type RedisServerOption func(*RedisServer)

// WithOnePhaseTxnDedup enables the option-2 one-phase idempotency dedup on
// list-push retries (see RedisServer.onePhaseTxnDedup). Off by default;
// enable only after the whole cluster runs a probe-aware binary.
func WithOnePhaseTxnDedup(enabled bool) RedisServerOption {
return func(r *RedisServer) {
r.onePhaseTxnDedup = enabled
}
}

func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisServerOption {
return func(r *RedisServer) {
r.readTracker = tracker
Expand Down Expand Up @@ -475,10 +495,15 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
// getLuaPool, which honors luaPoolMaxIdle the same way.
luaPool: nil,
traceCommands: os.Getenv("ELASTICKV_REDIS_TRACE") == "1",
baseCtx: baseCtx,
baseCancel: baseCancel,
streamWaiters: newKeyWaiterRegistry(),
zsetWaiters: newKeyWaiterRegistry(),
// onePhaseTxnDedup honors the documented opt-in env var; the
// WithOnePhaseTxnDedup option below can still override either way.
// Default off — see R5 in the design doc (the writer must not be
// enabled until the whole cluster runs a probe-aware binary).
onePhaseTxnDedup: os.Getenv("ELASTICKV_REDIS_ONEPHASE_DEDUP") == "1",
baseCtx: baseCtx,
baseCancel: baseCancel,
streamWaiters: newKeyWaiterRegistry(),
zsetWaiters: newKeyWaiterRegistry(),
}
r.relay.Bind(r.publishLocal)

Expand Down Expand Up @@ -3141,8 +3166,13 @@ type listPushBuildFn func(meta store.ListMeta, key []byte, values [][]byte, comm

// listPushCore is the shared retry loop for RPUSH and LPUSH. The caller supplies
// a buildFn that assembles the specific operations (RPUSH appends to tail, LPUSH
// prepends to head).
// prepends to head). When onePhaseTxnDedup is enabled it uses the write-set-reuse
// retry path (option 2); otherwise it keeps the original recompute-on-retry loop.
func (r *RedisServer) listPushCore(ctx context.Context, key []byte, values [][]byte, buildFn listPushBuildFn) (int64, error) {
if r.onePhaseTxnDedup {
return r.listPushCoreWithDedup(ctx, key, values, buildFn)
}

var newLen int64
err := r.retryRedisWrite(ctx, func() error {
readTS := r.readTS()
Expand Down Expand Up @@ -3178,6 +3208,264 @@ func (r *RedisServer) listPushCore(ctx context.Context, key []byte, values [][]b
return newLen, err
}

// reusableListPush captures a dispatched list-push attempt so a subsequent
// retry can reuse its exact write set (same seq, same item/delta keys) and
// probe whether it already landed, instead of recomputing seq from a fresh
// meta read. Recomputing is what duplicates the element under leadership
// churn: attempt 1 commits at T1 but returns an ambiguous error, the retry
// reads the now-larger list and appends at a NEW seq. Reuse + the FSM's
// exact-ts dedup probe close that. See option 2 in
// docs/design/2026_05_21_proposed_txn_secondary_idempotency.md.
type reusableListPush struct {
ops []*kv.Elem[kv.OP]
startTS uint64
// commitTS is the most recent dispatched commit_ts for this write set;
// the next retry passes it as prev_commit_ts so the FSM probes exactly
// the attempt that might have landed.
commitTS uint64
// length is the client-visible post-push length. It is invariant across
// reuse — the write set was built once from attempt 1's meta — so it is
// also the correct value to return when the FSM dedup no-ops the apply
// (R1 result reconstruction: no store re-read needed).
length int64
// readKeys is the boundary read set captured at attempt 1's meta read:
// listItemKey(Head) and (when Len > 1) listItemKey(Tail-1). It is the
// load-bearing fence against the codex P1 scenario where an intervening
// pop/trim shrinks the list before the retry — without it, the reused
// seq would land past the new Tail and be unreachable to LRANGE. OCC
// validates these atomically against startTS at FSM apply, so any
// boundary-touching commit fires WriteConflict and the adapter drops
// pending → recomputes. Empty when attempt 1 read an empty list (no
// boundary to fence; the OCC on the write key suffices for that case).
readKeys [][]byte
}

// dispatchListPushReuse runs one iteration of the option-2 reuse path:
// dispatches the captured write set under a fresh commit_ts (carrying
// pending.commitTS as PrevCommitTS so the FSM probes whether the prior
// attempt landed) and returns the post-push length on success. The drop
// return signals the caller to clear pending — set on a genuine
// WriteConflict from another txn so the next iteration recomputes from
// fresh meta. Extracted from listPushCoreWithDedup to keep that closure
// under the cyclop / gocognit / nestif limits.
func (r *RedisServer) dispatchListPushReuse(ctx context.Context, key []byte, pending *reusableListPush) (newLen int64, drop bool, err error) {
commitTS := r.coordinator.Clock().Next()
_, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: pending.startTS,
CommitTS: commitTS,
PrevCommitTS: pending.commitTS,
ReadKeys: pending.readKeys,
Elems: pending.ops,
})
if dispErr == nil {
return r.resolveReuseLength(ctx, key, pending), false, nil
}
if errors.Is(dispErr, store.ErrWriteConflict) {
// Self-inflicted-conflict guard (codex P1): the apply might have
// landed at this fresh commitTS but bubbled up as WriteConflict due
// to leadership churn (the original bug class the doc's "Resolved"
// section identifies). Without this probe, dropping pending here
// would recompute and append a second copy. Ask the store: did
// our just-attempted commit_ts land? If yes, this conflict is
// against our own commit — return success and keep pending pointing
// at THIS commit_ts so any subsequent retry probes the right point.
//
// Length resolution (codex P2 round-11): pending.length was computed
// during the prior attempt and is stale w.r.t. any non-conflicting
// list-modifying writes that landed between attempt 1 and this fresh
// apply. Probing pending.commitTS would hit for the fresh apply and
// (under the old resolveReuseLength shortcut) silently return the
// prior-attempt length — understating the count. Always re-read meta
// in the self-conflict path. resolveListMeta failure falls back to
// pending.length to honor codex P2 round-10 ("avoid failing after a
// reuse apply").
if probeKey := firstWriteKey(pending.ops); len(probeKey) > 0 {
landed, perr := r.store.CommittedVersionAt(ctx, probeKey, commitTS)
if perr == nil && landed {
pending.commitTS = commitTS
return r.resolveLengthAfterFreshApply(ctx, key, pending), false, nil
}
}
// Our attempt did not land at commitTS and the target seq is taken
// by another txn — a genuine conflict. Drop pending; the next
// iteration recomputes from a fresh meta read.
return 0, true, errors.WithStack(dispErr)
Comment on lines +3264 to +3293
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve reuse commit when conflict may be self-inflicted

When a reuse dispatch applies at the fresh commitTS but returns an ambiguous/transient error before the caller observes success, a coordinator retry of the same request can surface ErrWriteConflict because the reused write set now conflicts with its own just-committed version while PrevCommitTS still points at the older attempt. Treating every reuse write conflict as a genuine third-party conflict drops pending and recomputes, so this leadership-churn window can append the same logical push again instead of probing the reuse attempt's commitTS on the next loop.

Useful? React with 👍 / 👎.

}
// Still ambiguous (lock / other retryable): this reuse may itself
// have landed, so the next retry must probe THIS commit_ts. Only
// advance pending.commitTS if retryRedisWrite will actually loop
// (non-retryable errors escape to the client; pending is then
// discarded with the goroutine, so the update is wasted and the
// stale value would be misleading if some future caller reads it).
if isRetryableRedisTxnErr(dispErr) {
pending.commitTS = commitTS
}
return 0, false, errors.WithStack(dispErr)
}

// resolveReuseLength returns the client-visible post-push length after a
// successful reuse dispatch. If our prior attempt's exact commit_ts
// version exists, the FSM no-op'd (probe hit) and pending.length is the
// correct length we computed at that attempt. Otherwise the FSM applied
// the reused write set at a fresh commit_ts and we must re-read meta to
// capture any non-conflicting list-modifying writes that committed
// between attempts (codex P2) — without this, the return value would
// silently understate the count when the boundary OCC fence and
// write-key OCC both pass but the list length changed.
//
// Failure modes are converted to a degraded return (pending.length) rather
// than surfaced as an error, because the dispatch already committed. Per
// codex P2 round-10 ("avoid failing after a reuse apply"), reporting a
// write error after the apply landed drives the client into a retry that
// has no pending state and would re-append the element — the very anomaly
// this feature prevents. Specifically:
// - probe error of any kind: prefer pending.length over failure.
// - resolveListMeta failure (e.g. delta scan over MaxDeltaScanLimit
// under churn): fall back to pending.length.
//
// Returns int64 directly (no error) so callers do not have to invent
// caller-side fallback logic; the degraded-return contract is fixed here
// (golangci unparam / nilerr fix on the prior error-returning shape).
func (r *RedisServer) resolveReuseLength(ctx context.Context, key []byte, pending *reusableListPush) int64 {
if probeKey := firstWriteKey(pending.ops); len(probeKey) > 0 {
hit, perr := r.store.CommittedVersionAt(ctx, probeKey, pending.commitTS)
if perr == nil && hit {
return pending.length
}
if perr != nil {
// Probe failed; the dispatch already committed so degrade
// gracefully rather than propagate the read error.
return pending.length
}
// perr == nil && !hit: prior attempt didn't land at this ts; the
// FSM applied fresh writes, fall through to re-read meta.
}
return r.resolveLengthAfterFreshApply(ctx, key, pending)
}

// resolveLengthAfterFreshApply re-reads list meta to capture the post-apply
// length when we know the fresh commitTS applied (no probe shortcut), with
// the same fall-back-to-pending.length contract as resolveReuseLength. Used
// by the self-conflict path (codex P2 round-11): there pending.length is
// stale w.r.t. intervening non-conflicting writes, so the probe-hit
// shortcut would silently understate the count.
func (r *RedisServer) resolveLengthAfterFreshApply(ctx context.Context, key []byte, pending *reusableListPush) int64 {
currentMeta, _, mErr := r.resolveListMeta(ctx, key, r.readTS())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid failing after a reuse apply

When the prior attempt did not land and the reused write set applies successfully, this post-commit resolveListMeta can still fail (for example, the list had exactly MaxDeltaScanLimit deltas at the first read and an intervening non-conflicting push pushes it over the truncation limit). That returns an error to the client after the append has already committed; a client retry starts a fresh command with no pending state and can append the same element again. The legacy path did not add a fallible metadata read after a successful dispatch, so the reuse path needs to avoid surfacing this as a failed write or preserve enough idempotency state for the retry.

Useful? React with 👍 / 👎.

if mErr != nil {
return pending.length
}
return currentMeta.Len
}

// firstWriteKey returns the first non-empty element key from ops, or nil
// when there is none. Used after a successful reuse dispatch to probe
// whether our prior attempt's commit_ts actually landed: attempt 1 writes
// all its elem keys atomically at the same commit_ts, so any one of them
// answers the question.
func firstWriteKey(ops []*kv.Elem[kv.OP]) []byte {
for _, e := range ops {
if e != nil && len(e.Key) > 0 {
return e.Key
}
}
return nil
}

// listPushBoundaryReadKeys returns the boundary positions of the list as
// read keys for OCC. Including these in the dispatched OperationGroup makes
// FSM apply atomically reject the retry when any pop/trim has touched the
// boundary between attempts (codex P1 fix: prevents a reused seq from
// landing past a shrunk Tail). The keys are deduped: a single-element list
// has Head == Tail-1, so we emit it once.
func listPushBoundaryReadKeys(key []byte, meta store.ListMeta) [][]byte {
if meta.Len <= 0 {
return nil
}
tailIdx := meta.Tail - 1
if tailIdx == meta.Head {
return [][]byte{listItemKey(key, meta.Head)}
}
return [][]byte{
listItemKey(key, meta.Head),
listItemKey(key, tailIdx),
}
}

// listPushCoreWithDedup is the option-2 retry loop. The first attempt computes
// the write set from the current meta; any retryable failure makes the next
// iteration REUSE that write set under a fresh commit_ts with prev_commit_ts
// set, so the FSM no-ops if the prior attempt already landed. A WriteConflict
// on a reuse attempt means the probe ruled out our own prior attempt and the
// seq is genuinely taken by another txn, so we fall back to a full recompute.
func (r *RedisServer) listPushCoreWithDedup(ctx context.Context, key []byte, values [][]byte, buildFn listPushBuildFn) (int64, error) {
var newLen int64
var pending *reusableListPush
err := r.retryRedisWrite(ctx, func() error {
if pending != nil {
length, drop, dispErr := r.dispatchListPushReuse(ctx, key, pending)
if drop {
pending = nil
}
if dispErr != nil {
return dispErr
}
newLen = length
return nil
}

readTS := r.readTS()
meta, _, err := r.resolveListMeta(ctx, key, readTS)
if err != nil {
return err
}

commitTS := r.coordinator.Clock().Next()
ops, updatedMeta, err := buildFn(meta, key, values, commitTS, 0)
if err != nil {
return err
}
if len(ops) == 0 {
newLen = updatedMeta.Len
return nil
}

startTS := normalizeStartTS(readTS)
boundaryReads := listPushBoundaryReadKeys(key, meta)
_, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
CommitTS: commitTS,
ReadKeys: boundaryReads,
Elems: ops,
})
if dispErr == nil {
newLen = updatedMeta.Len
return nil
}
// Only remember the attempt for reuse if retryRedisWrite will actually
// loop — i.e. the error is one of WriteConflict / TxnLocked. For
// errors that escape the loop (transient-leader, context deadline,
// FSM apply error, etc.), `pending` would be discarded with the
// goroutine, and recording it would mislead a future reader about
// what state was preserved. The dedup window is therefore bounded by
// retryRedisWrite's retry predicate; ambiguous errors that escape
// to the client are a separate problem space (cross-request
// idempotency cache) and out of scope for this design.
if isRetryableRedisTxnErr(dispErr) {
pending = &reusableListPush{
ops: ops,
startTS: startTS,
commitTS: commitTS,
length: updatedMeta.Len,
readKeys: boundaryReads,
}
}
return errors.WithStack(dispErr)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep dedup state for non-retryable ambiguous errors

When the first dispatch returns an ambiguous error outside store.ErrWriteConflict/kv.ErrTxnLocked (for example a caller deadline/cancellation after the proposal may still apply), this path records pending but then returns the original dispErr to retryRedisWrite; adapter/redis_retry.go only retries write conflicts and txn locks, so the saved write set is discarded as the command returns. A client retry then starts a fresh listPushCoreWithDedup with no PrevCommitTS and recomputes a new sequence, leaving the duplicate-list-element case unfixed for those ambiguity modes.

Useful? React with 👍 / 👎.

})
return newLen, err
}

func (r *RedisServer) listRPush(ctx context.Context, key []byte, values [][]byte) (int64, error) {
return r.listPushCore(ctx, key, values, r.buildRPushOps)
}
Expand Down
Loading
Loading