Skip to content
24 changes: 23 additions & 1 deletion adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ type RedisServer struct {
leaderClientsMu sync.RWMutex
leaderClients map[string]*redis.Client

// compactor is the background DeltaCompactor for this node. When set,
// urgent compaction is triggered on ErrDeltaScanTruncated to unblock
// reads on hot keys faster than the regular compaction interval.
compactor *DeltaCompactor

route map[string]func(conn redcon.Conn, cmd redcon.Command)
}

Expand All @@ -282,6 +287,14 @@ func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisSe
}
}

// WithRedisCompactor wires a DeltaCompactor to the RedisServer so that urgent
// single-key compaction can be triggered when ErrDeltaScanTruncated is hit.
func WithRedisCompactor(c *DeltaCompactor) RedisServerOption {
return func(r *RedisServer) {
r.compactor = c
}
}
Comment on lines +290 to +296
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithRedisCompactor is defined, but there are no call sites wiring a DeltaCompactor into RedisServer, so triggerUrgentCompaction will always be a no-op and urgent compaction will never actually run in the current codebase. Please update the Redis server setup (where NewRedisServer is called) to pass WithRedisCompactor(deltaCompactor), or otherwise ensure the compactor gets injected.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6c2a775: WithRedisCompactor(deltaCompactor) is now passed to NewRedisServer in all three call sites — cmd/server/demo.go (setupRedis), main.go (startRedisServer), and adapter/test_util.go (setupNodes). In main.go the compactor's Run() is also wired into the errgroup.


// WithRedisRequestObserver enables Prometheus-compatible request metrics.
func WithRedisRequestObserver(observer monitoring.RedisRequestObserver) RedisServerOption {
return func(r *RedisServer) {
Expand Down Expand Up @@ -467,6 +480,14 @@ func (r *RedisServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
return r.readTracker.Pin(ts)
}

// triggerUrgentCompaction signals the DeltaCompactor to immediately compact
// the given key, bypassing the regular interval. No-op when no compactor is wired.
func (r *RedisServer) triggerUrgentCompaction(typeName string, key []byte) {
if r.compactor != nil {
r.compactor.TriggerUrgentCompaction(typeName, key)
}
}

func (r *RedisServer) dispatchCommand(conn redcon.Conn, name string, handler func(redcon.Conn, redcon.Command), cmd redcon.Command, start time.Time) {
switch {
case r.requestObserver != nil:
Expand Down Expand Up @@ -2682,7 +2703,8 @@ func (r *RedisServer) buildListPopElems(ctx context.Context, key []byte, meta st
claimEnd = meta.Tail
}
// Capacity: n claim keys + n Del(item) for found items + 1 for the delta key appended by caller.
elems := make([]*kv.Elem[kv.OP], 0, n+int64(len(kvps))+listPopDeltaOverhead)
// n is bounded by maxWideColumnItems (100_000) so the int conversion is safe.
elems := make([]*kv.Elem[kv.OP], 0, int(n)+len(kvps)+listPopDeltaOverhead)
for seq := claimStart; seq < claimEnd; seq++ {
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: store.ListClaimKey(key, seq), Value: []byte{}})
}
Expand Down
21 changes: 18 additions & 3 deletions adapter/redis_compat_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,9 @@ func (r *RedisServer) resolveListMeta(ctx context.Context, key []byte, readTS ui
return d.LenDelta, errors.WithStack(unmarshalErr)
})
if err != nil {
if errors.Is(err, ErrDeltaScanTruncated) {
r.triggerUrgentCompaction("list", key)
}
return store.ListMeta{}, false, err
}
baseMeta.Len += lenSum
Expand Down Expand Up @@ -797,7 +800,7 @@ func (r *RedisServer) resolveCollectionLen(

// resolveHashMeta aggregates the base hash metadata with all uncompacted Delta keys.
func (r *RedisServer) resolveHashMeta(ctx context.Context, key []byte, readTS uint64) (int64, bool, error) {
return r.resolveCollectionLen(
n, exists, err := r.resolveCollectionLen(
ctx, key, readTS,
store.HashMetaKey(key),
store.HashMetaDeltaScanPrefix(key),
Expand All @@ -811,11 +814,15 @@ func (r *RedisServer) resolveHashMeta(ctx context.Context, key []byte, readTS ui
},
"resolveHashMeta: clamping negative Len to 0",
)
if errors.Is(err, ErrDeltaScanTruncated) {
r.triggerUrgentCompaction("hash", key)
}
return n, exists, err
}

// resolveSetMeta aggregates the base set metadata with all uncompacted Delta keys.
func (r *RedisServer) resolveSetMeta(ctx context.Context, key []byte, readTS uint64) (int64, bool, error) {
return r.resolveCollectionLen(
n, exists, err := r.resolveCollectionLen(
ctx, key, readTS,
store.SetMetaKey(key),
store.SetMetaDeltaScanPrefix(key),
Expand All @@ -829,11 +836,15 @@ func (r *RedisServer) resolveSetMeta(ctx context.Context, key []byte, readTS uin
},
"resolveSetMeta: clamping negative Len to 0",
)
if errors.Is(err, ErrDeltaScanTruncated) {
r.triggerUrgentCompaction("set", key)
}
return n, exists, err
}

// resolveZSetMeta aggregates the base sorted set metadata with all uncompacted Delta keys.
func (r *RedisServer) resolveZSetMeta(ctx context.Context, key []byte, readTS uint64) (int64, bool, error) {
return r.resolveCollectionLen(
n, exists, err := r.resolveCollectionLen(
ctx, key, readTS,
store.ZSetMetaKey(key),
store.ZSetMetaDeltaScanPrefix(key),
Expand All @@ -847,4 +858,8 @@ func (r *RedisServer) resolveZSetMeta(ctx context.Context, key []byte, readTS ui
},
"resolveZSetMeta: clamping negative Len to 0",
)
if errors.Is(err, ErrDeltaScanTruncated) {
r.triggerUrgentCompaction("zset", key)
}
return n, exists, err
}
Loading
Loading