From 520249a65165b2d835c40cb99d17bd13f91d0eba Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 16:23:28 +0900 Subject: [PATCH 1/7] feat(txn-dedup): re-land M3 EXEC reuse on main + close M2 open + multi-mop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #884 was merged into the stacked branch docs/txn-idempotency-design (at cbbde3d7) but never reached main — main has only PR #796's M1 + M2 + M3 RPUSH/LPUSH content. This PR re-lands the M3 EXEC reuse code on top of main and bundles three follow-ups that extend the design doc's "still open" / "follow-up" items into "landed": 1. Re-land M3 EXEC reuse (PR #884's content, cherry-picked + rebased) - adapter/redis.go: txnContext.prepareDispatch() split out of commit(); reusableExecTxn; dispatchExecReuse; runTransactionWithDedup + firstExecAttempt; gate at the top of runTransaction. - adapter/redis_exec_dedup_test.go (originally added in PR #884): 5 tests pinning all four reuse outcomes plus the gate-off legacy equivalence. - The cherry-pick required one small adaptation: prepareDispatch() uses Clock().NextFenced() (uint64, error) on current main; the PR #884 version targeted Clock().Next() (uint64). Same downstream semantics; the error return is wired through preparedTxnDispatch. 2. Close M2 open item — FSM other-txn exactness test (kv/fsm_onephase_dedup_test.go) - TestOnePhaseDedup_OtherTxnVersionDoesNotMaskRetry pins exactness at the FSM apply layer: a third-party version at T_other=20 must not satisfy the FSM probe at T1=30, so the retry falls through and applies at the fresh T2=40. The store-layer pin (store/committed_version_at_test.go) already covers the primitive; this test covers the dispatch path that uses it. 3. M3 multi-mop EXEC dedup test (adapter/redis_exec_dedup_test.go) - TestExecDedup_MultiMopLandedPriorAttempt_ReturnsCachedResults extends single-mop dedup to a 3-command MULTI/EXEC body (SET a + SET b + DEL c). Validates the design's claim that the mechanism works the same for multi-mop because cached results + OCC readKeys fence are mop-count-agnostic. Without dedup the DEL would re-execute to 0 on the second pass — the test rejects that. 4. Design doc updates (docs/design/2026_05_21_proposed_txn_secondary_idempotency.md) - §M2 "still open" → "LANDED" with the new FSM test reference. - §M3 "runTransaction (MULTI/EXEC) — Still open" → "LANDED via PR #884" with multi-mop test reference and the two intentional deviations from the M1/M2 template that claude[bot] flagged on #884 (readKeys assembly order, fresh per-attempt reuseCtx). - §M3 "standalone SET/INCR/HSET" called out as the next follow-up (PR-B in the user-visible roadmap). Caller audit (per /loop semantic-change rule) ============================================== - prepareDispatch (newly added, replaces commit()'s body): callers are commit() and firstExecAttempt; both honor the defer prepared.cancel() contract. External behavior of commit() preserved. - commit(): internal structure changed; external behavior preserved (no test directly invokes it; runTransaction's legacy path continues to call it through the same shape). - runTransactionWithDedup / firstExecAttempt / dispatchExecReuse / reusableExecTxn: all new symbols, exercised only from the gated runTransaction path. Validation ========== - go test ./adapter/ -run 'Dedup|Txn|MULTI|EXEC' passes. - go test ./kv/ ./store/ both pass. - gofmt, go vet, golangci-lint run all clean (0 issues across adapter/kv/store). Relation to prior work ====================== - #796 (merged f481f2b7): M1 + M2 + M3 RPUSH/LPUSH on main. - #884 (merged cbbde3d7 into stacked branch, NOT main): M3 EXEC reuse. - THIS PR: brings #884's content into main, plus M2 cross-txn FSM test, multi-mop EXEC test, and design doc updates. - Next (PR-B): standalone SET / INCR / HSET reuse paths. - Next (PR-C): M4 Jepsen validation infra. --- adapter/redis.go | 274 +++++++++++++++++- adapter/redis_exec_dedup_test.go | 257 ++++++++++++++++ ...5_21_proposed_txn_secondary_idempotency.md | 50 +++- kv/fsm_onephase_dedup_test.go | 59 ++++ 4 files changed, 622 insertions(+), 18 deletions(-) create mode 100644 adapter/redis_exec_dedup_test.go diff --git a/adapter/redis.go b/adapter/redis.go index 27b4b88c0..3b0782d7f 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -2654,19 +2654,42 @@ func (t *txnContext) validateReadSet(ctx context.Context) error { return nil } -func (t *txnContext) commit() error { +// preparedTxnDispatch is the fully-assembled write set + read set + commit +// timestamp for a MULTI/EXEC transaction, ready to be passed to +// coordinator.Dispatch. Split out from commit() so the option-2 dedup +// path (runTransactionWithDedup) can intercept between prepare and +// dispatch — it needs to capture (elems, commitTS, readKeys) for a +// possible retry under PrevCommitTS without otherwise duplicating the +// commit-building logic. The owned ctx is the redisDispatchTimeout- +// bounded context the caller must run Dispatch under and Cancel after. +type preparedTxnDispatch struct { + elems []*kv.Elem[kv.OP] + commitTS uint64 + readKeys [][]byte + ctx context.Context + cancel context.CancelFunc +} + +// prepareDispatch builds everything Dispatch needs (elems, commitTS, +// readKeys, ctx) without actually calling Dispatch. Callers must always +// invoke `cancel()` on the returned prepared value once the dispatch +// attempt finishes (commit() does this via defer; the dedup path does it +// per retry iteration). When the transaction has no writes this returns +// a prepared value with empty `elems` and a no-op cancel — callers can +// check len(prepared.elems)==0 and skip the dispatch. +func (t *txnContext) prepareDispatch() (preparedTxnDispatch, error) { elems := t.buildKeyElems() // Pre-allocate commitTS so Delta keys can embed it in their bytes before // the coordinator assigns it during Dispatch. commitTS, err := t.server.coordinator.Clock().NextFenced() if err != nil { - return errors.Wrap(err, "redis txn commit: allocate commitTS") + return preparedTxnDispatch{cancel: func() {}}, errors.Wrap(err, "redis txn commit: allocate commitTS") } listElems := t.buildListElems(commitTS) zsetElems, err := t.buildZSetElems(commitTS) if err != nil { - return err + return preparedTxnDispatch{cancel: func() {}}, err } // TTL elements: string keys have TTL embedded in value (buildKeyElems handles that), // non-string keys get a !redis|ttl| element written in the same transaction. @@ -2678,33 +2701,48 @@ func (t *txnContext) commit() error { // run on the server-lifetime handlerContext, leaving its scans uncancellable // from the request side on a slow disk or hot-key pathological commit. ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout) - defer cancel() streamElems, err := t.buildStreamDeletionElems(ctx) if err != nil { - return err + cancel() + return preparedTxnDispatch{cancel: func() {}}, err } elems = append(elems, listElems...) elems = append(elems, zsetElems...) elems = append(elems, ttlElems...) elems = append(elems, streamElems...) - if len(elems) == 0 { - return nil - } readKeys := make([][]byte, 0, len(t.readKeys)) for _, k := range t.readKeys { readKeys = append(readKeys, k) } + return preparedTxnDispatch{ + elems: elems, + commitTS: commitTS, + readKeys: readKeys, + ctx: ctx, + cancel: cancel, + }, nil +} + +func (t *txnContext) commit() error { + prepared, err := t.prepareDispatch() + if err != nil { + return err + } + defer prepared.cancel() + if len(prepared.elems) == 0 { + return nil + } group := &kv.OperationGroup[kv.OP]{ IsTxn: true, - Elems: elems, + Elems: prepared.elems, StartTS: t.startTS, - CommitTS: commitTS, - ReadKeys: readKeys, + CommitTS: prepared.commitTS, + ReadKeys: prepared.readKeys, } - if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil { + if _, err := t.server.coordinator.Dispatch(prepared.ctx, group); err != nil { return errors.WithStack(err) } return nil @@ -2981,6 +3019,10 @@ func (t *txnContext) buildTTLElems() []*kv.Elem[kv.OP] { } func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, error) { + if r.onePhaseTxnDedup { + return r.runTransactionWithDedup(queue) + } + dispatchCtx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() @@ -3027,6 +3069,214 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err return results, nil } +// reusableExecTxn captures a dispatched MULTI/EXEC transaction so a +// subsequent retry can reuse its exact write set under a fresh +// commit_ts (carrying prev_commit_ts) and probe whether the prior +// attempt already landed. This is the EXEC analogue of +// reusableListPush (M3 R1 result reconstruction for MULTI/EXEC). +// +// `results` is computed once from attempt 1's startTS snapshot and is +// invariant across reuse for the same reason RPUSH/LPUSH's `length` +// is: the write set is fixed, so apply-vs-no-op is invisible to the +// client. Reads in the EXEC body returned values from attempt 1's +// snapshot — those values were what the client would have observed if +// attempt 1 hadn't returned an ambiguous error, so caching them is +// the right semantics for a confirmed-or-deduped commit. A +// genuine cross-txn conflict is caught by OCC on readKeys at the FSM +// apply (WriteConflict → drop pending → recompute), so the cached +// results are only returned when reuse actually represents the +// outcome of attempt 1's intent. +type reusableExecTxn struct { + elems []*kv.Elem[kv.OP] + startTS uint64 + commitTS uint64 + readKeys [][]byte + results []redisResult +} + +// dispatchExecReuse runs one iteration of the option-2 reuse path for +// MULTI/EXEC: dispatches the captured write set under a fresh +// commit_ts (carrying pending.commitTS as PrevCommitTS so the FSM +// probes whether the prior attempt landed) and returns the cached +// client-visible results on success. The drop return signals the +// caller to clear pending — set on a genuine WriteConflict from +// another txn (after the self-conflict probe rules out our own apply) +// so the next iteration rebuilds the txn from a fresh read snapshot. +// +// Mirrors dispatchListPushReuse; the only difference is the result +// payload (cached []redisResult vs computed list length) and the lack +// of a meta re-read fallback — for EXEC there is no post-apply "what +// is the current length" question; the client-visible result IS the +// cached results array. +func (r *RedisServer) dispatchExecReuse(ctx context.Context, pending *reusableExecTxn) (results []redisResult, drop bool, err error) { + commitTS := r.coordinator.Clock().Next() + _, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: pending.startTS, + CommitTS: commitTS, + PrevCommitTS: pending.commitTS, + ReadKeys: pending.readKeys, + Elems: pending.elems, + }) + if dispErr == nil { + return pending.results, false, nil + } + if errors.Is(dispErr, store.ErrWriteConflict) { + // Self-inflicted-conflict guard (mirrors dispatchListPushReuse): + // the apply might have landed at this fresh commitTS but bubbled + // up as WriteConflict due to leadership churn. Probe whether our + // reused write set actually landed; if yes, return the cached + // results unchanged (they describe the EXEC body's outcome + // against attempt 1's snapshot, which is the outcome whether + // the bytes hit MVCC at attempt-1's commitTS or at this fresh + // commitTS — the OCC fence on readKeys guarantees no + // intervening cross-txn write slipped past). + if probeKey := firstWriteKey(pending.elems); len(probeKey) > 0 { + landed, perr := r.store.CommittedVersionAt(ctx, probeKey, commitTS) + if perr == nil && landed { + pending.commitTS = commitTS + return pending.results, false, nil + } + } + // Our attempt did not land at commitTS and a key collides with + // another txn — genuine conflict. Drop pending so the next + // iteration rebuilds from a fresh snapshot. + return nil, true, errors.WithStack(dispErr) + } + // Still ambiguous (lock / other retryable): the reuse may itself + // have landed, so the next retry must probe THIS commit_ts. Only + // advance pending.commitTS if retryRedisWrite will actually loop + // (non-retryable errors escape to the client; pending is then + // discarded with the goroutine). + if isRetryableRedisTxnErr(dispErr) { + pending.commitTS = commitTS + } + return nil, false, errors.WithStack(dispErr) +} + +// runTransactionWithDedup is the option-2 retry loop for MULTI/EXEC. +// The first attempt builds the txn write set + cached results from +// the user's startTS snapshot; any retryable failure makes the next +// iteration REUSE that write set under a fresh commit_ts with +// prev_commit_ts set, so the FSM no-ops if the prior attempt already +// landed. A WriteConflict on a reuse attempt (after the self-conflict +// probe rules out our own apply) means another txn touched a read or +// write key, and we drop pending → rebuild from a fresh snapshot. +// +// Mirrors listPushCoreWithDedup at the EXEC granularity. +func (r *RedisServer) runTransactionWithDedup(queue []redcon.Command) ([]redisResult, error) { + dispatchCtx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) + defer cancel() + + var results []redisResult + var pending *reusableExecTxn + err := r.retryRedisWrite(dispatchCtx, func() error { + if pending != nil { + reuseCtx, reuseCancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) + defer reuseCancel() + res, drop, dispErr := r.dispatchExecReuse(reuseCtx, pending) + if drop { + pending = nil + } + if dispErr != nil { + return dispErr + } + results = res + return nil + } + res, next, ferr := r.firstExecAttempt(dispatchCtx, queue) + if ferr != nil { + if next != nil { + pending = next + } + return ferr + } + results = res + return nil + }) + if err != nil { + return nil, err + } + return results, nil +} + +// firstExecAttempt runs the initial (no-reuse) EXEC attempt: builds the +// txn snapshot, applies each command to capture the client-visible +// results, validates the read set, and dispatches. On success returns +// the results. On a retryable dispatch failure it returns a +// reusableExecTxn capturing what the retry loop needs to dispatch via +// PrevCommitTS on the next iteration; non-retryable failures return a +// nil reuse state (mirrors listPushCoreWithDedup's gating). Extracted +// from runTransactionWithDedup to keep that loop under the cyclop +// budget; the dedup rationale lives there. +func (r *RedisServer) firstExecAttempt(dispatchCtx context.Context, queue []redcon.Command) ([]redisResult, *reusableExecTxn, error) { + startTS := r.txnStartTS() + readPin := r.pinReadTS(startTS) + defer readPin.Release() + + txn := &txnContext{ + server: r, + ctx: dispatchCtx, + working: map[string]*txnValue{}, + listStates: map[string]*listTxnState{}, + zsetStates: map[string]*zsetTxnState{}, + ttlStates: map[string]*ttlTxnState{}, + readKeys: map[string][]byte{}, + streamDeletions: map[string][]byte{}, + startTS: startTS, + } + + nextResults := make([]redisResult, 0, len(queue)) + for _, cmd := range queue { + res, err := txn.apply(cmd) + if err != nil { + return nil, nil, err + } + nextResults = append(nextResults, res) + } + + if err := txn.validateReadSet(dispatchCtx); err != nil { + return nil, nil, err + } + + prepared, err := txn.prepareDispatch() + if err != nil { + return nil, nil, err + } + defer prepared.cancel() + if len(prepared.elems) == 0 { + // Read-only EXEC: nothing to dispatch, no dedup window. + return nextResults, nil, nil + } + + group := &kv.OperationGroup[kv.OP]{ + IsTxn: true, + Elems: prepared.elems, + StartTS: txn.startTS, + CommitTS: prepared.commitTS, + ReadKeys: prepared.readKeys, + } + if _, dispErr := r.coordinator.Dispatch(prepared.ctx, group); dispErr != nil { + // Only remember the attempt for reuse if retryRedisWrite will + // actually loop. Mirrors listPushCoreWithDedup's gating + // rationale — errors that escape the loop (transient-leader, + // context deadline, FSM apply error) leave pending pointing at + // state wasted with the goroutine; ambiguous errors that + // escape to the client are out of scope for this loop. + if isRetryableRedisTxnErr(dispErr) { + return nil, &reusableExecTxn{ + elems: prepared.elems, + startTS: txn.startTS, + commitTS: prepared.commitTS, + readKeys: prepared.readKeys, + results: nextResults, + }, errors.WithStack(dispErr) + } + return nil, nil, errors.WithStack(dispErr) + } + return nextResults, nil, nil +} + func (r *RedisServer) txnStartTS() uint64 { // store.LastCommitTS() is the authoritative safe-snapshot watermark: it is // updated atomically only AFTER the corresponding Pebble batch commit, so diff --git a/adapter/redis_exec_dedup_test.go b/adapter/redis_exec_dedup_test.go new file mode 100644 index 000000000..ae86132ef --- /dev/null +++ b/adapter/redis_exec_dedup_test.go @@ -0,0 +1,257 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" + "github.com/tidwall/redcon" +) + +// TestExecDedup_LandedPriorAttempt_ReturnsCachedResults is the option-2 +// headline for MULTI/EXEC (M3 R1): attempt 1 commits the transaction body +// but bubbles up an ambiguous error, the retry reuses the same write set +// with prev_commit_ts, the FSM probe finds the landed version and no-ops, +// and the client gets the same results array attempt 1 computed. +// +// Without the probe, the reuse would OCC-conflict against attempt 1's own +// version, the adapter would drop pending and recompute from a fresh +// snapshot — which for SET is harmless (idempotent overwrite) but for +// INCR/RPUSH would produce a different (and wrong, since attempt 1 already +// landed) result. This test pins that the reuse path returns attempt 1's +// cached results without re-executing the command body. +func TestExecDedup_LandedPriorAttempt_ReturnsCachedResults(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + // Single-mop EXEC: one SET command. + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str, "SET must return the cached OK from attempt 1, not be re-executed") + require.Equal(t, 2, coord.dispatches, "one failed (ambiguous-land) attempt + one reuse") + require.Equal(t, 1, coord.probeNoOps, "the reuse must dedup via the exact-ts probe") + + // And the value is exactly the one attempt 1 wrote. + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_PriorAttemptDidNotLand_Applies covers the truncated case for +// MULTI/EXEC: attempt 1 errored without committing (OCC-style pre-reject), +// so the probe misses and the reuse applies the same write set at a fresh +// commit_ts. The cached results are still returned (they describe the +// EXEC body's intent against attempt 1's snapshot, which is what the client +// sees regardless of which physical commit_ts the bytes hit MVCC at). +func TestExecDedup_PriorAttemptDidNotLand_Applies(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + require.Equal(t, 2, coord.dispatches) + require.Equal(t, 0, coord.probeNoOps, "nothing landed, so the probe must miss and the reuse applies") + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_GenuineConflictRebuildsAndApplies covers outcome 3 for +// MULTI/EXEC: attempt 1 did not land; a concurrent client wrote the same +// key between attempts; the reuse OCC-conflicts (the foreign write +// advances the key's commit_ts past pending.startTS), the self-conflict +// probe rules out our own landing, the adapter drops pending and rebuilds +// the txn from a fresh snapshot — the new attempt then succeeds. +// +// This test pins the discriminator: probe-miss + OCC-conflict ⇒ recompute. +// If the adapter incorrectly reused on a foreign conflict, the cached +// results from attempt 1 would be returned alongside a value written by +// the concurrent client (an inconsistent view). +func TestExecDedup_GenuineConflictRebuildsAndApplies(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + key := []byte("k") + + // Before dispatch 2 (the reuse), inject a concurrent SET so the reuse + // OCC-conflicts on the write key. + coord.beforeDispatch = func(n int) { + if n != 2 { + return + } + ts := coord.Clock().Next() + require.NoError(t, st.PutAt(ctx, redisStrKey(key), encodeRedisStr([]byte("other"), nil), ts, 0)) + } + + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), key, []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + // Three dispatches: attempt 1 (pre-reject), reuse (OCC-conflict on key), + // fresh-snapshot retry (success). + require.GreaterOrEqual(t, coord.dispatches, 3) + require.Equal(t, 0, coord.probeNoOps, "nothing landed at attempt 1's ts; probe must not fire as a hit") + + // Our final write wins (it commits AFTER the concurrent SET because we + // rebuilt at a fresh startTS that observed the foreign commit). + rawVal, err := st.GetAt(ctx, redisStrKey(key), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_SelfInflictedReuseConflict_ReturnsSuccess mirrors the +// listPush self-inflicted-conflict regression: the reuse dispatch APPLIES +// the elems at the fresh commitTS but bubbles up store.ErrWriteConflict +// (leadership churn surfacing a committed entry as a conflict). The +// adapter probes the just-attempted commit_ts; the probe hits; cached +// results are returned. Without the guard, the adapter would drop pending +// and recompute, double-applying the EXEC body. +func TestExecDedup_SelfInflictedReuseConflict_ReturnsSuccess(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 pre-rejects (didn't land) + coord.landThenWriteConflictAtDispatch = 2 // reuse lands then surfaces WriteConflict + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + require.Equal(t, 2, coord.dispatches, "attempt 1 pre-reject + reuse land-then-conflict; no third attempt") + require.Equal(t, 0, coord.probeNoOps, + "the FSM probe at attempt 1's ts must NOT hit (attempt 1 did not land); "+ + "the success comes from the adapter's self-conflict guard probing the fresh commitTS") + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_MultiMopLandedPriorAttempt_ReturnsCachedResults extends the +// single-mop dedup coverage to a 3-command MULTI/EXEC body: SET + SET + DEL +// on three different keys. The PR #884 design doc scopes the first PR to +// "single-mop EXEC" out of conservatism — the mechanism (cache results +// array; OCC fence on readKeys) works identically for multi-mop because: +// +// - The cached results array captures one entry per command, fixed at +// attempt 1's startTS snapshot. Reuse returns the array as-is; the FSM +// no-op / apply-fresh decision is invisible to per-command return +// values (SET="OK", DEL=count). +// - readKeys aggregates reads from every command in the body via +// txn.readKeys; OCC at the FSM apply checks the union, so a stale read +// anywhere in the body triggers WriteConflict → drop pending → +// recompute. This is the same fence single-mop relies on, just over a +// larger key set. +// +// This test pins the multi-mop guarantee: a 3-mop EXEC where attempt 1 +// lands then errors must reuse the cached results array (returning OK, OK, +// 1) and MUST NOT re-execute any of the three commands. Without dedup the +// SET overwrites would be harmless (idempotent) but DEL would re-execute +// and return 0 on the second pass (key already deleted) — observably +// wrong. This regression-test closes the design doc's "multi-mop EXEC … +// validation is a follow-up" open item. +func TestExecDedup_MultiMopLandedPriorAttempt_ReturnsCachedResults(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + + // Seed key c so the DEL on it returns 1 (matching attempt 1's view). + require.NoError(t, st.PutAt(ctx, redisStrKey([]byte("c")), encodeRedisStr([]byte("seed"), nil), 5, 0)) + + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("a"), []byte("va")}}, + {Args: [][]byte{[]byte(cmdSet), []byte("b"), []byte("vb")}}, + {Args: [][]byte{[]byte(cmdDel), []byte("c")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 3, "all three command results must be returned, not re-executed") + require.Equal(t, "OK", results[0].str, "SET a — cached OK from attempt 1") + require.Equal(t, "OK", results[1].str, "SET b — cached OK from attempt 1") + require.Equal(t, int64(1), results[2].integer, + "DEL c — must be the cached count from attempt 1, NOT a re-executed 0") + + require.Equal(t, 2, coord.dispatches, "one ambiguous-land attempt + one reuse") + require.Equal(t, 1, coord.probeNoOps, "reuse dedup via the exact-ts probe across the full multi-mop write set") + + // Stored state matches: a, b set; c deleted. + rawA, err := st.GetAt(ctx, redisStrKey([]byte("a")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + valA, _, err := decodeRedisStr(rawA) + require.NoError(t, err) + require.Equal(t, []byte("va"), valA) + + rawB, err := st.GetAt(ctx, redisStrKey([]byte("b")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + valB, _, err := decodeRedisStr(rawB) + require.NoError(t, err) + require.Equal(t, []byte("vb"), valB) + + _, getCerr := st.GetAt(ctx, redisStrKey([]byte("c")), snapshotTS(coord.Clock(), st)) + require.ErrorIs(t, getCerr, store.ErrKeyNotFound, "c must be deleted by the single landed apply, not double-applied") +} + +// TestExecDedup_DisabledKeepsLegacyPath verifies the dedup gate is honored: +// when onePhaseTxnDedup is off, runTransaction takes the legacy path +// (recompute on every retry, no prev_commit_ts) — byte-identical to today. +// Pins that the new code is strictly opt-in. +func TestExecDedup_DisabledKeepsLegacyPath(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{} /* gate left false */} + + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + // Legacy path runs no probe. + require.Equal(t, 0, coord.probeNoOps) + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} diff --git a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md index f759c32fd..15776a9f7 100644 --- a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md +++ b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md @@ -449,10 +449,15 @@ preserves availability and adds correctness. at exactly T1 → no-op (no version at T2, newest stays T1); truncated / never-applied → applies at T2; `prev_commit_ts == 0` → probe skipped, byte-identical to today. -- **Still open for M2:** an FSM test that simulates the *other*-txn case - (`Tx ≠ T1`) end-to-end so exactness is pinned at the apply layer too (the - store layer already pins it). Fold into M3, where the OCC-conflict path is - exercised. +- **M2 other-txn FSM exactness (was: still open) — LANDED.** + `TestOnePhaseDedup_OtherTxnVersionDoesNotMaskRetry` + (`kv/fsm_onephase_dedup_test.go`) pins exactness at the apply layer with + the third-party-version-at-T_other ≠ T1 scenario: a foreign version at + T_other=20 must NOT satisfy the FSM's exact-T1=30 probe, so a retry + carrying `prev_commit_ts=30` falls through and applies at the fresh + `commit_ts=40`. The store layer pin + (`store/committed_version_at_test.go`) covers the primitive; the new test + covers the FSM dispatch path that uses it. ### M3 — write-set reuse + result reconstruction in the retry sites @@ -473,8 +478,41 @@ preserves availability and adds correctness. outcomes end-to-end against real OCC + the real probe, plus the gate-off legacy path. - **Result reconstruction (R1) — resolved, and simpler than feared.** See R1. -- `runTransaction` (`adapter/redis.go`): same reuse + reconstruct for the EXEC - body (single-mop first; see Open questions). **Still open.** +- **`runTransaction` (MULTI/EXEC) — LANDED via PR #884.** When the gate is + on, `runTransactionWithDedup` mirrors `listPushCoreWithDedup` at the EXEC + granularity: the first attempt builds the txn, captures `nextResults` + from attempt 1's `startTS` snapshot, dispatches; on a retryable failure + the closure stashes a `reusableExecTxn` and the next iteration calls + `dispatchExecReuse` with `PrevCommitTS`. Cached `results` are returned + on both reuse-success and self-conflict-probe-hit; a non-self + `WriteConflict` drops pending and rebuilds. + - **Multi-mop EXEC — LANDED.** + `TestExecDedup_MultiMopLandedPriorAttempt_ReturnsCachedResults` + (`adapter/redis_exec_dedup_test.go`) pins a 3-command body (SET + + SET + DEL) where attempt 1 lands then errors: the retry returns the + cached results array as-is (OK, OK, 1) without re-executing — DEL + would re-execute to 0 on a second pass, which the test rejects. + - **Two intentional deviations from the M1/M2 template, noted per + claude[bot] PR #884 review:** + 1. `prepareDispatch()` assembles `readKeys` unconditionally (before + the empty-elems guard), versus the old `commit()` shape that + assembled `readKeys` only after the guard. Harmless reorder — + the slice is discarded in the empty-elems path — but the doc + acknowledges the small semantic shift so a future reader does + not flag it as an oversight. + 2. The reuse path in `runTransactionWithDedup` derives a fresh + per-attempt `reuseCtx` from `handlerContext()` rather than + reusing the outer `dispatchCtx`. Each reuse attempt thus gets a + full `redisDispatchTimeout`, strictly more conservative than + `listPushCoreWithDedup` (which threads the caller ctx through). + Chosen so a half-expired ctx never cuts a reuse attempt short + on the retry path; consistent with `prepareDispatch()`'s + per-attempt fresh-ctx pattern in `commit()`. +- **Standalone write commands (SET/INCR/HSET/...) — still open.** The EXEC + path covers MULTI bodies; standalone single-command dispatch goes through + per-handler paths (`applySet`, `applyIncr`, etc.) and needs the same + `reusable` capture + `dispatchXReuse` shape per command. Scope is + per-command but each is small (~50 LOC). Tracked as PR-B follow-up. ### M4 — Validation diff --git a/kv/fsm_onephase_dedup_test.go b/kv/fsm_onephase_dedup_test.go index de9d3d9db..aaf027c1d 100644 --- a/kv/fsm_onephase_dedup_test.go +++ b/kv/fsm_onephase_dedup_test.go @@ -110,3 +110,62 @@ func TestOnePhaseDedup_FirstAttemptSkipsProbe(t *testing.T) { require.NoError(t, err) require.True(t, at20) } + +// TestOnePhaseDedup_OtherTxnVersionDoesNotMaskRetry closes the M2 open item +// (design doc §M2 "Still open"): exactness must be pinned at the apply layer +// for the OTHER-txn case. The store layer's TestCommittedVersionAt_PebbleStore +// /-_MVCCStore pin (load-bearing exactness suite) already proves that a +// version at T_other ≠ T1 does not satisfy a probe at T1. This test extends +// that pin to the FSM apply path: if a third party committed a version at +// some `T_other ≠ prev_commit_ts` for the SAME primary key, the FSM probe +// at prev_commit_ts must miss and the retry must apply at the fresh +// commit_ts. Without exactness, the probe would alias on T_other and +// incorrectly no-op the retry — silently dropping the user's write. +// +// Scenario: +// 1. Third-party txn writes key=other at T_other=20. +// 2. Adapter's attempt 1 at commit_ts T1=30 returns ambiguous error and +// was actually truncated (no version at T1). +// 3. Retry arrives at fresh commit_ts T2=40 with prev_commit_ts=T1=30. +// 4. FSM probes CommittedVersionAt(key, 30) → MISS (only T_other=20 exists), +// falls through to apply, writing key=v at T2=40. +// +// If exactness were broken (probe at 30 returned true because T_other=20 +// existed), the retry would no-op and v would be permanently lost. +func TestOnePhaseDedup_OtherTxnVersionDoesNotMaskRetry(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSMWithHLC(st, NewHLC()).(*kvFSM) + require.True(t, ok) + + key := []byte("list-item") + + // Third party commits an unrelated version at T_other=20. + require.NoError(t, st.PutAt(ctx, key, []byte("other"), 20, 0)) + + // Retry: claims prev_commit_ts=T1=30 (an "attempt 1" that never landed, + // no version at 30 exists) and fresh commit_ts T2=40. The third-party + // version at 20 must NOT cause the probe at 30 to hit. + require.NoError(t, applyFSMRequest(t, fsm, onePhaseReq(35, 40, 30, key, []byte("v")))) + + // FSM apply layer pin: probe MUST miss exact 30, retry MUST apply at 40. + at30, err := st.CommittedVersionAt(ctx, key, 30) + require.NoError(t, err) + require.False(t, at30, "no attempt 1 landed at 30; the third-party version at 20 must not satisfy the exact-30 probe") + + at40, err := st.CommittedVersionAt(ctx, key, 40) + require.NoError(t, err) + require.True(t, at40, "with the exact-30 probe missing, the retry must apply at the fresh commit_ts 40") + + // LatestCommitTS reflects T2=40 (the retry's fresh write supersedes both + // the third-party version at 20 and the never-landed attempt 1 at 30). + latest, exists, err := st.LatestCommitTS(ctx, key) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, uint64(40), latest, "newest version must be the retry's fresh apply at 40, not the third-party 20") + + val, err := st.GetAt(ctx, key, ^uint64(0)) + require.NoError(t, err) + require.Equal(t, []byte("v"), val, "retry's write must be readable; exactness loss would have lost it") +} From dd042e970ddf324813208fb364042f7a0467c3a1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 16:40:11 +0900 Subject: [PATCH 2/7] fix(txn-dedup): address gemini PR #887 HIGH + MEDIUM round 1 gemini HIGH (adapter/redis.go:3112 dispatchExecReuse): Clock().Next() bypasses the HLC-4 physical-ceiling fence; persistence-grade commit_ts allocation must use NextFenced() to avoid issuing a timestamp that collides with a subsequent leader's window after renewal. Switched dispatchExecReuse to NextFenced() with the standard error-propagation shape (matches prepareDispatch from PR #884 cherry-pick). gemini MEDIUM (adapter/redis.go:3175 reuseCtx propagation): The original "fresh ctx from handlerContext per reuse attempt" pattern ignored outer cancellation -- a disconnected client would wait the full 10 s before the reuse returned. Derived reuseCtx from dispatchCtx (the caller's retry-loop ctx) so outer cancellation interrupts mid-attempt. Per-attempt redisDispatchTimeout still caps the dispatch the same way commit does for the first attempt. Matches listPushCoreWithDedup's caller-ctx threading. gemini MEDIUM (adapter/redis.go:3215 readPin.Release on nil): False positive. ActiveTimestampToken.Release is documented nil-safe at kv/active_timestamp_tracker.go:58. The pattern readPin := r.pinReadTS(...); defer readPin.Release() is used unchanged in the existing runTransaction (legacy path); no change needed. Caller audit per /loop semantic-change rule: - dispatchExecReuse signature unchanged; the new error return is the same (bool, error) tuple position. Single caller is runTransactionWithDedup which already returns dispErr via dropping to the retry loop's error path -- the new NextFenced error reaches the same dispErr branch. - reuseCtx parent change is local to runTransactionWithDedup; the only observable effect is faster cancellation propagation, which retryRedisWrite already gates on ctx.Done between attempts. Note on pre-existing Clock().Next() callers: dispatchListPushReuse (adapter/redis.go:3508) and the listPushCoreWithDedup first-attempt site (adapter/redis.go:3679) -- both shipped on main as part of PR #796 -- still use Clock().Next() and have the same HLC-4 ceiling-bypass exposure. Out of scope for this PR (those are pre-existing on main, not introduced by PR-A's diff), but should be fixed in a follow-up cleanup PR for parity. Design doc: M3 "fresh reuseCtx from handlerContext" deviation note is struck and replaced with the dispatchCtx-derived rationale; future readers see why the earlier framing was wrong and what the current pattern is. Validation: go test ./adapter/ -run 'ExecDedup|TxnMULTI' passes. go build ./adapter/... clean. golangci-lint run ./adapter/... 0 issues. --- adapter/redis.go | 23 +++++++++++++++++-- ...5_21_proposed_txn_secondary_idempotency.md | 20 ++++++++++------ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/adapter/redis.go b/adapter/redis.go index 3b0782d7f..065c2899a 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -3109,7 +3109,16 @@ type reusableExecTxn struct { // is the current length" question; the client-visible result IS the // cached results array. func (r *RedisServer) dispatchExecReuse(ctx context.Context, pending *reusableExecTxn) (results []redisResult, drop bool, err error) { - commitTS := r.coordinator.Clock().Next() + // gemini PR-A HIGH: persistence-grade commit_ts allocation must honor the + // HLC-4 physical-ceiling fence (see kv/hlc.go NextFenced + the TLA proof + // at tla/hlc/MCHLC_gap.cfg). Clock().Next() bypasses the ceiling and + // could issue a timestamp that collides with a subsequent leader's + // window after renewal — the very class of bug option-2 is meant to + // rule out. + commitTS, allocErr := r.coordinator.Clock().NextFenced() + if allocErr != nil { + return nil, false, errors.Wrap(allocErr, "redis exec reuse: allocate commitTS") + } _, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ IsTxn: true, StartTS: pending.startTS, @@ -3172,7 +3181,17 @@ func (r *RedisServer) runTransactionWithDedup(queue []redcon.Command) ([]redisRe var pending *reusableExecTxn err := r.retryRedisWrite(dispatchCtx, func() error { if pending != nil { - reuseCtx, reuseCancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) + // gemini PR-A MEDIUM: derive the per-attempt reuse ctx from the + // caller's `dispatchCtx` (not `r.handlerContext()`) so a cancelled + // caller stops the reuse promptly. Per-attempt `redisDispatchTimeout` + // still caps the dispatch the same way `commit()` does for the + // first attempt; what changes is that an outer cancellation can + // now interrupt mid-attempt instead of being ignored until the + // fresh 10 s budget elapses. The earlier "fresh ctx from + // handlerContext" pattern (noted in design doc §M3) was strictly + // more conservative but wasted resources on a disconnected + // client — see PR #887 review. + reuseCtx, reuseCancel := context.WithTimeout(dispatchCtx, redisDispatchTimeout) defer reuseCancel() res, drop, dispErr := r.dispatchExecReuse(reuseCtx, pending) if drop { diff --git a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md index 15776a9f7..e38fd89bd 100644 --- a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md +++ b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md @@ -500,14 +500,20 @@ preserves availability and adds correctness. the slice is discarded in the empty-elems path — but the doc acknowledges the small semantic shift so a future reader does not flag it as an oversight. - 2. The reuse path in `runTransactionWithDedup` derives a fresh + 2. ~~The reuse path in `runTransactionWithDedup` derives a fresh per-attempt `reuseCtx` from `handlerContext()` rather than - reusing the outer `dispatchCtx`. Each reuse attempt thus gets a - full `redisDispatchTimeout`, strictly more conservative than - `listPushCoreWithDedup` (which threads the caller ctx through). - Chosen so a half-expired ctx never cuts a reuse attempt short - on the retry path; consistent with `prepareDispatch()`'s - per-attempt fresh-ctx pattern in `commit()`. + reusing the outer `dispatchCtx`.~~ **Reverted per PR #887 review:** + the original "more conservative" framing ignored that a + disconnected client cannot benefit from the extra fresh-timeout + budget — the wasted 10 s reuse work would never reach a waiting + caller. `reuseCtx` is now derived from `dispatchCtx` so an outer + cancellation interrupts mid-attempt, matching + `listPushCoreWithDedup`'s pattern (which threads the caller ctx + through). Per-attempt `redisDispatchTimeout` still caps the + dispatch the same way `commit()` does for the first attempt; + what changes is that an expired outer ctx is now respected + promptly instead of being ignored until the fresh budget + elapses. - **Standalone write commands (SET/INCR/HSET/...) — still open.** The EXEC path covers MULTI bodies; standalone single-command dispatch goes through per-handler paths (`applySet`, `applyIncr`, etc.) and needs the same From 6052317ba94e40bf096cfe5de4616babbe98fcf8 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 16:30:23 +0900 Subject: [PATCH 3/7] feat(txn-dedup): standalone SET routes through option-2 dedup loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacked on PR-A (re-land of M3 EXEC reuse + multi-mop test). Extends option-2 dedup to standalone SET (dispatched outside MULTI). When `r.onePhaseTxnDedup` is on, `r.set` wraps the single command as a 1-element redcon.Command queue and dispatches via runTransactionWithDedup — reusing the M3 EXEC machinery instead of building a per-handler reusableSetTxn + dispatchSetReuse shape. SET already has `applySet` on `txnContext`, so this is the "free" extension to any command whose txn-state-aware apply hook already exists. The standalone fast path (`trySetFastPath`) is intentionally bypassed under the gate: dedup is opt-in, and a non-dedup'd fast path under a dedup-on cluster would split the idempotency contract. Why standalone INCR / HSET are out of scope for THIS PR: ======================================================== INCR (in adapter/redis_compat_commands.go) and HSET both lack a `txnContext.applyIncr` / `applyHSet` implementation, so the "route through single-mop EXEC" pattern that worked here for SET cannot apply as-is — `runTransactionWithDedup` would reject the command at `txn.apply` ("unsupported in MULTI"). Bringing them in requires implementing `applyIncr` / `applyHSet` first (each ~30–50 LOC for the txn-state-aware read-compute-write shape), then the standalone handler routing is a one-liner via this same path. Tracked as separate follow-up PRs; until then, INCR and HSET keep today's buggy-under-churn behaviour, which is the design doc's stated default ("everything else keeps today's behaviour until its hook is added"). Caller audit (per /loop semantic-change rule): - `r.set` (handler): gate-on takes a new code path; gate-off preserved verbatim via `setLegacy`. No external callers exist (it is wired only into the redcon dispatch table). - `setLegacy` (new): byte-identical extraction of the pre-PR set() body. No external callers. - `writeRedisStandaloneResult` (new): translates a single-element `[]redisResult` from `runTransactionWithDedup` into the bare redcon reply shape (no WriteArray wrapper). Single caller in this PR; future SET-pattern callers will reuse it. Validation: - adapter/redis_set_dedup_test.go (new): TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK, TestStandaloneSetDedup_DisabledKeepsLegacyPath. Existing TestRedis_SET / dedup suites still pass. - gofmt, go vet, golangci-lint run all clean (0 issues). Design doc updated to mark standalone SET as LANDED and call out INCR/HSET as the next follow-up with the precise reason they cannot land alongside SET in this PR. --- adapter/redis.go | 67 +++++++++++++++++++ adapter/redis_set_dedup_test.go | 65 ++++++++++++++++++ ...5_21_proposed_txn_secondary_idempotency.md | 23 +++++-- 3 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 adapter/redis_set_dedup_test.go diff --git a/adapter/redis.go b/adapter/redis.go index 065c2899a..aa1fd14f6 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -1118,6 +1118,37 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { if r.proxyToLeader(conn, cmd, cmd.Args[1]) { return } + // Option-2 dedup for standalone SET: route through runTransactionWithDedup + // as a single-mop EXEC body when the gate is on. SET inside MULTI/EXEC + // already has full dedup coverage via applySet (§M3 in the design doc), + // so we just reuse that machinery instead of building a per-handler + // reusableSetTxn + dispatchSetReuse shape. The fast-path optimization is + // intentionally bypassed under the gate — dedup is opt-in, and a + // non-dedup'd fast path under a dedup-on cluster would split the + // idempotency contract. + // + // Result translation: runTransactionWithDedup returns []redisResult; for + // SET there is exactly one element with the same redisResult shape as + // the standalone reply (resultString OK / resultNil for NX/XX miss / + // resultBulk for GET). + if r.onePhaseTxnDedup { + results, err := r.runTransaction([]redcon.Command{cmd}) + if err != nil { + writeRedisError(conn, err) + return + } + writeRedisStandaloneResult(conn, results) + return + } + r.setLegacy(conn, cmd) +} + +// setLegacy is the pre-dedup standalone SET path. Extracted from set() so +// the gate-on routing through runTransactionWithDedup keeps set() under the +// cyclop budget (the gate-off branch's parse + fast-path + executeSet +// shape carries its own decision points). Behaviour is byte-identical to +// the pre-PR set() body. +func (r *RedisServer) setLegacy(conn redcon.Conn, cmd redcon.Command) { opts, err := parseRedisSetOptions(cmd.Args[3:], time.Now()) if err != nil { writeRedisError(conn, err) @@ -1151,6 +1182,42 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { conn.WriteString("OK") } +// writeRedisStandaloneResult translates a single-element results array from +// runTransactionWithDedup into a redcon response, mirroring the shape a +// standalone handler would write directly. Used by SET / future standalone +// commands routed through the dedup loop. Differs from writeResults in NOT +// wrapping the response in conn.WriteArray — the standalone protocol returns +// the bare element. +// +// Empty or multi-element input is degenerate for standalone callers; we +// default to nil so a misuse never leaks a malformed reply to the wire. +func writeRedisStandaloneResult(conn redcon.Conn, results []redisResult) { + if len(results) != 1 { + conn.WriteNull() + return + } + res := results[0] + switch res.typ { + case resultNil: + conn.WriteNull() + case resultError: + writeRedisError(conn, res.err) + case resultBulk: + conn.WriteBulk(res.bulk) + case resultString: + conn.WriteString(res.str) + case resultArray: + conn.WriteArray(len(res.arr)) + for _, s := range res.arr { + conn.WriteBulkString(s) + } + case resultInt: + conn.WriteInt64(res.integer) + default: + conn.WriteNull() + } +} + func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) { key := cmd.Args[1] if r.proxyToLeader(conn, cmd, key) { diff --git a/adapter/redis_set_dedup_test.go b/adapter/redis_set_dedup_test.go new file mode 100644 index 000000000..946e02fcb --- /dev/null +++ b/adapter/redis_set_dedup_test.go @@ -0,0 +1,65 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" + "github.com/tidwall/redcon" +) + +// recordingConn (defined in redis_retry_test.go) captures handler writes via +// .bulk, .err, .int fields. WriteString and WriteBulk both populate .bulk — +// in this test "OK" lands as bulk=[]byte("OK"), .err stays empty for the +// success path. + +// TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK pins the standalone SET +// dedup path: when the gate is on, SET routes through runTransactionWithDedup +// as a single-mop EXEC body. Attempt 1 lands then errors → reuse probes → +// FSM no-ops → client gets "OK" (the cached result) without re-applying. +// +// Pins that the gate-on path uses the same dedup machinery as MULTI/EXEC. +// Without this routing, a standalone SET under leadership churn would not +// benefit from option-2 dedup (the design's "still open" item before this +// PR). +func TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + conn := &recordingConn{} + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}} + srv.set(conn, cmd) + + require.Equal(t, "OK", string(conn.bulk), "standalone SET must reply with the cached OK from attempt 1") + require.Empty(t, conn.err, "no error must escape; dedup hid the ambiguous attempt-1 failure") + require.Equal(t, 2, coord.dispatches, "one ambiguous-land attempt + one reuse") + require.Equal(t, 1, coord.probeNoOps, "reuse must dedup via the exact-ts probe") + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val, "only one apply landed — the value matches attempt 1") +} + +// TestStandaloneSetDedup_DisabledKeepsLegacyPath verifies the gate is honored +// for the standalone SET path too: when onePhaseTxnDedup is off, r.set takes +// its legacy fast-path / executeSet shape (no probe, no per-attempt PrevCommitTS). +// Pins that the new routing is strictly opt-in. +func TestStandaloneSetDedup_DisabledKeepsLegacyPath(t *testing.T) { + t.Parallel() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{} /* gate left false */} + + conn := &recordingConn{} + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}} + srv.set(conn, cmd) + + // Legacy path: no probe. + require.Equal(t, 0, coord.probeNoOps, "gate off — runTransactionWithDedup must not be used") +} diff --git a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md index e38fd89bd..aeb773cea 100644 --- a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md +++ b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md @@ -514,11 +514,24 @@ preserves availability and adds correctness. what changes is that an expired outer ctx is now respected promptly instead of being ignored until the fresh budget elapses. -- **Standalone write commands (SET/INCR/HSET/...) — still open.** The EXEC - path covers MULTI bodies; standalone single-command dispatch goes through - per-handler paths (`applySet`, `applyIncr`, etc.) and needs the same - `reusable` capture + `dispatchXReuse` shape per command. Scope is - per-command but each is small (~50 LOC). Tracked as PR-B follow-up. +- **Standalone SET — LANDED.** The standalone `r.set` handler now routes + through `runTransactionWithDedup` as a single-mop EXEC body when the gate + is on (the dedup machinery's "free" extension to any command whose + `applyXxx` already exists on `txnContext`). The fast-path optimization + (`trySetFastPath`) is intentionally bypassed under the gate — dedup is + opt-in, and a non-dedup'd fast path under a dedup-on cluster would split + the idempotency contract. Tested by `TestStandaloneSetDedup_*` in + `adapter/redis_set_dedup_test.go`. +- **Standalone INCR / HSET — still open.** Both lack a `txnContext.applyXxx` + implementation, so the "route through single-mop EXEC" pattern that + worked for SET cannot apply as-is. Bringing them into the dedup'd path + requires implementing `applyIncr` / `applyHSet` first (each ~30–50 LOC + for the txn-state-aware read-compute-write shape), then the standalone + handler routing is a one-liner via `runTransactionWithDedup`. Tracked + as separate follow-up PRs; until then, INCR and HSET keep today's + buggy-under-churn behaviour, which is the design doc's stated default + ("everything else keeps today's behaviour until its hook is added" — + Open questions). ### M4 — Validation From 8321ad0e359d15b0de29a754d636d228d4251924 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 16:42:41 +0900 Subject: [PATCH 4/7] fix(txn-dedup): address claude[bot] PR #888 round 1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit claude[bot] 🔶 (redundant double gate, adapter/redis.go:1135): set's gate-on branch called r.runTransaction which immediately re-checked the same r.onePhaseTxnDedup gate and routed to runTransactionWithDedup. The indirection was misleading -- the PR description said "dispatches via runTransactionWithDedup" but that was true only by indirection. Replaced with a direct call to runTransactionWithDedup; intent is now explicit and the double gate check is removed. claude[bot] 🔶 (test gap, adapter/redis_set_dedup_test.go): Only the resultString OK path was tested through the dedup route. Added two regression tests covering the other applySet result types that go through writeRedisStandaloneResult: - TestStandaloneSetDedup_NXMissReturnsNil pins resultNil routing: SET k v NX on an existing key returns nil; the cached attempt-1 result must round-trip through writeRedisStandaloneResult -> WriteNull, leaving conn.bulk == nil. - TestStandaloneSetDedup_GETOptionReturnsOldBulk pins resultBulk routing: SET k v GET on an existing key returns the prior value as a bulk reply; conn.bulk must be the prior bytes. Both fire the ambiguous-attempt-1-lands path (newDedupTestCoordinator ambiguousLands=true) so the result MUST come from the cached attempt-1 array, not a re-execution. claude[bot] minor observation (shallow-array constraint in writeRedisStandaloneResult): Documented in the function's doc comment that the resultArray arm flattens via WriteBulkString and is NOT safe to reuse for callers whose applyXxx emits nested arrays. Future HGETALL-pattern callers must either pre-flatten their result or extend the switch. Caller audit per /loop semantic-change rule: - set's gate-on branch is the only call site changed; the new direct call to runTransactionWithDedup uses the same []redcon.Command{cmd} shape and the same []redisResult return type. runTransactionWithDedup is exported within the package and has no other callers outside the legacy runTransaction path (which is unchanged). - writeRedisStandaloneResult signature unchanged. Documentation expanded but no behavior change. - New tests are pure additions; no existing tests modified. Validation: go test ./adapter/ -run StandaloneSetDedup passes (5 tests now). go build ./adapter/... clean. golangci-lint run ./adapter/... 0 issues. --- adapter/redis.go | 18 +++++++- adapter/redis_set_dedup_test.go | 73 +++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/adapter/redis.go b/adapter/redis.go index aa1fd14f6..0d0aa103a 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -1132,7 +1132,14 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { // the standalone reply (resultString OK / resultNil for NX/XX miss / // resultBulk for GET). if r.onePhaseTxnDedup { - results, err := r.runTransaction([]redcon.Command{cmd}) + // claude[bot] PR #888 review: call runTransactionWithDedup directly + // instead of going through runTransaction. runTransaction re-checks + // the same r.onePhaseTxnDedup gate and routes here anyway; the + // indirection made the call chain misleading ("dispatches via + // runTransactionWithDedup" was only true by indirection). Direct + // call makes the intent explicit and removes the double gate + // check. + results, err := r.runTransactionWithDedup([]redcon.Command{cmd}) if err != nil { writeRedisError(conn, err) return @@ -1191,6 +1198,15 @@ func (r *RedisServer) setLegacy(conn redcon.Conn, cmd redcon.Command) { // // Empty or multi-element input is degenerate for standalone callers; we // default to nil so a misuse never leaks a malformed reply to the wire. +// +// Array-element constraint (claude[bot] PR #888 review): the resultArray +// arm writes each element via WriteBulkString, which is correct for +// flat arrays of strings (the shape applySet / future SET-pattern +// callers produce). It does NOT recurse into nested arrays. A future +// caller whose applyXxx emits resultArray with non-string elements +// (e.g. HGETALL-like nested responses) must either pre-flatten its +// result or extend this switch with a recursive arm; reusing it as-is +// would silently mangle the wire reply. func writeRedisStandaloneResult(conn redcon.Conn, results []redisResult) { if len(results) != 1 { conn.WriteNull() diff --git a/adapter/redis_set_dedup_test.go b/adapter/redis_set_dedup_test.go index 946e02fcb..7ca2243b4 100644 --- a/adapter/redis_set_dedup_test.go +++ b/adapter/redis_set_dedup_test.go @@ -46,6 +46,79 @@ func TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK(t *testing.T) { require.Equal(t, []byte("v1"), val, "only one apply landed — the value matches attempt 1") } +// TestStandaloneSetDedup_NXMissReturnsNil pins resultNil routing through +// writeRedisStandaloneResult on the dedup path. SET with NX against an +// existing key returns nil (NX fails because the key exists); the dedup +// loop reuses the cached resultNil and the recording conn observes +// wroteNull. Without correct resultNil arming the client would observe an +// empty bulk reply, breaking NX semantics under dedup. +// +// Closes the claude[bot] PR #888 review's test-gap observation that the +// NX/XX/GET result types from applySet were not covered by the dedup +// suite. +func TestStandaloneSetDedup_NXMissReturnsNil(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + + // Seed the key so the NX condition fails (key already exists). + require.NoError(t, st.PutAt(ctx, redisStrKey([]byte("k")), encodeRedisStr([]byte("seed"), nil), 5, 0)) + + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + conn := &recordingConn{} + // SET k v1 NX -- attempt 1 records resultNil because NX miss. + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1"), []byte("NX")}} + srv.set(conn, cmd) + + // recordingConn.WriteNull sets bulk=nil; success path has no error. + require.Nil(t, conn.bulk, "NX miss must reach WriteNull, not WriteString or WriteBulk") + require.Empty(t, conn.err, "no error must escape; NX miss is a normal response, not an error") + + // Stored value is still the seed; nothing should have overwritten it. + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("seed"), val, "NX miss must not overwrite the existing value") +} + +// TestStandaloneSetDedup_GETOptionReturnsOldBulk pins resultBulk routing +// through writeRedisStandaloneResult on the dedup path. SET ... GET on an +// existing key returns the prior value as a bulk reply; the dedup loop +// reuses the cached resultBulk and the recording conn observes the bytes. +// Without correct resultBulk arming the client would observe an empty or +// nil reply, breaking SET GET semantics under dedup. +func TestStandaloneSetDedup_GETOptionReturnsOldBulk(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + + // Seed the prior value -- SET GET returns this as a bulk reply. + require.NoError(t, st.PutAt(ctx, redisStrKey([]byte("k")), encodeRedisStr([]byte("prior"), nil), 5, 0)) + + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + conn := &recordingConn{} + // SET k v1 GET -- attempt 1 records resultBulk("prior"). + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1"), []byte("GET")}} + srv.set(conn, cmd) + + // recordingConn.WriteBulk copies into .bulk; the prior value must round-trip + // from the cached attempt-1 result through writeRedisStandaloneResult. + require.Equal(t, "prior", string(conn.bulk), "GET option must reply with the cached prior value, not a re-read") + require.Empty(t, conn.err) + + // New value committed via the landed attempt-1 apply. + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val, "SET GET still applies the new value; dedup just preserves the GET result") +} + // TestStandaloneSetDedup_DisabledKeepsLegacyPath verifies the gate is honored // for the standalone SET path too: when onePhaseTxnDedup is off, r.set takes // its legacy fast-path / executeSet shape (no probe, no per-attempt PrevCommitTS). From fd91a6adaa4ea4cb83bff746f1098b793c895320 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 17:08:21 +0900 Subject: [PATCH 5/7] fix(txn-dedup): address claude[bot] PR #888 round 2 minor notes claude[bot] (a) attribution prefix per CLAUDE.md convention: Stripped the inline "// claude[bot] PR #888 review:" prefixes from the two comments added in round 1. The substantive "why" content (double-gate rationale; shallow-array constraint) is kept as that is what future readers need. CLAUDE.md convention is to leave fix attribution in the commit message, not the source. claude[bot] (b) wroteNull bool field on recordingConn: Added wroteNull witness to recordingConn (in redis_retry_test.go, the shared test helper). Hardened TestStandaloneSetDedup_NXMissReturnsNil to require.True(conn.wroteNull, ...) so a wrong branch that wrote nothing at all (would also satisfy conn.bulk == nil) is now caught. Caller audit per /loop semantic-change rule: - recordingConn is in *_test.go and used by tests only. Adding wroteNull bool default false is strictly additive; no existing test reads it. The only WriteNull semantics change is setting the new field; conn.bulk = nil behavior is preserved. - Verified by grep: only redis_set_dedup_test.go references wroteNull. All other recordingConn uses (8 instantiations across redis_retry_test.go + redis_lua_linearizable_read_test.go) are unaffected. Validation: go test ./adapter/ -run StandaloneSetDedup passes. go test ./adapter/ -run ExecDedup passes. gofmt + go vet + golangci-lint clean. --- adapter/redis.go | 30 +++++++++++++++--------------- adapter/redis_retry_test.go | 10 ++++++---- adapter/redis_set_dedup_test.go | 8 ++++++-- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/adapter/redis.go b/adapter/redis.go index 0d0aa103a..170f0fa76 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -1132,13 +1132,13 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { // the standalone reply (resultString OK / resultNil for NX/XX miss / // resultBulk for GET). if r.onePhaseTxnDedup { - // claude[bot] PR #888 review: call runTransactionWithDedup directly - // instead of going through runTransaction. runTransaction re-checks - // the same r.onePhaseTxnDedup gate and routes here anyway; the - // indirection made the call chain misleading ("dispatches via - // runTransactionWithDedup" was only true by indirection). Direct - // call makes the intent explicit and removes the double gate - // check. + // Call runTransactionWithDedup directly instead of going through + // runTransaction. runTransaction re-checks the same + // r.onePhaseTxnDedup gate and routes here anyway; the indirection + // would make the call chain misleading ("dispatches via + // runTransactionWithDedup" being true only by indirection). + // Direct call makes the intent explicit and removes the double + // gate check. results, err := r.runTransactionWithDedup([]redcon.Command{cmd}) if err != nil { writeRedisError(conn, err) @@ -1199,14 +1199,14 @@ func (r *RedisServer) setLegacy(conn redcon.Conn, cmd redcon.Command) { // Empty or multi-element input is degenerate for standalone callers; we // default to nil so a misuse never leaks a malformed reply to the wire. // -// Array-element constraint (claude[bot] PR #888 review): the resultArray -// arm writes each element via WriteBulkString, which is correct for -// flat arrays of strings (the shape applySet / future SET-pattern -// callers produce). It does NOT recurse into nested arrays. A future -// caller whose applyXxx emits resultArray with non-string elements -// (e.g. HGETALL-like nested responses) must either pre-flatten its -// result or extend this switch with a recursive arm; reusing it as-is -// would silently mangle the wire reply. +// Array-element constraint: the resultArray arm writes each element via +// WriteBulkString, which is correct for flat arrays of strings (the +// shape applySet / future SET-pattern callers produce). It does NOT +// recurse into nested arrays. A future caller whose applyXxx emits +// resultArray with non-string elements (e.g. HGETALL-like nested +// responses) must either pre-flatten its result or extend this switch +// with a recursive arm; reusing it as-is would silently mangle the +// wire reply. func writeRedisStandaloneResult(conn redcon.Conn, results []redisResult) { if len(results) != 1 { conn.WriteNull() diff --git a/adapter/redis_retry_test.go b/adapter/redis_retry_test.go index ea0d236a6..eb6d13ab6 100644 --- a/adapter/redis_retry_test.go +++ b/adapter/redis_retry_test.go @@ -93,10 +93,11 @@ func (c *retryOnceCoordinator) LeaseReadForKey(ctx context.Context, _ []byte) (u } type recordingConn struct { - ctx any - err string - bulk []byte - int int64 + ctx any + err string + bulk []byte + int int64 + wroteNull bool } func (c *recordingConn) RemoteAddr() string { return "" } @@ -129,6 +130,7 @@ func (c *recordingConn) WriteUint64(num uint64) { func (c *recordingConn) WriteArray(count int) {} func (c *recordingConn) WriteNull() { c.bulk = nil + c.wroteNull = true } func (c *recordingConn) WriteRaw(data []byte) { c.bulk = append([]byte(nil), data...) diff --git a/adapter/redis_set_dedup_test.go b/adapter/redis_set_dedup_test.go index 7ca2243b4..491dcc5df 100644 --- a/adapter/redis_set_dedup_test.go +++ b/adapter/redis_set_dedup_test.go @@ -72,8 +72,12 @@ func TestStandaloneSetDedup_NXMissReturnsNil(t *testing.T) { cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1"), []byte("NX")}} srv.set(conn, cmd) - // recordingConn.WriteNull sets bulk=nil; success path has no error. - require.Nil(t, conn.bulk, "NX miss must reach WriteNull, not WriteString or WriteBulk") + // Airtight assertion: WriteNull was actually called (not "nothing was + // written, leaving the zero-value nil"). Without the wroteNull witness + // flag, a wrong branch that wrote nothing at all would also pass + // `conn.bulk == nil` -- claude[bot] PR #888 round-2 hardening. + require.True(t, conn.wroteNull, "NX miss must call WriteNull, not silently skip the write") + require.Nil(t, conn.bulk, "WriteNull leaves conn.bulk nil; a stray WriteString/WriteBulk would have populated it") require.Empty(t, conn.err, "no error must escape; NX miss is a normal response, not an error") // Stored value is still the seed; nothing should have overwritten it. From da6b0b419e155fc846c7169d0ee9218b7c7758de Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 17:14:53 +0900 Subject: [PATCH 6/7] fix(test): strip remaining task-attribution refs from redis_set_dedup_test claude[bot] PR #888 round-3 minor cleanup: same CLAUDE.md convention violation that was cleaned from redis.go in round 2 was left in the test file. Removed two references on lines 56 and 78. Substantive rationale (NX semantics, zero-value aliasing problem) kept; the attribution moves to the commit message where it belongs. No functional or test behaviour change. golangci-lint clean. --- adapter/redis_set_dedup_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/adapter/redis_set_dedup_test.go b/adapter/redis_set_dedup_test.go index 491dcc5df..28d6bca0c 100644 --- a/adapter/redis_set_dedup_test.go +++ b/adapter/redis_set_dedup_test.go @@ -52,10 +52,6 @@ func TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK(t *testing.T) { // loop reuses the cached resultNil and the recording conn observes // wroteNull. Without correct resultNil arming the client would observe an // empty bulk reply, breaking NX semantics under dedup. -// -// Closes the claude[bot] PR #888 review's test-gap observation that the -// NX/XX/GET result types from applySet were not covered by the dedup -// suite. func TestStandaloneSetDedup_NXMissReturnsNil(t *testing.T) { t.Parallel() ctx := context.Background() @@ -75,7 +71,7 @@ func TestStandaloneSetDedup_NXMissReturnsNil(t *testing.T) { // Airtight assertion: WriteNull was actually called (not "nothing was // written, leaving the zero-value nil"). Without the wroteNull witness // flag, a wrong branch that wrote nothing at all would also pass - // `conn.bulk == nil` -- claude[bot] PR #888 round-2 hardening. + // `conn.bulk == nil`. require.True(t, conn.wroteNull, "NX miss must call WriteNull, not silently skip the write") require.Nil(t, conn.bulk, "WriteNull leaves conn.bulk nil; a stray WriteString/WriteBulk would have populated it") require.Empty(t, conn.err, "no error must escape; NX miss is a normal response, not an error") From 69db79bce0cc8ac896480c83434a46fa86631adf Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 17:25:01 +0900 Subject: [PATCH 7/7] fix(txn-dedup): address coderabbitai PR #887 round 2 minor coderabbitai (1) test (adapter/redis_exec_dedup_test.go:119): TestExecDedup_GenuineConflictRebuildsAndApplies asserted GreaterOrEqual(coord.dispatches, 3) but with the single injected pre-reject + single concurrent SET the retry topology is fully deterministic: attempt 1 (pre-reject), reuse (OCC-conflict), fresh retry (success). Tightened to Equal(3) so a regression that adds an extra dispatch is caught. coderabbitai (2) doc (docs/design/...:481): "LANDED via PR #884" was ambiguous after the re-land on main via PR #887. Updated to "LANDED via PR #887 (originally PR #884, re-landed against main)" so future readers see the canonical landing PR with the lineage preserved. No Go callers touched. go test ./adapter/ -run ExecDedup passes. --- adapter/redis_exec_dedup_test.go | 10 +++++++--- .../2026_05_21_proposed_txn_secondary_idempotency.md | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/adapter/redis_exec_dedup_test.go b/adapter/redis_exec_dedup_test.go index ae86132ef..40bc6956b 100644 --- a/adapter/redis_exec_dedup_test.go +++ b/adapter/redis_exec_dedup_test.go @@ -113,9 +113,13 @@ func TestExecDedup_GenuineConflictRebuildsAndApplies(t *testing.T) { require.NoError(t, err) require.Len(t, results, 1) require.Equal(t, "OK", results[0].str) - // Three dispatches: attempt 1 (pre-reject), reuse (OCC-conflict on key), - // fresh-snapshot retry (success). - require.GreaterOrEqual(t, coord.dispatches, 3) + // Exactly three dispatches: attempt 1 (pre-reject, no land), reuse + // (OCC-conflict on the foreign-committed key), fresh-snapshot retry + // (success). The single injected pre-reject + single concurrent SET + // fully determine the retry topology — Equal pins it so any future + // regression that adds an extra dispatch is caught immediately + // (coderabbitai PR #887 minor). + require.Equal(t, 3, coord.dispatches) require.Equal(t, 0, coord.probeNoOps, "nothing landed at attempt 1's ts; probe must not fire as a hit") // Our final write wins (it commits AFTER the concurrent SET because we diff --git a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md index aeb773cea..c463ceb03 100644 --- a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md +++ b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md @@ -478,7 +478,7 @@ preserves availability and adds correctness. outcomes end-to-end against real OCC + the real probe, plus the gate-off legacy path. - **Result reconstruction (R1) — resolved, and simpler than feared.** See R1. -- **`runTransaction` (MULTI/EXEC) — LANDED via PR #884.** When the gate is +- **`runTransaction` (MULTI/EXEC) — LANDED via PR #887 (originally PR #884, re-landed against main).** When the gate is on, `runTransactionWithDedup` mirrors `listPushCoreWithDedup` at the EXEC granularity: the first attempt builds the txn, captures `nextResults` from attempt 1's `startTS` snapshot, dispatches; on a retryable failure