Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,44 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) {
if r.proxyToLeader(conn, cmd, cmd.Args[1]) {
return
}
// Option-2 dedup for standalone SET: route through runTransactionWithDedup
// as a single-mop EXEC body when the gate is on. SET inside MULTI/EXEC
// already has full dedup coverage via applySet (§M3 in the design doc),
// so we just reuse that machinery instead of building a per-handler
// reusableSetTxn + dispatchSetReuse shape. The fast-path optimization is
// intentionally bypassed under the gate — dedup is opt-in, and a
// non-dedup'd fast path under a dedup-on cluster would split the
// idempotency contract.
//
// Result translation: runTransactionWithDedup returns []redisResult; for
// SET there is exactly one element with the same redisResult shape as
// the standalone reply (resultString OK / resultNil for NX/XX miss /
// resultBulk for GET).
if r.onePhaseTxnDedup {
// Call runTransactionWithDedup directly instead of going through
// runTransaction. runTransaction re-checks the same
// r.onePhaseTxnDedup gate and routes here anyway; the indirection
// would make the call chain misleading ("dispatches via
// runTransactionWithDedup" being true only by indirection).
// Direct call makes the intent explicit and removes the double
// gate check.
results, err := r.runTransactionWithDedup([]redcon.Command{cmd})
if err != nil {
writeRedisError(conn, err)
return
}
writeRedisStandaloneResult(conn, results)
return
}
r.setLegacy(conn, cmd)
}

// setLegacy is the pre-dedup standalone SET path. Extracted from set() so
// the gate-on routing through runTransactionWithDedup keeps set() under the
// cyclop budget (the gate-off branch's parse + fast-path + executeSet
// shape carries its own decision points). Behaviour is byte-identical to
// the pre-PR set() body.
func (r *RedisServer) setLegacy(conn redcon.Conn, cmd redcon.Command) {
opts, err := parseRedisSetOptions(cmd.Args[3:], time.Now())
if err != nil {
writeRedisError(conn, err)
Expand Down Expand Up @@ -1151,6 +1189,51 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) {
conn.WriteString("OK")
}

// writeRedisStandaloneResult translates a single-element results array from
// runTransactionWithDedup into a redcon response, mirroring the shape a
// standalone handler would write directly. Used by SET / future standalone
// commands routed through the dedup loop. Differs from writeResults in NOT
// wrapping the response in conn.WriteArray — the standalone protocol returns
// the bare element.
//
// Empty or multi-element input is degenerate for standalone callers; we
// default to nil so a misuse never leaks a malformed reply to the wire.
//
// Array-element constraint: the resultArray arm writes each element via
// WriteBulkString, which is correct for flat arrays of strings (the
// shape applySet / future SET-pattern callers produce). It does NOT
// recurse into nested arrays. A future caller whose applyXxx emits
// resultArray with non-string elements (e.g. HGETALL-like nested
// responses) must either pre-flatten its result or extend this switch
// with a recursive arm; reusing it as-is would silently mangle the
// wire reply.
func writeRedisStandaloneResult(conn redcon.Conn, results []redisResult) {
if len(results) != 1 {
conn.WriteNull()
return
}
res := results[0]
switch res.typ {
case resultNil:
conn.WriteNull()
case resultError:
writeRedisError(conn, res.err)
case resultBulk:
conn.WriteBulk(res.bulk)
case resultString:
conn.WriteString(res.str)
case resultArray:
conn.WriteArray(len(res.arr))
for _, s := range res.arr {
conn.WriteBulkString(s)
}
case resultInt:
conn.WriteInt64(res.integer)
default:
conn.WriteNull()
}
}

func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
key := cmd.Args[1]
if r.proxyToLeader(conn, cmd, key) {
Expand Down
10 changes: 6 additions & 4 deletions adapter/redis_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ func (c *retryOnceCoordinator) LeaseReadForKey(ctx context.Context, _ []byte) (u
}

type recordingConn struct {
ctx any
err string
bulk []byte
int int64
ctx any
err string
bulk []byte
int int64
wroteNull bool
}

func (c *recordingConn) RemoteAddr() string { return "" }
Expand Down Expand Up @@ -129,6 +130,7 @@ func (c *recordingConn) WriteUint64(num uint64) {
func (c *recordingConn) WriteArray(count int) {}
func (c *recordingConn) WriteNull() {
c.bulk = nil
c.wroteNull = true
}
func (c *recordingConn) WriteRaw(data []byte) {
c.bulk = append([]byte(nil), data...)
Expand Down
138 changes: 138 additions & 0 deletions adapter/redis_set_dedup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package adapter

import (
"context"
"testing"

"github.com/bootjp/elastickv/store"
"github.com/stretchr/testify/require"
"github.com/tidwall/redcon"
)

// recordingConn (defined in redis_retry_test.go) captures handler writes via
// .bulk, .err, .int fields. WriteString and WriteBulk both populate .bulk —
// in this test "OK" lands as bulk=[]byte("OK"), .err stays empty for the
// success path.

// TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK pins the standalone SET
// dedup path: when the gate is on, SET routes through runTransactionWithDedup
// as a single-mop EXEC body. Attempt 1 lands then errors → reuse probes →
// FSM no-ops → client gets "OK" (the cached result) without re-applying.
//
// Pins that the gate-on path uses the same dedup machinery as MULTI/EXEC.
// Without this routing, a standalone SET under leadership churn would not
// benefit from option-2 dedup (the design's "still open" item before this
// PR).
func TestStandaloneSetDedup_LandedPriorAttempt_ReturnsOK(t *testing.T) {
t.Parallel()
ctx := context.Background()
st := store.NewMVCCStore()
coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors
srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true}

conn := &recordingConn{}
cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}
srv.set(conn, cmd)

require.Equal(t, "OK", string(conn.bulk), "standalone SET must reply with the cached OK from attempt 1")
require.Empty(t, conn.err, "no error must escape; dedup hid the ambiguous attempt-1 failure")
require.Equal(t, 2, coord.dispatches, "one ambiguous-land attempt + one reuse")
require.Equal(t, 1, coord.probeNoOps, "reuse must dedup via the exact-ts probe")

rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st))
require.NoError(t, err)
val, _, err := decodeRedisStr(rawVal)
require.NoError(t, err)
require.Equal(t, []byte("v1"), val, "only one apply landed — the value matches attempt 1")
}

// TestStandaloneSetDedup_NXMissReturnsNil pins resultNil routing through
// writeRedisStandaloneResult on the dedup path. SET with NX against an
// existing key returns nil (NX fails because the key exists); the dedup
// loop reuses the cached resultNil and the recording conn observes
// wroteNull. Without correct resultNil arming the client would observe an
// empty bulk reply, breaking NX semantics under dedup.
func TestStandaloneSetDedup_NXMissReturnsNil(t *testing.T) {
t.Parallel()
ctx := context.Background()
st := store.NewMVCCStore()

// Seed the key so the NX condition fails (key already exists).
require.NoError(t, st.PutAt(ctx, redisStrKey([]byte("k")), encodeRedisStr([]byte("seed"), nil), 5, 0))

coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors
srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true}

conn := &recordingConn{}
// SET k v1 NX -- attempt 1 records resultNil because NX miss.
cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1"), []byte("NX")}}
srv.set(conn, cmd)

// Airtight assertion: WriteNull was actually called (not "nothing was
// written, leaving the zero-value nil"). Without the wroteNull witness
// flag, a wrong branch that wrote nothing at all would also pass
// `conn.bulk == nil`.
require.True(t, conn.wroteNull, "NX miss must call WriteNull, not silently skip the write")
require.Nil(t, conn.bulk, "WriteNull leaves conn.bulk nil; a stray WriteString/WriteBulk would have populated it")
require.Empty(t, conn.err, "no error must escape; NX miss is a normal response, not an error")

// Stored value is still the seed; nothing should have overwritten it.
rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st))
require.NoError(t, err)
val, _, err := decodeRedisStr(rawVal)
require.NoError(t, err)
require.Equal(t, []byte("seed"), val, "NX miss must not overwrite the existing value")
}

// TestStandaloneSetDedup_GETOptionReturnsOldBulk pins resultBulk routing
// through writeRedisStandaloneResult on the dedup path. SET ... GET on an
// existing key returns the prior value as a bulk reply; the dedup loop
// reuses the cached resultBulk and the recording conn observes the bytes.
// Without correct resultBulk arming the client would observe an empty or
// nil reply, breaking SET GET semantics under dedup.
func TestStandaloneSetDedup_GETOptionReturnsOldBulk(t *testing.T) {
t.Parallel()
ctx := context.Background()
st := store.NewMVCCStore()

// Seed the prior value -- SET GET returns this as a bulk reply.
require.NoError(t, st.PutAt(ctx, redisStrKey([]byte("k")), encodeRedisStr([]byte("prior"), nil), 5, 0))

coord := newDedupTestCoordinator(st, 1, true) // attempt 1 lands then errors
srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{}, onePhaseTxnDedup: true}

conn := &recordingConn{}
// SET k v1 GET -- attempt 1 records resultBulk("prior").
cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1"), []byte("GET")}}
srv.set(conn, cmd)

// recordingConn.WriteBulk copies into .bulk; the prior value must round-trip
// from the cached attempt-1 result through writeRedisStandaloneResult.
require.Equal(t, "prior", string(conn.bulk), "GET option must reply with the cached prior value, not a re-read")
require.Empty(t, conn.err)

// New value committed via the landed attempt-1 apply.
rawVal, err := st.GetAt(ctx, redisStrKey([]byte("k")), snapshotTS(coord.Clock(), st))
require.NoError(t, err)
val, _, err := decodeRedisStr(rawVal)
require.NoError(t, err)
require.Equal(t, []byte("v1"), val, "SET GET still applies the new value; dedup just preserves the GET result")
}

// TestStandaloneSetDedup_DisabledKeepsLegacyPath verifies the gate is honored
// for the standalone SET path too: when onePhaseTxnDedup is off, r.set takes
// its legacy fast-path / executeSet shape (no probe, no per-attempt PrevCommitTS).
// Pins that the new routing is strictly opt-in.
func TestStandaloneSetDedup_DisabledKeepsLegacyPath(t *testing.T) {
t.Parallel()
st := store.NewMVCCStore()
coord := newDedupTestCoordinator(st, 1, false) // attempt 1 errors without landing
srv := &RedisServer{store: st, coordinator: coord, scriptCache: map[string]string{} /* gate left false */}

conn := &recordingConn{}
cmd := redcon.Command{Args: [][]byte{[]byte(cmdSet), []byte("k"), []byte("v1")}}
srv.set(conn, cmd)

// Legacy path: no probe.
require.Equal(t, 0, coord.probeNoOps, "gate off — runTransactionWithDedup must not be used")
}
23 changes: 18 additions & 5 deletions docs/design/2026_05_21_proposed_txn_secondary_idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,24 @@ preserves availability and adds correctness.
what changes is that an expired outer ctx is now respected
promptly instead of being ignored until the fresh budget
elapses.
- **Standalone write commands (SET/INCR/HSET/...) — still open.** The EXEC
path covers MULTI bodies; standalone single-command dispatch goes through
per-handler paths (`applySet`, `applyIncr`, etc.) and needs the same
`reusable<X>` capture + `dispatchXReuse` shape per command. Scope is
per-command but each is small (~50 LOC). Tracked as PR-B follow-up.
- **Standalone SET — LANDED.** The standalone `r.set` handler now routes
through `runTransactionWithDedup` as a single-mop EXEC body when the gate
is on (the dedup machinery's "free" extension to any command whose
`applyXxx` already exists on `txnContext`). The fast-path optimization
(`trySetFastPath`) is intentionally bypassed under the gate — dedup is
opt-in, and a non-dedup'd fast path under a dedup-on cluster would split
the idempotency contract. Tested by `TestStandaloneSetDedup_*` in
`adapter/redis_set_dedup_test.go`.
- **Standalone INCR / HSET — still open.** Both lack a `txnContext.applyXxx`
implementation, so the "route through single-mop EXEC" pattern that
worked for SET cannot apply as-is. Bringing them into the dedup'd path
requires implementing `applyIncr` / `applyHSet` first (each ~30–50 LOC
for the txn-state-aware read-compute-write shape), then the standalone
handler routing is a one-liner via `runTransactionWithDedup`. Tracked
as separate follow-up PRs; until then, INCR and HSET keep today's
buggy-under-churn behaviour, which is the design doc's stated default
("everything else keeps today's behaviour until its hook is added" —
Open questions).

### M4 — Validation

Expand Down
Loading