Skip to content

feat: trigger urgent single-key compaction on ErrDeltaScanTruncated#517

Merged
bootjp merged 7 commits intomainfrom
feat/collection-metadata-delta-phase-4
Apr 16, 2026
Merged

feat: trigger urgent single-key compaction on ErrDeltaScanTruncated#517
bootjp merged 7 commits intomainfrom
feat/collection-metadata-delta-phase-4

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 16, 2026

When a hot key accumulates more than MaxDeltaScanLimit (1024) delta keys, reads like HLEN/SCARD/ZCARD/LLEN fail with ErrDeltaScanTruncated. Previously the only remedy was to wait for the next 30s compaction tick.

Now:

  • DeltaCompactor gains an urgentCh (buffered 64) and TriggerUrgentCompaction.
  • Run() selects on urgentCh between regular ticks; compactUrgentKey performs a targeted single-key scan+compact without waiting for the full keyspace pass.
  • Each collectionDeltaHandler carries a deltaKeyPrefixFn for per-key scans.
  • resolveHashMeta/resolveSetMeta/resolveZSetMeta/resolveListMeta call triggerUrgentCompaction when ErrDeltaScanTruncated is returned.
  • RedisServer gains a compactor field wired via WithRedisCompactor option.

Excess urgent signals (channel full) are dropped silently; the regular tick acts as the fallback. This bounds latency for hot keys under high write load.

When a hot key accumulates more than MaxDeltaScanLimit (1024) delta keys,
reads like HLEN/SCARD/ZCARD/LLEN fail with ErrDeltaScanTruncated. Previously
the only remedy was to wait for the next 30s compaction tick.

Now:
- DeltaCompactor gains an urgentCh (buffered 64) and TriggerUrgentCompaction.
- Run() selects on urgentCh between regular ticks; compactUrgentKey performs
  a targeted single-key scan+compact without waiting for the full keyspace pass.
- Each collectionDeltaHandler carries a deltaKeyPrefixFn for per-key scans.
- resolveHashMeta/resolveSetMeta/resolveZSetMeta/resolveListMeta call
  triggerUrgentCompaction when ErrDeltaScanTruncated is returned.
- RedisServer gains a compactor field wired via WithRedisCompactor option.

Excess urgent signals (channel full) are dropped silently; the regular tick
acts as the fallback. This bounds latency for hot keys under high write load.
@bootjp bootjp requested a review from Copilot April 16, 2026 13:46
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an “urgent” single-key delta compaction path to reduce the time hot collection keys remain unreadable after exceeding store.MaxDeltaScanLimit (1024) uncompacted deltas, integrating it with Redis metadata resolution so reads can proactively request compaction.

Changes:

  • Add an urgent compaction request channel to DeltaCompactor and expose TriggerUrgentCompaction for single-key targeted compaction.
  • Extend collection delta handlers with deltaKeyPrefixFn to enable per-key delta scans for urgent compaction.
  • Trigger urgent compaction from resolveListMeta/resolveHashMeta/resolveSetMeta/resolveZSetMeta when ErrDeltaScanTruncated is hit, and add wiring support on RedisServer via WithRedisCompactor.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
adapter/redis_delta_compactor.go Adds urgent-compaction queue + targeted single-key compaction logic; extends handler config with per-key delta prefix function.
adapter/redis_compat_helpers.go Triggers urgent compaction on ErrDeltaScanTruncated during metadata resolution for list/hash/set/zset.
adapter/redis.go Adds RedisServer.compactor, WithRedisCompactor, and a helper to signal urgent compaction.

