diff --git a/adapter/redis.go b/adapter/redis.go index 2411c57b9..07f2c918f 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -2654,7 +2654,30 @@ func (t *txnContext) validateReadSet(ctx context.Context) error { return nil } -func (t *txnContext) commit() error { +// preparedTxnDispatch is the fully-assembled write set + read set + commit +// timestamp for a MULTI/EXEC transaction, ready to be passed to +// coordinator.Dispatch. Split out from commit() so the option-2 dedup +// path (runTransactionWithDedup) can intercept between prepare and +// dispatch — it needs to capture (elems, commitTS, readKeys) for a +// possible retry under PrevCommitTS without otherwise duplicating the +// commit-building logic. The owned ctx is the redisDispatchTimeout- +// bounded context the caller must run Dispatch under and Cancel after. +type preparedTxnDispatch struct { + elems []*kv.Elem[kv.OP] + commitTS uint64 + readKeys [][]byte + ctx context.Context + cancel context.CancelFunc +} + +// prepareDispatch builds everything Dispatch needs (elems, commitTS, +// readKeys, ctx) without actually calling Dispatch. Callers must always +// invoke `cancel()` on the returned prepared value once the dispatch +// attempt finishes (commit() does this via defer; the dedup path does it +// per retry iteration). When the transaction has no writes this returns +// a prepared value with empty `elems` and a no-op cancel — callers can +// check len(prepared.elems)==0 and skip the dispatch. +func (t *txnContext) prepareDispatch() (preparedTxnDispatch, error) { elems := t.buildKeyElems() // Pre-allocate commitTS so Delta keys can embed it in their bytes before @@ -2663,7 +2686,7 @@ func (t *txnContext) commit() error { listElems := t.buildListElems(commitTS) zsetElems, err := t.buildZSetElems(commitTS) if err != nil { - return err + return preparedTxnDispatch{cancel: func() {}}, err } // TTL elements: string keys have TTL embedded in value (buildKeyElems handles that), // non-string keys get a !redis|ttl| element written in the same transaction. @@ -2675,33 +2698,48 @@ func (t *txnContext) commit() error { // run on the server-lifetime handlerContext, leaving its scans uncancellable // from the request side on a slow disk or hot-key pathological commit. ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout) - defer cancel() streamElems, err := t.buildStreamDeletionElems(ctx) if err != nil { - return err + cancel() + return preparedTxnDispatch{cancel: func() {}}, err } elems = append(elems, listElems...) elems = append(elems, zsetElems...) elems = append(elems, ttlElems...) elems = append(elems, streamElems...) - if len(elems) == 0 { - return nil - } readKeys := make([][]byte, 0, len(t.readKeys)) for _, k := range t.readKeys { readKeys = append(readKeys, k) } + return preparedTxnDispatch{ + elems: elems, + commitTS: commitTS, + readKeys: readKeys, + ctx: ctx, + cancel: cancel, + }, nil +} + +func (t *txnContext) commit() error { + prepared, err := t.prepareDispatch() + if err != nil { + return err + } + defer prepared.cancel() + if len(prepared.elems) == 0 { + return nil + } group := &kv.OperationGroup[kv.OP]{ IsTxn: true, - Elems: elems, + Elems: prepared.elems, StartTS: t.startTS, - CommitTS: commitTS, - ReadKeys: readKeys, + CommitTS: prepared.commitTS, + ReadKeys: prepared.readKeys, } - if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil { + if _, err := t.server.coordinator.Dispatch(prepared.ctx, group); err != nil { return errors.WithStack(err) } return nil @@ -2978,6 +3016,10 @@ func (t *txnContext) buildTTLElems() []*kv.Elem[kv.OP] { } func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, error) { + if r.onePhaseTxnDedup { + return r.runTransactionWithDedup(queue) + } + dispatchCtx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) defer cancel() @@ -3024,6 +3066,214 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err return results, nil } +// reusableExecTxn captures a dispatched MULTI/EXEC transaction so a +// subsequent retry can reuse its exact write set under a fresh +// commit_ts (carrying prev_commit_ts) and probe whether the prior +// attempt already landed. This is the EXEC analogue of +// reusableListPush (M3 R1 result reconstruction for MULTI/EXEC). +// +// `results` is computed once from attempt 1's startTS snapshot and is +// invariant across reuse for the same reason RPUSH/LPUSH's `length` +// is: the write set is fixed, so apply-vs-no-op is invisible to the +// client. Reads in the EXEC body returned values from attempt 1's +// snapshot — those values were what the client would have observed if +// attempt 1 hadn't returned an ambiguous error, so caching them is +// the right semantics for a confirmed-or-deduped commit. A +// genuine cross-txn conflict is caught by OCC on readKeys at the FSM +// apply (WriteConflict → drop pending → recompute), so the cached +// results are only returned when reuse actually represents the +// outcome of attempt 1's intent. +type reusableExecTxn struct { + elems []*kv.Elem[kv.OP] + startTS uint64 + commitTS uint64 + readKeys [][]byte + results []redisResult +} + +// dispatchExecReuse runs one iteration of the option-2 reuse path for +// MULTI/EXEC: dispatches the captured write set under a fresh +// commit_ts (carrying pending.commitTS as PrevCommitTS so the FSM +// probes whether the prior attempt landed) and returns the cached +// client-visible results on success. The drop return signals the +// caller to clear pending — set on a genuine WriteConflict from +// another txn (after the self-conflict probe rules out our own apply) +// so the next iteration rebuilds the txn from a fresh read snapshot. +// +// Mirrors dispatchListPushReuse; the only difference is the result +// payload (cached []redisResult vs computed list length) and the lack +// of a meta re-read fallback — for EXEC there is no post-apply "what +// is the current length" question; the client-visible result IS the +// cached results array. +func (r *RedisServer) dispatchExecReuse(ctx context.Context, pending *reusableExecTxn) (results []redisResult, drop bool, err error) { + commitTS := r.coordinator.Clock().Next() + _, dispErr := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{ + IsTxn: true, + StartTS: pending.startTS, + CommitTS: commitTS, + PrevCommitTS: pending.commitTS, + ReadKeys: pending.readKeys, + Elems: pending.elems, + }) + if dispErr == nil { + return pending.results, false, nil + } + if errors.Is(dispErr, store.ErrWriteConflict) { + // Self-inflicted-conflict guard (mirrors dispatchListPushReuse): + // the apply might have landed at this fresh commitTS but bubbled + // up as WriteConflict due to leadership churn. Probe whether our + // reused write set actually landed; if yes, return the cached + // results unchanged (they describe the EXEC body's outcome + // against attempt 1's snapshot, which is the outcome whether + // the bytes hit MVCC at attempt-1's commitTS or at this fresh + // commitTS — the OCC fence on readKeys guarantees no + // intervening cross-txn write slipped past). + if probeKey := firstWriteKey(pending.elems); len(probeKey) > 0 { + landed, perr := r.store.CommittedVersionAt(ctx, probeKey, commitTS) + if perr == nil && landed { + pending.commitTS = commitTS + return pending.results, false, nil + } + } + // Our attempt did not land at commitTS and a key collides with + // another txn — genuine conflict. Drop pending so the next + // iteration rebuilds from a fresh snapshot. + return nil, true, errors.WithStack(dispErr) + } + // Still ambiguous (lock / other retryable): the reuse may itself + // have landed, so the next retry must probe THIS commit_ts. Only + // advance pending.commitTS if retryRedisWrite will actually loop + // (non-retryable errors escape to the client; pending is then + // discarded with the goroutine). + if isRetryableRedisTxnErr(dispErr) { + pending.commitTS = commitTS + } + return nil, false, errors.WithStack(dispErr) +} + +// runTransactionWithDedup is the option-2 retry loop for MULTI/EXEC. +// The first attempt builds the txn write set + cached results from +// the user's startTS snapshot; any retryable failure makes the next +// iteration REUSE that write set under a fresh commit_ts with +// prev_commit_ts set, so the FSM no-ops if the prior attempt already +// landed. A WriteConflict on a reuse attempt (after the self-conflict +// probe rules out our own apply) means another txn touched a read or +// write key, and we drop pending → rebuild from a fresh snapshot. +// +// Mirrors listPushCoreWithDedup at the EXEC granularity. +func (r *RedisServer) runTransactionWithDedup(queue []redcon.Command) ([]redisResult, error) { + dispatchCtx, cancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) + defer cancel() + + var results []redisResult + var pending *reusableExecTxn + err := r.retryRedisWrite(dispatchCtx, func() error { + if pending != nil { + reuseCtx, reuseCancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) + defer reuseCancel() + res, drop, dispErr := r.dispatchExecReuse(reuseCtx, pending) + if drop { + pending = nil + } + if dispErr != nil { + return dispErr + } + results = res + return nil + } + res, next, ferr := r.firstExecAttempt(dispatchCtx, queue) + if ferr != nil { + if next != nil { + pending = next + } + return ferr + } + results = res + return nil + }) + if err != nil { + return nil, err + } + return results, nil +} + +// firstExecAttempt runs the initial (no-reuse) EXEC attempt: builds the +// txn snapshot, applies each command to capture the client-visible +// results, validates the read set, and dispatches. On success returns +// the results. On a retryable dispatch failure it returns a +// reusableExecTxn capturing what the retry loop needs to dispatch via +// PrevCommitTS on the next iteration; non-retryable failures return a +// nil reuse state (mirrors listPushCoreWithDedup's gating). Extracted +// from runTransactionWithDedup to keep that loop under the cyclop +// budget; the dedup rationale lives there. +func (r *RedisServer) firstExecAttempt(dispatchCtx context.Context, queue []redcon.Command) ([]redisResult, *reusableExecTxn, error) { + startTS := r.txnStartTS() + readPin := r.pinReadTS(startTS) + defer readPin.Release() + + txn := &txnContext{ + server: r, + ctx: dispatchCtx, + working: map[string]*txnValue{}, + listStates: map[string]*listTxnState{}, + zsetStates: map[string]*zsetTxnState{}, + ttlStates: map[string]*ttlTxnState{}, + readKeys: map[string][]byte{}, + streamDeletions: map[string][]byte{}, + startTS: startTS, + } + + nextResults := make([]redisResult, 0, len(queue)) + for _, cmd := range queue { + res, err := txn.apply(cmd) + if err != nil { + return nil, nil, err + } + nextResults = append(nextResults, res) + } + + if err := txn.validateReadSet(dispatchCtx); err != nil { + return nil, nil, err + } + + prepared, err := txn.prepareDispatch() + if err != nil { + return nil, nil, err + } + defer prepared.cancel() + if len(prepared.elems) == 0 { + // Read-only EXEC: nothing to dispatch, no dedup window. + return nextResults, nil, nil + } + + group := &kv.OperationGroup[kv.OP]{ + IsTxn: true, + Elems: prepared.elems, + StartTS: txn.startTS, + CommitTS: prepared.commitTS, + ReadKeys: prepared.readKeys, + } + if _, dispErr := r.coordinator.Dispatch(prepared.ctx, group); dispErr != nil { + // Only remember the attempt for reuse if retryRedisWrite will + // actually loop. Mirrors listPushCoreWithDedup's gating + // rationale — errors that escape the loop (transient-leader, + // context deadline, FSM apply error) leave pending pointing at + // state wasted with the goroutine; ambiguous errors that + // escape to the client are out of scope for this loop. + if isRetryableRedisTxnErr(dispErr) { + return nil, &reusableExecTxn{ + elems: prepared.elems, + startTS: txn.startTS, + commitTS: prepared.commitTS, + readKeys: prepared.readKeys, + results: nextResults, + }, errors.WithStack(dispErr) + } + return nil, nil, errors.WithStack(dispErr) + } + return nextResults, nil, nil +} + func (r *RedisServer) txnStartTS() uint64 { // store.LastCommitTS() is the authoritative safe-snapshot watermark: it is // updated atomically only AFTER the corresponding Pebble batch commit, so diff --git a/adapter/redis_exec_dedup_test.go b/adapter/redis_exec_dedup_test.go new file mode 100644 index 000000000..b29769bc3 --- /dev/null +++ b/adapter/redis_exec_dedup_test.go @@ -0,0 +1,190 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" + "github.com/tidwall/redcon" +) + +// TestExecDedup_LandedPriorAttempt_ReturnsCachedResults is the option-2 +// headline for MULTI/EXEC (M3 R1): attempt 1 commits the transaction body +// but bubbles up an ambiguous error, the retry reuses the same write set +// with prev_commit_ts, the FSM probe finds the landed version and no-ops, +// and the client gets the same results array attempt 1 computed. +// +// Without the probe, the reuse would OCC-conflict against attempt 1's own +// version, the adapter would drop pending and recompute from a fresh +// snapshot — which for SET is harmless (idempotent overwrite) but for +// INCR/RPUSH would produce a different (and wrong, since attempt 1 already +// landed) result. This test pins that the reuse path returns attempt 1's +// cached results without re-executing the command body. +func TestExecDedup_LandedPriorAttempt_ReturnsCachedResults(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + // Single-mop EXEC: one SET command. + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str, "SET must return the cached OK from attempt 1, not be re-executed") + require.Equal(t, 2, coord.dispatches, "one failed (ambiguous-land) attempt + one reuse") + require.Equal(t, 1, coord.probeNoOps, "the reuse must dedup via the exact-ts probe") + + // And the value is exactly the one attempt 1 wrote. + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_PriorAttemptDidNotLand_Applies covers the truncated case for +// MULTI/EXEC: attempt 1 errored without committing (OCC-style pre-reject), +// so the probe misses and the reuse applies the same write set at a fresh +// commit_ts. The cached results are still returned (they describe the +// EXEC body's intent against attempt 1's snapshot, which is what the client +// sees regardless of which physical commit_ts the bytes hit MVCC at). +func TestExecDedup_PriorAttemptDidNotLand_Applies(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + require.Equal(t, 2, coord.dispatches) + require.Equal(t, 0, coord.probeNoOps, "nothing landed, so the probe must miss and the reuse applies") + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_GenuineConflictRebuildsAndApplies covers outcome 3 for +// MULTI/EXEC: attempt 1 did not land; a concurrent client wrote the same +// key between attempts; the reuse OCC-conflicts (the foreign write +// advances the key's commit_ts past pending.startTS), the self-conflict +// probe rules out our own landing, the adapter drops pending and rebuilds +// the txn from a fresh snapshot — the new attempt then succeeds. +// +// This test pins the discriminator: probe-miss + OCC-conflict ⇒ recompute. +// If the adapter incorrectly reused on a foreign conflict, the cached +// results from attempt 1 would be returned alongside a value written by +// the concurrent client (an inconsistent view). +func TestExecDedup_GenuineConflictRebuildsAndApplies(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + key := []byte("k") + + // Before dispatch 2 (the reuse), inject a concurrent SET so the reuse + // OCC-conflicts on the write key. + coord.beforeDispatch = func(n int) { + if n != 2 { + return + } + ts := coord.Clock().Next() + require.NoError(t, st.PutAt(ctx, redisStrKey(key), encodeRedisStr([]byte("other"), nil), ts, 0)) + } + + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), key, []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + // Three dispatches: attempt 1 (pre-reject), reuse (OCC-conflict on key), + // fresh-snapshot retry (success). + require.GreaterOrEqual(t, coord.dispatches, 3) + require.Equal(t, 0, coord.probeNoOps, "nothing landed at attempt 1's ts; probe must not fire as a hit") + + // Our final write wins (it commits AFTER the concurrent SET because we + // rebuilt at a fresh startTS that observed the foreign commit). + rawVal, err := st.GetAt(ctx, redisStrKey(key), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_SelfInflictedReuseConflict_ReturnsSuccess mirrors the +// listPush self-inflicted-conflict regression: the reuse dispatch APPLIES +// the elems at the fresh commitTS but bubbles up store.ErrWriteConflict +// (leadership churn surfacing a committed entry as a conflict). The +// adapter probes the just-attempted commit_ts; the probe hits; cached +// results are returned. Without the guard, the adapter would drop pending +// and recompute, double-applying the EXEC body. +func TestExecDedup_SelfInflictedReuseConflict_ReturnsSuccess(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 pre-rejects (didn't land) + coord.landThenWriteConflictAtDispatch = 2 // reuse lands then surfaces WriteConflict + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + require.Equal(t, 2, coord.dispatches, "attempt 1 pre-reject + reuse land-then-conflict; no third attempt") + require.Equal(t, 0, coord.probeNoOps, + "the FSM probe at attempt 1's ts must NOT hit (attempt 1 did not land); "+ + "the success comes from the adapter's self-conflict guard probing the fresh commitTS") + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestExecDedup_DisabledKeepsLegacyPath verifies the dedup gate is honored: +// when onePhaseTxnDedup is off, runTransaction takes the legacy path +// (recompute on every retry, no prev_commit_ts) — byte-identical to today. +// Pins that the new code is strictly opt-in. +func TestExecDedup_DisabledKeepsLegacyPath(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{} /* gate left false */} + + queue := []redcon.Command{ + {Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}, + } + results, err := srv.runTransaction(queue) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "OK", results[0].str) + // Legacy path runs no probe. + require.Equal(t, 0, coord.probeNoOps) + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +}