Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions adapter/redis_compat_commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adapter

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -1779,6 +1780,256 @@ func (r *RedisServer) zsetMemberFastScore(ctx context.Context, key, member []byt
return score, true, !expired, nil
}

// zsetRangeByScoreFast streams the score index for key over the
// caller-supplied [startKey, endKey) byte range, returning the
// decoded entries up to offset+limit. This replaces the
// load-the-whole-zset path used by cmdZRangeByScore / cmdZRevRangeByScore
// when the caller has no script-local mutations and the zset is in
// wide-column form. For a delay-queue poll ("next 10 jobs due by
// now") the cost goes from O(N) member GetAts to O(range_width +
// offset + limit) score-index entries.
//
// hit=false means the fast path cannot safely answer (legacy-blob
// zset present, string-encoding corruption, or empty-result case
// where we cannot distinguish "zset is empty in this range" from
// "key exists as another type / is missing"). Callers MUST take
// the slow path on hit=false so keyTypeAt disambiguation fires.
//
// scoreInRange filter is applied post-scan for exclusive bound
// edge cases; the caller supplies precomputed scan bounds that
// over-approximate toward INclusive and lets this helper filter.
func (r *RedisServer) zsetRangeByScoreFast(
ctx context.Context,
key, startKey, endKey []byte,
reverse bool,
offset, limit int,
scoreFilter func(float64) bool,
readTS uint64,
) ([]redisZSetEntry, bool, error) {
if eligible, err := r.zsetFastPathEligible(ctx, key, readTS); err != nil || !eligible {
return nil, false, err
}
// Large-offset short-circuit: once offset >= maxWideScanLimit,
// the fast path can only scan maxWideScanLimit rows then skip all
// of them -- guaranteed wasted I/O. Defer to the slow path
// immediately so it can answer from the full member load without
// the redundant score-index scan.
if offset >= maxWideScanLimit {
return nil, false, nil
}
scanLimit := zsetFastScanLimit(offset, limit)
if scanLimit <= 0 || bytes.Compare(startKey, endKey) >= 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When limit is 0, the fast path can avoid the score index scan entirely and jump straight to the existence check and TTL probe. This is a minor efficiency gain for queries that only care about existence or type validation.

Suggested change
if scanLimit <= 0 || bytes.Compare(startKey, endKey) >= 0 {
if limit == 0 || scanLimit <= 0 || bytes.Compare(startKey, endKey) >= 0 {

return r.zsetRangeEmptyFastResult(ctx, key, readTS)
}
kvs, err := r.zsetScoreScan(ctx, startKey, endKey, scanLimit, reverse, readTS)
if err != nil {
return nil, false, err
}
return r.finalizeZSetFastRange(ctx, key, kvs, offset, limit, scanLimit, scoreFilter, readTS)
}

// finalizeZSetFastRange runs the post-scan priority guard, decodes
// the candidate score rows into redisZSetEntry, and applies the TTL
// filter. Factored out so zsetRangeByScoreFast stays under the
// cyclomatic-complexity cap.
//
// Takes scanLimit so we can detect a saturated scan: if the scanner
// returned exactly scanLimit rows AND the caller's request is not
// satisfied (unbounded limit, or collected fewer entries than limit),
// there MAY be more entries beyond the scan window. In that case we
// return hit=false so the slow path can produce the authoritative
// answer -- the fast path MUST NOT silently truncate.
func (r *RedisServer) finalizeZSetFastRange(
ctx context.Context, key []byte, kvs []*store.KVPair,
offset, limit, scanLimit int, scoreFilter func(float64) bool, readTS uint64,
) ([]redisZSetEntry, bool, error) {
Comment on lines +1842 to +1845
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update the function signature to accept scanLimit for truncation detection.

Suggested change
func (r *RedisServer) finalizeZSetFastRange(
ctx context.Context, key []byte, kvs []*store.KVPair,
offset, limit int, scoreFilter func(float64) bool, readTS uint64,
) ([]redisZSetEntry, bool, error) {
func (r *RedisServer) finalizeZSetFastRange(
ctx context.Context, key []byte, kvs []*store.KVPair,
offset, limit, scanLimit int, scoreFilter func(float64) bool, readTS uint64,
) ([]redisZSetEntry, bool, error) {

// Priority guard runs after a candidate hit (mirrors post-PR #565
// ordering). Skip it on empty result -- the empty-result tail
// handles disambiguation.
if len(kvs) > 0 {
if higher, hErr := r.hasHigherPriorityStringEncoding(ctx, key, readTS); hErr != nil {
return nil, false, hErr
} else if higher {
return nil, false, nil
}
}
entries := decodeZSetScoreRange(key, kvs, offset, limit, scoreFilter)
// Truncation guard: the raw scanner hit its cap AND the caller did
// not get a satisfied result. Entries beyond the window may
// exist; defer to the slow path for correctness.
if zsetFastPathTruncated(len(kvs), scanLimit, len(entries), limit) {
return nil, false, nil
}
if len(entries) == 0 {
return r.zsetRangeEmptyFastResult(ctx, key, readTS)
}
expired, expErr := r.hasExpired(ctx, key, readTS, true)
if expErr != nil {
return nil, false, cockerrors.WithStack(expErr)
}
if expired {
return nil, true, nil
}
return entries, true, nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The fast path must return hit=false if the scan limit was reached but the request was not fully satisfied. This ensures the system does not silently drop entries or return incomplete results, maintaining state consistency between the fast and slow paths.

isComplete := len(kvs) < scanLimit || (limit >= 0 && len(entries) >= limit)
return entries, isComplete, nil
References
  1. Avoid silently dropping entries during serialization or normalization. If placeholder or incomplete entries are valid in some contexts, provide a specific, more lenient handling path for them to ensure they are preserved on disk and avoid state inconsistencies.

}

// zsetFastPathTruncated reports whether the bounded score-index scan
// may have dropped entries that the caller's request would otherwise
// include. Returns true when the scanner returned the full quota
// (scannedRows == scanLimit) AND the caller's request is still
// unsatisfied (unbounded limit or collectedEntries < limit). In that
// case the caller must fall back to the slow full-load path to get
// the authoritative result.
func zsetFastPathTruncated(scannedRows, scanLimit, collectedEntries, limit int) bool {
if scannedRows < scanLimit {
return false
}
if limit < 0 {
return true
}
return collectedEntries < limit
}

// zsetFastPathEligible returns false (without error) when a legacy-
// blob zset is present; the caller must take the slow path so
// ensureZSetLoaded / blob decoding runs.
func (r *RedisServer) zsetFastPathEligible(ctx context.Context, key []byte, readTS uint64) (bool, error) {
legacyExists, err := r.store.ExistsAt(ctx, redisZSetKey(key), readTS)
if err != nil {
return false, cockerrors.WithStack(err)
}
return !legacyExists, nil
}

// zsetFastScanLimit clamps offset+limit to maxWideScanLimit so an
// unbounded or malicious LIMIT cannot force an O(N) scan of a large
// zset. A negative limit means "unbounded" at the Redis level; cap it
// at the collection OOM limit.
//
// Check bounds BEFORE adding to avoid signed-integer overflow on
// hostile input (e.g. a Lua script passing offset=limit=math.MaxInt).
// A wrap would produce a negative scanLimit and cause the caller's
// `scanLimit <= 0` branch to misroute a live zset into the
// empty-result tail.
func zsetFastScanLimit(offset, limit int) int {
// limit == 0: the caller wants zero entries regardless of offset.
// Return 0 so the caller's `scanLimit <= 0` branch routes to the
// empty-result tail (which still runs resolveZSetMeta for proper
// WRONGTYPE / existence disambiguation) instead of a pointless
// full-quota scan.
if limit == 0 {
return 0
}
if limit < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When limit is 0, the caller expects an empty result regardless of the offset. In this case, we can avoid scanning the score index entirely by returning a scanLimit of 0. This will trigger the short-circuit in zsetRangeByScoreFast and call zsetRangeEmptyFastResult, which correctly handles existence and type checks without the overhead of a prefix scan.

Suggested change
if limit < 0 {
if limit == 0 {
return 0
}
if limit < 0 {

return maxWideScanLimit
}
if offset >= maxWideScanLimit {
return maxWideScanLimit
}
if limit > maxWideScanLimit-offset {
return maxWideScanLimit
}
return offset + limit
}
Comment on lines +1914 to +1933
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The zsetFastScanLimit function correctly clamps the scan window to maxWideScanLimit to prevent OOM, which aligns with the rule to apply fixed bounds to collections that can grow from external requests. However, when offset is very large, the scan will still fetch up to maxWideScanLimit rows only to have them all skipped. Consider if we should short-circuit and return hit=false immediately if offset exceeds a certain threshold to avoid wasted resource consumption, or if we should rely on the positional iteration follow-up mentioned in the PR description to manage this complexity.

References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
  2. During large refactorings, limit the scope of a single pull request. Defer related but wide-reaching API changes to dedicated follow-up pull requests to manage complexity and avoid scope creep.


// zsetScoreScan picks Forward / Reverse ScanAt based on direction.
func (r *RedisServer) zsetScoreScan(
ctx context.Context, startKey, endKey []byte, scanLimit int, reverse bool, readTS uint64,
) ([]*store.KVPair, error) {
if reverse {
kvs, err := r.store.ReverseScanAt(ctx, startKey, endKey, scanLimit, readTS)
return kvs, cockerrors.WithStack(err)
}
kvs, err := r.store.ScanAt(ctx, startKey, endKey, scanLimit, readTS)
return kvs, cockerrors.WithStack(err)
}

// zsetDecodeAllocSize returns a tight upper bound on the collected
// entry count for decodeZSetScoreRange: (kvLen - offset) capped by
// limit, never negative. Avoiding a make([]...len(kvs)) saves up to
// maxWideScanLimit entries of wasted slice capacity when the caller
// asked for a small window at a large offset.
func zsetDecodeAllocSize(kvLen, offset, limit int) int {
allocSize := kvLen - offset
if allocSize < 0 {
return 0
}
if limit >= 0 && limit < allocSize {
return limit
}
return allocSize
}

// decodeZSetScoreRange decodes score-index scan results into
// redisZSetEntry, applying the post-scan score filter (exclusive
// bound edges) and the offset / limit pagination. Entries that fail
// to decode are silently dropped -- they can only appear under data
// corruption.
func decodeZSetScoreRange(
key []byte, kvs []*store.KVPair, offset, limit int, scoreFilter func(float64) bool,
) []redisZSetEntry {
entries := make([]redisZSetEntry, 0, zsetDecodeAllocSize(len(kvs), offset, limit))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The capacity allocation for the entries slice can be optimized when withScores is true. Currently, zsetDecodeAllocSize returns the number of members, but if scores are included, the final luaReply array will be twice that size. Pre-allocating the correct capacity here or in zsetRangeReply would reduce slice growth overhead. Ensure a strictly enforced maximum size limit is checked before allocation to prevent OOM issues.

References
  1. When deserializing data with a size header, pre-allocating a buffer is acceptable if a strictly enforced maximum size limit is checked before allocation and the entire data must be materialized in memory.

skipped := 0
for _, kv := range kvs {
score, member, ok := store.ExtractZSetScoreAndMember(kv.Key, key)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

store.ExtractZSetScoreAndMember re-allocates the scan prefix on every iteration of the loop. Since the prefix is constant for the entire scan, it should be calculated once outside the loop to avoid O(N) allocations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Allocating entries with len(kvs) capacity can be wasteful if offset is large and limit is small, as len(kvs) can be up to 100,001. Calculating a tighter capacity based on the requested range and the available data reduces memory pressure while maintaining the efficiency of pre-allocation.

	if offset >= len(kvs) {
		return nil
	}
	allocSize := len(kvs) - offset
	if limit >= 0 && limit < allocSize {
		allocSize = limit
	}
	entries := make([]redisZSetEntry, 0, allocSize)
References
  1. Pre-allocating a buffer is acceptable if a strictly enforced maximum size limit is checked before allocation and the entire data must be materialized in memory.

if !ok {
continue
}
if scoreFilter != nil && !scoreFilter(score) {
continue
}
// Check limit saturation BEFORE the offset skip so a small
// limit with a large offset exits immediately instead of
// burning offset iterations on the skip branch. Correct for
// any (offset, limit): once len(entries) >= limit we are done
// regardless of remaining skip budget.
if limit >= 0 && len(entries) >= limit {
break
}
if skipped < offset {
skipped++
continue
}
entries = append(entries, redisZSetEntry{Member: string(member), Score: score})
}
return entries
}

// zsetRangeEmptyFastResult is the empty-result tail: either the
// score range is genuinely empty on a live zset (return empty +
// hit=true) or the zset does not exist in wide-column form (return
// hit=false so the caller takes the slow path for WRONGTYPE / missing
// disambiguation).
//
// Uses resolveZSetMeta so delta-only wide zsets (a fresh zset whose
// base meta has not been persisted yet, only delta rows) are detected
// as "exists". Using a plain ExistsAt on ZSetMetaKey would miss those
// and force the slow path unnecessarily. Also runs the string-priority
// guard so a corrupted redisStrKey + zset meta surfaces WRONGTYPE via
// the slow path rather than an empty array.
func (r *RedisServer) zsetRangeEmptyFastResult(ctx context.Context, key []byte, readTS uint64) ([]redisZSetEntry, bool, error) {
_, zsetExists, err := r.resolveZSetMeta(ctx, key, readTS)
if err != nil {
return nil, false, cockerrors.WithStack(err)
}
if !zsetExists {
return nil, false, nil
}
if higher, hErr := r.hasHigherPriorityStringEncoding(ctx, key, readTS); hErr != nil {
return nil, false, hErr
} else if higher {
return nil, false, nil
}
// hasExpired is called for its error-surfacing side effect only:
// whether the zset is expired or not, a live zset with no members
// in range returns an empty hit=true result. Keep the call so
// storage errors during TTL resolution still propagate.
if _, expErr := r.hasExpired(ctx, key, readTS, true); expErr != nil {
return nil, false, cockerrors.WithStack(expErr)
}
return nil, true, nil
}

// hgetSlow falls back to the type-probing path when hashFieldFastLookup
// misses. Handles legacy-blob hashes and nil / WRONGTYPE disambiguation.
func (r *RedisServer) hgetSlow(conn redcon.Conn, ctx context.Context, key, field []byte, readTS uint64) {
Expand Down
Loading
Loading