Comment thread adapter/redis_delta_compactor.go Outdated
Comment on lines +195 to +221
// Scan all delta keys for this specific user key.
prefix := h.deltaKeyPrefixFn(req.userKey)
end := store.PrefixScanEnd(prefix)
// Scan one extra beyond MaxDeltaScanLimit to collect all accumulated deltas.
kvs, err := c.st.ScanAt(tickCtx, prefix, end, store.MaxDeltaScanLimit+1, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: scan failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if len(kvs) == 0 {
return
}

elems, err := h.buildElems(tickCtx, req.userKey, kvs, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: buildElems failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if err := c.dispatchCompaction(tickCtx, readTS, elems); err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: dispatch failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
c.logger.InfoContext(tickCtx, "delta compactor urgent: compacted key",
"type", req.typeName, "key", string(req.userKey), "delta_count", len(kvs))
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compactUrgentKey scans with limit MaxDeltaScanLimit+1, so for keys with far more than MaxDeltaScanLimit deltas it will only compact a single page (1025 deltas) and reads can still fail with ErrDeltaScanTruncated (e.g., >2049 deltas). Consider looping/paginating: rescan after dispatch and keep compacting pages until the remaining delta count is <= MaxDeltaScanLimit or the timeout context expires; also adjust the comment that claims it collects “all accumulated deltas”.

Suggested change
// Scan all delta keys for this specific user key.
prefix := h.deltaKeyPrefixFn(req.userKey)
end := store.PrefixScanEnd(prefix)
// Scan one extra beyond MaxDeltaScanLimit to collect all accumulated deltas.
kvs, err := c.st.ScanAt(tickCtx, prefix, end, store.MaxDeltaScanLimit+1, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: scan failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if len(kvs) == 0 {
return
}
elems, err := h.buildElems(tickCtx, req.userKey, kvs, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: buildElems failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if err := c.dispatchCompaction(tickCtx, readTS, elems); err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: dispatch failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
c.logger.InfoContext(tickCtx, "delta compactor urgent: compacted key",
"type", req.typeName, "key", string(req.userKey), "delta_count", len(kvs))
// Scan and compact this specific user key in pages until the remaining
// delta count is within MaxDeltaScanLimit or the timeout expires.
prefix := h.deltaKeyPrefixFn(req.userKey)
end := store.PrefixScanEnd(prefix)
for {
if err := tickCtx.Err(); err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: timed out before completion",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
// Scan one extra beyond MaxDeltaScanLimit to detect whether more deltas
// remain after this compaction pass.
kvs, err := c.st.ScanAt(tickCtx, prefix, end, store.MaxDeltaScanLimit+1, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: scan failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if len(kvs) == 0 {
return
}
elems, err := h.buildElems(tickCtx, req.userKey, kvs, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: buildElems failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if err := c.dispatchCompaction(tickCtx, readTS, elems); err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: dispatch failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
c.logger.InfoContext(tickCtx, "delta compactor urgent: compacted key",
"type", req.typeName, "key", string(req.userKey), "delta_count", len(kvs))
if len(kvs) <= store.MaxDeltaScanLimit {
return
}
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6c2a775: compactUrgentKey now loops via compactUrgentKeyBatch until the delta count drops to or below MaxDeltaScanLimit. Each iteration gets a fresh readTS after dispatch so previously committed deletes are visible. Two passes for 1034 deltas (1025 + 9) are verified by TestDeltaCompactor_UrgentCompactionPagination.

Comment thread adapter/redis.go
Comment on lines +290 to +296
// WithRedisCompactor wires a DeltaCompactor to the RedisServer so that urgent
// single-key compaction can be triggered when ErrDeltaScanTruncated is hit.
func WithRedisCompactor(c *DeltaCompactor) RedisServerOption {
return func(r *RedisServer) {
r.compactor = c
}
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithRedisCompactor is defined, but there are no call sites wiring a DeltaCompactor into RedisServer, so triggerUrgentCompaction will always be a no-op and urgent compaction will never actually run in the current codebase. Please update the Redis server setup (where NewRedisServer is called) to pass WithRedisCompactor(deltaCompactor), or otherwise ensure the compactor gets injected.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6c2a775: WithRedisCompactor(deltaCompactor) is now passed to NewRedisServer in all three call sites — cmd/server/demo.go (setupRedis), main.go (startRedisServer), and adapter/test_util.go (setupNodes). In main.go the compactor's Run() is also wired into the errgroup.

Comment on lines +154 to +158
// TriggerUrgentCompaction queues an immediate single-key compaction for a key
// whose delta count has exceeded MaxDeltaScanLimit. The request is dropped
// silently when the channel is full (the regular tick will catch it).
func (c *DeltaCompactor) TriggerUrgentCompaction(typeName string, userKey []byte) {
if c == nil {
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New urgent compaction behavior (TriggerUrgentCompaction / compactUrgentKey) isn’t covered by tests. Since this package already has DeltaCompactor unit tests, please add coverage for at least: urgent requests compact a targeted key (delta keys deleted, base meta updated), and a key with >MaxDeltaScanLimit deltas becomes readable again (no ErrDeltaScanTruncated) after urgent compaction runs.

Copilot generated this review using guidance from repository custom instructions.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6c2a775: added two tests to adapter/redis_delta_compactor_test.go:

  • TestDeltaCompactor_UrgentCompactionTriggeredByChannel: verifies that a request queued via TriggerUrgentCompaction is processed by the Run loop (uses a large maxCount so the regular tick skips the key, confirming the urgent path compacts it).
  • TestDeltaCompactor_UrgentCompactionPagination: writes MaxDeltaScanLimit+10 = 1034 delta keys and verifies all are compacted across multiple passes, leaving no delta keys behind.

bootjp added 2 commits April 16, 2026 23:11
n is int64 but make capacity should be int for clarity; n is already
bounded by maxWideColumnItems (100_000) so the conversion is safe.
A panic inside compactUrgentKey (e.g. in buildElems) would propagate to
the Run loop goroutine and crash the server. Add the same defer/recover
guard that the SyncOnce handler goroutines already have.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment thread adapter/redis_delta_compactor.go Outdated
Comment on lines +200 to +230
readTS := snapshotTS(c.coord.Clock(), c.st)
tickCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

// Scan all delta keys for this specific user key.
prefix := h.deltaKeyPrefixFn(req.userKey)
end := store.PrefixScanEnd(prefix)
// Scan one extra beyond MaxDeltaScanLimit to collect all accumulated deltas.
kvs, err := c.st.ScanAt(tickCtx, prefix, end, store.MaxDeltaScanLimit+1, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: scan failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if len(kvs) == 0 {
return
}

elems, err := h.buildElems(tickCtx, req.userKey, kvs, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: buildElems failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if err := c.dispatchCompaction(tickCtx, readTS, elems); err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: dispatch failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
c.logger.InfoContext(tickCtx, "delta compactor urgent: compacted key",
"type", req.typeName, "key", string(req.userKey), "delta_count", len(kvs))
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compactUrgentKey caps the scan at store.MaxDeltaScanLimit+1, but the whole point of urgent compaction is to recover when a key has more than MaxDeltaScanLimit deltas. If the hot key has far more than 1024 deltas, this will compact only the first chunk and the key can remain above the limit, so reads will continue returning ErrDeltaScanTruncated. Consider looping scan+dispatch until the remaining delta count is <= MaxDeltaScanLimit (re-snapshotting after each commit), or otherwise ensuring you compact enough deltas to reliably drop below the truncation threshold; also update the comment that currently claims this collects “all accumulated deltas”.

Suggested change
readTS := snapshotTS(c.coord.Clock(), c.st)
tickCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
// Scan all delta keys for this specific user key.
prefix := h.deltaKeyPrefixFn(req.userKey)
end := store.PrefixScanEnd(prefix)
// Scan one extra beyond MaxDeltaScanLimit to collect all accumulated deltas.
kvs, err := c.st.ScanAt(tickCtx, prefix, end, store.MaxDeltaScanLimit+1, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: scan failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if len(kvs) == 0 {
return
}
elems, err := h.buildElems(tickCtx, req.userKey, kvs, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: buildElems failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if err := c.dispatchCompaction(tickCtx, readTS, elems); err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: dispatch failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
c.logger.InfoContext(tickCtx, "delta compactor urgent: compacted key",
"type", req.typeName, "key", string(req.userKey), "delta_count", len(kvs))
tickCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
// Repeatedly scan and compact this user key until the remaining delta count
// is at or below MaxDeltaScanLimit. Each pass uses a fresh snapshot so it
// observes the latest committed compaction result.
prefix := h.deltaKeyPrefixFn(req.userKey)
end := store.PrefixScanEnd(prefix)
for {
readTS := snapshotTS(c.coord.Clock(), c.st)
// Scan one extra beyond MaxDeltaScanLimit so we can detect whether
// another urgent compaction pass is still needed.
kvs, err := c.st.ScanAt(tickCtx, prefix, end, store.MaxDeltaScanLimit+1, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: scan failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if len(kvs) == 0 {
return
}
elems, err := h.buildElems(tickCtx, req.userKey, kvs, readTS)
if err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: buildElems failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
if err := c.dispatchCompaction(tickCtx, readTS, elems); err != nil {
c.logger.WarnContext(tickCtx, "delta compactor urgent: dispatch failed",
"type", req.typeName, "key", string(req.userKey), "error", err)
return
}
c.logger.InfoContext(tickCtx, "delta compactor urgent: compacted key",
"type", req.typeName, "key", string(req.userKey), "delta_count", len(kvs))
if len(kvs) <= store.MaxDeltaScanLimit {
return
}
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same pagination fix applied — compactUrgentKeyBatch loop in 6c2a775.

Comment on lines +154 to +166
// TriggerUrgentCompaction queues an immediate single-key compaction for a key
// whose delta count has exceeded MaxDeltaScanLimit. The request is dropped
// silently when the channel is full (the regular tick will catch it).
func (c *DeltaCompactor) TriggerUrgentCompaction(typeName string, userKey []byte) {
if c == nil {
return
}
req := urgentCompactionRequest{typeName: typeName, userKey: bytes.Clone(userKey)}
select {
case c.urgentCh <- req:
default: // channel full; the regular tick will catch it
}
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New urgent-compaction behavior (TriggerUrgentCompaction + compactUrgentKey + the new per-key prefix functions) doesn’t appear to be covered by tests. Since adapter/redis_delta_compactor_test.go already covers SyncOnce behavior, it would be good to add a focused test that enqueues an urgent request for a key with >MaxDeltaScanLimit deltas and asserts the compaction reduces deltas below the truncation threshold (and ideally a small adapter-level test that resolve*Meta triggers the request when ErrDeltaScanTruncated is hit).

Copilot generated this review using guidance from repository custom instructions.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered by the same two new tests in 6c2a775: TestDeltaCompactor_UrgentCompactionTriggeredByChannel and TestDeltaCompactor_UrgentCompactionPagination.

… tests

- compactUrgentKey now loops until delta count drops to or below MaxDeltaScanLimit
  so keys with >1024 deltas become readable after a single urgent compaction call
- Wire WithRedisCompactor(deltaCompactor) in all three NewRedisServer call sites
  (cmd/server/demo.go, main.go, adapter/test_util.go) so TriggerUrgentCompaction
  is reachable from the Redis handler layer
- Add TestDeltaCompactor_UrgentCompactionTriggeredByChannel to verify the channel
  mechanism delivers a queued request to the Run loop
- Add TestDeltaCompactor_UrgentCompactionPagination to verify >MaxDeltaScanLimit
  delta keys are fully compacted across multiple passes
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 16, 2026

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Comment thread adapter/redis_delta_compactor.go Outdated
slog.Any("panic", rec))
}
}()
if !c.coord.IsLeader() {
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaCompactor uses coord.IsLeader() to gate urgent compaction (and SyncOnce also gates the whole compaction pass on IsLeader()). With kv.ShardedCoordinator, IsLeader() refers to the default-group leader and can be false even when this node is the leader for the specific shard/key being served (Redis commands proxy using IsLeaderForKey(key)). This can cause urgent compaction requests (and regular compaction) to be skipped on per-key leaders, leaving hot keys stuck returning ErrDeltaScanTruncated. Consider switching to per-key leadership checks (IsLeaderForKey/VerifyLeaderForKey) and/or filtering compaction work to keys whose shard this node leads.

Suggested change
if !c.coord.IsLeader() {
if !c.coord.IsLeaderForKey(req.userKey) {

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b85690d:

  • compactUrgentKey: now uses IsLeaderForKey(req.userKey) instead of IsLeader() so urgent compaction runs on the per-key shard leader in sharded deployments.
  • SyncOnce: removed the blanket IsLeader() guard; per-key leadership is enforced inside buildBatchElems via IsLeaderForKey(userKey).
  • buildBatchElems: added IsLeaderForKey(userKey) check to skip keys whose shard this node does not lead.
  • Updated stubAdapterCoordinator.IsLeaderForKey to respect leaderSet/leader fields, and strengthened TestDeltaCompactor_NonLeaderSkips to use localAdapterCoordinator (real writes) so the assertion is meaningful.

…and SyncOnce

In sharded deployments (ShardedCoordinator), IsLeader() checks only the
default Raft group leader, causing compactUrgentKey and SyncOnce to skip
compaction even when this node leads the shard for the target key.

Changes:
- compactUrgentKey: IsLeader() → IsLeaderForKey(req.userKey) so urgent
  compaction runs on the per-key leader regardless of default-group leadership
- SyncOnce: remove the blanket IsLeader() guard; per-key leadership is now
  enforced inside buildBatchElems via IsLeaderForKey, allowing nodes to
  compact the shards they lead
- buildBatchElems: add IsLeaderForKey(userKey) check to skip keys whose shard
  this node does not lead
- stubAdapterCoordinator.IsLeaderForKey: respect leaderSet/leader fields (same
  as IsLeader) so the non-leader test exercises the new code path
- TestDeltaCompactor_NonLeaderSkips: use localAdapterCoordinator so Dispatch
  actually writes — makes the assertion meaningful with the new per-key check
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 16, 2026

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

adapter/redis_delta_compactor.go:45

  • The DeltaCompactor type-level doc comment still says it “runs as a background goroutine on the Raft leader” and that “Non-leaders skip each tick silently”, but SyncOnce/Run no longer gate on coord.IsLeader(). Please update this comment to reflect the new per-key leadership behavior (scan can run on all nodes; dispatch/compaction only occurs on IsLeaderForKey).
// DeltaCompactor folds accumulated delta keys into their corresponding base
// metadata keys for all wide-column collection types (List, Hash, Set, ZSet).
//
// It runs as a background goroutine on the Raft leader. Non-leaders skip each
// tick silently. Compaction is performed as an OCC transaction so concurrent
// writers never conflict with the compactor.

Comment thread adapter/redis_delta_compactor.go Outdated
Comment on lines 260 to 270
// SyncOnce runs one compaction pass. Keys for shards this node does not lead
// are skipped individually inside buildBatchElems via IsLeaderForKey, so the
// scan proceeds on all nodes but only the shard leader dispatches each batch.
// Each collection-type handler runs in its own goroutine so that a slow
// handler (e.g. one with many list deltas) does not delay Hash/Set/ZSet
// compaction. All goroutines share the same per-tick timeout context.
func (c *DeltaCompactor) SyncOnce(ctx context.Context) error {
if c.coord == nil || !c.coord.IsLeader() {
if c.coord == nil {
return nil
}
readTS := snapshotTS(c.coord.Clock(), c.st)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncOnce now runs on every node (no coord.IsLeader() guard). When the compactor is backed by kv.ShardStore, ScanAt proxies to shard leaders for shards this node doesn’t lead, so running this loop on all nodes can multiply cross-node scan traffic each interval. Consider adding a guard to avoid scanning when the process isn’t leader for any shard, or restricting scans to locally-led shard ranges (so followers don’t proxy full-prefix scans).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a0d2dc0: restored the IsLeader() guard in SyncOnce to prevent full-prefix delta-key scans on follower nodes. On a ShardStore backend, ScanAt proxies cross-node for shards this node doesn't lead, so running the scan on all nodes would multiply scan traffic by cluster size.

The guard ensures only the default-group leader does the full scan. The IsLeaderForKey filter added to buildBatchElems remains as a correctness layer so the default-group leader never dispatches mutations for shards it does not own. Keys for non-default shard groups continue to be handled by compactUrgentKey which uses IsLeaderForKey for per-key routing.

…s on followers

Removing the guard in the previous commit caused every node to run the
full delta-key prefix scan every 30 s. On a ShardStore backend ScanAt
proxies cross-node for shards this node does not lead, multiplying scan
traffic proportional to cluster size.

Restore the IsLeader() guard so only the default-group leader performs the
full scan. The per-key IsLeaderForKey filter added to buildBatchElems remains
as a correctness layer ensuring the default-group leader never dispatches
for shards it does not own. Keys for non-default shard groups are still
handled correctly by compactUrgentKey which uses IsLeaderForKey.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 16, 2026

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated no new comments.

@bootjp bootjp merged commit 613d308 into main Apr 16, 2026
11 checks passed
@bootjp bootjp deleted the feat/collection-metadata-delta-phase-4 branch April 16, 2026 16:46
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 16, 2026

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 16, 2026

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants