From 0a969c3d096bde431a0f032d2cc1c990730a263c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 18:41:14 +0900 Subject: [PATCH 1/2] fix(redis,stream): shutdown-aware XREAD ctx + clock-before-epoch defence + comment fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to PR #620 (already merged): address Gemini's round-final medium-priority review (https://github.com/bootjp/elastickv/pull/620#pullrequestreview-4175384316), which landed after the merge. Each change ships with a regression test. Defensive cast on time.Now().UnixMilli(): - nextXAddID's "*" branch cast time.Now().UnixMilli() to uint64 directly. If the system clock is ever set before the Unix epoch (1970-01-01) UnixMilli returns a negative int64 and the cast wraps to a value near math.MaxUint64, which then makes the future-ms branch chase that pathological value forever — wedging every subsequent XADD '*' on the stream until the clock recovers. - New helper safeUnixMilliToUint64(ms int64) uint64 clamps negative values to 0; nextXAddID now goes through it. The lastMs/lastSeq monotonic guarantee then carries the stream forward via bumpStreamID. Shutdown-aware XREAD context plumbing: - xread's $-resolution context (resolveCtx) now derives from r.handlerContext() instead of context.Background(), so an in-flight resolve aborts promptly on graceful shutdown rather than surviving past the drain window. - xreadBusyPoll's per-iteration context (iterCtx) likewise reroots on r.handlerContext(). Without this, a 5-second BLOCK would ignore Close() and run to completion. The loop also short- circuits at the top of each iteration when handlerCtx is cancelled — relying on iterCtx alone would still spin at the redisBusyPollBackoff cadence because isXReadIterCtxError silently translates the per-iteration cancel into "empty iteration". - Net effect: XREAD ... BLOCK on a server in shutdown returns the standard null-on-timeout reply within ~one redisBusyPollBackoff (~10 ms) instead of waiting up to . Comment alignment: - resolveXReadDollarID's doc comment said "legacy blobs fall back to a full load", but PR #620 deliberately removed that fallback in favour of the "discard-on-read, delete-on-write" contract. The comment is now consistent with what dollarIDFromState actually does. Tests: - TestSafeUnixMilliToUint64 covers the clamp at 0, MaxInt64 pass-through, and the three negative cases (-1, large negative, math.MinInt64). - TestRedis_StreamXReadShutdownShortCircuits seeds a stream, fires XREAD ... BLOCK 5s in a goroutine, calls redisServer.Close() ~50 ms later, and asserts the XREAD returns redis.Nil within 2 s. Pre-fix the test would block for the full 5 s; post-fix it unblocks on the next iteration boundary. Build / vet / lint clean. --- adapter/redis_compat_commands.go | 56 +++++++++++++++--- adapter/redis_compat_commands_stream_test.go | 62 ++++++++++++++++++++ adapter/redis_stream_limit_test.go | 34 +++++++++++ 3 files changed, 144 insertions(+), 8 deletions(-) diff --git a/adapter/redis_compat_commands.go b/adapter/redis_compat_commands.go index 09c45d708..43a61b0ac 100644 --- a/adapter/redis_compat_commands.go +++ b/adapter/redis_compat_commands.go @@ -3703,7 +3703,7 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string, return requested, nil } - nowMs := uint64(time.Now().UnixMilli()) //nolint:gosec // always non-negative + nowMs := safeUnixMilliToUint64(time.Now().UnixMilli()) if !hasLast || nowMs > lastMs { return strconv.FormatUint(nowMs, 10) + "-0", nil } @@ -3718,6 +3718,21 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string, return strconv.FormatUint(ms, 10) + "-" + strconv.FormatUint(seq, 10), nil } +// safeUnixMilliToUint64 returns ms as uint64, clamping any negative value +// (caused by a system clock set before the Unix epoch) to 0. Without this +// clamp, a direct uint64 cast of a negative int64 would yield a value +// near math.MaxUint64, which would then make nextXAddID's "future-ms" +// branch chase that pathological value forever — effectively wedging +// every subsequent XADD '*' on the stream until the clock recovers. +// The lastMs/lastSeq monotonic guarantee carries the stream forward +// from there via bumpStreamID. +func safeUnixMilliToUint64(ms int64) uint64 { + if ms < 0 { + return 0 + } + return uint64(ms) //nolint:gosec // negative values handled above +} + // bumpStreamID returns the strictly-greater successor of (ms, seq) within // the uint64-uint64 stream ID space. Bumps seq; on seq overflow carries // to ms+1, seq=0; on ms overflow returns an error (no representable @@ -4305,11 +4320,14 @@ func (r *RedisServer) resolveXReadAfterIDs(ctx context.Context, req *xreadReques } // resolveXReadDollarID resolves the "$" after-ID for a single stream by -// asking the store for the highest ID ever assigned. New-layout streams -// answer from meta in one read; legacy blobs fall back to a full load. -// Returns streamZeroID for non-existent and empty-never-written streams. -// ctx threads through the caller's cancellation/deadline so the resolve -// step doesn't survive past a BLOCK-window cancel. +// asking the store for the highest ID ever assigned. The new-layout meta +// answers in one read; when meta is absent the stream is treated as +// empty — legacy single-blob data is intentionally ignored under the +// "discard-on-read, delete-on-write" contract documented on +// dollarIDFromState (and matching loadStreamAt). Returns streamZeroID +// for non-existent and empty-never-written streams. ctx threads through +// the caller's cancellation/deadline so the resolve step doesn't survive +// past a BLOCK-window cancel. func (r *RedisServer) resolveXReadDollarID(ctx context.Context, key []byte) (string, error) { readTS := r.readTS() typ, err := r.keyTypeAt(ctx, key, readTS) @@ -4567,7 +4585,12 @@ func (r *RedisServer) xread(conn redcon.Conn, cmd redcon.Command) { // the resolve either succeeds quickly or fails cleanly, leaving // the BLOCK-window timeout semantics (null on expiry) to the // busy-poll below. - resolveCtx, resolveCancel := context.WithTimeout(context.Background(), redisDispatchTimeout) + // + // Parent on r.handlerContext() (not context.Background()) so an + // in-flight resolve aborts promptly when the server is shutting + // down — otherwise the per-resolve ScanAt could survive past + // graceful-shutdown's drain window. + resolveCtx, resolveCancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout) err = r.resolveXReadAfterIDs(resolveCtx, &req) resolveCancel() if err != nil { @@ -4581,7 +4604,19 @@ func (r *RedisServer) xread(conn redcon.Conn, cmd redcon.Command) { // xreadBusyPoll runs the BLOCK-window busy-poll loop. Extracted from xread // so the parent function stays under the cyclop budget. func (r *RedisServer) xreadBusyPoll(conn redcon.Conn, req xreadRequest, deadline time.Time) { + handlerCtx := r.handlerContext() for { + // Server-shutdown short-circuit: if the parent handlerContext + // has been cancelled, abandon the busy-poll immediately rather + // than spin until the BLOCK deadline. iterCtx below is rooted + // in handlerCtx, so it would cancel-on-call too — but routing + // through isXReadIterCtxError silently translates that into an + // empty iteration and the loop would burn CPU at + // redisBusyPollBackoff cadence until the deadline. + if handlerCtx.Err() != nil { + conn.WriteNull() + return + } // BLOCK-expired before the loop body: respect the Redis contract // that a BLOCK timeout returns null, not an error. If we fell // through here without remaining time (very small BLOCK, or @@ -4598,7 +4633,12 @@ func (r *RedisServer) xreadBusyPoll(conn redcon.Conn, req xreadRequest, deadline if iterTimeout > redisDispatchTimeout { iterTimeout = redisDispatchTimeout } - iterCtx, iterCancel := context.WithTimeout(context.Background(), iterTimeout) + // iterCtx is rooted in handlerCtx so its underlying storage + // scans abort promptly on server shutdown rather than running + // until iterTimeout fires. The handlerCtx.Err() guard at the + // top of each iteration prevents the loop from spinning once + // the parent ctx is cancelled. + iterCtx, iterCancel := context.WithTimeout(handlerCtx, iterTimeout) results, err := r.xreadOnce(iterCtx, req) iterCancel() // Per-iteration ctx hitting its deadline (or being cancelled by diff --git a/adapter/redis_compat_commands_stream_test.go b/adapter/redis_compat_commands_stream_test.go index 018241799..862417dfa 100644 --- a/adapter/redis_compat_commands_stream_test.go +++ b/adapter/redis_compat_commands_stream_test.go @@ -763,3 +763,65 @@ func TestRedis_UserKeyShadowingStreamPrefixSurvivesMultiExec(t *testing.T) { require.NoError(t, err) require.Equal(t, "user-value", plain) } + +// TestRedis_StreamXReadShutdownShortCircuits guards Gemini's medium +// concern: the XREAD busy-poll loop's per-iteration ctx must be rooted +// in r.handlerContext() so that a server shutdown aborts the loop +// promptly instead of running until the BLOCK deadline. A handlerCtx +// cancellation also short-circuits the loop entry to a null reply, so +// the client sees BLOCK timeout semantics rather than a hung +// connection or a delayed -ERR. +func TestRedis_StreamXReadShutdownShortCircuits(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 3) + defer shutdown(nodes) + + rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress}) + defer func() { _ = rdb.Close() }() + ctx := context.Background() + + // Seed one entry and then start an XREAD that will block waiting + // for *new* entries on top of "1-0". With a 5-second BLOCK budget, + // pre-fix this would happily run for the full 5 s after Close() + // because iterCtx was rooted in context.Background(). Post-fix the + // handlerCtx.Err() guard at the top of each loop iteration kicks + // in within ~one redisBusyPollBackoff (10 ms) and we reply null. + _, err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream-shutdown", + ID: "1-0", + Values: []string{"k", "v"}, + }).Result() + require.NoError(t, err) + + type xreadOutcome struct { + streams []redis.XStream + err error + } + xreadDone := make(chan xreadOutcome, 1) + go func() { + streams, err := rdb.XRead(ctx, &redis.XReadArgs{ + Streams: []string{"stream-shutdown", "1-0"}, + Count: 1, + Block: 5 * time.Second, + }).Result() + xreadDone <- xreadOutcome{streams: streams, err: err} + }() + + // Give the XREAD a moment to enter the busy-poll loop, then cancel + // the server's base context. The poll loop must observe the + // cancellation and reply null well before the 5 s BLOCK deadline. + time.Sleep(50 * time.Millisecond) + require.NoError(t, nodes[0].redisServer.Close()) + + select { + case res := <-xreadDone: + // BLOCK timeout returns redis.Nil — the same wire-level reply + // the client would have seen if the BLOCK had expired + // naturally. The server must NOT surface the cancellation as a + // -ERR or hang the connection. + require.True(t, errors.Is(res.err, redis.Nil), + "BLOCK after shutdown must return redis.Nil, got err=%v streams=%v", res.err, res.streams) + case <-time.After(2 * time.Second): + t.Fatal("XREAD did not return within 2 s of server Close — busy-poll did not honour handlerContext cancel") + } +} diff --git a/adapter/redis_stream_limit_test.go b/adapter/redis_stream_limit_test.go index 3a0cf855a..3cfc56065 100644 --- a/adapter/redis_stream_limit_test.go +++ b/adapter/redis_stream_limit_test.go @@ -304,3 +304,37 @@ func TestIsKnownInternalKey_StreamPrefixNarrowed(t *testing.T) { }) } } + +// TestSafeUnixMilliToUint64 guards Gemini's medium concern: a system +// clock set before the Unix epoch makes time.Now().UnixMilli() return a +// negative int64; a naive uint64 cast wraps to a value near +// math.MaxUint64 that wedges every subsequent XADD '*' (the future-ms +// branch in nextXAddID would chase that pathological value forever). The +// helper must clamp at 0 so the lastMs/lastSeq monotonic carry takes +// over. +func TestSafeUnixMilliToUint64(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + in int64 + want uint64 + }{ + {"zero", 0, 0}, + {"positive epoch ms", 1_777_000_000_000, 1_777_000_000_000}, + {"max int64", math.MaxInt64, uint64(math.MaxInt64)}, + // Negative values represent a clock set before the Unix epoch + // (1970-01-01). All must clamp at 0. + {"minus one", -1, 0}, + {"large negative", -1_000_000_000_000, 0}, + {"min int64", math.MinInt64, 0}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + if got := safeUnixMilliToUint64(tc.in); got != tc.want { + t.Fatalf("safeUnixMilliToUint64(%d): want %d, got %d", tc.in, tc.want, got) + } + }) + } +} From f7269818fee39e2fb2f6a771b1bd8aefcb2671d0 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 19:13:19 +0900 Subject: [PATCH 2/2] fix(redis,stream): XADD '*' avoids 0-0 when clock is pre-epoch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gemini medium + Codex P2 round 1 on PR #631: the original change clamped a pre-epoch clock to nowMs=0 via safeUnixMilliToUint64, but on a fresh stream the auto-ID branch then returned "0-0" — the very ID the same function rejects as invalid for explicitly-requested IDs and which XREAD ... 0 treats as the empty after-marker. Fix: extract the auto-ID branch into autoXAddID(nowMs, hasLast, ...) so it can be unit-tested without the wall clock; bump seq to 1 when nowMs is 0 so the first auto-generated entry is "0-1" rather than "0-0". The existing bumpStreamID-on-collision path is unchanged. Tests: - TestAutoXAddID covers the fresh-stream branch (sane clock, clamped clock → 0-1), the same-ms collision (bump to seq+1), the clock-behind-lastMs case (carry from lastMs/lastSeq, not nowMs), the seq-at-MaxUint64 carry to ms+1, and the ID-space-exhausted error. Build / vet / lint clean. --- adapter/redis_compat_commands.go | 28 +++++++++++++-- adapter/redis_stream_limit_test.go | 55 ++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/adapter/redis_compat_commands.go b/adapter/redis_compat_commands.go index 43a61b0ac..5ffc9c0a1 100644 --- a/adapter/redis_compat_commands.go +++ b/adapter/redis_compat_commands.go @@ -3702,10 +3702,34 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string, } return requested, nil } + return autoXAddID(safeUnixMilliToUint64(time.Now().UnixMilli()), hasLast, lastMs, lastSeq) +} - nowMs := safeUnixMilliToUint64(time.Now().UnixMilli()) +// autoXAddID resolves XADD '*' to a concrete stream ID given a wall-clock +// nowMs. Pulled out of nextXAddID so the auto-ID branch is testable +// without depending on time.Now() — the only un-injectable dependency is +// already isolated in the caller. +// +// Two corner cases the caller cannot rely on the wall clock to avoid: +// +// - nowMs == 0 on a fresh stream (!hasLast). A naive "-0" reply +// yields "0-0", which Redis explicitly rejects as a stream ID and +// which XREAD ... 0 would treat as the empty after-marker. Bump the +// seq to 1 so the first auto-generated entry is "0-1" — strictly +// greater than 0-0 and reachable via XREAD ... 0. (This case fires +// only when safeUnixMilliToUint64 clamped a pre-epoch clock to 0; +// under any sane clock, nowMs is well above 0.) +// +// - nowMs <= lastMs. Advance past lastMs/lastSeq via bumpStreamID so +// the stream stays strictly monotonic even across a backwards clock +// step or a corrupted meta where lastMs is far in the future. +func autoXAddID(nowMs uint64, hasLast bool, lastMs, lastSeq uint64) (string, error) { if !hasLast || nowMs > lastMs { - return strconv.FormatUint(nowMs, 10) + "-0", nil + seq := uint64(0) + if nowMs == 0 { + seq = 1 + } + return strconv.FormatUint(nowMs, 10) + "-" + strconv.FormatUint(seq, 10), nil } // Either nowMs == lastMs (same millisecond), or lastMs is in the future // (monotonic guarantee across a backwards clock step or a corrupted diff --git a/adapter/redis_stream_limit_test.go b/adapter/redis_stream_limit_test.go index 3cfc56065..62ab96c9e 100644 --- a/adapter/redis_stream_limit_test.go +++ b/adapter/redis_stream_limit_test.go @@ -338,3 +338,58 @@ func TestSafeUnixMilliToUint64(t *testing.T) { }) } } + +// TestAutoXAddID covers the XADD '*' path of nextXAddID with synthetic +// nowMs values, including the Codex P2 / Gemini-medium edge case: +// safeUnixMilliToUint64 clamps a pre-epoch clock to 0, and a naive +// "0-0" auto-ID is rejected by Redis (XREAD ... 0 treats it as the +// after-marker and skips it). autoXAddID must bump seq to 1 in that +// case so the first auto-generated entry is "0-1". +func TestAutoXAddID(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + nowMs uint64 + hasLast bool + lastMs uint64 + lastSeq uint64 + want string + wantErr bool + }{ + // Fresh stream, healthy clock: nowMs > 0, seq starts at 0. + {"fresh stream, sane clock", 1_777_000_000_000, false, 0, 0, "1777000000000-0", false}, + // Fresh stream, clock pre-epoch (clamped to 0): MUST yield 0-1 + // rather than 0-0 — the original Codex P2 / Gemini-medium case. + {"fresh stream, clamped clock → 0-1", 0, false, 0, 0, "0-1", false}, + // Existing stream, nowMs strictly greater: seq resets to 0. + {"clock advanced past lastMs", 200, true, 100, 5, "200-0", false}, + // Existing stream, nowMs == lastMs: bumpStreamID seq carry. + {"same ms as lastMs", 100, true, 100, 5, "100-6", false}, + // Existing stream, nowMs < lastMs (clock went backwards): + // bumpStreamID carries from lastMs/lastSeq, NOT from nowMs. + {"clock behind lastMs", 50, true, 100, 5, "100-6", false}, + // seq at MaxUint64 carries to ms+1. + {"seq at max carries", 100, true, 100, ^uint64(0), "101-0", false}, + // Both ms and seq at MaxUint64: ID space exhausted, error. + {"ID space exhausted", 100, true, ^uint64(0), ^uint64(0), "", true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, err := autoXAddID(tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq) + if tc.wantErr { + if err == nil { + t.Fatalf("autoXAddID(%d,%v,%d,%d): expected error, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, got) + } + return + } + if err != nil { + t.Fatalf("autoXAddID(%d,%v,%d,%d): unexpected error %v", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, err) + } + if got != tc.want { + t.Fatalf("autoXAddID(%d,%v,%d,%d): want %q, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, tc.want, got) + } + }) + } +}