Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 95 additions & 24 deletions adapter/redis_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,103 @@ import (
"math"
"strconv"
"testing"
"time"

"github.com/bootjp/elastickv/store"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// waitForListState polls until this node has applied raft commits such that
// the list stored under key resolves to expectedLen items whose values match
// expectedValues (when non-nil) via both resolveListMeta and a direct scan at
// the node-local readTS.
//
// Why this is necessary: the go-redis client may be connected to a follower
// (or a leader that momentarily lags in applying its own commit). The adapter's
// client-facing read path (LRANGE) uses LeaseRead/LinearizableRead so it
// blocks until the local apply catches up, which is why LRANGE observes the
// new state. However readTS() returns store.LastCommitTS() directly, and a
// direct ScanAt(readTS) bypasses that wait. When the client and the direct
// scan target the same node, there is still a window between the client
// receiving the EXEC reply (driven by the wait-apply inside read handlers) and
// the raft-applied commit updating LastCommitTS on this node's store — in
// particular when this node was not the proposer and applies strictly after
// the response was delivered via a different code path. Polling resolves the
// gap deterministically without a timing-based sleep.
func waitForListState(t *testing.T, n Node, key []byte, expectedLen int, expectedValues []string) {
t.Helper()
if expectedValues != nil && len(expectedValues) != expectedLen {
t.Fatalf("waitForListState: expectedValues length %d does not match expectedLen %d",
len(expectedValues), expectedLen)
}
require.Eventually(t, func() bool {
return listStateMatches(n, key, expectedLen, expectedValues)
}, 5*time.Second, 250*time.Millisecond,
"node did not catch up to expected list state for key %q (len=%d)", string(key), expectedLen)
}

// listStateMatches reports whether this node's applied state for key resolves
// to expectedLen items whose values match expectedValues (when non-nil). It is
// the single-shot check driving waitForListState's poll loop.
func listStateMatches(n Node, key []byte, expectedLen int, expectedValues []string) bool {
ctx := context.Background()
readTS := n.redisServer.readTS()
if !listMetaMatches(ctx, n, key, expectedLen, readTS) {
return false
}
kvs, ok := scanListItems(ctx, n, key, expectedLen, readTS)
if !ok {
return false
}
return listValuesMatch(kvs, expectedValues)
}

// listMetaMatches checks the list meta key. For expectedLen == 0 the Redis
// semantics of "empty list == absent meta" are honored; for non-empty
// expectations both existence and length must match.
func listMetaMatches(ctx context.Context, n Node, key []byte, expectedLen int, readTS uint64) bool {
meta, exists, err := n.redisServer.resolveListMeta(ctx, key, readTS)
if err != nil {
return false
}
if expectedLen == 0 {
return !exists || meta.Len == 0
}
return exists && meta.Len == int64(expectedLen)
}

// scanListItems reads the list items at readTS and returns them when the
// observed length matches expectedLen.
func scanListItems(ctx context.Context, n Node, key []byte, expectedLen int, readTS uint64) ([]*store.KVPair, bool) {
kvs, err := n.redisServer.store.ScanAt(
ctx,
store.ListItemKey(key, math.MinInt64),
store.ListItemKey(key, math.MaxInt64),
expectedLen+1,
readTS,
)
if err != nil || len(kvs) != expectedLen {
return nil, false
}
return kvs, true
}

// listValuesMatch returns true when expectedValues is nil (no value check
// requested) or every scanned value equals its expected counterpart.
func listValuesMatch(kvs []*store.KVPair, expectedValues []string) bool {
if expectedValues == nil {
return true
}
for i, kvp := range kvs {
if string(kvp.Value) != expectedValues[i] {
return false
}
}
return true
}
Comment on lines +93 to +103
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The listValuesMatch function is susceptible to a panic if len(kvs) is greater than len(expectedValues). While scanListItems ensures that len(kvs) == expectedLen, there is no validation that the caller-provided expectedLen matches the actual length of the expectedValues slice. Adding a length check at the beginning of the function makes the helper more robust and prevents confusing panics during test failures.

func listValuesMatch(kvs []*store.KVPair, expectedValues []string) bool {
	if expectedValues == nil {
		return true
	}
	if len(kvs) != len(expectedValues) {
		return false
	}
	for i, kvp := range kvs {
		if string(kvp.Value) != expectedValues[i] {
			return false
		}
	}
	return true
}


func TestRedis_MultiExecAtomic(t *testing.T) {
t.Parallel()
nodes, _, _ := createNode(t, 3)
Expand Down Expand Up @@ -194,31 +284,12 @@ func TestRedis_MultiExec_DelThenRPushRecreatesList(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []any{"new1", "new2"}, rangeRes)

readTS := nodes[1].redisServer.readTS()

// With the Delta pattern, RPUSH inside a MULTI/EXEC emits a delta key
// rather than updating the base metadata key directly. Verify the
// effective metadata via resolveListMeta which aggregates deltas.
resolvedMeta, resolvedExists, err := nodes[1].redisServer.resolveListMeta(ctx, []byte("list-del-rpush"), readTS)
require.NoError(t, err)
require.True(t, resolvedExists)
require.Equal(t, int64(2), resolvedMeta.Len)

kvs, err := nodes[1].redisServer.store.ScanAt(
ctx,
store.ListItemKey([]byte("list-del-rpush"), math.MinInt64),
store.ListItemKey([]byte("list-del-rpush"), math.MaxInt64),
10,
readTS,
)
require.NoError(t, err)
require.Len(t, kvs, 2)

got := make([]string, 0, len(kvs))
for _, kvp := range kvs {
got = append(got, string(kvp.Value))
}
require.Equal(t, []string{"new1", "new2"}, got)
// rather than updating the base metadata key directly. Additionally, the
// node the client is connected to may apply the EXEC commit slightly
// after the client receives its response, so poll until this node's
// store has caught up before asserting on the raw scan.
waitForListState(t, nodes[1], []byte("list-del-rpush"), 2, []string{"new1", "new2"})
}

func TestRedis_MultiExec_SetGetAfterDeleteReturnsNilOldValue(t *testing.T) {
Expand Down
Loading