Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions adapter/redis_lua_compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,34 @@ return {moved, redis.call("LLEN", KEYS[1])}
require.NoError(t, err)
require.Greater(t, ttl, time.Duration(0))
}

func TestRedis_LuaDelAndRecreateListNoOrphan(t *testing.T) {
nodes, _, _ := createNode(t, 3)
defer shutdown(nodes)

ctx := context.Background()
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
defer func() { _ = rdb.Close() }()

// Pre-populate a list so there are existing storage items.
require.NoError(t, rdb.RPush(ctx, "mylist", "a", "b", "c").Err())

// In a single Lua script: DEL the key, then recreate it as a list.
// After the script, only the new items should be visible; the old
// [a, b, c] items must not be orphaned/leaked in storage.
result, err := rdb.Eval(ctx, `
redis.call("DEL", KEYS[1])
redis.call("RPUSH", KEYS[1], "d")
return redis.call("LRANGE", KEYS[1], 0, -1)
`, []string{"mylist"}).Result()
require.NoError(t, err)

values, ok := result.([]any)
require.True(t, ok)
require.Equal(t, []any{"d"}, values, "list should contain only the newly pushed item")

// Verify via a plain LRANGE that the old items are gone.
final, err := rdb.LRange(ctx, "mylist", 0, -1).Result()
require.NoError(t, err)
require.Equal(t, []string{"d"}, final)
}
39 changes: 25 additions & 14 deletions adapter/redis_lua_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ type luaScriptContext struct {
startTS uint64
readPin *kv.ActiveTimestampToken

touched map[string]struct{}
deleted map[string]bool
touched map[string]struct{}
deleted map[string]bool
everDeleted map[string]bool

strings map[string]*luaStringState
lists map[string]*luaListState
Expand Down Expand Up @@ -189,18 +190,19 @@ var luaRenameHandlers = map[redisValueType]luaRenameHandler{
func newLuaScriptContext(server *RedisServer) *luaScriptContext {
startTS := server.readTS()
return &luaScriptContext{
server: server,
startTS: startTS,
readPin: server.pinReadTS(startTS),
touched: map[string]struct{}{},
deleted: map[string]bool{},
strings: map[string]*luaStringState{},
lists: map[string]*luaListState{},
hashes: map[string]*luaHashState{},
sets: map[string]*luaSetState{},
zsets: map[string]*luaZSetState{},
streams: map[string]*luaStreamState{},
ttls: map[string]*luaTTLState{},
server: server,
startTS: startTS,
readPin: server.pinReadTS(startTS),
touched: map[string]struct{}{},
deleted: map[string]bool{},
everDeleted: map[string]bool{},
strings: map[string]*luaStringState{},
lists: map[string]*luaListState{},
hashes: map[string]*luaHashState{},
sets: map[string]*luaSetState{},
zsets: map[string]*luaZSetState{},
streams: map[string]*luaStreamState{},
ttls: map[string]*luaTTLState{},
}
}

Expand Down Expand Up @@ -265,6 +267,7 @@ func (c *luaScriptContext) deleteLogical(key []byte) {
k := string(key)
c.markTouched(key)
c.deleted[k] = true
c.everDeleted[k] = true
c.clearTTL(key)

if st, ok := c.strings[k]; ok {
Expand Down Expand Up @@ -2517,6 +2520,14 @@ func (c *luaScriptContext) listCommitPlan(key string) (luaCommitPlan, error) {
elems, err := c.listCommitElems(key)
return luaCommitPlan{elems: elems}, err
}
// If the key was deleted earlier in this script and later recreated as a
// list, we must perform a full rewrite (preserveExisting=false) so that
// deleteLogicalKeyElems is called and any orphaned storage items from the
// previous incarnation of the key are cleaned up before writing the delta.
if c.everDeleted[key] {
elems, err := c.listCommitElems(key)
return luaCommitPlan{elems: elems}, err
}
elems, err := c.listDeltaCommitElems(key, st)
return luaCommitPlan{preserveExisting: true, elems: elems}, err
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (p *ProxyServer) handleAdmin(conn redcon.Conn, name string, args [][]byte)
// uses a shared connection pool and cannot maintain per-client DB state.
if name == "SELECT" {
// Redis arity: SELECT <db>. Require exactly one DB argument.
if len(args) != 2 {
if len(args) != selectArgCount {
conn.WriteError("ERR wrong number of arguments for 'select' command")
return
}
Expand Down
3 changes: 2 additions & 1 deletion proxy/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
pubsubArrayReply = 3 // ["subscribe"/"unsubscribe", channel, count]
pubsubArrayPong = 2 // ["pong", data]
pubsubMinArgs = 2 // command + at least one channel
selectArgCount = 2 // command name + db index argument

cmdSubscribe = "SUBSCRIBE"
cmdUnsubscribe = "UNSUBSCRIBE"
Expand Down Expand Up @@ -334,7 +335,7 @@ func (s *pubsubSession) handleProxySpecialCommand(name string, args [][]byte) bo
return false
}
// Enforce Redis arity: SELECT requires exactly one DB index argument.
if len(args) != 2 {
if len(args) != selectArgCount {
s.writeError("ERR wrong number of arguments for 'select' command")
return true
}
Expand Down
Loading