Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions adapter/redis_lua_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,15 @@ func TestRedis_LuaRPopLPushBullMQLikeLists(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
defer func() { _ = rdb.Close() }()

// Raft leadership can briefly churn right after createNode returns — the
// freshly-elected leader can step down if the initial heartbeats miss
// quorum on a slow CI runner (see waitForRaftReadiness). Retry the first
// writes so they ride out that re-election instead of failing the test.
rpushEventually(t, ctx, rdb, "bull:test:wait", "job-1", "job-2", "job-3")
lpushEventually(t, ctx, rdb, "bull:test:active", "job-0")
// Direct first writes: waitForRaftReadiness (called by createNode)
// now drives one no-op Raft entry through full quorum before
// returning, so any subsequent write is guaranteed to land on a
// leader that has already held quorum through one apply. The
// post-readiness leader-churn window the prior rpushEventually /
// lpushEventually wrappers absorbed is closed at the readiness
// layer instead — see PR #898.
require.NoError(t, rdb.RPush(ctx, "bull:test:wait", "job-1", "job-2", "job-3").Err())
require.NoError(t, rdb.LPush(ctx, "bull:test:active", "job-0").Err())

result, err := rdb.Eval(ctx, `
local moved = redis.call("RPOPLPUSH", KEYS[1], KEYS[2])
Expand Down
49 changes: 13 additions & 36 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
Expand All @@ -46,10 +45,12 @@ const (
testEngineMaxSizePerMsg = 1 << 20
testEngineMaxInflight = 256

// leaderChurnRetryTimeout bounds how long doEventually keeps retrying a
// write that fails with a transient leader-unavailable error. It covers
// both startup churn right after createNode returns and mid-test churn
// when the leader briefly steps down under CI load.
// leaderChurnRetryTimeout bounds how long retryNotLeader keeps
// retrying a write that fails with a transient leader-unavailable
// error. Used by tests that issue writes inside long mid-test
// loops where leadership can briefly step down under CI load.
// Startup churn (createNode → first write) is closed at the
// readiness layer by waitForWriteableLeader (PR #898), not here.
leaderChurnRetryTimeout = 5 * time.Second
// leaderChurnRetryInterval is the poll interval between retries.
leaderChurnRetryInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -750,23 +751,17 @@ func eventuallyExpired(t *testing.T, ttl time.Duration, condition func() bool, m
require.Eventually(t, condition, ttlExpiryHeadroom, ttlExpiryPoll, msg)
}

// doEventually retries do() while it returns a transient "not leader" error,
// giving the cluster a few seconds to re-settle leadership after startup.
// Non-"not leader" errors fail the test immediately.
//
// MUST be called from the main test goroutine only. The final require.NoError
// calls t.FailNow() on failure; invoking FailNow from a worker goroutine is
// a testing.T contract violation. For parallel-worker use, call
// retryNotLeader directly and report errors back via a channel.
func doEventually(t *testing.T, do func() error) {
t.Helper()
require.NoError(t, retryNotLeader(context.Background(), do))
}

// retryNotLeader calls do() repeatedly while it returns a transient
// leader-unavailable error, capped at leaderChurnRetryTimeout. It returns
// the final error (or nil on success) without touching testing.T, making
// it safe to call from worker goroutines in parallel tests.
//
// Use cases: mid-test leader churn under CI load (e.g. grpc_test.go's
// 9999-iteration consistency loops). The startup window — leader-churn
// between createNode returning and the first write — is closed at the
// readiness layer instead (waitForWriteableLeader, PR #898), so first-
// write callers should NOT wrap in retryNotLeader; direct calls suffice
// and are clearer.
func retryNotLeader(ctx context.Context, do func() error) error {
deadline := time.Now().Add(leaderChurnRetryTimeout)
var lastErr error
Expand All @@ -785,21 +780,3 @@ func retryNotLeader(ctx context.Context, do func() error) error {
}
}
}

// rpushEventually wraps RPUSH in doEventually so transient leader churn
// immediately after createNode doesn't fail the test.
func rpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key string, vals ...any) {
t.Helper()
doEventually(t, func() error {
return rdb.RPush(ctx, key, vals...).Err()
})
}

// lpushEventually wraps LPUSH in doEventually so transient leader churn
// immediately after createNode doesn't fail the test.
func lpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key string, vals ...any) {
t.Helper()
doEventually(t, func() error {
return rdb.LPush(ctx, key, vals...).Err()
})
}
Loading