Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions adapter/distribution_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,3 +743,7 @@ func (s *distributionCoordinatorStub) RaftLeaderForKey(_ []byte) raft.ServerAddr
func (s *distributionCoordinatorStub) Clock() *kv.HLC {
return nil
}

func (s *distributionCoordinatorStub) LinearizableRead(_ context.Context) (uint64, error) {
return 0, nil
}
4 changes: 4 additions & 0 deletions adapter/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,3 +1855,7 @@ func (w *testCoordinatorWrapper) RaftLeaderForKey(key []byte) raft.ServerAddress
func (w *testCoordinatorWrapper) Clock() *kv.HLC {
return w.inner.Clock()
}

func (w *testCoordinatorWrapper) LinearizableRead(ctx context.Context) (uint64, error) {
return w.inner.LinearizableRead(ctx)
}
56 changes: 42 additions & 14 deletions adapter/redis_compat_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,26 +377,37 @@ func (r *RedisServer) dispatchElems(ctx context.Context, isTxn bool, startTS uin
// the legacy path does at most two extra reads (TTL index, then bare key).
// Expiration is checked locally from the TTL we just decoded.
func (r *RedisServer) readRedisStringAt(key []byte, readTS uint64) ([]byte, *time.Time, error) {
raw, err := r.leaderAwareGetAt(redisStrKey(key), readTS)
return r.readRedisStringWith(key, readTS, r.leaderAwareGetAt)
}

// readRedisStringAtSnapshot reads a string without re-verifying leadership on
// every sub-call. The caller must have already called coordinator.VerifyLeader()
// once before invoking this (e.g. at Lua script startTS acquisition time).
Comment on lines +383 to +385
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Update the snapshot-read precondition comments.

The Lua path now gates snapshot reads with LinearizableRead(ctx), not coordinator.VerifyLeader(). Keeping the old wording makes the safety contract ambiguous for future callers.

📝 Proposed comment update
-// every sub-call. The caller must have already called coordinator.VerifyLeader()
-// once before invoking this (e.g. at Lua script startTS acquisition time).
+// every sub-call. The caller must have already completed a LinearizableRead
+// before invoking this (e.g. at Lua script startTS acquisition time).
 func (r *RedisServer) readRedisStringAtSnapshot(key []byte, readTS uint64) ([]byte, *time.Time, error) {
 	return r.readRedisStringWith(key, readTS, r.snapshotGetAt)
 }
-// The caller must have already called coordinator.VerifyLeader() once (e.g. at
-// Lua script startTS acquisition time) before using this method.
+// The caller must have already completed a LinearizableRead (e.g. at Lua script
+// startTS acquisition time) before using this method.
 func (r *RedisServer) snapshotGetAt(key []byte, readTS uint64) ([]byte, error) {
 	return r.doGetAt(key, readTS, false)
 }

Also applies to: 467-469

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/redis_compat_helpers.go` around lines 383 - 385, Update the
precondition comments to reflect that snapshot reads are gated by
LinearizableRead(ctx) instead of coordinator.VerifyLeader(); specifically change
the comment above readRedisStringAtSnapshot to say the caller must have already
performed a LinearizableRead(ctx) (e.g., at Lua script startTS acquisition) and
make the same wording change in the other similar comment block around the
second snapshot-read helper (the block at the other location referenced in the
review). Ensure the revised comments mention LinearizableRead(ctx) as the
required precondition and keep the example context (Lua script startTS
acquisition).

func (r *RedisServer) readRedisStringAtSnapshot(key []byte, readTS uint64) ([]byte, *time.Time, error) {
return r.readRedisStringWith(key, readTS, r.snapshotGetAt)
}

func (r *RedisServer) readRedisStringWith(key []byte, readTS uint64, get rawGetFn) ([]byte, *time.Time, error) {
raw, err := get(redisStrKey(key), readTS)
if err == nil {
return r.decodePrefixedString(key, raw, readTS)
return r.decodePrefixedStringWith(key, raw, readTS, get)
}
if !errors.Is(err, store.ErrKeyNotFound) {
return nil, nil, err
}
return r.readBareLegacyString(key, readTS)
return r.readBareLegacyStringWith(key, readTS, get)
}

// decodePrefixedString handles the !redis|str|<key> payload: new-format values
// carry their TTL inline, while legacy-format payloads that still sit under
// the prefixed key during rolling upgrade must consult the secondary index.
func (r *RedisServer) decodePrefixedString(key, raw []byte, readTS uint64) ([]byte, *time.Time, error) {
func (r *RedisServer) decodePrefixedStringWith(key, raw []byte, readTS uint64, get rawGetFn) ([]byte, *time.Time, error) {
userValue, ttl, err := decodeRedisStr(raw)
if err != nil {
return nil, nil, err
}
if !isNewRedisStrFormat(raw) {
legacyTTL, ttlErr := r.readLegacyTTL(key, readTS)
legacyTTL, ttlErr := r.readLegacyTTLWith(key, readTS, get)
if ttlErr != nil {
return nil, nil, ttlErr
}
Expand All @@ -408,27 +419,27 @@ func (r *RedisServer) decodePrefixedString(key, raw []byte, readTS uint64) ([]by
return userValue, ttl, nil
}

// readBareLegacyString handles pre-migration data still under the bare user
// readBareLegacyStringWith handles pre-migration data still under the bare user
// key: TTL in the secondary index, value at the bare key itself.
func (r *RedisServer) readBareLegacyString(key []byte, readTS uint64) ([]byte, *time.Time, error) {
legacyTTL, err := r.readLegacyTTL(key, readTS)
func (r *RedisServer) readBareLegacyStringWith(key []byte, readTS uint64, get rawGetFn) ([]byte, *time.Time, error) {
legacyTTL, err := r.readLegacyTTLWith(key, readTS, get)
if err != nil {
return nil, nil, err
}
if legacyTTL != nil && !legacyTTL.After(time.Now()) {
return nil, nil, errors.WithStack(store.ErrKeyNotFound)
}
legacy, err := r.leaderAwareGetAt(key, readTS)
legacy, err := get(key, readTS)
if err != nil {
return nil, nil, err
}
return legacy, legacyTTL, nil
}

// readLegacyTTL fetches the pre-migration !redis|ttl| entry, returning nil
// readLegacyTTLWith fetches the pre-migration !redis|ttl| entry, returning nil
// when no index is present.
func (r *RedisServer) readLegacyTTL(key []byte, readTS uint64) (*time.Time, error) {
raw, err := r.leaderAwareGetAt(redisTTLKey(key), readTS)
func (r *RedisServer) readLegacyTTLWith(key []byte, readTS uint64, get rawGetFn) (*time.Time, error) {
raw, err := get(redisTTLKey(key), readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
return nil, nil
Expand All @@ -442,19 +453,36 @@ func (r *RedisServer) readLegacyTTL(key []byte, readTS uint64) (*time.Time, erro
return &ttl, nil
}

// rawGetFn is the signature shared by leaderAwareGetAt and snapshotGetAt so
// that the string-read helpers can be parameterised without duplication.
type rawGetFn func(key []byte, readTS uint64) ([]byte, error)

// leaderAwareGetAt is a GetAt that honors the per-key leader routing readValueAt
// uses, but without calling back into hasExpiredTTLAt. Callers are responsible
// for handling expiration themselves using the TTL they just read.
func (r *RedisServer) leaderAwareGetAt(key []byte, readTS uint64) ([]byte, error) {
return r.doGetAt(key, readTS, true)
}

// snapshotGetAt reads at readTS without re-verifying leadership on every call.
// The caller must have already called coordinator.VerifyLeader() once (e.g. at
// Lua script startTS acquisition time) before using this method.
func (r *RedisServer) snapshotGetAt(key []byte, readTS uint64) ([]byte, error) {
return r.doGetAt(key, readTS, false)
}

func (r *RedisServer) doGetAt(key []byte, readTS uint64, verify bool) ([]byte, error) {
// Leadership is partitioned by the logical user key, so strip the internal
// prefix before asking the coordinator.
routingKey := key
if userKey := extractRedisInternalUserKey(key); userKey != nil {
routingKey = userKey
}
if r.coordinator.IsLeaderForKey(routingKey) {
if err := r.coordinator.VerifyLeaderForKey(routingKey); err != nil {
return nil, errors.WithStack(err)
if verify {
if err := r.coordinator.VerifyLeaderForKey(routingKey); err != nil {
return nil, errors.WithStack(err)
}
}
v, err := r.store.GetAt(context.Background(), key, readTS)
return v, errors.WithStack(err)
Expand Down
2 changes: 2 additions & 0 deletions adapter/redis_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (c *infoTestCoordinator) Clock() *kv.HLC {
return c.clock
}

func (c *infoTestCoordinator) LinearizableRead(_ context.Context) (uint64, error) { return 0, nil }

func TestRedisServer_Info_LeaderRole(t *testing.T) {
r := &RedisServer{
redisAddr: "10.0.0.1:6379",
Expand Down
4 changes: 4 additions & 0 deletions adapter/redis_keys_pattern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (s *stubAdapterCoordinator) Clock() *kv.HLC {
return s.clock
}

func (s *stubAdapterCoordinator) LinearizableRead(_ context.Context) (uint64, error) {
return 0, s.verifyLeaderErr
}

func (s *stubAdapterCoordinator) VerifyLeaderCalls() int32 {
if s == nil {
return 0
Expand Down
10 changes: 8 additions & 2 deletions adapter/redis_lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (r *RedisServer) runLuaScript(conn redcon.Conn, script string, evalArgs [][
var reply luaReply
err = r.retryRedisWrite(ctx, func() error {
attempts++
scriptCtx := newLuaScriptContext(r)
scriptCtx, err := newLuaScriptContext(ctx, r)
if err != nil {
return err
}
defer scriptCtx.Close()
state := newRedisLuaState()
defer state.Close()
Expand Down Expand Up @@ -880,7 +883,10 @@ func (r *RedisServer) execLuaCompat(conn redcon.Conn, command string, args [][]b

var reply luaReply
err := r.retryRedisWrite(ctx, func() error {
scriptCtx := newLuaScriptContext(r)
scriptCtx, err := newLuaScriptContext(ctx, r)
if err != nil {
return err
}
defer scriptCtx.Close()
nextReply, err := scriptCtx.exec(command, stringArgs)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions adapter/redis_lua_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,14 @@ var luaRenameHandlers = map[redisValueType]luaRenameHandler{
redisTypeStream: (*luaScriptContext).renameStreamValue,
}

func newLuaScriptContext(server *RedisServer) *luaScriptContext {
func newLuaScriptContext(ctx context.Context, server *RedisServer) (*luaScriptContext, error) {
// LinearizableRead confirms leadership via quorum AND waits for the local
// FSM to apply all committed entries, so startTS reflects the latest
// committed state. All subsequent reads within the script use snapshotGetAt
// (no per-call VerifyLeader), making VerifyLeader O(1) per script.
if _, err := server.coordinator.LinearizableRead(ctx); err != nil {
return nil, errors.WithStack(err)
}
startTS := server.readTS()
return &luaScriptContext{
server: server,
Expand All @@ -206,7 +213,7 @@ func newLuaScriptContext(server *RedisServer) *luaScriptContext {
zsets: map[string]*luaZSetState{},
streams: map[string]*luaStreamState{},
ttls: map[string]*luaTTLState{},
}
}, nil
}

func (c *luaScriptContext) Close() {
Expand Down Expand Up @@ -437,7 +444,7 @@ func (c *luaScriptContext) stringState(key []byte) (*luaStringState, error) {
return nil, wrongTypeError()
}

value, _, err := c.server.readRedisStringAt(key, c.startTS)
value, _, err := c.server.readRedisStringAtSnapshot(key, c.startTS)
if errors.Is(err, store.ErrKeyNotFound) {
st.loaded = true
return st, nil
Expand Down
96 changes: 96 additions & 0 deletions adapter/redis_lua_linearizable_read_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package adapter

import (
"testing"

"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"github.com/tidwall/redcon"
)

// sentinelErr is the error injected by the stub coordinator to simulate a
// LinearizableRead failure (e.g. leadership lost, context timeout).
var sentinelLinearizableErr = errors.New("linearizable read: not leader")

func newLuaTestServer(linearizableErr error) *RedisServer {
return &RedisServer{
store: store.NewMVCCStore(),
coordinator: &stubAdapterCoordinator{verifyLeaderErr: linearizableErr},
scriptCache: map[string]string{},
}
}

// TestEval_LinearizableReadFailure verifies that EVAL propagates a
// LinearizableRead error to the client as a Redis error reply.
func TestEval_LinearizableReadFailure(t *testing.T) {
t.Parallel()

r := newLuaTestServer(sentinelLinearizableErr)
conn := &recordingConn{}
r.eval(conn, redcon.Command{
Args: [][]byte{
[]byte("EVAL"),
[]byte("return 1"),
[]byte("0"),
},
})

require.NotEmpty(t, conn.err, "EVAL must write an error when LinearizableRead fails")
require.Contains(t, conn.err, sentinelLinearizableErr.Error())
}

// TestEvalSHA_LinearizableReadFailure verifies that EVALSHA propagates a
// LinearizableRead error when the script is cached.
func TestEvalSHA_LinearizableReadFailure(t *testing.T) {
t.Parallel()

r := newLuaTestServer(sentinelLinearizableErr)
script := "return 1"
sha := luaScriptSHA(script)
r.scriptCache[sha] = script

conn := &recordingConn{}
r.evalsha(conn, redcon.Command{
Args: [][]byte{
[]byte("EVALSHA"),
[]byte(sha),
[]byte("0"),
},
})

require.NotEmpty(t, conn.err, "EVALSHA must write an error when LinearizableRead fails")
require.Contains(t, conn.err, sentinelLinearizableErr.Error())
}

// TestExecLuaCompat_LinearizableReadFailure verifies that the compat path
// (used by RENAME, RPOPLPUSH, LLEN, etc.) propagates LinearizableRead errors.
func TestExecLuaCompat_LinearizableReadFailure(t *testing.T) {
t.Parallel()

r := newLuaTestServer(sentinelLinearizableErr)
conn := &recordingConn{}
r.execLuaCompat(conn, cmdSet, [][]byte{[]byte("k"), []byte("v")})

require.NotEmpty(t, conn.err, "execLuaCompat must write an error when LinearizableRead fails")
require.Contains(t, conn.err, sentinelLinearizableErr.Error())
}

// TestEval_LinearizableReadSuccess verifies the happy path: when
// LinearizableRead succeeds, EVAL executes the script and returns a result.
func TestEval_LinearizableReadSuccess(t *testing.T) {
t.Parallel()

r := newLuaTestServer(nil) // no error
conn := &recordingConn{}
r.eval(conn, redcon.Command{
Args: [][]byte{
[]byte("EVAL"),
[]byte("return 42"),
[]byte("0"),
},
})

require.Empty(t, conn.err, "EVAL must not write an error when LinearizableRead succeeds")
require.Equal(t, int64(42), conn.int)
}
4 changes: 4 additions & 0 deletions adapter/redis_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (c *retryOnceCoordinator) Clock() *kv.HLC {
return c.clock
}

func (c *retryOnceCoordinator) LinearizableRead(_ context.Context) (uint64, error) {
return 0, nil
}

type recordingConn struct {
ctx any
err string
Expand Down
4 changes: 4 additions & 0 deletions adapter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,10 @@ func (c *followerS3Coordinator) VerifyLeader() error {
return kv.ErrLeaderNotFound
}

func (c *followerS3Coordinator) LinearizableRead(_ context.Context) (uint64, error) {
return 0, kv.ErrLeaderNotFound
}

func (c *followerS3Coordinator) RaftLeader() raft.ServerAddress {
return raft.ServerAddress("leader")
}
Expand Down
1 change: 1 addition & 0 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Coordinator interface {
Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
IsLeader() bool
VerifyLeader() error
LinearizableRead(ctx context.Context) (uint64, error)
RaftLeader() raft.ServerAddress
IsLeaderForKey(key []byte) bool
VerifyLeaderForKey(key []byte) error
Expand Down
Loading