diff --git a/adapter/redis.go b/adapter/redis.go index 065c2899..170f0fa7 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -1118,6 +1118,44 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { if r.proxyToLeader(conn, cmd, cmd.Args[1]) { return } + // Option-2 dedup for standalone SET: route through runTransactionWithDedup + // as a single-mop EXEC body when the gate is on. SET inside MULTI/EXEC + // already has full dedup coverage via applySet (§M3 in the design doc), + // so we just reuse that machinery instead of building a per-handler + // reusableSetTxn + dispatchSetReuse shape. The fast-path optimization is + // intentionally bypassed under the gate — dedup is opt-in, and a + // non-dedup'd fast path under a dedup-on cluster would split the + // idempotency contract. + // + // Result translation: runTransactionWithDedup returns []redisResult; for + // SET there is exactly one element with the same redisResult shape as + // the standalone reply (resultString OK / resultNil for NX/XX miss / + // resultBulk for GET). + if r.onePhaseTxnDedup { + // Call runTransactionWithDedup directly instead of going through + // runTransaction. runTransaction re-checks the same + // r.onePhaseTxnDedup gate and routes here anyway; the indirection + // would make the call chain misleading ("dispatches via + // runTransactionWithDedup" being true only by indirection). + // Direct call makes the intent explicit and removes the double + // gate check. + results, err := r.runTransactionWithDedup([]redcon.Command{cmd}) + if err != nil { + writeRedisError(conn, err) + return + } + writeRedisStandaloneResult(conn, results) + return + } + r.setLegacy(conn, cmd) +} + +// setLegacy is the pre-dedup standalone SET path. Extracted from set() so +// the gate-on routing through runTransactionWithDedup keeps set() under the +// cyclop budget (the gate-off branch's parse + fast-path + executeSet +// shape carries its own decision points). Behaviour is byte-identical to +// the pre-PR set() body. +func (r *RedisServer) setLegacy(conn redcon.Conn, cmd redcon.Command) { opts, err := parseRedisSetOptions(cmd.Args[3:], time.Now()) if err != nil { writeRedisError(conn, err) @@ -1151,6 +1189,51 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { conn.WriteString("OK") } +// writeRedisStandaloneResult translates a single-element results array from +// runTransactionWithDedup into a redcon response, mirroring the shape a +// standalone handler would write directly. Used by SET / future standalone +// commands routed through the dedup loop. Differs from writeResults in NOT +// wrapping the response in conn.WriteArray — the standalone protocol returns +// the bare element. +// +// Empty or multi-element input is degenerate for standalone callers; we +// default to nil so a misuse never leaks a malformed reply to the wire. +// +// Array-element constraint: the resultArray arm writes each element via +// WriteBulkString, which is correct for flat arrays of strings (the +// shape applySet / future SET-pattern callers produce). It does NOT +// recurse into nested arrays. A future caller whose applyXxx emits +// resultArray with non-string elements (e.g. HGETALL-like nested +// responses) must either pre-flatten its result or extend this switch +// with a recursive arm; reusing it as-is would silently mangle the +// wire reply. +func writeRedisStandaloneResult(conn redcon.Conn, results []redisResult) { + if len(results) != 1 { + conn.WriteNull() + return + } + res := results[0] + switch res.typ { + case resultNil: + conn.WriteNull() + case resultError: + writeRedisError(conn, res.err) + case resultBulk: + conn.WriteBulk(res.bulk) + case resultString: + conn.WriteString(res.str) + case resultArray: + conn.WriteArray(len(res.arr)) + for _, s := range res.arr { + conn.WriteBulkString(s) + } + case resultInt: + conn.WriteInt64(res.integer) + default: + conn.WriteNull() + } +} + func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) { key := cmd.Args[1] if r.proxyToLeader(conn, cmd, key) { diff --git a/adapter/redis_retry_test.go b/adapter/redis_retry_test.go index ea0d236a..eb6d13ab 100644 --- a/adapter/redis_retry_test.go +++ b/adapter/redis_retry_test.go @@ -93,10 +93,11 @@ func (c *retryOnceCoordinator) LeaseReadForKey(ctx context.Context, _ []byte) (u } type recordingConn struct { - ctx any - err string - bulk []byte - int int64 + ctx any + err string + bulk []byte + int int64 + wroteNull bool } func (c *recordingConn) RemoteAddr() string { return "" } @@ -129,6 +130,7 @@ func (c *recordingConn) WriteUint64(num uint64) { func (c *recordingConn) WriteArray(count int) {} func (c *recordingConn) WriteNull() { c.bulk = nil + c.wroteNull = true } func (c *recordingConn) WriteRaw(data []byte) { c.bulk = append([]byte(nil), data...) diff --git a/adapter/redis_set_dedup_test.go b/adapter/redis_set_dedup_test.go new file mode 100644 index 00000000..28d6bca0 --- /dev/null +++ b/adapter/redis_set_dedup_test.go @@ -0,0 +1,138 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" + "github.com/tidwall/redcon" +) + +// recordingConn (defined in redis_retry_test.go) captures handler writes via +// .bulk, .err, .int fields. WriteString and WriteBulk both populate .bulk — +// in this test "OK" lands as bulk=[]byte("OK"), .err stays empty for the +// success path. + +// TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK pins the standalone SET +// dedup path: when the gate is on, SET routes through runTransactionWithDedup +// as a single-mop EXEC body. Attempt 1 lands then errors → reuse probes → +// FSM no-ops → client gets "OK" (the cached result) without re-applying. +// +// Pins that the gate-on path uses the same dedup machinery as MULTI/EXEC. +// Without this routing, a standalone SET under leadership churn would not +// benefit from option-2 dedup (the design's "still open" item before this +// PR). +func TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + conn := &recordingConn{} + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}} + srv.set(conn, cmd) + + require.Equal(t, "OK", string(conn.bulk), "standalone SET must reply with the cached OK from attempt 1") + require.Empty(t, conn.err, "no error must escape; dedup hid the ambiguous attempt-1 failure") + require.Equal(t, 2, coord.dispatches, "one ambiguous-land attempt + one reuse") + require.Equal(t, 1, coord.probeNoOps, "reuse must dedup via the exact-ts probe") + + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val, "only one apply landed — the value matches attempt 1") +} + +// TestStandaloneSetDedup_NXMissReturnsNil pins resultNil routing through +// writeRedisStandaloneResult on the dedup path. SET with NX against an +// existing key returns nil (NX fails because the key exists); the dedup +// loop reuses the cached resultNil and the recording conn observes +// wroteNull. Without correct resultNil arming the client would observe an +// empty bulk reply, breaking NX semantics under dedup. +func TestStandaloneSetDedup_NXMissReturnsNil(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + + // Seed the key so the NX condition fails (key already exists). + require.NoError(t, st.PutAt(ctx, redisStrKey([]byte("k")), encodeRedisStr([]byte("seed"), nil), 5, 0)) + + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + conn := &recordingConn{} + // SET k v1 NX -- attempt 1 records resultNil because NX miss. + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1"), []byte("NX")}} + srv.set(conn, cmd) + + // Airtight assertion: WriteNull was actually called (not "nothing was + // written, leaving the zero-value nil"). Without the wroteNull witness + // flag, a wrong branch that wrote nothing at all would also pass + // `conn.bulk == nil`. + require.True(t, conn.wroteNull, "NX miss must call WriteNull, not silently skip the write") + require.Nil(t, conn.bulk, "WriteNull leaves conn.bulk nil; a stray WriteString/WriteBulk would have populated it") + require.Empty(t, conn.err, "no error must escape; NX miss is a normal response, not an error") + + // Stored value is still the seed; nothing should have overwritten it. + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("seed"), val, "NX miss must not overwrite the existing value") +} + +// TestStandaloneSetDedup_GETOptionReturnsOldBulk pins resultBulk routing +// through writeRedisStandaloneResult on the dedup path. SET ... GET on an +// existing key returns the prior value as a bulk reply; the dedup loop +// reuses the cached resultBulk and the recording conn observes the bytes. +// Without correct resultBulk arming the client would observe an empty or +// nil reply, breaking SET GET semantics under dedup. +func TestStandaloneSetDedup_GETOptionReturnsOldBulk(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + + // Seed the prior value -- SET GET returns this as a bulk reply. + require.NoError(t, st.PutAt(ctx, redisStrKey([]byte("k")), encodeRedisStr([]byte("prior"), nil), 5, 0)) + + coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true} + + conn := &recordingConn{} + // SET k v1 GET -- attempt 1 records resultBulk("prior"). + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1"), []byte("GET")}} + srv.set(conn, cmd) + + // recordingConn.WriteBulk copies into .bulk; the prior value must round-trip + // from the cached attempt-1 result through writeRedisStandaloneResult. + require.Equal(t, "prior", string(conn.bulk), "GET option must reply with the cached prior value, not a re-read") + require.Empty(t, conn.err) + + // New value committed via the landed attempt-1 apply. + rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st)) + require.NoError(t, err) + val, _, err := decodeRedisStr(rawVal) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val, "SET GET still applies the new value; dedup just preserves the GET result") +} + +// TestStandaloneSetDedup_DisabledKeepsLegacyPath verifies the gate is honored +// for the standalone SET path too: when onePhaseTxnDedup is off, r.set takes +// its legacy fast-path / executeSet shape (no probe, no per-attempt PrevCommitTS). +// Pins that the new routing is strictly opt-in. +func TestStandaloneSetDedup_DisabledKeepsLegacyPath(t *testing.T) { + t.Parallel() + st := store.NewMVCCStore() + coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing + srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{} /* gate left false */} + + conn := &recordingConn{} + cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}} + srv.set(conn, cmd) + + // Legacy path: no probe. + require.Equal(t, 0, coord.probeNoOps, "gate off — runTransactionWithDedup must not be used") +} diff --git a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md index e38fd89b..aeb773ce 100644 --- a/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md +++ b/docs/design/2026_05_21_proposed_txn_secondary_idempotency.md @@ -514,11 +514,24 @@ preserves availability and adds correctness. what changes is that an expired outer ctx is now respected promptly instead of being ignored until the fresh budget elapses. -- **Standalone write commands (SET/INCR/HSET/...) — still open.** The EXEC - path covers MULTI bodies; standalone single-command dispatch goes through - per-handler paths (`applySet`, `applyIncr`, etc.) and needs the same - `reusable` capture + `dispatchXReuse` shape per command. Scope is - per-command but each is small (~50 LOC). Tracked as PR-B follow-up. +- **Standalone SET — LANDED.** The standalone `r.set` handler now routes + through `runTransactionWithDedup` as a single-mop EXEC body when the gate + is on (the dedup machinery's "free" extension to any command whose + `applyXxx` already exists on `txnContext`). The fast-path optimization + (`trySetFastPath`) is intentionally bypassed under the gate — dedup is + opt-in, and a non-dedup'd fast path under a dedup-on cluster would split + the idempotency contract. Tested by `TestStandaloneSetDedup_*` in + `adapter/redis_set_dedup_test.go`. +- **Standalone INCR / HSET — still open.** Both lack a `txnContext.applyXxx` + implementation, so the "route through single-mop EXEC" pattern that + worked for SET cannot apply as-is. Bringing them into the dedup'd path + requires implementing `applyIncr` / `applyHSet` first (each ~30–50 LOC + for the txn-state-aware read-compute-write shape), then the standalone + handler routing is a one-liner via `runTransactionWithDedup`. Tracked + as separate follow-up PRs; until then, INCR and HSET keep today's + buggy-under-churn behaviour, which is the design doc's stated default + ("everything else keeps today's behaviour until its hook is added" — + Open questions). ### M4 — Validation