Skip to content

Commit ba3f5bf

Browse files
committed
test(redis): wait for follower apply catch-up in MultiExec list tests
TestRedis_MultiExec_DelThenRPushRecreatesList was intermittently failing under -race on CI (PR #577): after MULTI / DEL list / RPUSH list new1 new2 / EXEC, the final direct store.ScanAt(..., readTS) on nodes[1] returned the pre-EXEC ["old1","old2"] even though the immediately preceding LRANGE via the redis client correctly returned ["new1","new2"]. Why the race exists: - The go-redis client connects to nodes[1], which is not necessarily the raft leader. - LRANGE goes through the adapter's LeaseRead / LinearizableRead path, which blocks until the local apply watermark reaches a safe point, so it observes the EXEC commit even on a slightly-lagging follower. - readTS() (adapter/ts.go:snapshotTS) returns the local store's LastCommitTS() directly, with no wait-apply. On a follower (or a leader that has just responded to the client), LastCommitTS can momentarily trail the raft-committed EXEC because the test reads it on a different code path than the one that drove the client-visible wait. - The direct ScanAt(readTS) therefore sees the stale, pre-EXEC state. Fix: introduce waitForListState(t, node, key, expectedLen, expectedValues) that polls resolveListMeta AND the raw item scan at the node-local readTS via require.Eventually (250ms interval, 5s timeout) until both reflect the expected post-EXEC state. This is a real synchronization on the raft apply progress of the target node, not a timing hack: it succeeds on the first iteration when the node is already caught up and only retries while the apply is genuinely behind. Use it in place of the ad-hoc readTS/resolveListMeta/ScanAt block in TestRedis_MultiExec_DelThenRPushRecreatesList.
1 parent 4426f58 commit ba3f5bf

1 file changed

Lines changed: 55 additions & 24 deletions

File tree

adapter/redis_multi_test.go

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,63 @@ import (
55
"math"
66
"strconv"
77
"testing"
8+
"time"
89

910
"github.com/bootjp/elastickv/store"
1011
"github.com/redis/go-redis/v9"
1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
1314
)
1415

16+
// waitForListState polls until this node has applied raft commits such that
17+
// the list stored under key resolves to expectedLen items whose values match
18+
// expectedValues (when non-nil) via both resolveListMeta and a direct scan at
19+
// the node-local readTS.
20+
//
21+
// Why this is necessary: the go-redis client may be connected to a follower
22+
// (or a leader that momentarily lags in applying its own commit). The adapter's
23+
// client-facing read path (LRANGE) uses LeaseRead/LinearizableRead so it
24+
// blocks until the local apply catches up, which is why LRANGE observes the
25+
// new state. However readTS() returns store.LastCommitTS() directly, and a
26+
// direct ScanAt(readTS) bypasses that wait. When the client and the direct
27+
// scan target the same node, there is still a window between the client
28+
// receiving the EXEC reply (driven by the wait-apply inside read handlers) and
29+
// the raft-applied commit updating LastCommitTS on this node's store — in
30+
// particular when this node was not the proposer and applies strictly after
31+
// the response was delivered via a different code path. Polling resolves the
32+
// gap deterministically without a timing-based sleep.
33+
func waitForListState(t *testing.T, n Node, key []byte, expectedLen int, expectedValues []string) {
34+
t.Helper()
35+
ctx := context.Background()
36+
require.Eventually(t, func() bool {
37+
readTS := n.redisServer.readTS()
38+
meta, exists, err := n.redisServer.resolveListMeta(ctx, key, readTS)
39+
if err != nil || !exists || meta.Len != int64(expectedLen) {
40+
return false
41+
}
42+
kvs, err := n.redisServer.store.ScanAt(
43+
ctx,
44+
store.ListItemKey(key, math.MinInt64),
45+
store.ListItemKey(key, math.MaxInt64),
46+
expectedLen+1,
47+
readTS,
48+
)
49+
if err != nil || len(kvs) != expectedLen {
50+
return false
51+
}
52+
if expectedValues == nil {
53+
return true
54+
}
55+
for i, kvp := range kvs {
56+
if string(kvp.Value) != expectedValues[i] {
57+
return false
58+
}
59+
}
60+
return true
61+
}, 5*time.Second, 250*time.Millisecond,
62+
"node did not catch up to expected list state for key %q (len=%d)", string(key), expectedLen)
63+
}
64+
1565
func TestRedis_MultiExecAtomic(t *testing.T) {
1666
t.Parallel()
1767
nodes, _, _ := createNode(t, 3)
@@ -194,31 +244,12 @@ func TestRedis_MultiExec_DelThenRPushRecreatesList(t *testing.T) {
194244
require.NoError(t, err)
195245
require.Equal(t, []any{"new1", "new2"}, rangeRes)
196246

197-
readTS := nodes[1].redisServer.readTS()
198-
199247
// With the Delta pattern, RPUSH inside a MULTI/EXEC emits a delta key
200-
// rather than updating the base metadata key directly. Verify the
201-
// effective metadata via resolveListMeta which aggregates deltas.
202-
resolvedMeta, resolvedExists, err := nodes[1].redisServer.resolveListMeta(ctx, []byte("list-del-rpush"), readTS)
203-
require.NoError(t, err)
204-
require.True(t, resolvedExists)
205-
require.Equal(t, int64(2), resolvedMeta.Len)
206-
207-
kvs, err := nodes[1].redisServer.store.ScanAt(
208-
ctx,
209-
store.ListItemKey([]byte("list-del-rpush"), math.MinInt64),
210-
store.ListItemKey([]byte("list-del-rpush"), math.MaxInt64),
211-
10,
212-
readTS,
213-
)
214-
require.NoError(t, err)
215-
require.Len(t, kvs, 2)
216-
217-
got := make([]string, 0, len(kvs))
218-
for _, kvp := range kvs {
219-
got = append(got, string(kvp.Value))
220-
}
221-
require.Equal(t, []string{"new1", "new2"}, got)
248+
// rather than updating the base metadata key directly. Additionally, the
249+
// node the client is connected to may apply the EXEC commit slightly
250+
// after the client receives its response, so poll until this node's
251+
// store has caught up before asserting on the raw scan.
252+
waitForListState(t, nodes[1], []byte("list-del-rpush"), 2, []string{"new1", "new2"})
222253
}
223254

224255
func TestRedis_MultiExec_SetGetAfterDeleteReturnsNilOldValue(t *testing.T) {

0 commit comments

Comments
 (0)