Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 78 additions & 26 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,13 @@ func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) {
return nil, err
}

// ZSet meta keys use a 4-byte length-prefixed userKey encoding,
// so scan the entire prefix and rely on collectUserKeys for filtering.
zsetMetaPrefix := []byte(store.ZSetMetaPrefix)
if err := mergeScannedKeys(zsetMetaPrefix, prefixScanEnd(zsetMetaPrefix)); err != nil {
return nil, err
}

for _, prefix := range redisInternalPrefixes {
internalStart, internalEnd := listPatternScanBounds(prefix, pattern)
if err := mergeScannedKeys(internalStart, internalEnd); err != nil {
Expand Down Expand Up @@ -1292,6 +1299,9 @@ func redisVisibleUserKey(key []byte) []byte {
if store.IsListMetaKey(key) || store.IsListItemKey(key) {
return store.ExtractListUserKey(key)
}
if store.IsZSetInternalKey(key) {
return store.ExtractZSetUserKey(key)
}
if userKey := extractRedisInternalUserKey(key); userKey != nil {
return userKey
}
Expand Down Expand Up @@ -1386,9 +1396,11 @@ type listTxnState struct {
}

type zsetTxnState struct {
members map[string]float64
exists bool
dirty bool
members map[string]float64
origMembers map[string]float64 // snapshot at load time for diff-based commit
exists bool
dirty bool
fromLegacy bool
}

type ttlTxnState struct {
Expand Down Expand Up @@ -1425,6 +1437,7 @@ func (t *txnContext) trackTypeReadKeys(key []byte) {
redisHashKey(key),
redisSetKey(key),
redisZSetKey(key),
store.ZSetMetaKey(key),
redisStreamKey(key),
redisHLLKey(key),
redisStrKey(key),
Expand All @@ -1435,22 +1448,29 @@ func (t *txnContext) trackTypeReadKeys(key []byte) {
}
}

func (t *txnContext) load(key []byte) (*txnValue, error) {
// If the key is already an internal key (e.g., !redis|hash|...,
// !lst|..., !txn|..., !ddb|..., !s3|..., !dist|...), use it as-is.
// Otherwise, it's a bare user key for a string value — prefix it.
storageKey := key
userKey := extractRedisInternalUserKey(key)
// resolveLoadKeys determines the storage key and user key for txnContext.load.
// For bare user keys (not a known internal prefix), the storage key is prefixed
// and the user key is the original key. For internal keys, the user key is
// extracted via extractRedisInternalUserKey or store.ExtractZSetUserKey.
func resolveLoadKeys(key []byte) (storageKey []byte, userKey []byte, internal bool) {
if !isKnownInternalKey(key) {
storageKey = redisStrKey(key)
userKey = key
return redisStrKey(key), key, false
}
uk := extractRedisInternalUserKey(key)
if uk == nil && store.IsZSetInternalKey(key) {
uk = store.ExtractZSetUserKey(key)
}
return key, uk, true
}

func (t *txnContext) load(key []byte) (*txnValue, error) {
storageKey, userKey, internal := resolveLoadKeys(key)
k := string(storageKey)
if tv, ok := t.working[k]; ok {
return tv, nil
}
t.trackReadKey(storageKey)
if !isKnownInternalKey(key) {
if !internal {
// Track the bare key too for conflict detection on legacy fallback reads.
t.trackReadKey(key)
}
Expand All @@ -1459,7 +1479,7 @@ func (t *txnContext) load(key []byte) (*txnValue, error) {
}
tv := &txnValue{}
var val []byte
if !isKnownInternalKey(key) {
if !internal {
// For bare user string keys, use the fallback-aware reader.
var err error
val, err = t.server.readRedisStringAt(key, t.startTS)
Expand Down Expand Up @@ -1510,27 +1530,34 @@ func (t *txnContext) loadZSetState(key []byte) (*zsetTxnState, error) {
if st, ok := t.zsetStates[k]; ok {
return st, nil
}
t.trackReadKey(redisZSetKey(key))
t.trackReadKey(store.ZSetMetaKey(key))
// Check TTL: treat expired keys as non-existent.
ttlSt, err := t.loadTTLState(key)
if err != nil {
return nil, err
}
if ttlSt.value != nil && !ttlSt.value.After(time.Now()) {
st := &zsetTxnState{
members: map[string]float64{},
exists: false,
members: map[string]float64{},
origMembers: map[string]float64{},
exists: false,
}
t.zsetStates[k] = st
return st, nil
}
value, exists, err := t.server.loadZSetAt(context.Background(), key, t.startTS)
load, err := t.server.loadZSetMembersMap(context.Background(), key, t.startTS)
if err != nil {
return nil, err
}
origMembers := make(map[string]float64, len(load.members))
for mk, mv := range load.members {
origMembers[mk] = mv
}
st := &zsetTxnState{
members: zsetEntriesToMap(value.Entries),
exists: exists,
members: load.members,
origMembers: origMembers,
exists: load.exists,
fromLegacy: load.fromLegacy,
}
t.zsetStates[k] = st
return st, nil
Expand Down Expand Up @@ -2018,6 +2045,33 @@ func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) {
return elems, nil
}

func buildZSetStateElems(keyBytes []byte, st *zsetTxnState) ([]*kv.Elem[kv.OP], error) {
switch {
case len(st.members) == 0 && st.fromLegacy:
// Legacy blob removed: delete the legacy key and TTL key.
return []*kv.Elem[kv.OP]{
{Op: kv.Del, Key: redisZSetKey(keyBytes)},
{Op: kv.Del, Key: redisTTLKey(keyBytes)},
}, nil
case len(st.members) == 0:
// Delete meta + all member/score keys via prefix deletion.
return buildZSetDiffElems(keyBytes, st.origMembers, st.members)
case st.fromLegacy:
// Legacy blob → wide-column migration: full write + delete legacy blob.
elems, err := buildZSetWriteElems(keyBytes, st.members)
if err != nil {
return nil, err
}
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: redisZSetKey(keyBytes)})
return elems, nil
case len(st.origMembers) == 0:
// Brand-new ZSet: full write.
return buildZSetWriteElems(keyBytes, st.members)
default:
return buildZSetDiffElems(keyBytes, st.origMembers, st.members)
}
}

func (t *txnContext) buildZSetElems() ([]*kv.Elem[kv.OP], error) {
keys := make([]string, 0, len(t.zsetStates))
for k := range t.zsetStates {
Expand All @@ -2031,15 +2085,11 @@ func (t *txnContext) buildZSetElems() ([]*kv.Elem[kv.OP], error) {
if !st.dirty {
continue
}
if len(st.members) == 0 {
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: redisZSetKey([]byte(k))})
continue
}
payload, err := marshalZSetValue(redisZSetValue{Entries: zsetMapToEntries(st.members)})
stElems, err := buildZSetStateElems([]byte(k), st)
if err != nil {
return nil, err
}
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: redisZSetKey([]byte(k)), Value: payload})
elems = append(elems, stElems...)
}
return elems, nil
}
Expand Down Expand Up @@ -2611,7 +2661,9 @@ func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) {

func (r *RedisServer) readValueAt(key []byte, readTS uint64) ([]byte, error) {
ttlKey := key
if userKey := extractRedisInternalUserKey(key); userKey != nil {
if store.IsZSetInternalKey(key) {
ttlKey = store.ExtractZSetUserKey(key)
} else if userKey := extractRedisInternalUserKey(key); userKey != nil {
ttlKey = userKey
}
expired, err := r.hasExpiredTTLAt(context.Background(), ttlKey, readTS)
Expand Down
Loading
Loading