diff --git a/adapter/redis.go b/adapter/redis.go index 2c3c749d2..2411c57b9 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -239,11 +239,31 @@ type RedisServer struct { // redis_key_waiters.go. zsetWaiters *keyWaiterRegistry + // onePhaseTxnDedup enables option-2 one-phase idempotency: on a + // retryable write error, list-push retries reuse the failed attempt's + // write set and carry prev_commit_ts so the FSM can dedup a commit that + // landed under leadership churn (see + // docs/design/2026_05_21_proposed_txn_secondary_idempotency.md). It + // MUST stay off until every node runs a probe-aware binary — see R5 + // (FSM determinism across a rolling upgrade). Default off; enabled via + // WithOnePhaseTxnDedup / the ELASTICKV_REDIS_ONEPHASE_DEDUP env var + // after a full rollout. + onePhaseTxnDedup bool + route map[string]func(conn redcon.Conn, cmd redcon.Command) } type RedisServerOption func(*RedisServer) +// WithOnePhaseTxnDedup enables the option-2 one-phase idempotency dedup on +// list-push retries (see RedisServer.onePhaseTxnDedup). Off by default; +// enable only after the whole cluster runs a probe-aware binary. +func WithOnePhaseTxnDedup(enabled bool) RedisServerOption { + return func(r *RedisServer) { + r.onePhaseTxnDedup = enabled + } +} + func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisServerOption { return func(r *RedisServer) { r.readTracker = tracker @@ -475,10 +495,15 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore // getLuaPool, which honors luaPoolMaxIdle the same way. luaPool: nil, traceCommands: os.Getenv("ELASTICKV_REDIS_TRACE") == "1", - baseCtx: baseCtx, - baseCancel: baseCancel, - streamWaiters: newKeyWaiterRegistry(), - zsetWaiters: newKeyWaiterRegistry(), + // onePhaseTxnDedup honors the documented opt-in env var; the + // WithOnePhaseTxnDedup option below can still override either way. + // Default off — see R5 in the design doc (the writer must not be + // enabled until the whole cluster runs a probe-aware binary). + onePhaseTxnDedup: os.Getenv("ELASTICKV_REDIS_ONEPHASE_DEDUP") == "1", + baseCtx: baseCtx, + baseCancel: baseCancel, + streamWaiters: newKeyWaiterRegistry(), + zsetWaiters: newKeyWaiterRegistry(), } r.relay.Bind(r.publishLocal) @@ -3141,8 +3166,13 @@ type listPushBuildFn func(meta store.ListMeta, key []byte, values [][]byte, comm // listPushCore is the shared retry loop for RPUSH and LPUSH. The caller supplies // a buildFn that assembles the specific operations (RPUSH appends to tail, LPUSH -// prepends to head). +// prepends to head). When onePhaseTxnDedup is enabled it uses the write-set-reuse +// retry path (option 2); otherwise it keeps the original recompute-on-retry loop. func (r *RedisServer) listPushCore(ctx context.Context, key []byte, values [][]byte, buildFn listPushBuildFn) (int64, error) { + if r.onePhaseTxnDedup { + return r.listPushCoreWithDedup(ctx, key, values, buildFn) + } + var newLen int64 err := r.retryRedisWrite(ctx, func() error { readTS := r.readTS() @@ -3178,6 +3208,264 @@ func (r *RedisServer) listPushCore(ctx context.Context, key []byte, values [][]b return newLen, err } +// reusableListPush captures a dispatched list-push attempt so a subsequent +// retry can reuse its exact write set (same seq, same item/delta keys) and +// probe whether it already landed, instead of recomputing seq from a fresh +// meta read. Recomputing is what duplicates the element under leadership +// churn: attempt 1 commits at T1 but returns an ambiguous error, the retry +// reads the now-larger list and appends at a NEW seq. Reuse + the FSM's +// exact-ts dedup probe close that. See option 2 in +// docs/design/2026_05_21_proposed_txn_secondary_idempotency.md. +type reusableListPush struct { + ops []*kv.Elem[kv.OP] + startTS uint64 + // commitTS is the most recent dispatched commit_ts for this write set; + // the next retry passes it as prev_commit_ts so the FSM probes exactly + // the attempt that might have landed. + commitTS uint64 + // length is the client-visible post-push length. It is invariant across + // reuse — the write set was built once from attempt 1's meta — so it is + // also the correct value to return when the FSM dedup no-ops the apply + // (R1 result reconstruction: no store re-read needed). + length int64 + // readKeys is the boundary read set captured at attempt 1's meta read: + // listItemKey(Head) and (when Len > 1) listItemKey(Tail-1). It is the + // load-bearing fence against the codex P1 scenario where an intervening + // pop/trim shrinks the list before the retry — without it, the reused + // seq would land past the new Tail and be unreachable to LRANGE. OCC + // validates these atomically against startTS at FSM apply, so any + // boundary-touching commit fires WriteConflict and the adapter drops + // pending → recomputes. Empty when attempt 1 read an empty list (no + // boundary to fence; the OCC on the write key suffices for that case). + readKeys [][]byte +} + +// dispatchListPushReuse runs one iteration of the option-2 reuse path: +// dispatches the captured write set under a fresh commit_ts (carrying +// pending.commitTS as PrevCommitTS so the FSM probes whether the prior +// attempt landed) and returns the post-push length on success. The drop +// return signals the caller to clear pending — set on a genuine +// WriteConflict from another txn so the next iteration recomputes from +// fresh meta. Extracted from listPushCoreWithDedup to keep that closure +// under the cyclop / gocognit / nestif limits. +func (r *RedisServer) dispatchListPushReuse(ctx context.Context, key []byte, pending *reusableListPush) (newLen int64, drop bool, err error) { + commitTS := r.coordinator.Clock().Next() + _, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: pending.startTS, + CommitTS: commitTS, + PrevCommitTS: pending.commitTS, + ReadKeys: pending.readKeys, + Elems: pending.ops, + }) + if dispErr == nil { + return r.resolveReuseLength(ctx, key, pending), false, nil + } + if errors.Is(dispErr, store.ErrWriteConflict) { + // Self-inflicted-conflict guard (codex P1): the apply might have + // landed at this fresh commitTS but bubbled up as WriteConflict due + // to leadership churn (the original bug class the doc's "Resolved" + // section identifies). Without this probe, dropping pending here + // would recompute and append a second copy. Ask the store: did + // our just-attempted commit_ts land? If yes, this conflict is + // against our own commit — return success and keep pending pointing + // at THIS commit_ts so any subsequent retry probes the right point. + // + // Length resolution (codex P2 round-11): pending.length was computed + // during the prior attempt and is stale w.r.t. any non-conflicting + // list-modifying writes that landed between attempt 1 and this fresh + // apply. Probing pending.commitTS would hit for the fresh apply and + // (under the old resolveReuseLength shortcut) silently return the + // prior-attempt length — understating the count. Always re-read meta + // in the self-conflict path. resolveListMeta failure falls back to + // pending.length to honor codex P2 round-10 ("avoid failing after a + // reuse apply"). + if probeKey := firstWriteKey(pending.ops); len(probeKey) > 0 { + landed, perr := r.store.CommittedVersionAt(ctx, probeKey, commitTS) + if perr == nil && landed { + pending.commitTS = commitTS + return r.resolveLengthAfterFreshApply(ctx, key, pending), false, nil + } + } + // Our attempt did not land at commitTS and the target seq is taken + // by another txn — a genuine conflict. Drop pending; the next + // iteration recomputes from a fresh meta read. + return 0, true, errors.WithStack(dispErr) + } + // Still ambiguous (lock / other retryable): this reuse may itself + // have landed, so the next retry must probe THIS commit_ts. Only + // advance pending.commitTS if retryRedisWrite will actually loop + // (non-retryable errors escape to the client; pending is then + // discarded with the goroutine, so the update is wasted and the + // stale value would be misleading if some future caller reads it). + if isRetryableRedisTxnErr(dispErr) { + pending.commitTS = commitTS + } + return 0, false, errors.WithStack(dispErr) +} + +// resolveReuseLength returns the client-visible post-push length after a +// successful reuse dispatch. If our prior attempt's exact commit_ts +// version exists, the FSM no-op'd (probe hit) and pending.length is the +// correct length we computed at that attempt. Otherwise the FSM applied +// the reused write set at a fresh commit_ts and we must re-read meta to +// capture any non-conflicting list-modifying writes that committed +// between attempts (codex P2) — without this, the return value would +// silently understate the count when the boundary OCC fence and +// write-key OCC both pass but the list length changed. +// +// Failure modes are converted to a degraded return (pending.length) rather +// than surfaced as an error, because the dispatch already committed. Per +// codex P2 round-10 ("avoid failing after a reuse apply"), reporting a +// write error after the apply landed drives the client into a retry that +// has no pending state and would re-append the element — the very anomaly +// this feature prevents. Specifically: +// - probe error of any kind: prefer pending.length over failure. +// - resolveListMeta failure (e.g. delta scan over MaxDeltaScanLimit +// under churn): fall back to pending.length. +// +// Returns int64 directly (no error) so callers do not have to invent +// caller-side fallback logic; the degraded-return contract is fixed here +// (golangci unparam / nilerr fix on the prior error-returning shape). +func (r *RedisServer) resolveReuseLength(ctx context.Context, key []byte, pending *reusableListPush) int64 { + if probeKey := firstWriteKey(pending.ops); len(probeKey) > 0 { + hit, perr := r.store.CommittedVersionAt(ctx, probeKey, pending.commitTS) + if perr == nil && hit { + return pending.length + } + if perr != nil { + // Probe failed; the dispatch already committed so degrade + // gracefully rather than propagate the read error. + return pending.length + } + // perr == nil && !hit: prior attempt didn't land at this ts; the + // FSM applied fresh writes, fall through to re-read meta. + } + return r.resolveLengthAfterFreshApply(ctx, key, pending) +} + +// resolveLengthAfterFreshApply re-reads list meta to capture the post-apply +// length when we know the fresh commitTS applied (no probe shortcut), with +// the same fall-back-to-pending.length contract as resolveReuseLength. Used +// by the self-conflict path (codex P2 round-11): there pending.length is +// stale w.r.t. intervening non-conflicting writes, so the probe-hit +// shortcut would silently understate the count. +func (r *RedisServer) resolveLengthAfterFreshApply(ctx context.Context, key []byte, pending *reusableListPush) int64 { + currentMeta, _, mErr := r.resolveListMeta(ctx, key, r.readTS()) + if mErr != nil { + return pending.length + } + return currentMeta.Len +} + +// firstWriteKey returns the first non-empty element key from ops, or nil +// when there is none. Used after a successful reuse dispatch to probe +// whether our prior attempt's commit_ts actually landed: attempt 1 writes +// all its elem keys atomically at the same commit_ts, so any one of them +// answers the question. +func firstWriteKey(ops []*kv.Elem[kv.OP]) []byte { + for _, e := range ops { + if e != nil && len(e.Key) > 0 { + return e.Key + } + } + return nil +} + +// listPushBoundaryReadKeys returns the boundary positions of the list as +// read keys for OCC. Including these in the dispatched OperationGroup makes +// FSM apply atomically reject the retry when any pop/trim has touched the +// boundary between attempts (codex P1 fix: prevents a reused seq from +// landing past a shrunk Tail). The keys are deduped: a single-element list +// has Head == Tail-1, so we emit it once. +func listPushBoundaryReadKeys(key []byte, meta store.ListMeta) [][]byte { + if meta.Len <= 0 { + return nil + } + tailIdx := meta.Tail - 1 + if tailIdx == meta.Head { + return [][]byte{listItemKey(key, meta.Head)} + } + return [][]byte{ + listItemKey(key, meta.Head), + listItemKey(key, tailIdx), + } +} + +// listPushCoreWithDedup is the option-2 retry loop. The first attempt computes +// the write set from the current meta; any retryable failure makes the next +// iteration REUSE that write set under a fresh commit_ts with prev_commit_ts +// set, so the FSM no-ops if the prior attempt already landed. A WriteConflict +// on a reuse attempt means the probe ruled out our own prior attempt and the +// seq is genuinely taken by another txn, so we fall back to a full recompute. +func (r *RedisServer) listPushCoreWithDedup(ctx context.Context, key []byte, values [][]byte, buildFn listPushBuildFn) (int64, error) { + var newLen int64 + var pending *reusableListPush + err := r.retryRedisWrite(ctx, func() error { + if pending != nil { + length, drop, dispErr := r.dispatchListPushReuse(ctx, key, pending) + if drop { + pending = nil + } + if dispErr != nil { + return dispErr + } + newLen = length + return nil + } + + readTS := r.readTS() + meta, _, err := r.resolveListMeta(ctx, key, readTS) + if err != nil { + return err + } + + commitTS := r.coordinator.Clock().Next() + ops, updatedMeta, err := buildFn(meta, key, values, commitTS, 0) + if err != nil { + return err + } + if len(ops) == 0 { + newLen = updatedMeta.Len + return nil + } + + startTS := normalizeStartTS(readTS) + boundaryReads := listPushBoundaryReadKeys(key, meta) + _, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: startTS, + CommitTS: commitTS, + ReadKeys: boundaryReads, + Elems: ops, + }) + if dispErr == nil { + newLen = updatedMeta.Len + return nil + } + // Only remember the attempt for reuse if retryRedisWrite will actually + // loop — i.e. the error is one of WriteConflict / TxnLocked. For + // errors that escape the loop (transient-leader, context deadline, + // FSM apply error, etc.), `pending` would be discarded with the + // goroutine, and recording it would mislead a future reader about + // what state was preserved. The dedup window is therefore bounded by + // retryRedisWrite's retry predicate; ambiguous errors that escape + // to the client are a separate problem space (cross-request + // idempotency cache) and out of scope for this design. + if isRetryableRedisTxnErr(dispErr) { + pending = &reusableListPush{ + ops: ops, + startTS: startTS, + commitTS: commitTS, + length: updatedMeta.Len, + readKeys: boundaryReads, + } + } + return errors.WithStack(dispErr) + }) + return newLen, err +} + func (r *RedisServer) listRPush(ctx context.Context, key []byte, values [][]byte) (int64, error) { return r.listPushCore(ctx, key, values, r.buildRPushOps) } diff --git a/adapter/redis_list_dedup_test.go b/adapter/redis_list_dedup_test.go new file mode 100644 index 000000000..67c191541 --- /dev/null +++ b/adapter/redis_list_dedup_test.go @@ -0,0 +1,397 @@ +package adapter + +import ( + "bytes" + "context" + "testing" + + "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// dedupTestCoordinator drives the option-2 one-phase idempotency path +// end-to-end. It layers two things on the OCC-aware coordinator: +// +// - The FSM's exact-ts dedup probe (mimicking handleOnePhaseTxnRequest): +// when a request carries prev_commit_ts, it checks whether the primary +// key already has a committed version at exactly that timestamp and, if +// so, no-ops the apply. +// - An injected ambiguous commit: a chosen dispatch either lands-then-errors +// (ambiguousLands) or errors-without-applying, reproducing the +// leadership-churn window where attempt 1 may or may not have committed. +// +// Because the OCC layer is real (store.ApplyMutations against StartTS), a +// reuse attempt whose probe did NOT fire would conflict against attempt 1's +// own version — exactly the trap the probe must avoid. So a passing test +// proves the probe is load-bearing, not cosmetic. +type dedupTestCoordinator struct { + *occAdapterCoordinator + ambiguousDispatch int // 1-based dispatch number to make ambiguous + ambiguousLands bool // true: apply then error; false: error without applying + // landThenWriteConflictAtDispatch reproduces the self-inflicted-conflict + // scenario (codex P1): the named dispatch applies the elems THEN returns + // store.ErrWriteConflict — modelling leadership churn that surfaces a + // committed entry as a write conflict. + landThenWriteConflictAtDispatch int + dispatches int + probeNoOps int + // beforeDispatch, if set, runs at the start of each Dispatch with the + // 1-based dispatch number — lets a test inject a concurrent commit + // between the adapter's attempts. + beforeDispatch func(n int) +} + +func newDedupTestCoordinator(st store.MVCCStore, ambiguousDispatch int, lands bool) *dedupTestCoordinator { + return &dedupTestCoordinator{ + occAdapterCoordinator: newOCCAdapterCoordinator(st), + ambiguousDispatch: ambiguousDispatch, + ambiguousLands: lands, + } +} + +func (c *dedupTestCoordinator) Dispatch(ctx context.Context, req *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) { + c.dispatches++ + n := c.dispatches + if c.beforeDispatch != nil { + c.beforeDispatch(n) + } + if handled, resp, err := c.maybeProbe(ctx, req); handled { + return resp, err + } + if n == c.ambiguousDispatch && !c.ambiguousLands { + // OCC-style pre-reject: nothing is written, definitely did not land. + return nil, store.ErrWriteConflict + } + resp, err := c.occAdapterCoordinator.Dispatch(ctx, req) + if err != nil { + return nil, err + } + if n == c.ambiguousDispatch && c.ambiguousLands { + // The apply LANDED, but the adapter sees an ambiguous retryable error. + return nil, store.ErrWriteConflict + } + if n == c.landThenWriteConflictAtDispatch { + // The apply LANDED, but leadership churn surfaces the commit as + // store.ErrWriteConflict — the original bug class. The adapter's + // self-inflicted-conflict guard must probe our just-committed + // commit_ts before treating this as a third-party conflict. + return nil, store.ErrWriteConflict + } + return resp, nil +} + +// maybeProbe mimics handleOnePhaseTxnRequest's exact-ts dedup check. It +// returns handled=true when the probe owns the response (hit → no-op success +// or probe error), and handled=false to fall through to the normal apply. +// Extracted from Dispatch to keep the dispatcher under the cyclop limit. +func (c *dedupTestCoordinator) maybeProbe(ctx context.Context, req *kv.OperationGroup[kv.OP]) (bool, *kv.CoordinateResponse, error) { + if req == nil || !req.IsTxn || req.PrevCommitTS == 0 { + return false, nil, nil + } + primary := minElemKey(req.Elems) + landed, err := c.store.CommittedVersionAt(ctx, primary, req.PrevCommitTS) + if err != nil { + return true, nil, err + } + if landed { + c.probeNoOps++ + return true, &kv.CoordinateResponse{}, nil + } + return false, nil, nil +} + +// minElemKey mirrors kv.primaryKeyForElems: the minimum non-empty key among +// the elements. The FSM probes this key for the prior attempt's version. +func minElemKey(elems []*kv.Elem[kv.OP]) []byte { + var primary []byte + for _, e := range elems { + if e == nil || len(e.Key) == 0 { + continue + } + if primary == nil || bytes.Compare(e.Key, primary) < 0 { + primary = e.Key + } + } + return primary +} + +// TestListPushDedup_LandedPriorAttempt_NoDuplicate is the option-2 headline: +// attempt 1 commits the RPUSH but returns an ambiguous error; the retry +// reuses the same write set with prev_commit_ts, the probe finds the landed +// version and no-ops, and the client gets the correct length with NO +// duplicate element. Without the probe, the reuse would OCC-conflict against +// attempt 1's own version, recompute at a new seq, and duplicate. +func TestListPushDedup_LandedPriorAttempt_NoDuplicate(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, true) + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + key := []byte("mylist") + n, err := srv.listRPush(ctx, key, [][]byte{[]byte("v")}) + require.NoError(t, err) + require.Equal(t, int64(1), n, "RPUSH must return the post-push length reconstructed from the prior attempt") + require.Equal(t, 2, coord.dispatches, "one failed attempt + one reuse") + require.Equal(t, 1, coord.probeNoOps, "the reuse must dedup via the exact-ts probe") + + // No duplicate: the list has exactly one element. + readTS := snapshotTS(coord.Clock(), st) + meta, _, err := srv.resolveListMeta(ctx, key, readTS) + require.NoError(t, err) + require.Equal(t, int64(1), meta.Len, "the delta must be counted once — no double-count") + + val, err := st.GetAt(ctx, listItemKey(key, meta.Head), readTS) + require.NoError(t, err) + require.Equal(t, []byte("v"), val) +} + +// TestListPushDedup_PriorAttemptDidNotLand_Applies covers the truncated case: +// attempt 1 errored without committing, so the probe misses and the reuse +// applies the same write set at a fresh commit_ts. The element lands exactly +// once and the length is correct. +func TestListPushDedup_PriorAttemptDidNotLand_Applies(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + key := []byte("mylist") + n, err := srv.listRPush(ctx, key, [][]byte{[]byte("v")}) + require.NoError(t, err) + require.Equal(t, int64(1), n) + require.Equal(t, 2, coord.dispatches) + require.Equal(t, 0, coord.probeNoOps, "nothing landed, so the probe must miss and the reuse applies") + + readTS := snapshotTS(coord.Clock(), st) + meta, _, err := srv.resolveListMeta(ctx, key, readTS) + require.NoError(t, err) + require.Equal(t, int64(1), meta.Len) + + val, err := st.GetAt(ctx, listItemKey(key, meta.Head), readTS) + require.NoError(t, err) + require.Equal(t, []byte("v"), val) +} + +// TestListPushDedup_GenuineConflictRecomputes covers outcome 3: the prior +// attempt did NOT land, and a concurrent txn took the target seq between +// attempts. The reuse's probe misses, the OCC apply conflicts against the +// other txn, so the adapter drops the reusable attempt and recomputes from a +// fresh meta — appending our value at a NEW seq. Two distinct elements result +// (correct), not a duplicate. +func TestListPushDedup_GenuineConflictRecomputes(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + key := []byte("mylist") + // Before the reuse (dispatch 2), simulate another client's RPUSH landing + // at seq 0 so the reuse conflicts and must recompute. + coord.beforeDispatch = func(n int) { + if n != 2 { + return + } + ts := coord.Clock().Next() + require.NoError(t, st.PutAt(ctx, listItemKey(key, 0), []byte("other"), ts, 0)) + delta := store.MarshalListMetaDelta(store.ListMetaDelta{HeadDelta: 0, LenDelta: 1}) + require.NoError(t, st.PutAt(ctx, store.ListMetaDeltaKey(key, ts, 0), delta, ts, 0)) + } + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + n, err := srv.listRPush(ctx, key, [][]byte{[]byte("v")}) + require.NoError(t, err) + require.Equal(t, int64(2), n, "after recompute, our value is the 2nd element") + require.Equal(t, 3, coord.dispatches, "attempt 1 + conflicting reuse + recompute") + require.Equal(t, 0, coord.probeNoOps, "the reuse probe must miss (prior did not land)") + + readTS := snapshotTS(coord.Clock(), st) + meta, _, err := srv.resolveListMeta(ctx, key, readTS) + require.NoError(t, err) + require.Equal(t, int64(2), meta.Len, "two distinct appends, not a duplicate") + + other, err := st.GetAt(ctx, listItemKey(key, meta.Head), readTS) + require.NoError(t, err) + require.Equal(t, []byte("other"), other) + ours, err := st.GetAt(ctx, listItemKey(key, meta.Head+1), readTS) + require.NoError(t, err) + require.Equal(t, []byte("v"), ours) +} + +// TestListPushDedup_InterveningRPopRefusesStaleReuse is the codex P1 +// regression. From a one-element list [A]: attempt 1's RPUSH "v" plans seq 1 +// then fails without landing; an intervening RPOP shrinks the list to [] +// (Head→1, Len→0); the retry must NOT silently re-apply the stale write set +// (which would place "v" at seq 1, past the new Tail=1, unreachable to +// LRANGE). The boundary-position read keys (listItemKey(Head), +// listItemKey(Tail-1)) carried in the dispatched OperationGroup.ReadKeys +// catch the RPOP atomically via OCC; the adapter drops pending and recomputes +// from the fresh (empty) meta so the new RPUSH plans seq 1 (Head+Len=1+0) +// and lands at the correct, reachable position. +func TestListPushDedup_InterveningRPopRefusesStaleReuse(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + key := []byte("mylist") + + // Seed the list with one element at seq 0 (Head=0, Len=1, Tail=1). + require.NoError(t, st.PutAt(ctx, listItemKey(key, 0), []byte("A"), 1, 0)) + delta := store.MarshalListMetaDelta(store.ListMetaDelta{HeadDelta: 0, LenDelta: 1}) + require.NoError(t, st.PutAt(ctx, store.ListMetaDeltaKey(key, 1, 0), delta, 1, 0)) + + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + coord.clock.Observe(1) + // Between attempts, an RPOP commits: it tombstones seq 0 and writes a + // (HeadDelta=+1, LenDelta=-1) meta delta — shrinking the list to empty. + coord.beforeDispatch = func(n int) { + if n != 2 { + return + } + ts := coord.Clock().Next() + require.NoError(t, st.DeleteAt(ctx, listItemKey(key, 0), ts)) + popDelta := store.MarshalListMetaDelta(store.ListMetaDelta{HeadDelta: 1, LenDelta: -1}) + require.NoError(t, st.PutAt(ctx, store.ListMetaDeltaKey(key, ts, 0), popDelta, ts, 0)) + } + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + n, err := srv.listRPush(ctx, key, [][]byte{[]byte("v")}) + require.NoError(t, err) + // After the RPOP shrank the list to [] and we recomputed onto the new + // state, the post-push length is 1 (just "v") and seq is the new Tail + // (Head+Len = 1+0 = 1) — but more importantly, the element MUST be + // reachable: meta.Head <= seq < meta.Tail. + require.Equal(t, int64(1), n) + // Pin the exact dispatch sequence: attempt 1 pre-rejects, attempt 2's + // reuse OCC-conflicts on the boundary read key (RPOP's tombstone), + // attempt 3 recomputes from fresh meta and lands. A future regression + // that piles on extra dispatches before success would slip past a >2 + // check. + require.Equal(t, 3, coord.dispatches) + + readTS := snapshotTS(coord.Clock(), st) + meta, _, err := srv.resolveListMeta(ctx, key, readTS) + require.NoError(t, err) + require.Equal(t, int64(1), meta.Len, "list has exactly one element") + + val, err := st.GetAt(ctx, listItemKey(key, meta.Head), readTS) + require.NoError(t, err) + require.Equal(t, []byte("v"), val, "the pushed element must be reachable at meta.Head") +} + +// TestListPushDedup_InterveningNonConflictingLPush_RecomputesLength is the +// codex P2 regression. Attempt 1's RPUSH plans seq Tail and fails without +// landing; an intervening LPUSH commits non-conflictingly (writes at +// Head-1, doesn't touch our boundary read keys at Head / Tail-1). The +// reuse's probe misses (attempt 1 didn't land), the boundary OCC passes +// (LPUSH didn't touch our boundary), the write-key OCC passes (different +// keys), and the apply succeeds at the fresh commit_ts. Pre-fix, the +// adapter returned pending.length (= attempt-1-snapshot, missing LPUSH's +// element). Post-fix, the adapter detects the apply-not-no-op case via +// CommittedVersionAt(probeKey, pending.commitTS) miss, re-reads the +// current meta, and returns the correct post-push length. +func TestListPushDedup_InterveningNonConflictingLPush_RecomputesLength(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + key := []byte("mylist") + + // Seed [A] at seq 0 (Head=0, Len=1, Tail=1). + require.NoError(t, st.PutAt(ctx, listItemKey(key, 0), []byte("A"), 1, 0)) + seedDelta := store.MarshalListMetaDelta(store.ListMetaDelta{HeadDelta: 0, LenDelta: 1}) + require.NoError(t, st.PutAt(ctx, store.ListMetaDeltaKey(key, 1, 0), seedDelta, 1, 0)) + + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + coord.clock.Observe(1) + // Between attempts, simulate an LPUSH committing: writes listItemKey(-1) + // (Head-1) and a delta with HeadDelta=-1, LenDelta=+1. Crucially this + // does NOT touch the boundary read keys (Head=0, Tail-1=0) or our + // RPUSH's write keys (listItemKey(1), ListMetaDeltaKey(key, T1, 0)), + // so neither the read-set OCC nor the write-set OCC catches it. + coord.beforeDispatch = func(n int) { + if n != 2 { + return + } + ts := coord.Clock().Next() + require.NoError(t, st.PutAt(ctx, listItemKey(key, -1), []byte("x"), ts, 0)) + lpushDelta := store.MarshalListMetaDelta(store.ListMetaDelta{HeadDelta: -1, LenDelta: 1}) + require.NoError(t, st.PutAt(ctx, store.ListMetaDeltaKey(key, ts, 0), lpushDelta, ts, 0)) + } + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + n, err := srv.listRPush(ctx, key, [][]byte{[]byte("v")}) + require.NoError(t, err) + // Three elements after the dust settles: LPUSH's "x" at seq -1, the + // seed "A" at seq 0, our "v" at seq 1. A stale snapshot return would + // be 2 (attempt 1's view: 1 seed + 1 push, no LPUSH). + require.Equal(t, int64(3), n, "return must reflect intervening non-conflicting writes, not the attempt-1 snapshot") + // Only two dispatches: attempt 1 (pre-rejects), attempt 2 (reuse applies). + // No recompute needed because the boundary and write-key OCC both pass. + require.Equal(t, 2, coord.dispatches) + require.Equal(t, 0, coord.probeNoOps, "prior did not land; the probe must miss") + + readTS := snapshotTS(coord.Clock(), st) + meta, _, err := srv.resolveListMeta(ctx, key, readTS) + require.NoError(t, err) + require.Equal(t, int64(3), meta.Len) + require.Equal(t, int64(-1), meta.Head) +} + +// TestListPushDedup_SelfInflictedReuseConflict_ReturnsSuccess is the codex +// P1 regression for the "self-inflicted conflict" leadership-churn case: +// +// 1. attempt 1 errors without landing (pending stored, pending.commitTS=T1) +// 2. reuse dispatched at fresh T2 with PrevCommitTS=T1; the FSM probes T1, +// misses (T1 didn't land), and APPLIES at T2 — but leadership churn +// surfaces the commit as ErrWriteConflict to the adapter. +// +// Pre-fix the adapter treated this as a genuine third-party conflict, dropped +// pending, recomputed on a fresh meta read (which now reflects T2's effect) +// and APPENDED THE SAME ELEMENT A SECOND TIME at a new seq — exactly the +// duplicate-elements anomaly this feature exists to prevent. The fix is to +// probe our just-attempted commit_ts on a reuse WriteConflict; if it landed +// the conflict is against our own commit, return success and keep pending +// pointing at THIS commit_ts. +func TestListPushDedup_SelfInflictedReuseConflict_ReturnsSuccess(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 pre-rejects without landing + coord.landThenWriteConflictAtDispatch = 2 // reuse applies, then returns WriteConflict + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + key := []byte("mylist") + n, err := srv.listRPush(ctx, key, [][]byte{[]byte("v")}) + require.NoError(t, err) + require.Equal(t, int64(1), n, "self-inflicted conflict must not duplicate the element") + require.Equal(t, 2, coord.dispatches, "no recompute should fire; the reuse landed") + require.Equal(t, 0, coord.probeNoOps, "the FSM probe at T1 misses; the no-op count stays zero") + + readTS := snapshotTS(coord.Clock(), st) + meta, _, err := srv.resolveListMeta(ctx, key, readTS) + require.NoError(t, err) + require.Equal(t, int64(1), meta.Len, "exactly one element after self-inflicted conflict") + val, err := st.GetAt(ctx, listItemKey(key, meta.Head), readTS) + require.NoError(t, err) + require.Equal(t, []byte("v"), val) +} + +// TestListPushDedup_DisabledKeepsLegacyPath confirms the gate is off by +// default: without onePhaseTxnDedup, listPushCore never emits prev_commit_ts, +// so the coordinator's probe is never exercised (dedup count stays zero) and +// the legacy recompute-on-retry path runs. +func TestListPushDedup_DisabledKeepsLegacyPath(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + // ambiguousDispatch=0 → no injected error; a clean single dispatch. + coord := newDedupTestCoordinator(st, 0, false) + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}} // onePhaseTxnDedup defaults false + + key := []byte("mylist") + n, err := srv.listRPush(ctx, key, [][]byte{[]byte("v")}) + require.NoError(t, err) + require.Equal(t, int64(1), n) + require.Equal(t, 1, coord.dispatches) + require.Equal(t, 0, coord.probeNoOps) +} diff --git a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md new file mode 100644 index 000000000..f759c32fd --- /dev/null +++ b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md @@ -0,0 +1,797 @@ +# Transactional secondary-commit idempotency + +> **Status: partial — option 2 reader landed everywhere; writer (emission) +> gated off by default pending cluster-wide rollout.** M2a (store exact-ts +> probe), M2b (`prev_commit_ts` in `TxnMeta` V2 + one-phase FSM probe), and +> M1+M3 for the list-push family (write-set reuse + `OperationGroup.PrevCommitTS` +> threading through both `ShardedCoordinator` and the legacy `Coordinate` +> redirect path) have shipped on this branch and are tested. Emission stays +> off by default (`RedisServer.onePhaseTxnDedup`), so the FSM is byte-identical +> to today until operators enable it after a full probe-aware rollout — see +> R5. Remaining: `runTransaction` EXEC-body reuse and M4 (Jepsen). +> +> Triggered by the 2026-05-21 Jepsen scheduled run +> [26198185540](https://github.com/bootjp/elastickv/actions/runs/26198185540) +> which surfaced a `:duplicate-elements` anomaly on the Redis list-append +> workload. This doc proposes **option 2 — stable-write-set reuse with an +> exact-`commit_ts` dedup probe at one-phase FSM apply** as the primary +> design, and records **option (a) — give the single-group one-phase path +> a lock + commit record so TiKV-style `CheckTxnStatus` applies** as a +> documented fallback. The decision hinges on one argument, written head-on +> below: *can a retry safely reuse the failed attempt's write set without +> re-committing at a stale `commit_ts`?* The answer is yes — by separating +> the **dedup identity** (the stale `commit_ts`, read-only) from the +> **commit ordering** (a fresh, monotonic `commit_ts`, write). Because the +> argument holds, option 2 is primary; if it fails in review or test, we +> fall back to (a). + +## Background + +PR [#789](https://github.com/bootjp/elastickv/pull/789) (May 19) shipped the +NOTLEADER prefix for gRPC-wrapped leader errors, which stopped the Jepsen +Redis workers from crashing on `:prefix :rpc` exceptions. That fix +unmasked a deeper anomaly that had been silently classified as `:info` +(unknown outcome) under the old worker-crash regime: **`:duplicate-elements`**. + +Jepsen's history.txt for the run shows: + +```text +1411 3 :invoke :txn [[:r 16 nil] [:r 16 nil] [:append 14 230]] +1416 3 :ok :txn [[:r 16 [...]] [:r 16 [...]] [:append 14 230]] +1420 7 :ok :txn [..., [:r 14 [... 228 229 230 230]]] +``` + +Worker 3 sent the value `230` to key 14 exactly once. The very next read +(line 1420) shows two `230`s adjacent at the tail. The result.edn +analysis reports `:anomalies {:duplicate-elements [... :duplicates {230 2} ...]}` +across multiple ops. + +This is a real consistency violation. Each Jepsen `:append k v` carries a +distinct, per-key monotonically-increasing value, so two `230`s in the read +must mean two distinct storage entries with that value — one logical +append applied twice. + +## Problem + +### Why a same-commit-ts replay does NOT produce a duplicate + +elastickv's list-append mutations are content-addressed: + +```go +// adapter/redis.go:3066-3071 (buildLPushOps / mirrored in RPush) +elems = append(elems, &kv.Elem[kv.OP]{ + Op: kv.Put, + Key: listItemKey(key, seq), // seq = current Tail + Len at attempt time + Value: vCopy, +}) +delta := store.MarshalListMetaDelta(...) +elems = append(elems, &kv.Elem[kv.OP]{ + Op: kv.Put, + Key: store.ListMetaDeltaKey(key, commitTS, seqInTxn), // commitTS in the key + Value: delta, +}) +``` + +Both the per-item key and the meta-delta key embed `commitTS` (directly or +via `seq` derived from a meta read pinned to `startTS`). Two applies with +the **same** `(startTS, commitTS)` therefore land on the **same** Pebble +keys — the second is a no-op overwrite, not a duplicate. + +This is the same property TiKV relies on: writes to the Write CF at +`(user_key, commit_ts) → start_ts` are naturally idempotent because +`commit_ts` is part of the key. + +### How a different-commit-ts replay DOES produce a duplicate + +`adapter/redis.go:2824` wraps the entire `runTransaction` body in +`retryRedisWrite`: + +```go +err := r.retryRedisWrite(dispatchCtx, func() error { + startTS := r.txnStartTS() // regenerated each iteration + ... + txn := &txnContext{... startTS: startTS, ...} + for _, cmd := range queue { + res, err := txn.apply(cmd) + } + if err := txn.validateReadSet(...); err != nil { return err } + if err := txn.commit(); err != nil { return err } // new commitTS via Clock().Next() +}) +``` + +`retryRedisWrite` retries on `store.ErrWriteConflict` or `kv.ErrTxnLocked` +(see `adapter/redis_retry.go:43`). On retry the EXEC body re-runs from +scratch: fresh `startTS`, fresh meta read (which now reflects any earlier +commit), fresh `commitTS`, fresh `seq` for the list item, fresh meta-delta +key. Content-addressing protects against same-input replay; it does not +protect against this re-generation. + +The duplicate shape: + +1. Attempt 1: `startTS=T0`, reads `meta.Len = 229`, builds `listItemKey(14, 230)` + and `ListMetaDeltaKey(14, T1, 0)` with `commitTS=T1`. Dispatch runs. +2. Some path inside dispatch lets Attempt 1 **actually commit** (key 14 now + has `230` at idx 230, at commit_ts=T1) yet returns a `WriteConflict` / + `TxnLocked` (or a transient/ambiguous error) to the adapter. The + retryRedisWrite classifier therefore retries. +3. Attempt 2: `startTS=T2 > T1`, reads `meta.Len = 230` (the just-committed + entry from attempt 1 is now visible), builds `listItemKey(14, 231)` and + `ListMetaDeltaKey(14, T3, 0)` with `commitTS=T3`. Dispatch commits. +4. End state: idx 230 has value `230` at T1, idx 231 has value `230` at T3. + LRANGE returns `[... 229 230 230]`. + +The trigger that lets attempt 1 commit *and* return a retryable error has +since been positively identified as a **leader-election storm racing the +in-flight commit** — see "Resolved" below. The fix does not depend on +*which* raft event produced the ambiguous commit; it depends only on the +adapter being able to recognise, on retry, that its own previous attempt +already landed. + +### Why the existing `applyCommitWithIdempotencyFallback` doesn't catch this + +`kv/fsm.go:502` already has an idempotency fallback for the commit-phase +apply path: + +```go +func applyCommitWithIdempotencyFallback(... applyStartTS, commitTS uint64) error { + err := f.store.ApplyMutationsRaft(ctx, storeMuts, nil, applyStartTS, commitTS) + if err == nil { return nil } + if !errors.Is(err, store.ErrWriteConflict) { return errors.WithStack(err) } + // Scan for a key already committed at >= commitTS; if found, retry + // with commitTS as the conflict baseline. + for _, mut := range uniq { + latestTS, exists, lErr := f.store.LatestCommitTS(ctx, mut.Key) + if exists && latestTS >= commitTS { + return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, commitTS, commitTS)) + } + } + return errors.WithStack(err) +} +``` + +This handles **same-commit_ts replay** of a single-key PUT correctly: the +second apply's WriteConflict is recognized as an idempotent retry and the +re-apply at `(commit_ts, commit_ts)` becomes a no-op MVCC overwrite. + +It does *not* help here because Attempt 1's storage keys +(`listItemKey(14, 230)`, `ListMetaDeltaKey(14, T1, 0)`) and Attempt 2's +(`listItemKey(14, 231)`, `ListMetaDeltaKey(14, T3, 0)`) are **different +keys**. Neither attempt's apply sees a WriteConflict on the other's +storage key. The fallback can only catch identical-input replays; it +cannot catch logically-equivalent-but-recomputed retries. Note also its +`latestTS >= commitTS` comparison is a *loose* check — option 2 below +needs an *exact*-ts probe, for reasons the correctness argument makes +precise. + +## How TiKV solves this + +TiKV's Percolator commit path is naturally idempotent because TiKV writes +are pure PUTs at content-addressed keys (Write CF `(user_key, commit_ts) → +start_ts`). Critically, **TiKV clients do not recompute the txn on a +transient error**: they call `CheckTxnStatus(primary_key, start_ts)` first +to learn the txn's actual fate. If the txn already committed, the client +returns success to the user without re-issuing any write. Three layers +combine: + +1. **Content-addressed writes**: `(key, commit_ts)` PUT is idempotent at + the FSM/MVCC layer. Replay with same `(key, commit_ts, value)` is a + no-op. +2. **`CheckTxnStatus` before retry**: client reads the primary's Write CF + for `start_ts` before deciding to retry. Eliminates blind-retry of an + already-committed txn. +3. **Resolve-lock from readers**: a stale secondary lock seen at read + time is resolved against the primary's commit record. Guarantees + eventual commit of stranded secondaries without needing client retry. + +elastickv has (1) (the `applyCommitWithIdempotencyFallback` fallback, plus +content-addressed list-delta keys) and (3) (`kv.LockResolver` at +`kv/lock_resolver.go`). It is missing (2) — but, crucially, **it cannot +adopt (2) as-is**, because of the cluster's topology: + +> The cluster is **single-group**: `NewEngineWithDefaultRoute` +> (`distribution/engine.go:58`) maps the whole keyspace to group 1. Every +> transaction therefore takes the **one-phase** path +> (`handleOnePhaseTxnRequest`, `kv/fsm.go:393`), which writes **no lock and +> no `txnCommitKey`** — it applies mutations directly. TiKV's +> `CheckTxnStatus` reads the primary's lock / commit record; in the +> one-phase path there is nothing for it to read. `primaryTxnStatus` +> (`kv/shard_store.go:962`) — elastickv's `CheckTxnStatus` primitive — only +> ever fires on the multi-shard 2PC path, which this deployment does not +> exercise. + +So the missing layer (2) cannot be added by "just probe the commit +record": **there is no commit record on the path that produces the bug.** +Two ways forward: + +- **Option 2 (primary):** keep the one-phase path commit-record-free, and + make the *retry itself* idempotent by reusing the failed attempt's write + set and deduping on an exact-`commit_ts` probe of the data the attempt + would have written. No commit record, no lock, no hot-path tax. +- **Option (a) (fallback):** give the one-phase path a lock + commit record + (turning it into a mini-2PC), so the existing `primaryTxnStatus` / + `CheckTxnStatus` machinery applies. Correct and built on a proven + primitive, but it taxes the single-group hot path. + +The rest of this doc develops option 2, proves it, and records (a) as the +fallback. + +## Proposed design (primary) — option 2: write-set reuse + exact-ts dedup + +### The idea + +The duplicate is born the moment the retry **recomputes** its write set +from a fresh meta read: the list grew between attempts, so `seq` advances +(230 → 231) and the retry appends at a *new* index instead of overwriting +the old one. Kill the recomputation and you kill the duplicate. + +So: on a retryable failure, **do not recompute**. Reuse the previous +attempt's logical write set verbatim — same `seq`, same `listItemKey` +index, same `seqInTxn` — but commit it at a **fresh** `commit_ts`. Carry +the previous attempt's `commit_ts` (`prev_commit_ts`) into the request as a +read-only probe key. At one-phase apply, the FSM uses `prev_commit_ts` to +ask the only question that matters: + +> **Did the previous attempt actually land?** + +and branches: + +```text +one-phase apply of the retry entry E2 (commit_ts = T2, prev_commit_ts = T1): + if committedVersionAt(primaryKey, T1): # attempt 1 landed + no-op the entire apply # dedup: do not write at T2 + → adapter reconstructs attempt 1's result (length at T1) + else: # attempt 1 did NOT land + apply the reused write set at T2 # OCC still guards genuine conflicts +``` + +The probe is `committedVersionAt(key, exactTS)` — an MVCC point lookup for +a committed version stamped *exactly* `T1`, not `>= T1`. Exactness is +load-bearing; the correctness argument explains why. + +### The three outcomes + +Let attempt 1 be Raft entry **E1** (`commit_ts = T1`, builds +`listItemKey(14, 230)` + `ListMetaDeltaKey(14, T1, 0)`), and the retry be +entry **E2** (`commit_ts = T2 > T1`, **reusing** `seq = 230`, so it would +build `listItemKey(14, 230)` + `ListMetaDeltaKey(14, T2, 0)`). + +1. **Attempt 1 landed (the bug case).** `committedVersionAt(14-primary, T1)` + is true. E2 no-ops entirely — no second `listItemKey`, and critically no + second meta-delta, so `meta.Len` is not double-counted. The adapter + reconstructs the post-append length as of T1 and returns success. List + tail stays `[... 229 230]`. **One element. Correct.** +2. **Attempt 1 did not land (truncated / never committed).** The probe + misses. E2 applies the reused write set at T2. Exactly one delta + (`@T2`), one item at idx 230. **One element. Correct.** +3. **Genuine conflict (another txn took idx 230).** Attempt 1 did not land, + so the probe misses and E2 tries to apply at T2 — but `listItemKey(14, 230)` + now holds a committed version from the *other* txn at some `Tx`, newer + than E2's reused `startTS`. OCC fires `WriteConflict`. The adapter falls + back to a full recompute (fresh meta read → `seq = 231`) and appends the + *different* value at idx 231. Two distinct values at two indices is the + correct, non-duplicate outcome. + +Outcome 3 is why we still keep `retryRedisWrite`'s recompute path as the +*fallback* branch: write-set reuse is tried first (the common, self-conflict +case), and a genuine cross-txn conflict degrades cleanly to recompute. + +### Correctness — commit_ts reuse vs stale-ts ordering + +This is the crux the whole design turns on. State the tension first, then +resolve it. + +**The tempting shortcut, and why it is wrong.** The simplest way to make +the retry idempotent would be to reuse not just the write set but the +*same* `commit_ts` T1 — then E2's keys are byte-identical to E1's +(`listItemKey(14,230)` + `ListMetaDeltaKey(14, T1, 0)`), and a second apply +is a trivial no-op overwrite. But **re-committing new data at a stale +`commit_ts` violates Snapshot Isolation.** Concretely: a reader picks a +snapshot timestamp `S` with `T1 < S < T2` and reads the list — sees +`Len = 229` (the retry has not landed yet). Later the retry commits a write +stamped `T1 < S`. A *re-read at the same `S`* now returns `Len = 230`. The +set of versions visible at a fixed snapshot changed after the snapshot was +chosen — a non-repeatable read inside a pinned snapshot. SI's core +invariant ("the version set at `S` is fixed the instant `S` is chosen") is +broken. So naive `commit_ts` reuse is off the table. + +**The resolution: separate identity from ordering.** Option 2 reuses the +write *set* but **never** the `commit_ts` as a write timestamp: + +- **New data is only ever written at a fresh, strictly-monotonic + `commit_ts` T2** (`Clock().Next()`, which always advances). Any snapshot + `S` that could have been chosen *before* the retry was issued satisfies + `S ≤ last-issued-ts < T2`, so the T2 write is invisible to every such + snapshot — precisely what SI requires of a write that lands after `S` was + fixed. **No snapshot's version set is mutated retroactively. SI holds.** +- **The stale `commit_ts` T1 is used only as a read key** in + `committedVersionAt(primaryKey, T1)`. A point read of an old version at an + exact timestamp adds and removes nothing; it cannot perturb any snapshot's + visible set. T1 is demoted from "the timestamp we re-commit at" to "the + lookup key for *did attempt 1 land?*" — and that demotion is the entire + trick. + +This is why the probe must be **exact** (`== T1`), not loose (`>= T1`). A +loose `>= T1` check (as `applyCommitWithIdempotencyFallback` uses today) +would also fire for outcome 3, where the *other* txn committed at `Tx > T1` +— misclassifying a genuine conflict as a self-duplicate and silently +dropping our append. Exactness keys the dedup to *our own* attempt and +nothing else. + +**Race-freedom: the Raft log already decided E1's fate.** The remaining +worry is a TOCTOU: between the probe reading "attempt 1 not landed" and E2 +writing at T2, could E1 land and produce a double write? No — because the +probe runs *inside the deterministic FSM apply of E2*, and FSM apply is a +pure function of the committed Raft log. By the time E2 applies, the log +order has already fixed E1's outcome to exactly one of: + +- **E1 committed at a lower index than E2.** Then E1 applied before E2; the + T1 version exists; the probe hits; E2 no-ops (outcome 1). +- **E1 was truncated** (lost the log race during the election that produced + the ambiguous error). Then E1 will *never* apply; the T1 version will + never exist; the probe correctly misses; E2 applies at T2 (outcome 2). + +There is **no committed log in which E1 applies after E2.** This rests on +one architectural property, stated explicitly because it is the linchpin: +*the adapter proposes E2 only after attempt 1's dispatch has returned,* and +a dispatch returns only after its entry has either applied or been +abandoned at a leader that subsequently lost leadership. Attempt 1's +proposal therefore went to a leader `L`; the retry's E2 is proposed to a +leader `L'` whose log, at the moment E2 is appended, already contains E1 +either committed-at-a-lower-index or truncated. E1 cannot enter `L'`'s log +*after* E2, because the client-proposal for E1 was abandoned and is never +re-submitted to `L'`. (If `L == L'` and both proposals are queued at the +same leader, they are appended in call order — E1 first — because dispatch +is synchronous: attempt 2 begins only after attempt 1 returns.) The +fallback (a) does **not** rely on this property — it gets its +race-freedom from the lock + commit record instead — which is exactly what +makes (a) the safe retreat if this argument is ever doubted. + +**Conclusion.** Writing only at fresh monotonic timestamps preserves SI; +reading the stale timestamp only (exactly) decides the dedup; the Raft log +order makes the decision race-free. The argument holds, so option 2 is the +primary design and we proceed to implement it. + +### Where it lives — caller audit + +The change is split between the adapter retry loop (reuse the write set; +carry `prev_commit_ts`; reconstruct the result on dedup) and the one-phase +FSM apply (the exact-ts probe + no-op branch). No new persistent state, no +proto dedup table, no GC. + +| Call site | Behaviour change | Risk | +|---|---|---| +| `listPushCore` retry (`adapter/redis.go:3009`) | on retryable error, reuse prior attempt's write set (same `seq`) + carry `prev_commit_ts`; on dedup, return prior length reconstructed at `prev_commit_ts` | result reconstruction (R1); reuse-vs-recompute branch selection (R3) | +| `runTransaction` retry (`adapter/redis.go:2824`) | same write-set reuse for the EXEC body; carry `prev_commit_ts`; reconstruct each mop's result on dedup | multi-mop result reconstruction (R1) | +| `handleOnePhaseTxnRequest` (`kv/fsm.go:393`) | **add** exact-ts probe `committedVersionAt(primaryKey, prev_commit_ts)`; no-op the apply on hit | the only FSM-side change; covered by R2/R4 | +| `commitSecondaryWithRetry` (`kv/sharded_coordinator.go:527`) | **none** — already replays the same `req` (stable `start_ts`/`commit_ts`), content-addressed and idempotent today | — | +| `LockResolver` / `applyTxnResolution` | **none** — still idempotent via the existing lock-missing check | — | + +`prev_commit_ts` is the one new request field on the one-phase path +(a single `uint64`, set only on retries; zero on first attempt → probe +skipped). It is *not* a dedup token in the UUID sense: it is the prior +attempt's own commit timestamp, which the adapter already holds in a local +across the retry loop. + +## Fallback design — option (a): one-phase lock + commit record + +If the option-2 correctness argument fails to convince in review, or a test +exposes a case the argument missed, fall back to making the single-group +one-phase path carry a Percolator-style commit record, so elastickv's +existing, proven `primaryTxnStatus` / `CheckTxnStatus` machinery applies +directly. + +**Mechanism.** Extend `handleOnePhaseTxnRequest` to (1) write a lock at +prewrite and (2) write `txnCommitKey(primaryKey, startTS) = +encodeTxnCommitRecord(commitTS)` at the commit point — i.e. give the +one-phase path the same commit record `buildCommitStoreMutations` +(`kv/fsm.go:619`) already writes on the multi-shard path. Then the +adapter's pre-retry guard is the straightforward CheckTxnStatus probe: +before re-running, call `primaryTxnStatus(primaryKey, startTS_prev)`; if it +reports `Committed`, return the prior result instead of retrying. + +**Why it's the fallback, not the primary.** This turns the one-phase fast +path into a mini-2PC: a lock write plus a commit-record write (and the +commit-point ordering they imply) on *every* single-group transaction — +which is *every* transaction in this deployment. One-phase exists precisely +to avoid that cost; (a) gives it back. It is correct and leans on a +battle-tested primitive (so it is the safe retreat), but it is a measurable +hot-path tax to defend against a rare retry race. Option 2 defends the same +race with cost paid *only on the retry path* (one extra `uint64` in the +request and one exact-ts point read at apply, both reached only after an +attempt has already failed). That asymmetry is why 2 is primary. + +## Why not just remove `retryRedisWrite` from `runTransaction`? + +Removing the EXEC-body retry would eliminate the trigger but trade +correctness for availability: every transient `WriteConflict` (which +happens under normal OCC contention) would surface to the client as a +hard error, requiring the client to retry. Jepsen/Redis clients +typically do not retry write-conflict errors and would interpret them +as `:fail` ops. Availability under contended workloads (BullMQ, +list-append-heavy patterns) would drop measurably. + +Keeping the retry but making it idempotent (reuse + exact-ts dedup) +preserves availability and adds correctness. + +## Implementation milestones (option 2) + +### M1 — previous-attempt write-set + identity plumbing in the retry loop + +- `retryRedisWrite` (`adapter/redis_retry.go`): expose the resolved + `(writeSet, startTS, commitTS, primaryKey)` of the attempt to the loop + driver so the *next* iteration can reuse the write set and carry + `prev_commit_ts`. Today the closure hides this; thread it out via a small + result struct or a pre-attempt callback. +- No proto change beyond the single `prev_commit_ts uint64` on the + one-phase request (M2). +- Unit test: the loop driver observes the prior attempt's write set + + `commit_ts` on the second iteration and reuses the same `seq`. + +### M2 — exact-ts probe at one-phase apply — **LANDED** + +- **M2a (landed).** `store.MVCCStore.CommittedVersionAt(ctx, key, commitTS) + (bool, error)` — an *exact*-timestamp existence check (point lookup), + distinct from `GetAt`'s newest-version-`<=ts` scan. Implemented on + `pebbleStore` (point `Get` on `encodeKey(key, commitTS)`; a tombstone + counts as landed, so the value is not decoded), on the in-memory + `mvccStore` (binary search the TS-ascending version slice), and delegated + by `ShardStore` / `LeaderRoutedStore` to the local group store (apply-local + probe, no leader fence/proxy). Tested on both stores, including the + load-bearing exactness case (a version at 300 must not satisfy a probe at + 200). +- **M2b (landed).** `prev_commit_ts` is carried in the existing `TxnMeta` + blob, **not** a new proto field: a new `TxnMeta.PrevCommitTS` field + V2 + flag `0x04`. `EncodeTxnMeta` emits V2 *only* when `PrevCommitTS != 0`, so + every non-retry caller stays on the V1 wire format (see R5). Wired into + `handleOnePhaseTxnRequest` (`kv/fsm.go`): when `meta.PrevCommitTS != 0`, + probe `CommittedVersionAt(meta.PrimaryKey, meta.PrevCommitTS)` and `return + nil` (no-op the whole apply) on a hit. FSM tests pin: prior attempt landed + at exactly T1 → no-op (no version at T2, newest stays T1); truncated / + never-applied → applies at T2; `prev_commit_ts == 0` → probe skipped, + byte-identical to today. +- **Still open for M2:** an FSM test that simulates the *other*-txn case + (`Tx ≠ T1`) end-to-end so exactness is pinned at the apply layer too (the + store layer already pins it). Fold into M3, where the OCC-conflict path is + exercised. + +### M3 — write-set reuse + result reconstruction in the retry sites + +- **Emission gating (R5) — LANDED.** `prev_commit_ts` is emitted only when + `RedisServer.onePhaseTxnDedup` is on (`WithOnePhaseTxnDedup` / + `ELASTICKV_REDIS_ONEPHASE_DEDUP`), **default off** until the whole cluster + runs a probe-aware binary. While off, `listPushCore` keeps the legacy + recompute-on-retry loop, no V2 meta is produced, the probe never fires, and + the FSM is byte-identical to today — no mixed-version divergence window. +- **`listPushCore` (RPUSH/LPUSH) — LANDED.** When the gate is on, the retry + loop tracks a `reusableListPush` (the prior attempt's `ops`, `startTS`, + `commitTS`, and computed `length`). On a retryable error it REUSES that + write set under a fresh `commit_ts` with `PrevCommitTS` set (threaded + through `OperationGroup` → `ShardedCoordinator.dispatchSingleShardTxn` → + `onePhaseTxnRequestWithPrevCommit`). On success it returns the reused + `length`; on an OCC `WriteConflict` from a reuse it drops the reusable + attempt and recomputes from a fresh meta (outcome 3). Tests cover all three + outcomes end-to-end against real OCC + the real probe, plus the gate-off + legacy path. +- **Result reconstruction (R1) — resolved, and simpler than feared.** See R1. +- `runTransaction` (`adapter/redis.go`): same reuse + reconstruct for the EXEC + body (single-mop first; see Open questions). **Still open.** + +### M4 — Validation + +- Local Jepsen reproduction. Because the trigger is election churn + (see "Resolved" below), reproduce by running the 3-node demo under + CPU pressure or with shortened election timeouts so leadership + flaps during the workload. +- Scheduled Jepsen run goes 7 consecutive days without + `:duplicate-elements` / `:G-single-item-realtime`. + +Scope estimate: M1–M3 are adapter + one `store` helper + a one-field +one-phase request change (~250 LOC Go + tests), no FSM dedup table, no GC. +M4 is verification only. + +## Risks + +### R1 — Client-visible result reconstruction on the dedup no-op — resolved + +The doc originally called this "the most intricate part." For the list-push +family it turns out to be trivial, and the reason is the heart of option 2: +**because the retry reuses the write set built from attempt 1's meta, the +client-visible result is invariant across reuse.** RPUSH/LPUSH returns the +post-push length `meta.Len + len(values)`, computed once from attempt 1's +meta read. Whether attempt 1 physically committed it (probe hit → no-op) or +the reuse re-applied the identical write set (probe miss → apply at T2), the +list ends in the same state and the return value is the same number. So the +adapter simply **returns the remembered `length`** — no store re-read, no +reconstruction at `prev_commit_ts`. (`listPushCoreWithDedup` keeps it in the +`reusableListPush.length` field.) The genuine-conflict branch is the only one +that produces a different length, and there the adapter recomputes from a +fresh meta anyway, so the length is naturally correct. + +This invariance is specific to commands whose retry reuses a fixed write set. +A MULTI/EXEC body with read-dependent results (e.g. an `INCR` whose return is +the post-increment value) needs the same treatment — remember the +client-visible result computed on the first attempt and return it on a dedup +no-op — which is tractable but per-command. Mitigation unchanged: scope the +first PR to RPUSH/LPUSH (the commands the Jepsen workload exercises); commands +without a reuse path keep today's (buggy-under-churn, gate-off-by-default) +recompute, so the change is strictly an improvement, never a regression. + +### R2 — Probe read cost on the retry path + +Each retry now carries one `uint64` and does one extra MVCC point-read +(`committedVersionAt`) at apply before deciding to write. Retries only +happen on conflict, which is rare, and the read is a single key — cost is +negligible relative to the Raft round-trip the retry would otherwise pay. +The hot (no-conflict) path is untouched: `prev_commit_ts` is zero on the +first attempt, so the probe is skipped entirely. + +### R3 — Reuse-vs-recompute branch selection + +Write-set reuse is correct only for a *self*-conflict (our own prior +attempt). A genuine cross-txn conflict must degrade to recompute (outcome +3). The discriminator is the exact-ts probe plus the OCC check at reuse: +probe-miss + OCC-conflict ⇒ recompute. If the adapter ever reuses when it +should recompute, it would drop a legitimate distinct append; if it +recomputes when it should reuse, it reintroduces the duplicate. The unit +tests in M3 must pin both directions. Exactness of the probe (R-argument +above) is what keeps these from blurring. + +### R4 — Exact-ts probe under deep churn + +If the prior attempt's data version is itself truncated by a *subsequent* +election before E2 applies, the probe correctly returns false and E2 +applies its reused write set at T2 — still one element, still correct +(outcome 2), because reuse keeps `seq` stable so no duplicate index is +created even when the probe misses. This is a strict improvement over the +original bug, which only duplicated *because* the retry recomputed `seq`. +The residual failure would require E1 to apply *after* E2 in the committed +log, which the race-freedom argument rules out under the synchronous- +dispatch property; if that property is ever violated in practice, fall back +to (a). + +### R5 — FSM determinism across a rolling upgrade + +The probe changes the apply *outcome* (no-op vs. apply at T2), so it must be +computed identically on every node — FSM apply is deterministic by +contract; a node that no-ops while another applies would diverge the +replicas. The decision depends only on (a) the presence of `prev_commit_ts` +in the replicated log entry (identical on every node) and (b) +`CommittedVersionAt(primaryKey, prev_commit_ts)`, which is itself +deterministic: at entry E2's apply, every node has applied exactly E1..E2-1, +so the probe result is the same everywhere. The only divergence risk is a +node that lacks the probe code yet receives an entry carrying +`prev_commit_ts` — it would ignore the dedup and apply at T2 while upgraded +nodes no-op. + +Two design choices close this: + +1. **`prev_commit_ts` rides in `TxnMeta` V2, and V2 is emitted only when + `prev_commit_ts != 0`.** Every existing (non-retry) caller keeps emitting + V1, so the default wire format is unchanged. A node that predates the V2 + flag would *reject* an unknown flag (`decodeTxnMetaV2` returns + "unsupported flags") rather than silently diverge — fail-loud, not + fail-silent. +2. **Emission is gated off by default** (M3): the leader only starts + emitting a non-zero `prev_commit_ts` once the whole cluster runs a + probe-aware binary. Until then, no V2-with-prev-flag entry is ever + produced, the probe never fires, and the FSM is byte-identical to today. + The probe code (M2b) is always present but inert at `prev_commit_ts == 0`. + +The operational contract is therefore: roll out the probe-aware binary +everywhere first (behaves exactly as before), *then* enable emission. This +is the standard "ship the reader before the writer" sequencing for a +replicated-state-machine format change. + +## Resolved: the trigger is leadership churn during commit + +> **Closed 2026-05-24** with the full demo-cluster log from scheduled +> run +> [26340084100](https://github.com/bootjp/elastickv/actions/runs/26340084100) +> (the first failing run after PR #795 landed the full-log artifact). + +The full log positively identifies the trigger as a **leader-election +storm racing an in-flight commit**, not a single localized code bug. +Run 26340084100 reproduced the SAME `:duplicate-elements` anomaly +(`{153 2}` on key 17, with adjacent values 154/155 lost) AND added a +downstream `:G-single-item-realtime` cycle — the latter is the +append-checker's report of the corrupted per-key version order the +duplicate causes, not an independent fault. + +Server-side evidence, `elastickv-demo.log`: + +- The 3-node single-process demo went through **20+ leadership + changes in the ~3-minute workload** (terms 2→3→5→7→11→…→23), with + repeated `found conflict at index N [existing term: X, conflicting + term: Y]` — etcd raft truncating uncommitted log-tail entries on + each leader change. +- At `18:21:48` — the exact instant `[:append 17 153]` was processed + — a 3-way pre-vote/vote storm was in flight: two peers at + `index: 888` campaigning while the incumbent leader sat at + `index: 890` (two entries ahead) with a still-valid lease + (`remaining ticks: 6`). The two-entry divergence is precisely the + window where a freshly-proposed commit lands in the leader's log + but has not yet committed to quorum. + +Causal chain, now confirmed: + +1. The `append 17 153` commit is proposed to the incumbent leader and + lands at log index ~889. +2. The election storm makes that entry's fate ambiguous: the leader's + lease keeps it serving briefly, but a successor at a higher term + may truncate index 889 (`found conflict at index …`). +3. The adapter's `retryRedisWrite` observes a `WriteConflict` / + transient error and **retries with a fresh `commitTS`** (the + mechanism this doc's "Problem" section describes). +4. The original entry (if it survived the election) plus the retry + both commit → two `153`s at two different list indices. + +This confirms the doc's core hypothesis (retry regenerating +`commitTS` *and `seq`* defeats content-addressing) and pins the trigger +to leadership churn rather than `commitSecondaryWithRetry` or the +LockResolver specifically. Option 2 is the correct fix because it +deduplicates **regardless of which raft event produced the ambiguous +commit** — the retry reuses the prior write set (stable `seq`) and the +one-phase apply deduplicates on the prior attempt's exact `commit_ts`, +both of which are stable across the `retryRedisWrite` regeneration and +the leader change. Step (3) in the chain above maps directly to outcome 1 +(probe hits → no-op) and step (2)'s truncation maps to outcome 2 (probe +misses → apply at T2, still one element because `seq` is reused). + +### CI-environment amplifier (separate, non-blocking) + +The 20+ elections in 3 minutes are abnormally frequent and are +amplified by the CI environment: the single-process 3-node demo runs +co-located with the JVM Jepsen client on a 2-core GitHub runner. The +`lease is not expired (remaining ticks: 6)` + pre-vote-storm pattern +is the signature of heartbeats delayed by CPU starvation, which trips +election timeouts. This makes the ambiguous-commit window far more +frequent in CI than in the 5-node production cluster on dedicated +VMs. It does NOT invalidate the bug — production leader churn during +rolling deploys and failures hits the same window — but it explains +why CI surfaces it on most scheduled runs while production has not +reported a user-visible duplicate. A follow-up may pin the demo's +raft tick/election timing or the runner's CPU budget to reduce CI +flakiness, tracked separately from the correctness fix. + +## Open questions + +- **Result reconstruction coverage.** The dedup no-op must reconstruct each + command's client-visible result at `prev_commit_ts`. RPUSH/LPUSH (length) + and single-mop EXEC are in scope for the first PR; multi-mop EXEC and + other write commands (SET/INCR/HSET/…) need per-command reconstruction + hooks. Which commands beyond RPUSH/LPUSH must land in the first PR vs. + follow-ups? (Default: only the list-append family the Jepsen workload + exercises; everything else keeps today's behaviour until its hook is + added.) +- **Race-freedom linchpin — RE-VERIFIED (2026-05-30).** The race-freedom + argument needs **E1 applies before E2, or never** — never the other way + round. An earlier verification (2026-05-26) appealed to `applyRequests` + using `context.Background()` so that `Engine.Propose` could not abandon on + a caller timeout. That argument was based on a stale read; codex correctly + pointed out (PR #796 review) that the current single-shard txn path + threads the caller ctx through (`dispatchSingleShardTxn` → + `TransactionManager.Commit(ctx, …)` → `applyRequests(ctx, …)` → + `Engine.Propose(ctx, …)`), and `Engine.Propose` has a live `ctx.Done()` + early-return that calls `cancelPendingProposal`. So "Background blocks" + is *not* what guarantees ordering. Re-prove it on what actually does: + the FIFO propose channel and monotonic Raft log indices. + + The relevant facts in `internal/raftengine/etcd/engine.go`: + - `Engine.Propose` enqueues `req` onto `e.proposeCh` (`engine.go:626`) + *before* awaiting `req.done`. The raft run loop dequeues in order and + submits to the underlying raft node, which assigns log indices + monotonically on a single leader. A request that has entered + `proposeCh` cannot be un-enqueued: `cancelPendingProposal` + (`engine.go:2326`) only removes the pending-proposal notification + entry; the payload may still be appended to the leader's log. + - On commit, `applyNormalEntry` (`engine.go:1769`) runs `fsm.Apply` at + the entry's assigned log index *regardless* of whether the proposal + notification has been drained, so a cancelled or leadership-drained + proposal still applies if it was already committed by the log. + - The adapter goroutine that issues E1 returns control *before* it + issues E2 (the retry is sequential), and E1's Propose-submit + happens-before E2's. With a single leader, E1 therefore lands in + `proposeCh` ahead of E2 and is assigned a strictly lower log index. + If leadership has changed in between: E2 is proposed to a *different* + leader and gets a fresh-tail index in that leader's log; E1 in the + deposed leader's log was either already committed (index < E2's, + applies first) or truncated (never applies). Across leaders, E1's + original payload is never re-submitted to the new leader — the + `failPending(errNotLeader)` drain (`engine.go:2179-2190`) errors the + *pending* entry on this engine, but the adapter creates a brand new + request id on retry, so E2 is a different proposal, not a re-issue + of E1. + + Therefore E1 either applies at a lower log index than E2, or never + applies at all. It cannot apply *after* E2. This holds whether the + caller ctx is `Background()` or a cancellable context; cancellation + just lets the adapter learn about a fate Raft has already decided. + The ambiguous-commit case (E1 committed by the successor leader while + the adapter received `errNotLeader`) maps to outcome 1: E1 applies + first, the exact-ts probe hits, E2 no-ops. + +- **FSM probe determinism — retention guard reverted (2026-05-30, round-11).** + Round-10 surfaced `ErrReadTSCompacted` from `CommittedVersionAt` when + the probed `commit_ts` fell below `minRetainedTS`, mirroring + `GetAt`/`ExistsAt` semantics (codex P2 round-10). Codex P1 round-11 + correctly flagged that branching the FSM dedup decision on this signal + is non-deterministic across raft replicas: FSM compaction advances + `minRetainedTS` from local wall clock and per-replica scheduler + (`kv/compactor.go:safeMinTS` uses `time.Now()`), so two replicas + applying the same log entry at the same log index can return different + probe outcomes — one falls through and applies at the fresh + `commit_ts`, the other no-ops, leaving diverging MVCC histories. + + The fix reverts the retention guard inside `CommittedVersionAt`. The + probe is now a single `pebble.Get` (or sorted-slice lookup for the + in-memory store) and never returns `ErrReadTSCompacted`. For the + option-2 use case this is deterministic across replicas: every + per-element key carries at most one MVCC version (`elem:N -> value` + appended by attempt 1), so physical pebble compaction does not remove + it — there is no superseding version to retire. The cluster-wide + invariant `retentionWindow > max(adapter retry latency)` keeps the + rare "real never-landed retry below the compacted floor" case out of + reach in practice; future work can replace the invariant with a + separate replicated commit-ts log if option 2 needs to support + arbitrary key shapes. + + `ErrReadTSCompacted` continues to surface from `GetAt`, `ScanAt`, + `RawGetAt`, `RawScanAt` for genuine historical-read callers (gRPC + adapter, S3 adapter, dualwrite proxy) where the guard expresses the + truthful answer "the value at this timestamp is no longer + recoverable". + +## Decision log + +- (2026-05-21) initial proposal; open for review. +- (2026-05-24) open question closed: full log from run 26340084100 + confirms the trigger is an election storm racing an in-flight + commit. Recorded the CI CPU-starvation amplifier as a separate + non-blocking follow-up. +- (2026-05-24) design switched from the UUID + FSM-marker + GC approach to + a TiKV-style `CheckTxnStatus` probe at the adapter retry boundary. +- (2026-05-26) **CheckTxnStatus-as-primary retracted**: the cluster is + single-group, so every txn takes the one-phase path, which writes no lock + and no commit record — `CheckTxnStatus` has nothing to read. Doc + rewritten with **option 2 (write-set reuse + exact-ts dedup) as primary** + and **option (a) (one-phase lock + commit record, enabling + `CheckTxnStatus`) as the documented fallback**. The decision hinges on the + "commit_ts reuse vs stale-ts ordering" argument, written head-on above: + new data is written only at a fresh monotonic `commit_ts` (SI preserved), + the stale `commit_ts` is read-only in an exact-ts probe (dedup identity), + and Raft log order makes the probe race-free (E1 applies before E2 or + never). Because the argument holds, option 2 is primary and implementation + (M1–M4) is authorised to begin; (a) is the retreat if the synchronous- + dispatch linchpin (Open questions) cannot be guaranteed. +- (2026-05-26) **synchronous-dispatch linchpin verified** against the + propose/apply path (`kv/transaction.go:152`, `engine.go:606/1780/2179`): + the txn dispatch proposes with `context.Background()` and blocks until + apply or until the proposal is failed-and-drained on a Leader→non-Leader + transition; a drained proposal is never re-proposed, and `fsm.Apply` runs + at the entry's own (lower) log index. So attempt-1's entry applies before + the retry's or never — never after. The race-freedom premise holds; option + 2 is cleared to implement. +- (2026-05-26) **M2 landed.** M2a: `CommittedVersionAt` exact-ts probe on + both store implementations (+ ShardStore/LeaderRoutedStore delegation), + tested. M2b: `prev_commit_ts` carried as a new `TxnMeta.PrevCommitTS` field + + V2 flag `0x04` (no proto change), with `EncodeTxnMeta` emitting V2 only + when set, so the default wire format stays V1; the probe is wired into + `handleOnePhaseTxnRequest` (no-op the apply on an exact hit). Recorded R5 + (FSM determinism across a rolling upgrade): resolved by the V1-default wire + format + fail-loud unknown-flag rejection + default-off emission gating + (ship the reader before the writer). Next: M1 (thread the prior attempt's + write set + commit_ts out of `retryRedisWrite`) and M3 (write-set reuse + + gated emission + result reconstruction in `listPushCore` / `runTransaction`). +- (2026-05-27) **M1 + M3 (list-push) landed.** `OperationGroup.PrevCommitTS` + threads through `ShardedCoordinator` → `dispatchSingleShardTxn` → + `onePhaseTxnRequestWithPrevCommit`. `listPushCore` gained a write-set-reuse + retry loop (`listPushCoreWithDedup` + `reusableListPush`) gated by + `RedisServer.onePhaseTxnDedup` (default off; `WithOnePhaseTxnDedup` / + `ELASTICKV_REDIS_ONEPHASE_DEDUP`). **R1 resolved and downgraded from "most + intricate" to trivial for list-push:** because the retry reuses the write + set built from attempt 1's meta, the post-push length is invariant across + reuse, so reconstruction is just returning the remembered `length` — no + store re-read. Adapter tests cover all three outcomes (landed→dedup→no + duplicate; not-landed→apply; genuine cross-txn conflict→recompute) against + real OCC + the real probe, plus the gate-off legacy path. Remaining: + `runTransaction` EXEC-body reuse (per-command result memo) and M4 (Jepsen). diff --git a/kv/coordinator.go b/kv/coordinator.go index 752c1c5f8..3260274a7 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -462,7 +462,7 @@ func (c *Coordinate) dispatchOnce(ctx context.Context, reqs *OperationGroup[OP]) var resp *CoordinateResponse var err error if reqs.IsTxn { - resp, err = c.dispatchTxn(ctx, reqs.Elems, reqs.ReadKeys, reqs.StartTS, reqs.CommitTS) + resp, err = c.dispatchTxn(ctx, reqs.Elems, reqs.ReadKeys, reqs.StartTS, reqs.CommitTS, reqs.PrevCommitTS) } else { resp, err = c.dispatchRaw(ctx, reqs.Elems) } @@ -798,7 +798,7 @@ func (c *Coordinate) nextStartTS() uint64 { return c.clock.Next() } -func (c *Coordinate) dispatchTxn(ctx context.Context, reqs []*Elem[OP], readKeys [][]byte, startTS uint64, commitTS uint64) (*CoordinateResponse, error) { +func (c *Coordinate) dispatchTxn(ctx context.Context, reqs []*Elem[OP], readKeys [][]byte, startTS uint64, commitTS uint64, prevCommitTS uint64) (*CoordinateResponse, error) { if len(readKeys) > maxReadKeys { return nil, errors.WithStack(ErrInvalidRequest) } @@ -827,9 +827,11 @@ func (c *Coordinate) dispatchTxn(ctx context.Context, reqs []*Elem[OP], readKeys // window that exists between the adapter's pre-Raft validateReadSet call // and FSM application. The adapter's validateReadSet is kept as a fast // path to fail early without a Raft round-trip, but the FSM check is - // the authoritative, serializable validation. + // the authoritative, serializable validation. prevCommitTS, when set, + // carries the option-2 one-phase dedup probe key for a retry that reuses + // a failed attempt's write set. r, err := c.transactionManager.Commit(ctx, []*pb.Request{ - onePhaseTxnRequest(startTS, commitTS, primary, reqs, readKeys), + onePhaseTxnRequestWithPrevCommit(startTS, commitTS, prevCommitTS, primary, reqs, readKeys), }) if err != nil { return nil, errors.WithStack(err) @@ -973,12 +975,16 @@ func (c *Coordinate) buildRedirectRequests(reqs *OperationGroup[OP]) ([]*pb.Requ // so the leader assigns both timestamps consistently. A caller-provided // CommitTS without a StartTS would produce an invalid txn where // CommitTS <= StartTS (because StartTS=0 at the forwarding site). + // PrevCommitTS is the immutable identity of the prior attempt — it is + // allocated by the originating adapter, not by the leader — so it is + // forwarded unconditionally so the leader's one-phase apply can run the + // option-2 dedup probe. commitTS := reqs.CommitTS if reqs.StartTS == 0 { commitTS = 0 } return []*pb.Request{ - onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems, reqs.ReadKeys), + onePhaseTxnRequestWithPrevCommit(reqs.StartTS, commitTS, reqs.PrevCommitTS, primary, reqs.Elems, reqs.ReadKeys), }, nil } @@ -1018,9 +1024,20 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation { panic("unreachable") } -func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP], readKeys [][]byte) *pb.Request { +// onePhaseTxnRequestWithPrevCommit builds a single-shard one-phase request, +// optionally carrying prevCommitTS — the commit timestamp of a failed prior +// attempt of the same transaction. When prevCommitTS is non-zero the FSM +// (handleOnePhaseTxnRequest) probes whether that attempt already landed and +// no-ops the apply if so (option 2 dedup). When it is zero the encoded meta +// is byte-identical to the pre-feature V1 form, so non-retry callers are +// unaffected. See docs/design/2026_05_21_proposed_txn_secondary_idempotency.md. +func onePhaseTxnRequestWithPrevCommit(startTS, commitTS, prevCommitTS uint64, primaryKey []byte, reqs []*Elem[OP], readKeys [][]byte) *pb.Request { muts := make([]*pb.Mutation, 0, len(reqs)+1) - muts = append(muts, txnMetaMutation(primaryKey, 0, commitTS)) + muts = append(muts, &pb.Mutation{ + Op: pb.Op_PUT, + Key: []byte(txnMetaPrefix), + Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, CommitTS: commitTS, PrevCommitTS: prevCommitTS}), + }) for _, req := range reqs { muts = append(muts, elemToMutation(req)) } diff --git a/kv/coordinator_txn_test.go b/kv/coordinator_txn_test.go index 552fb495f..df7ec3bf7 100644 --- a/kv/coordinator_txn_test.go +++ b/kv/coordinator_txn_test.go @@ -37,7 +37,7 @@ func TestCoordinateDispatchTxn_RejectsNonMonotonicCommitTS(t *testing.T) { _, err := c.dispatchTxn(context.Background(), []*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, nil, startTS, 0) + }, nil, startTS, 0, 0) require.ErrorIs(t, err, ErrTxnCommitTSRequired) require.Equal(t, 0, tx.commits) } @@ -53,7 +53,7 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) { _, err := c.dispatchTxn(context.Background(), []*Elem[OP]{ {Op: Put, Key: nil, Value: []byte("v")}, - }, nil, 1, 0) + }, nil, 1, 0, 0) require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired) require.Equal(t, 0, tx.commits) } @@ -71,7 +71,7 @@ func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) { _, err := c.dispatchTxn(context.Background(), []*Elem[OP]{ {Op: Put, Key: []byte("b"), Value: []byte("v1")}, {Op: Del, Key: []byte("x")}, - }, nil, startTS, 0) + }, nil, startTS, 0, 0) require.NoError(t, err) require.Equal(t, 1, tx.commits) require.Len(t, tx.reqs, 1) @@ -108,7 +108,7 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) { commitTS := uint64(25) _, err := c.dispatchTxn(context.Background(), []*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, nil, startTS, commitTS) + }, nil, startTS, commitTS, 0) require.NoError(t, err) require.Len(t, tx.reqs, 1) require.Len(t, tx.reqs[0], 1) @@ -130,7 +130,7 @@ func TestCoordinateDispatchTxn_PassesReadKeysToRaftEntry(t *testing.T) { readKeys := [][]byte{[]byte("rk1"), []byte("rk2")} _, err := c.dispatchTxn(context.Background(), []*Elem[OP]{ {Op: Put, Key: []byte("k"), Value: []byte("v")}, - }, readKeys, 10, 0) + }, readKeys, 10, 0, 0) require.NoError(t, err) require.Len(t, tx.reqs, 1) require.Len(t, tx.reqs[0], 1) diff --git a/kv/fsm.go b/kv/fsm.go index 710ce872f..3a345df16 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -488,6 +488,33 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com return errors.WithStack(ErrTxnCommitTSRequired) } + // Option-2 idempotency dedup: when this is a retry (meta.PrevCommitTS set), + // the adapter reused the failed attempt's write set under a fresh commitTS. + // If the previous attempt already landed — its entry survived the leader + // churn that returned an ambiguous error — there is a committed version of + // the primary key at exactly PrevCommitTS. Re-applying would create a + // duplicate (the very :duplicate-elements anomaly), so no-op the whole + // apply and let the adapter reconstruct the prior result. + // + // Determinism note (codex P1 round-11): the underlying CommittedVersionAt + // intentionally does NOT enforce the retention watermark — branching FSM + // apply on the per-replica minRetainedTS would let replicas with stale + // retention surface ErrReadTSCompacted and skip dedup while replicas that + // still retain the previous version no-op, producing split-brain. The + // store's probe returns the raw pebble.Get answer; for the option-2 use + // case (per-element keys, single MVCC version each) physical compaction + // does not remove the version, so the probe is identical on every replica + // applying this log entry. The retention-window > max-retry-latency + // invariant prevents the rare case where a real never-landed retry + // arrives with PrevCommitTS below pebble's compacted floor. + dedup, err := f.dedupProbeOnePhase(ctx, meta) + if err != nil { + return err + } + if dedup { + return nil + } + uniq, err := uniqueMutations(muts) if err != nil { return err @@ -500,6 +527,25 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, commitTS)) } +// dedupProbeOnePhase decides whether handleOnePhaseTxnRequest should no-op +// because the entry is a retry whose prior attempt already landed. Extracted +// to keep handleOnePhaseTxnRequest under the cyclop budget; the determinism +// rationale lives at the call site. +// +// Returns (true, nil) → the entry must no-op (prior attempt landed). +// Returns (false, nil) → fall through to normal apply. +// Returns (false, err) → propagate err; apply must not proceed. +func (f *kvFSM) dedupProbeOnePhase(ctx context.Context, meta TxnMeta) (bool, error) { + if meta.PrevCommitTS == 0 { + return false, nil + } + landed, err := f.store.CommittedVersionAt(ctx, meta.PrimaryKey, meta.PrevCommitTS) + if err != nil { + return false, errors.WithStack(err) + } + return landed, nil +} + func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error { meta, muts, err := extractTxnMeta(r.Mutations) if err != nil { diff --git a/kv/fsm_onephase_dedup_test.go b/kv/fsm_onephase_dedup_test.go new file mode 100644 index 000000000..de9d3d9db --- /dev/null +++ b/kv/fsm_onephase_dedup_test.go @@ -0,0 +1,112 @@ +package kv + +import ( + "context" + "testing" + + pb "github.com/bootjp/elastickv/proto" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// onePhaseReq builds a single-shard (Phase_NONE) transactional request whose +// meta carries commitTS and, when non-zero, prevCommitTS — the option-2 +// idempotency dedup probe key. The mutation set is a single PUT of key=value, +// mirroring the failed attempt's reused write set on a retry. +func onePhaseReq(startTS, commitTS, prevCommitTS uint64, key, value []byte) *pb.Request { + return &pb.Request{ + IsTxn: true, + Phase: pb.Phase_NONE, + Ts: startTS, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{ + PrimaryKey: key, + CommitTS: commitTS, + PrevCommitTS: prevCommitTS, + })}, + {Op: pb.Op_PUT, Key: key, Value: value}, + }, + } +} + +// TestOnePhaseDedup_NoOpsWhenPriorAttemptLanded is the core option-2 case: +// the first attempt committed at T1 (its raft entry survived the churn that +// returned an ambiguous error), then the adapter retried with the SAME write +// set under a fresh commit_ts T2 and prev_commit_ts=T1. The FSM probe finds +// the prior version at exactly T1 and no-ops the whole apply, so no second +// version is written at T2 — preventing the duplicate. This runs at the +// retry entry's deterministic apply point, so every node computes the same +// no-op decision. +func TestOnePhaseDedup_NoOpsWhenPriorAttemptLanded(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM) + require.True(t, ok) + + key := []byte("list-item") + + // Attempt 1 lands at commit_ts 20. + require.NoError(t, applyFSMRequest(t, fsm, onePhaseReq(10, 20, 0, key, []byte("v")))) + landed, err := st.CommittedVersionAt(ctx, key, 20) + require.NoError(t, err) + require.True(t, landed) + + // Retry: reused write set, fresh commit_ts 40, prev_commit_ts=20. + require.NoError(t, applyFSMRequest(t, fsm, onePhaseReq(30, 40, 20, key, []byte("v")))) + + // The retry must be a no-op: no version at 40, and the newest version is + // still the one from attempt 1 at 20. + at40, err := st.CommittedVersionAt(ctx, key, 40) + require.NoError(t, err) + require.False(t, at40, "retry must not write a second version at the fresh commit_ts") + + latest, exists, err := st.LatestCommitTS(ctx, key) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, uint64(20), latest, "newest version must remain attempt 1's at 20") +} + +// TestOnePhaseDedup_AppliesWhenPriorAttemptDidNotLand covers the truncated / +// never-applied case: prev_commit_ts is set but no version exists at exactly +// that timestamp (attempt 1's entry lost the log race). The probe misses and +// the retry applies its reused write set at the fresh commit_ts. +func TestOnePhaseDedup_AppliesWhenPriorAttemptDidNotLand(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM) + require.True(t, ok) + + key := []byte("list-item") + + // No attempt 1 ever landed; the retry carries prev_commit_ts=20 anyway. + require.NoError(t, applyFSMRequest(t, fsm, onePhaseReq(30, 40, 20, key, []byte("v")))) + + at40, err := st.CommittedVersionAt(ctx, key, 40) + require.NoError(t, err) + require.True(t, at40, "with no prior version at 20, the retry must apply at 40") + + val, err := st.GetAt(ctx, key, ^uint64(0)) + require.NoError(t, err) + require.Equal(t, []byte("v"), val) +} + +// TestOnePhaseDedup_FirstAttemptSkipsProbe confirms the probe is inert when +// prev_commit_ts is zero (the first attempt / pre-feature path): the apply +// proceeds exactly as before, leaving the FSM byte-identical to today for +// every non-retry request. +func TestOnePhaseDedup_FirstAttemptSkipsProbe(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM) + require.True(t, ok) + + key := []byte("k") + require.NoError(t, applyFSMRequest(t, fsm, onePhaseReq(10, 20, 0, key, []byte("v")))) + + at20, err := st.CommittedVersionAt(ctx, key, 20) + require.NoError(t, err) + require.True(t, at20) +} diff --git a/kv/leader_routed_store.go b/kv/leader_routed_store.go index a97595d9d..cfd935bb1 100644 --- a/kv/leader_routed_store.go +++ b/kv/leader_routed_store.go @@ -188,6 +188,63 @@ func (s *LeaderRoutedStore) ExistsAt(ctx context.Context, key []byte, ts uint64) return err == nil, errors.WithStack(err) } +// CommittedVersionAt gates the exact-timestamp existence probe so client +// reads through this wrapper get a fresh authoritative answer even on a +// deposed leader. The FSM apply path is NOT affected — it holds the raw +// local store (not a LeaderRoutedStore), so its deterministic probe never +// goes through this method. The option-2 reuse path +// (RedisServer.resolveReuseLength) DOES call this and needs the +// authoritative answer to preserve the pending.length fast-path +// (returning the per-our-commit length rather than the leader's current +// Len) when our prior attempt actually committed. +// +// Two-path strategy, mirroring how LatestCommitTS uses a lease fast-path +// and a proxy slow-path: +// +// - We are the leader with a valid lease (leaderOKForKey is true): the +// local replica is up-to-date by the lease invariant; read local. +// - Not leader (deposed or never): there is no RawCommittedVersionAt +// RPC to proxy to, so use the coordinator's LinearizableRead to +// submit a Raft ReadIndex — that protocol forwards to the current +// leader and waits until our local applied index has caught up to +// the leader's commit point. After that, a local probe sees every +// committed version of this key (including any landed at commitTS). +// If the read-index fails (no leader reachable, ctx canceled), fall +// back to (false, nil); the adapter's resolveReuseLength then re-reads +// via the already-leader-fenced ScanAt/GetAt, returning the leader's +// current Len — a valid serialization, just not the per-our-commit +// value. +func (s *LeaderRoutedStore) CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error) { + if s == nil || s.local == nil { + return false, nil + } + if !s.leaderOKForKey(ctx, key) && !s.tryLinearizableFence(ctx) { + // Not leader and ReadIndex failed (no coordinator wired, no leader + // reachable, ctx canceled). Report (false, nil) so the adapter's + // resolveReuseLength falls through to the leader-fenced ScanAt/GetAt + // path, which returns a valid current-Len serialization. + return false, nil + } + exists, err := s.local.CommittedVersionAt(ctx, key, commitTS) + return exists, errors.WithStack(err) +} + +// tryLinearizableFence submits a Raft ReadIndex via the coordinator and +// reports whether it succeeded. After a successful ReadIndex the local +// applied index is caught up to the current leader's commit point, so a +// subsequent local read sees every committed version. The error from the +// underlying call is intentionally not surfaced — callers that need the +// authoritative answer treat a failed fence as "couldn't verify, fall back +// to the leader-routed slow path." Structured to avoid the nilerr +// false positive at the call site. +func (s *LeaderRoutedStore) tryLinearizableFence(ctx context.Context) bool { + if s == nil || s.coordinator == nil { + return false + } + _, err := s.coordinator.LinearizableRead(ctx) + return err == nil +} + func (s *LeaderRoutedStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error) { if s == nil || s.local == nil { return []*store.KVPair{}, nil diff --git a/kv/shard_store.go b/kv/shard_store.go index 34ceb33ef..2e811446c 100644 --- a/kv/shard_store.go +++ b/kv/shard_store.go @@ -98,6 +98,69 @@ func (s *ShardStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, return v != nil, nil } +// CommittedVersionAt routes the exact-timestamp existence probe to the +// owning group's local store, gated on the same lease-aware leader check +// GetAt uses, so a deposed node that has not yet applied a freshly- +// committed entry does not silently return false to a client read. The +// FSM apply path is NOT affected — it holds the per-shard store directly +// (not ShardStore) and runs the probe on the deterministic local replica +// it is writing to. The option-2 reuse path (RedisServer.resolveReuseLength) +// goes through this wrapper, so during leader churn the probe must answer +// authoritatively or defer to a leader-routed re-read. +// +// There is no RawCommittedVersionAt RPC to proxy to; when we are not the +// linearizable leader for the group we return (false, nil) and let the +// caller fall back to derived reads (resolveListMeta uses ScanAt/GetAt, +// which ARE leader-fenced / proxied per group). The fallback returns the +// leader's current Len — a valid serialization — at the cost of the +// pending.length fast-path during churn. Mirrors LeaderRoutedStore's fix +// for codex P1 #796. +func (s *ShardStore) CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error) { + g, ok := s.groupForKey(key) + if !ok || g.Store == nil { + return false, nil + } + // engineForGroup may be nil in test fixtures that wire ShardStore + // without raft; preserve the existing local-only fallback there. + engine := engineForGroup(g) + if engine == nil { + exists, err := g.Store.CommittedVersionAt(ctx, key, commitTS) + if err != nil { + return false, errors.WithStack(err) + } + return exists, nil + } + if !isLinearizableRaftLeader(ctx, engine) && !tryEngineLinearizableFence(ctx, engine) { + // Not the linearizable leader for this group AND the ReadIndex + // fence failed (no leader reachable, ctx canceled). Fall back to + // (false, nil); the adapter's resolveListMeta path takes over via + // the leader-fenced ScanAt/GetAt and returns a valid current-Len + // serialization. + return false, nil + } + exists, err := g.Store.CommittedVersionAt(ctx, key, commitTS) + if err != nil { + return false, errors.WithStack(err) + } + return exists, nil +} + +// tryEngineLinearizableFence submits a Raft ReadIndex via the per-group +// engine and reports whether it succeeded. After a successful ReadIndex +// the local applied index is caught up to the current leader's commit +// point, so a subsequent local read sees every committed version. The +// error from the underlying call is intentionally not surfaced — callers +// that need the authoritative answer treat a failed fence as "couldn't +// verify, fall back to the leader-routed slow path." Structured to avoid +// the nilerr false positive at the call site. +func tryEngineLinearizableFence(ctx context.Context, engine raftengine.LeaderView) bool { + if engine == nil { + return false + } + _, err := engine.LinearizableRead(ctx) + return err == nil +} + // ScanAt scans keys across shards at the given timestamp. Note: when the range // spans multiple shards, each shard may have a different Raft apply position. // This means the returned view is NOT a globally consistent snapshot — it is diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index a88b173dc..6b0971003 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -263,7 +263,7 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[ } if reqs.IsTxn { - return c.dispatchTxn(ctx, reqs.StartTS, reqs.CommitTS, reqs.Elems, reqs.ReadKeys) + return c.dispatchTxn(ctx, reqs.StartTS, reqs.CommitTS, reqs.PrevCommitTS, reqs.Elems, reqs.ReadKeys) } logs, err := c.requestLogs(reqs) @@ -366,7 +366,7 @@ func (c *ShardedCoordinator) broadcastToAllGroups(ctx context.Context, requests return &CoordinateResponse{CommitIndex: maxIndex.Load()}, nil } -func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, commitTS uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { +func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, commitTS uint64, prevCommitTS uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { if len(readKeys) > maxReadKeys { return nil, errors.WithStack(ErrInvalidRequest) } @@ -390,14 +390,35 @@ func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, co // If any read key belongs to a different shard the 2PC path is required // so that validateReadOnlyShards can issue a linearizable read barrier, // preserving SSI. - return c.dispatchSingleShardTxn(ctx, startTS, commitTS, primaryKey, gids[0], elems, readKeys) - } - - // Multi-shard path: group read keys by shard now. The result is passed - // directly to prewriteTxn to avoid a second iteration inside that function. - // A routing failure here aborts the transaction before any prewrite — - // silently dropping unresolvable read keys would let OCC validation run - // with an incomplete read set and break SSI. + return c.dispatchSingleShardTxn(ctx, startTS, commitTS, prevCommitTS, primaryKey, gids[0], elems, readKeys) + } + return c.dispatchMultiShardTxn(ctx, startTS, commitTS, prevCommitTS, primaryKey, grouped, gids, readKeys) +} + +// dispatchMultiShardTxn runs the 2PC path. Extracted from dispatchTxn to keep +// that function under the cyclop budget after the prevCommitTS reject (codex +// P2 round-10) was added; the multi-shard branch already carries five linear +// error checks (groupReadKeys, prewrite, commitPrimary, abortCleanup, +// commitSecondaries) that pushed the parent over the 10-edge limit. +func (c *ShardedCoordinator) dispatchMultiShardTxn(ctx context.Context, startTS, commitTS, prevCommitTS uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64, readKeys [][]byte) (*CoordinateResponse, error) { + // Fail-closed when a retry carries the option-2 dedup probe key but its + // write set / read set spans shards (codex P2 round-10 "reject retries + // that leave the one-phase path"). The 2PC log builders only encode + // CommitTS and would silently drop PrevCommitTS — a landed ambiguous + // attempt would then look like an ordinary write conflict, the adapter + // would drop pending and recompute, and the duplicate this feature is + // meant to prevent would reappear. Surface the constraint explicitly so + // the caller (or a future multi-shard dedup design) knows the request + // shape is unsupported. + if prevCommitTS != 0 { + return nil, errors.WithStack(ErrTxnDedupRequiresSingleShard) + } + + // Group read keys by shard now. The result is passed directly to + // prewriteTxn to avoid a second iteration inside that function. A + // routing failure here aborts the transaction before any prewrite — + // silently dropping unresolvable read keys would let OCC validation + // run with an incomplete read set and break SSI. groupedReadKeys, err := c.groupReadKeysByShardID(readKeys) if err != nil { return nil, err @@ -451,15 +472,17 @@ func (c *ShardedCoordinator) allReadKeysInShard(readKeys [][]byte, gid uint64) b return true } -func (c *ShardedCoordinator) dispatchSingleShardTxn(ctx context.Context, startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { +func (c *ShardedCoordinator) dispatchSingleShardTxn(ctx context.Context, startTS, commitTS, prevCommitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) { g, err := c.txnGroupForID(gid) if err != nil { return nil, err } // ReadKeys are included in the Raft log entry so the FSM validates - // read-write conflicts atomically under applyMu. + // read-write conflicts atomically under applyMu. prevCommitTS, when set, + // carries the one-phase dedup probe key for a retry that reuses a failed + // attempt's write set. resp, err := g.Txn.Commit(ctx, []*pb.Request{ - onePhaseTxnRequest(startTS, commitTS, primaryKey, elems, readKeys), + onePhaseTxnRequestWithPrevCommit(startTS, commitTS, prevCommitTS, primaryKey, elems, readKeys), }) if err != nil { return nil, errors.WithStack(err) diff --git a/kv/transcoder.go b/kv/transcoder.go index 3088e879a..58ad1cb4b 100644 --- a/kv/transcoder.go +++ b/kv/transcoder.go @@ -29,6 +29,14 @@ type OperationGroup[T OP] struct { // CommitTS optionally pins the transaction commit timestamp. // Coordinators choose one automatically when this is zero. CommitTS uint64 + // PrevCommitTS carries the commit timestamp of a failed previous attempt + // of the same single-shard transaction (option-2 one-phase idempotency + // dedup). It is set only on a retry that reuses the prior attempt's write + // set, and only flows to the one-phase apply path, where the FSM probes + // whether that attempt already landed and no-ops the apply if so. Zero on + // first attempts and on every non-retry caller. See + // docs/design/2026_05_21_proposed_txn_secondary_idempotency.md. + PrevCommitTS uint64 // ReadKeys carries the transaction's read set so the FSM can validate // read-write conflicts atomically with the commit. ReadKeys [][]byte diff --git a/kv/txn_codec.go b/kv/txn_codec.go index d2f80ba53..679340559 100644 --- a/kv/txn_codec.go +++ b/kv/txn_codec.go @@ -23,9 +23,10 @@ const ( const txnLockFlagPrimary byte = 0x01 const ( - txnMetaFlagLockTTL byte = 0x01 - txnMetaFlagCommitTS byte = 0x02 - txnMetaKnownFlags byte = txnMetaFlagLockTTL | txnMetaFlagCommitTS + txnMetaFlagLockTTL byte = 0x01 + txnMetaFlagCommitTS byte = 0x02 + txnMetaFlagPrevCommitTS byte = 0x04 + txnMetaKnownFlags byte = txnMetaFlagLockTTL | txnMetaFlagCommitTS | txnMetaFlagPrevCommitTS ) const txnMetaHeaderSize = 2 @@ -35,15 +36,32 @@ const uint64FieldSize = 8 // TxnMeta is embedded into transactional raft log requests via a synthetic // mutation (key prefix "!txn|meta|"). It is not persisted in the MVCC store. +// +// PrevCommitTS is the commit timestamp of a failed previous attempt of the +// same single-shard transaction. It is set only on a retry, and only carries +// the one-phase idempotency dedup probe (option 2): at apply, the FSM checks +// whether the previous attempt's write set already landed at exactly this +// timestamp and, if so, no-ops the apply instead of re-applying. Because it +// only needs the V2 wire format, EncodeTxnMeta keeps emitting V1 whenever +// PrevCommitTS is zero (every non-retry path), so the default wire format is +// unchanged. See docs/design/2026_05_21_proposed_txn_secondary_idempotency.md. type TxnMeta struct { - PrimaryKey []byte - LockTTLms uint64 - CommitTS uint64 + PrimaryKey []byte + LockTTLms uint64 + CommitTS uint64 + PrevCommitTS uint64 } func EncodeTxnMeta(m TxnMeta) []byte { // Keep v1 as the default wire format until the cluster can guarantee that - // every node understands v2 during rolling upgrades. + // every node understands v2 during rolling upgrades. The only field that + // requires v2 is PrevCommitTS (the one-phase dedup probe), which is set + // exclusively on a retry by an upgraded leader; emitting v2 only in that + // case bounds the new wire format to the post-rollout, feature-enabled + // path and leaves every existing caller on v1. + if m.PrevCommitTS != 0 { + return encodeTxnMetaV2(m) + } return encodeTxnMetaV1(m) } @@ -69,6 +87,9 @@ func encodeTxnMetaV2(m TxnMeta) []byte { if flags&txnMetaFlagCommitTS != 0 { size += uint64FieldSize } + if flags&txnMetaFlagPrevCommitTS != 0 { + size += uint64FieldSize + } b := make([]byte, size) b[0] = txnMetaVersionV2 b[1] = flags @@ -82,6 +103,10 @@ func encodeTxnMetaV2(m TxnMeta) []byte { } if flags&txnMetaFlagCommitTS != 0 { binary.BigEndian.PutUint64(b[offset:], m.CommitTS) + offset += uint64FieldSize + } + if flags&txnMetaFlagPrevCommitTS != 0 { + binary.BigEndian.PutUint64(b[offset:], m.PrevCommitTS) } return b } @@ -108,6 +133,9 @@ func txnMetaFlags(m TxnMeta) byte { if m.CommitTS != 0 { flags |= txnMetaFlagCommitTS } + if m.PrevCommitTS != 0 { + flags |= txnMetaFlagPrevCommitTS + } return flags } @@ -154,17 +182,15 @@ func decodeTxnMetaV2(b []byte) (TxnMeta, error) { } meta := TxnMeta{PrimaryKey: pk} - if flags&txnMetaFlagLockTTL != 0 { - meta.LockTTLms, err = readTxnUint64(r, "txn meta: lock ttl truncated") - if err != nil { - return TxnMeta{}, err + for _, f := range optionalV2Fields(&meta) { + if flags&f.flag == 0 { + continue } - } - if flags&txnMetaFlagCommitTS != 0 { - meta.CommitTS, err = readTxnUint64(r, "txn meta: commit ts truncated") - if err != nil { - return TxnMeta{}, err + v, rerr := readTxnUint64(r, f.errMsg) + if rerr != nil { + return TxnMeta{}, rerr } + *f.dest = v } if r.Len() != 0 { return TxnMeta{}, errors.WithStack(errors.Newf("txn meta: unexpected trailing bytes %d", r.Len())) @@ -172,6 +198,28 @@ func decodeTxnMetaV2(b []byte) (TxnMeta, error) { return meta, nil } +// optionalV2Fields lists the V2 optional uint64 fields in their on-wire +// order. Only decodeTxnMetaV2 currently iterates this table (the table-drive +// is what keeps it under the cyclop limit); encodeTxnMetaV2 writes the same +// fields in the same sequence via inline conditionals, so when adding a new +// V2 optional field both functions must be updated together to keep the +// encode/decode order in lockstep. +func optionalV2Fields(m *TxnMeta) []struct { + flag byte + dest *uint64 + errMsg string +} { + return []struct { + flag byte + dest *uint64 + errMsg string + }{ + {txnMetaFlagLockTTL, &m.LockTTLms, "txn meta: lock ttl truncated"}, + {txnMetaFlagCommitTS, &m.CommitTS, "txn meta: commit ts truncated"}, + {txnMetaFlagPrevCommitTS, &m.PrevCommitTS, "txn meta: prev commit ts truncated"}, + } +} + type txnLock struct { StartTS uint64 TTLExpireAt uint64 diff --git a/kv/txn_codec_test.go b/kv/txn_codec_test.go index 1a5f74a02..6e1a900a9 100644 --- a/kv/txn_codec_test.go +++ b/kv/txn_codec_test.go @@ -75,6 +75,40 @@ func TestDecodeTxnMetaV1Compatibility(t *testing.T) { require.Equal(t, meta, decoded) } +func TestEncodeTxnMetaEmitsV2OnlyWhenPrevCommitTSSet(t *testing.T) { + t.Parallel() + + // Without PrevCommitTS, even with a CommitTS set, the default wire format + // stays V1 — preserving rolling-upgrade safety for every non-retry caller. + v1 := EncodeTxnMeta(TxnMeta{PrimaryKey: []byte("pk"), CommitTS: 42}) + require.Equal(t, txnMetaVersionV1, v1[0]) + + // With PrevCommitTS, EncodeTxnMeta upgrades to V2 so the field can ride. + v2 := EncodeTxnMeta(TxnMeta{PrimaryKey: []byte("pk"), CommitTS: 42, PrevCommitTS: 41}) + require.Equal(t, txnMetaVersionV2, v2[0]) + require.Equal(t, txnMetaFlagCommitTS|txnMetaFlagPrevCommitTS, v2[1]) +} + +func TestTxnMetaPrevCommitTSRoundTrip(t *testing.T) { + t.Parallel() + + meta := TxnMeta{ + PrimaryKey: []byte("primary"), + LockTTLms: 7, + CommitTS: 200, + PrevCommitTS: 100, + } + decoded, err := DecodeTxnMeta(EncodeTxnMeta(meta)) + require.NoError(t, err) + require.Equal(t, meta, decoded) + + // PrevCommitTS alone (no lock TTL, no commit TS) must also survive. + prevOnly := TxnMeta{PrimaryKey: []byte("pk"), PrevCommitTS: 99} + decodedPrev, err := DecodeTxnMeta(EncodeTxnMeta(prevOnly)) + require.NoError(t, err) + require.Equal(t, prevOnly, decodedPrev) +} + func TestDecodeTxnMetaV2RejectsUnknownFlags(t *testing.T) { t.Parallel() diff --git a/kv/txn_errors.go b/kv/txn_errors.go index 0121c92d0..f4696158a 100644 --- a/kv/txn_errors.go +++ b/kv/txn_errors.go @@ -15,6 +15,13 @@ var ( ErrTxnAlreadyCommitted = errors.New("txn already committed") ErrTxnAlreadyAborted = errors.New("txn already aborted") ErrTxnPrimaryKeyRequired = errors.New("txn primary key required") + // ErrTxnDedupRequiresSingleShard is returned when a transaction request + // carries OperationGroup.PrevCommitTS (the option-2 one-phase dedup probe + // key) but its mutations or read keys span shards. The 2PC log builders + // encode only CommitTS, so silently honoring such a request would drop + // the probe at the FSM and let the original duplicate-elements anomaly + // reappear. See codex P2 in PR #796 and the design doc. + ErrTxnDedupRequiresSingleShard = errors.New("txn dedup (prev_commit_ts) requires a single-shard write set") ) type TxnLockedError struct { diff --git a/store/committed_version_at_test.go b/store/committed_version_at_test.go new file mode 100644 index 000000000..0c2600365 --- /dev/null +++ b/store/committed_version_at_test.go @@ -0,0 +1,97 @@ +package store + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +// committedVersionAtCase drives the exact-timestamp existence probe used by +// the one-phase transaction idempotency design (option 2). The same table +// runs against both the in-memory mvccStore and the Pebble-backed store so +// their semantics stay identical. +func runCommittedVersionAtSuite(t *testing.T, newStore func(t *testing.T) MVCCStore) { + t.Helper() + ctx := context.Background() + + t.Run("exact hit on a put version", func(t *testing.T) { + st := newStore(t) + require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v"), 100, 0)) + + ok, err := st.CommittedVersionAt(ctx, []byte("k"), 100) + require.NoError(t, err) + require.True(t, ok, "version committed at exactly 100 must be found") + }) + + t.Run("miss on an absent key", func(t *testing.T) { + st := newStore(t) + ok, err := st.CommittedVersionAt(ctx, []byte("absent"), 100) + require.NoError(t, err) + require.False(t, ok) + }) + + t.Run("exactness: a newer version does not satisfy an earlier ts", func(t *testing.T) { + // This is the load-bearing case. A loose `latestTS >= ts` check + // (as applyCommitWithIdempotencyFallback uses) would report true + // for ts=200 because 300 >= 200, misclassifying a different txn's + // commit as our own prior attempt. The exact probe must not. + st := newStore(t) + require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v1"), 100, 0)) + require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v3"), 300, 0)) + + got100, err := st.CommittedVersionAt(ctx, []byte("k"), 100) + require.NoError(t, err) + require.True(t, got100, "version at exactly 100 exists") + + got200, err := st.CommittedVersionAt(ctx, []byte("k"), 200) + require.NoError(t, err) + require.False(t, got200, "no version at exactly 200; a >=300 match must not leak through") + + got300, err := st.CommittedVersionAt(ctx, []byte("k"), 300) + require.NoError(t, err) + require.True(t, got300, "version at exactly 300 exists") + }) + + t.Run("tombstone counts as a landed version", func(t *testing.T) { + // The previous attempt landed even if it committed a delete; the + // probe must report present so the retry deduplicates rather than + // re-applying. + st := newStore(t) + require.NoError(t, st.DeleteAt(ctx, []byte("k"), 100)) + + ok, err := st.CommittedVersionAt(ctx, []byte("k"), 100) + require.NoError(t, err) + require.True(t, ok, "a tombstone committed at exactly 100 must count as landed") + }) + + t.Run("different key isolation", func(t *testing.T) { + st := newStore(t) + require.NoError(t, st.PutAt(ctx, []byte("a"), []byte("v"), 100, 0)) + + ok, err := st.CommittedVersionAt(ctx, []byte("b"), 100) + require.NoError(t, err) + require.False(t, ok, "a version on key a must not satisfy a probe on key b") + }) +} + +func TestCommittedVersionAt_MVCCStore(t *testing.T) { + runCommittedVersionAtSuite(t, func(t *testing.T) MVCCStore { + t.Helper() + return NewMVCCStore() + }) +} + +func TestCommittedVersionAt_PebbleStore(t *testing.T) { + runCommittedVersionAtSuite(t, func(t *testing.T) MVCCStore { + t.Helper() + dir, err := os.MkdirTemp("", "committed-version-at") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + s, err := NewPebbleStore(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = s.Close() }) + return s + }) +} diff --git a/store/lsm_store.go b/store/lsm_store.go index f2d8d7942..248e9fbaa 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -765,6 +765,42 @@ func (s *pebbleStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool return val != nil, nil } +// CommittedVersionAt reports whether a version stamped EXACTLY commitTS +// exists for key. It is a single point lookup on the MVCC-encoded key +// (userKey + inverted commitTS), not a <=ts scan: only the exact version +// matters for the one-phase idempotency probe. A tombstone version counts +// as present — the previous attempt landed even if it committed a delete — +// so the raw key existence is the answer and the value is not decoded. +// +// Unlike GetAt/ExistsAt, this probe does NOT enforce the retention watermark +// (codex P1 round-11): branching FSM apply on the per-replica minRetainedTS +// is non-deterministic across raft replicas (compaction is driven by local +// wall clock, not by the replicated log), and a fail-closed retention check +// here would produce split-brain — some replicas surface ErrReadTSCompacted +// and skip the apply while others see the version and no-op. Returning the +// raw pebble.Get answer makes the probe deterministic for the option-2 +// dedup use case, where each per-element key has at most one MVCC version +// so physical pebble compaction does not remove it. The invariant the +// caller depends on is: retention window > max adapter retry latency, so a +// live retry's PrevCommitTS never falls below pebble's compacted floor on +// any replica. The earlier round-10 retention guard was reverted with this +// rationale; see design doc §race-freedom. +func (s *pebbleStore) CommittedVersionAt(_ context.Context, key []byte, commitTS uint64) (bool, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + _, closer, err := s.db.Get(encodeKey(key, commitTS)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return false, nil + } + return false, errors.WithStack(err) + } + if err := closer.Close(); err != nil { + return false, errors.WithStack(err) + } + return true, nil +} + func (s *pebbleStore) processFoundValue(iter *pebble.Iterator, userKey []byte, ts uint64) (*KVPair, error) { valBytes := iter.Value() sv, err := decodeValue(valBytes) diff --git a/store/mvcc_store.go b/store/mvcc_store.go index fbd1a800f..74b6a2295 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -464,6 +464,31 @@ func (s *mvccStore) LatestCommitTS(_ context.Context, key []byte) (uint64, bool, return ver.TS, true, nil } +// CommittedVersionAt reports whether a version stamped EXACTLY commitTS +// exists for key. Versions are kept sorted by TS ascending, so a binary +// search for the exact timestamp answers the one-phase idempotency probe. +// A tombstone counts as present (the previous attempt committed a delete), +// matching the pebbleStore semantics. +// +// No retention guard is enforced here (codex P1 round-11): see pebbleStore +// CommittedVersionAt for the determinism rationale. The retention guard was +// added in round-10 to mirror GetAt semantics, but for the exact-TS probe +// used by FSM apply dedup it produced split-brain when replicas had +// diverging local minRetainedTS — some surface ErrReadTSCompacted, others +// see the version. Returning the raw existence answer is deterministic for +// single-version-per-key write sets (the option-2 use case). +func (s *mvccStore) CommittedVersionAt(_ context.Context, key []byte, commitTS uint64) (bool, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + v, ok := s.tree.Get(key) + if !ok { + return false, nil + } + vs, _ := v.([]VersionedValue) + i := sort.Search(len(vs), func(i int) bool { return vs[i].TS >= commitTS }) + return i < len(vs) && vs[i].TS == commitTS, nil +} + // ApplyMutationsRaft is provided to satisfy the MVCCStore interface. The // in-memory store has no WAL and therefore no sync-mode distinction; this // method delegates to ApplyMutations. diff --git a/store/store.go b/store/store.go index 41f495cd1..ace186809 100644 --- a/store/store.go +++ b/store/store.go @@ -128,6 +128,18 @@ type MVCCStore interface { // LatestCommitTS returns the commit timestamp of the newest version. // The boolean reports whether the key has any version. LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error) + // CommittedVersionAt reports whether a committed version stamped + // EXACTLY commitTS exists for key. Unlike GetAt (newest version + // <= ts) this is an exact-timestamp existence check, used by the + // one-phase transaction idempotency probe to ask "did the previous + // attempt — which committed at this exact commit_ts — land?". Because + // commit timestamps are issued by the strictly-monotonic, unique HLC + // (Clock().Next()), a version at an exact commitTS on a given key can + // only have come from the transaction that was assigned that timestamp, + // so an exact hit unambiguously identifies that attempt. A tombstone + // counts as a landed version (the attempt committed a delete). See + // docs/design/2026_05_21_proposed_txn_secondary_idempotency.md. + CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error) // ApplyMutations atomically validates and appends the provided mutations. // It must return ErrWriteConflict if any mutation key or any read key has // a newer commit timestamp than startTS. readKeys carries the transaction's