Skip to content

Commit 5ba2ef9

Browse files
committed
test(redis): retry initial writes on 'not leader' to absorb CI leader churn
TestRedis_LuaRPopLPushBullMQLikeLists flaked on CI when the newly-elected raft leader briefly stepped down ("stepped down to follower since quorum is not active") between waitForRaftReadiness returning and the first RPUSH, causing the write to race a re-election and fail with "etcd raft engine is not leader". Add small rpushEventually / lpushEventually helpers (backed by a shared doEventually that retries only on transient "not leader" errors) and use them for the first two writes in the test. waitForRaftReadiness is left untouched — other tests rely on its exact semantics. Failing CI: https://github.com/bootjp/elastickv/actions/runs/24842581631/job/72720509753
1 parent 306eb0b commit 5ba2ef9

2 files changed

Lines changed: 61 additions & 2 deletions

File tree

adapter/redis_lua_compat_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,12 @@ func TestRedis_LuaRPopLPushBullMQLikeLists(t *testing.T) {
121121
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
122122
defer func() { _ = rdb.Close() }()
123123

124-
require.NoError(t, rdb.RPush(ctx, "bull:test:wait", "job-1", "job-2", "job-3").Err())
125-
require.NoError(t, rdb.LPush(ctx, "bull:test:active", "job-0").Err())
124+
// Raft leadership can briefly churn right after createNode returns — the
125+
// freshly-elected leader can step down if the initial heartbeats miss
126+
// quorum on a slow CI runner (see waitForRaftReadiness). Retry the first
127+
// writes so they ride out that re-election instead of failing the test.
128+
rpushEventually(t, ctx, rdb, "bull:test:wait", "job-1", "job-2", "job-3")
129+
lpushEventually(t, ctx, rdb, "bull:test:active", "job-0")
126130

127131
result, err := rdb.Eval(ctx, `
128132
local moved = redis.call("RPOPLPUSH", KEYS[1], KEYS[2])

adapter/test_util.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"log"
66
"net"
77
"strconv"
8+
"strings"
89
"sync"
910
"sync/atomic"
1011
"testing"
@@ -18,6 +19,7 @@ import (
1819
pb "github.com/bootjp/elastickv/proto"
1920
"github.com/bootjp/elastickv/store"
2021
"github.com/cockroachdb/errors"
22+
"github.com/redis/go-redis/v9"
2123
"github.com/stretchr/testify/assert"
2224
"github.com/stretchr/testify/require"
2325
"golang.org/x/sys/unix"
@@ -30,6 +32,12 @@ const (
3032
testEngineElectionTick = 10
3133
testEngineMaxSizePerMsg = 1 << 20
3234
testEngineMaxInflight = 256
35+
36+
// leaderChurnRetryTimeout bounds how long doEventually keeps retrying a
37+
// write that fails with "not leader" right after createNode returns.
38+
leaderChurnRetryTimeout = 5 * time.Second
39+
// leaderChurnRetryInterval is the poll interval between retries.
40+
leaderChurnRetryInterval = 50 * time.Millisecond
3341
)
3442

3543
func newTestFactory() raftengine.Factory {
@@ -442,3 +450,50 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
442450

443451
return nodes, grpcAdders, redisAdders, peers
444452
}
453+
454+
// isTransientNotLeaderErr reports whether err is a transient "not leader"
455+
// error that can happen right after createNode returns if the newly elected
456+
// leader briefly steps down due to a missed heartbeat quorum (common on slow
457+
// CI runners under -race). Callers should retry the write in that case.
458+
func isTransientNotLeaderErr(err error) bool {
459+
if err == nil {
460+
return false
461+
}
462+
return strings.Contains(err.Error(), "not leader")
463+
}
464+
465+
// doEventually retries do() while it returns a transient "not leader" error,
466+
// giving the cluster a few seconds to re-settle leadership after startup.
467+
// Non-"not leader" errors fail the test immediately.
468+
func doEventually(t *testing.T, do func() error) {
469+
t.Helper()
470+
require.Eventually(t, func() bool {
471+
err := do()
472+
if err == nil {
473+
return true
474+
}
475+
if isTransientNotLeaderErr(err) {
476+
return false
477+
}
478+
require.NoError(t, err)
479+
return true
480+
}, leaderChurnRetryTimeout, leaderChurnRetryInterval)
481+
}
482+
483+
// rpushEventually wraps RPUSH in doEventually so transient leader churn
484+
// immediately after createNode doesn't fail the test.
485+
func rpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key string, vals ...any) {
486+
t.Helper()
487+
doEventually(t, func() error {
488+
return rdb.RPush(ctx, key, vals...).Err()
489+
})
490+
}
491+
492+
// lpushEventually wraps LPUSH in doEventually so transient leader churn
493+
// immediately after createNode doesn't fail the test.
494+
func lpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key string, vals ...any) {
495+
t.Helper()
496+
doEventually(t, func() error {
497+
return rdb.LPush(ctx, key, vals...).Err()
498+
})
499+
}

0 commit comments

Comments
 (0)