Skip to content

fix(redis,stream): shutdown-aware XREAD ctx + clock-epoch defence (#620 follow-up)#631

Merged
bootjp merged 2 commits intomainfrom
fix/redis-stream-shutdown-ctx
Apr 25, 2026
Merged

fix(redis,stream): shutdown-aware XREAD ctx + clock-epoch defence (#620 follow-up)#631
bootjp merged 2 commits intomainfrom
fix/redis-stream-shutdown-ctx

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 25, 2026

Summary

Follow-up to merged PR #620, addressing the Gemini medium-priority review that landed after the merge. Three changes, all with regression tests:

  • safeUnixMilliToUint64 defends nextXAddID's * branch against a system clock set before the Unix epoch (a negative UnixMilli would wrap to a math.MaxUint64-adjacent value and wedge every subsequent XADD '*' chasing that pathological future-ms).
  • xread's $-resolve and xreadBusyPoll's per-iteration ctx now derive from r.handlerContext() instead of context.Background(), so a server Close() aborts in-flight BLOCK XREADs within ~one redisBusyPollBackoff (10 ms) instead of running until the BLOCK deadline. The busy-poll loop also short-circuits at the top of each iteration when handlerCtx is cancelled — necessary because isXReadIterCtxError would otherwise silently translate the per-iteration cancel into "empty iteration" and let the loop spin at backoff cadence.
  • resolveXReadDollarID's doc comment said "legacy blobs fall back to a full load"; PR perf(redis): stream entry-per-key layout for O(new) XREAD #620 deliberately removed that fallback in favour of the "discard-on-read, delete-on-write" contract. Comment now matches the implementation.

Test plan

  • TestSafeUnixMilliToUint64 (unit) — clamp-at-0 for negatives (-1, large negative, MinInt64); pass-through for 0, positive, MaxInt64.
  • TestRedis_StreamXReadShutdownShortCircuits (e2e) — XREAD ... BLOCK 5s in a goroutine, redisServer.Close() after 50 ms, assert reply is redis.Nil within 2 s.
  • Build / vet / lint clean.
  • Existing TestRedis_StreamXReadShortBlockReturnsNullNotError and TestRedis_StreamXReadIterCtxDeadlineReturnsNull still pass — the shutdown short-circuit doesn't perturb the existing BLOCK-timeout-returns-null path.

/gemini review
@codex review

…nce + comment fix

Follow-up to PR #620 (already merged): address Gemini's round-final
medium-priority review (#620 (review)),
which landed after the merge. Each change ships with a regression test.

Defensive cast on time.Now().UnixMilli():

- nextXAddID's "*" branch cast time.Now().UnixMilli() to uint64
  directly. If the system clock is ever set before the Unix epoch
  (1970-01-01) UnixMilli returns a negative int64 and the cast wraps
  to a value near math.MaxUint64, which then makes the future-ms
  branch chase that pathological value forever — wedging every
  subsequent XADD '*' on the stream until the clock recovers.
- New helper safeUnixMilliToUint64(ms int64) uint64 clamps negative
  values to 0; nextXAddID now goes through it. The lastMs/lastSeq
  monotonic guarantee then carries the stream forward via bumpStreamID.

Shutdown-aware XREAD context plumbing:

- xread's $-resolution context (resolveCtx) now derives from
  r.handlerContext() instead of context.Background(), so an
  in-flight resolve aborts promptly on graceful shutdown rather
  than surviving past the drain window.
- xreadBusyPoll's per-iteration context (iterCtx) likewise reroots
  on r.handlerContext(). Without this, a 5-second BLOCK would
  ignore Close() and run to completion. The loop also short-
  circuits at the top of each iteration when handlerCtx is
  cancelled — relying on iterCtx alone would still spin at the
  redisBusyPollBackoff cadence because isXReadIterCtxError
  silently translates the per-iteration cancel into "empty
  iteration".
- Net effect: XREAD ... BLOCK <duration> on a server in shutdown
  returns the standard null-on-timeout reply within ~one
  redisBusyPollBackoff (~10 ms) instead of waiting up to <duration>.

Comment alignment:

- resolveXReadDollarID's doc comment said "legacy blobs fall back
  to a full load", but PR #620 deliberately removed that fallback
  in favour of the "discard-on-read, delete-on-write" contract.
  The comment is now consistent with what dollarIDFromState
  actually does.

Tests:

- TestSafeUnixMilliToUint64 covers the clamp at 0, MaxInt64
  pass-through, and the three negative cases (-1, large negative,
  math.MinInt64).
- TestRedis_StreamXReadShutdownShortCircuits seeds a stream, fires
  XREAD ... BLOCK 5s in a goroutine, calls redisServer.Close()
  ~50 ms later, and asserts the XREAD returns redis.Nil within
  2 s. Pre-fix the test would block for the full 5 s; post-fix it
  unblocks on the next iteration boundary.

Build / vet / lint clean.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 25, 2026

Warning

Rate limit exceeded

@bootjp has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 51 minutes and 26 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 51 minutes and 26 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0d24375e-b385-429e-9473-4a206d417076

📥 Commits

Reviewing files that changed from the base of the PR and between 21cf2c9 and f726981.

📒 Files selected for processing (3)
  • adapter/redis_compat_commands.go
  • adapter/redis_compat_commands_stream_test.go
  • adapter/redis_stream_limit_test.go
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/redis-stream-shutdown-ctx

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request improves the robustness of Redis stream operations by handling edge cases in system clock values and ensuring graceful shutdown. It introduces a safe conversion for Unix milliseconds to prevent pathological stream ID generation when the system clock is set before the Unix epoch. Additionally, it updates XREAD busy-polling to use the server's handler context, allowing blocking reads to abort promptly during shutdown. Feedback was provided to ensure that auto-generated IDs do not result in the forbidden 0-0 value when the clock is at epoch zero.

Comment thread adapter/redis_compat_commands.go Outdated
Comment on lines 3706 to 3709
nowMs := safeUnixMilliToUint64(time.Now().UnixMilli())
if !hasLast || nowMs > lastMs {
return strconv.FormatUint(nowMs, 10) + "-0", nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The auto-generated ID logic can produce 0-0 if the system clock is at or before the Unix epoch (clamped to 0 by safeUnixMilliToUint64) and the stream is empty. Redis explicitly forbids 0-0 as a stream ID. When nowMs is 0 and the stream has no previous entries, the sequence should start at 1 to ensure the ID is greater than 0-0 (i.e., 0-1).

Suggested change
nowMs := safeUnixMilliToUint64(time.Now().UnixMilli())
if !hasLast || nowMs > lastMs {
return strconv.FormatUint(nowMs, 10) + "-0", nil
}
nowMs := safeUnixMilliToUint64(time.Now().UnixMilli())
if !hasLast || nowMs > lastMs {
// Redis forbids 0-0; if the clock is at epoch 0, start at 0-1.
seq := "0"
if nowMs == 0 {
seq = "1"
}
return strconv.FormatUint(nowMs, 10) + "-" + seq, nil
}

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0a969c3d09

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/redis_compat_commands.go Outdated
Comment on lines 3707 to 3708
if !hasLast || nowMs > lastMs {
return strconv.FormatUint(nowMs, 10) + "-0", nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent XADD '*' from emitting stream ID 0-0

Clamping negative wall-clock milliseconds to 0 means the !hasLast path now returns "0-0" when the system clock is pre-epoch. This conflicts with the same function’s invariant that IDs <= 0-0 are invalid/unreachable via XREAD ... 0, so the first auto-generated entry can become invisible to normal consumers in exactly the edge case this patch targets. In this branch, ensure the generated ID is strictly greater than 0-0 (for example by bumping to 0-1 when nowMs == 0).

Useful? React with 👍 / 👎.

Gemini medium + Codex P2 round 1 on PR #631: the original change
clamped a pre-epoch clock to nowMs=0 via safeUnixMilliToUint64, but on
a fresh stream the auto-ID branch then returned "0-0" — the very ID
the same function rejects as invalid for explicitly-requested IDs and
which XREAD ... 0 treats as the empty after-marker.

Fix: extract the auto-ID branch into autoXAddID(nowMs, hasLast, ...)
so it can be unit-tested without the wall clock; bump seq to 1 when
nowMs is 0 so the first auto-generated entry is "0-1" rather than
"0-0". The existing bumpStreamID-on-collision path is unchanged.

Tests:

- TestAutoXAddID covers the fresh-stream branch (sane clock, clamped
  clock → 0-1), the same-ms collision (bump to seq+1), the
  clock-behind-lastMs case (carry from lastMs/lastSeq, not nowMs),
  the seq-at-MaxUint64 carry to ms+1, and the ID-space-exhausted
  error.

Build / vet / lint clean.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 25, 2026

Addressed Gemini medium + Codex P2 (両方とも同じ問題) in f726981.

指摘: safeUnixMilliToUint64 でクロックを 0 にクランプ → 鮮なストリームの auto-ID パスが "0-0" を返してしまう。Redis は 0-0 を invalid stream ID として明示的に拒否しており、XREAD ... 00-0 を after-marker として無視する。

修正: auto-ID ブランチを autoXAddID(nowMs, hasLast, lastMs, lastSeq) ヘルパーに分離。nowMs == 0 の場合は seq を 1 にして "0-1" を返す (pre-epoch クロックでクランプされた場合の唯一の発生条件)。time.Now() への依存を caller 側に押し出したので、ヘルパー単体で synthetic な nowMs でユニットテスト可能になった。

TestAutoXAddID で網羅:

  • 鮮なストリーム + 健全なクロック → <nowMs>-0
  • 鮮なストリーム + クランプされたクロック (nowMs=0) → 0-1 (本指摘の核)
  • nowMs > lastMs → seq=0 リセット
  • nowMs == lastMs → bumpStreamID で seq+1
  • nowMs < lastMs (時計が戻った) → lastMs/lastSeq からキャリー
  • seq=MaxUint64 → ms+1, seq=0
  • ms=seq=MaxUint64 → ID space exhausted error

Build / vet / lint clean.

/gemini review
@codex review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request improves the reliability of Redis stream operations by handling edge cases related to system clock inaccuracies and server shutdown. It introduces autoXAddID to prevent invalid "0-0" stream IDs and safeUnixMilliToUint64 to clamp negative timestamps. Furthermore, it updates the XREAD implementation to use the server's handler context, ensuring that blocking operations abort promptly during shutdown rather than waiting for timeouts. I have no feedback to provide.

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Another round soon, please!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@bootjp bootjp merged commit 4db754f into main Apr 25, 2026
8 checks passed
@bootjp bootjp deleted the fix/redis-stream-shutdown-ctx branch April 25, 2026 11:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant