Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 73 additions & 9 deletions adapter/redis_compat_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -3702,10 +3702,34 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string,
}
return requested, nil
}
return autoXAddID(safeUnixMilliToUint64(time.Now().UnixMilli()), hasLast, lastMs, lastSeq)
}

nowMs := uint64(time.Now().UnixMilli()) //nolint:gosec // always non-negative
// autoXAddID resolves XADD '*' to a concrete stream ID given a wall-clock
// nowMs. Pulled out of nextXAddID so the auto-ID branch is testable
// without depending on time.Now() — the only un-injectable dependency is
// already isolated in the caller.
//
// Two corner cases the caller cannot rely on the wall clock to avoid:
//
// - nowMs == 0 on a fresh stream (!hasLast). A naive "<nowMs>-0" reply
// yields "0-0", which Redis explicitly rejects as a stream ID and
// which XREAD ... 0 would treat as the empty after-marker. Bump the
// seq to 1 so the first auto-generated entry is "0-1" — strictly
// greater than 0-0 and reachable via XREAD ... 0. (This case fires
// only when safeUnixMilliToUint64 clamped a pre-epoch clock to 0;
// under any sane clock, nowMs is well above 0.)
//
// - nowMs <= lastMs. Advance past lastMs/lastSeq via bumpStreamID so
// the stream stays strictly monotonic even across a backwards clock
// step or a corrupted meta where lastMs is far in the future.
func autoXAddID(nowMs uint64, hasLast bool, lastMs, lastSeq uint64) (string, error) {
if !hasLast || nowMs > lastMs {
return strconv.FormatUint(nowMs, 10) + "-0", nil
seq := uint64(0)
if nowMs == 0 {
seq = 1
}
return strconv.FormatUint(nowMs, 10) + "-" + strconv.FormatUint(seq, 10), nil
}
// Either nowMs == lastMs (same millisecond), or lastMs is in the future
// (monotonic guarantee across a backwards clock step or a corrupted
Expand All @@ -3718,6 +3742,21 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string,
return strconv.FormatUint(ms, 10) + "-" + strconv.FormatUint(seq, 10), nil
}

// safeUnixMilliToUint64 returns ms as uint64, clamping any negative value
// (caused by a system clock set before the Unix epoch) to 0. Without this
// clamp, a direct uint64 cast of a negative int64 would yield a value
// near math.MaxUint64, which would then make nextXAddID's "future-ms"
// branch chase that pathological value forever — effectively wedging
// every subsequent XADD '*' on the stream until the clock recovers.
// The lastMs/lastSeq monotonic guarantee carries the stream forward
// from there via bumpStreamID.
func safeUnixMilliToUint64(ms int64) uint64 {
if ms < 0 {
return 0
}
return uint64(ms) //nolint:gosec // negative values handled above
}

// bumpStreamID returns the strictly-greater successor of (ms, seq) within
// the uint64-uint64 stream ID space. Bumps seq; on seq overflow carries
// to ms+1, seq=0; on ms overflow returns an error (no representable
Expand Down Expand Up @@ -4305,11 +4344,14 @@ func (r *RedisServer) resolveXReadAfterIDs(ctx context.Context, req *xreadReques
}

// resolveXReadDollarID resolves the "$" after-ID for a single stream by
// asking the store for the highest ID ever assigned. New-layout streams
// answer from meta in one read; legacy blobs fall back to a full load.
// Returns streamZeroID for non-existent and empty-never-written streams.
// ctx threads through the caller's cancellation/deadline so the resolve
// step doesn't survive past a BLOCK-window cancel.
// asking the store for the highest ID ever assigned. The new-layout meta
// answers in one read; when meta is absent the stream is treated as
// empty — legacy single-blob data is intentionally ignored under the
// "discard-on-read, delete-on-write" contract documented on
// dollarIDFromState (and matching loadStreamAt). Returns streamZeroID
// for non-existent and empty-never-written streams. ctx threads through
// the caller's cancellation/deadline so the resolve step doesn't survive
// past a BLOCK-window cancel.
func (r *RedisServer) resolveXReadDollarID(ctx context.Context, key []byte) (string, error) {
readTS := r.readTS()
typ, err := r.keyTypeAt(ctx, key, readTS)
Expand Down Expand Up @@ -4567,7 +4609,12 @@ func (r *RedisServer) xread(conn redcon.Conn, cmd redcon.Command) {
// the resolve either succeeds quickly or fails cleanly, leaving
// the BLOCK-window timeout semantics (null on expiry) to the
// busy-poll below.
resolveCtx, resolveCancel := context.WithTimeout(context.Background(), redisDispatchTimeout)
//
// Parent on r.handlerContext() (not context.Background()) so an
// in-flight resolve aborts promptly when the server is shutting
// down — otherwise the per-resolve ScanAt could survive past
// graceful-shutdown's drain window.
resolveCtx, resolveCancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout)
err = r.resolveXReadAfterIDs(resolveCtx, &req)
resolveCancel()
if err != nil {
Expand All @@ -4581,7 +4628,19 @@ func (r *RedisServer) xread(conn redcon.Conn, cmd redcon.Command) {
// xreadBusyPoll runs the BLOCK-window busy-poll loop. Extracted from xread
// so the parent function stays under the cyclop budget.
func (r *RedisServer) xreadBusyPoll(conn redcon.Conn, req xreadRequest, deadline time.Time) {
handlerCtx := r.handlerContext()
for {
// Server-shutdown short-circuit: if the parent handlerContext
// has been cancelled, abandon the busy-poll immediately rather
// than spin until the BLOCK deadline. iterCtx below is rooted
// in handlerCtx, so it would cancel-on-call too — but routing
// through isXReadIterCtxError silently translates that into an
// empty iteration and the loop would burn CPU at
// redisBusyPollBackoff cadence until the deadline.
if handlerCtx.Err() != nil {
conn.WriteNull()
return
}
// BLOCK-expired before the loop body: respect the Redis contract
// that a BLOCK timeout returns null, not an error. If we fell
// through here without remaining time (very small BLOCK, or
Expand All @@ -4598,7 +4657,12 @@ func (r *RedisServer) xreadBusyPoll(conn redcon.Conn, req xreadRequest, deadline
if iterTimeout > redisDispatchTimeout {
iterTimeout = redisDispatchTimeout
}
iterCtx, iterCancel := context.WithTimeout(context.Background(), iterTimeout)
// iterCtx is rooted in handlerCtx so its underlying storage
// scans abort promptly on server shutdown rather than running
// until iterTimeout fires. The handlerCtx.Err() guard at the
// top of each iteration prevents the loop from spinning once
// the parent ctx is cancelled.
iterCtx, iterCancel := context.WithTimeout(handlerCtx, iterTimeout)
results, err := r.xreadOnce(iterCtx, req)
iterCancel()
// Per-iteration ctx hitting its deadline (or being cancelled by
Expand Down
62 changes: 62 additions & 0 deletions adapter/redis_compat_commands_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,65 @@ func TestRedis_UserKeyShadowingStreamPrefixSurvivesMultiExec(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "user-value", plain)
}

// TestRedis_StreamXReadShutdownShortCircuits guards Gemini's medium
// concern: the XREAD busy-poll loop's per-iteration ctx must be rooted
// in r.handlerContext() so that a server shutdown aborts the loop
// promptly instead of running until the BLOCK deadline. A handlerCtx
// cancellation also short-circuits the loop entry to a null reply, so
// the client sees BLOCK timeout semantics rather than a hung
// connection or a delayed -ERR.
func TestRedis_StreamXReadShutdownShortCircuits(t *testing.T) {
t.Parallel()
nodes, _, _ := createNode(t, 3)
defer shutdown(nodes)

rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
defer func() { _ = rdb.Close() }()
ctx := context.Background()

// Seed one entry and then start an XREAD that will block waiting
// for *new* entries on top of "1-0". With a 5-second BLOCK budget,
// pre-fix this would happily run for the full 5 s after Close()
// because iterCtx was rooted in context.Background(). Post-fix the
// handlerCtx.Err() guard at the top of each loop iteration kicks
// in within ~one redisBusyPollBackoff (10 ms) and we reply null.
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "stream-shutdown",
ID: "1-0",
Values: []string{"k", "v"},
}).Result()
require.NoError(t, err)

type xreadOutcome struct {
streams []redis.XStream
err error
}
xreadDone := make(chan xreadOutcome, 1)
go func() {
streams, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"stream-shutdown", "1-0"},
Count: 1,
Block: 5 * time.Second,
}).Result()
xreadDone <- xreadOutcome{streams: streams, err: err}
}()

// Give the XREAD a moment to enter the busy-poll loop, then cancel
// the server's base context. The poll loop must observe the
// cancellation and reply null well before the 5 s BLOCK deadline.
time.Sleep(50 * time.Millisecond)
require.NoError(t, nodes[0].redisServer.Close())

select {
case res := <-xreadDone:
// BLOCK timeout returns redis.Nil — the same wire-level reply
// the client would have seen if the BLOCK had expired
// naturally. The server must NOT surface the cancellation as a
// -ERR or hang the connection.
require.True(t, errors.Is(res.err, redis.Nil),
"BLOCK after shutdown must return redis.Nil, got err=%v streams=%v", res.err, res.streams)
case <-time.After(2 * time.Second):
t.Fatal("XREAD did not return within 2 s of server Close — busy-poll did not honour handlerContext cancel")
}
}
89 changes: 89 additions & 0 deletions adapter/redis_stream_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,92 @@ func TestIsKnownInternalKey_StreamPrefixNarrowed(t *testing.T) {
})
}
}

// TestSafeUnixMilliToUint64 guards Gemini's medium concern: a system
// clock set before the Unix epoch makes time.Now().UnixMilli() return a
// negative int64; a naive uint64 cast wraps to a value near
// math.MaxUint64 that wedges every subsequent XADD '*' (the future-ms
// branch in nextXAddID would chase that pathological value forever). The
// helper must clamp at 0 so the lastMs/lastSeq monotonic carry takes
// over.
func TestSafeUnixMilliToUint64(t *testing.T) {
t.Parallel()

cases := []struct {
name string
in int64
want uint64
}{
{"zero", 0, 0},
{"positive epoch ms", 1_777_000_000_000, 1_777_000_000_000},
{"max int64", math.MaxInt64, uint64(math.MaxInt64)},
// Negative values represent a clock set before the Unix epoch
// (1970-01-01). All must clamp at 0.
{"minus one", -1, 0},
{"large negative", -1_000_000_000_000, 0},
{"min int64", math.MinInt64, 0},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := safeUnixMilliToUint64(tc.in); got != tc.want {
t.Fatalf("safeUnixMilliToUint64(%d): want %d, got %d", tc.in, tc.want, got)
}
})
}
}

// TestAutoXAddID covers the XADD '*' path of nextXAddID with synthetic
// nowMs values, including the Codex P2 / Gemini-medium edge case:
// safeUnixMilliToUint64 clamps a pre-epoch clock to 0, and a naive
// "0-0" auto-ID is rejected by Redis (XREAD ... 0 treats it as the
// after-marker and skips it). autoXAddID must bump seq to 1 in that
// case so the first auto-generated entry is "0-1".
func TestAutoXAddID(t *testing.T) {
t.Parallel()

cases := []struct {
name string
nowMs uint64
hasLast bool
lastMs uint64
lastSeq uint64
want string
wantErr bool
}{
// Fresh stream, healthy clock: nowMs > 0, seq starts at 0.
{"fresh stream, sane clock", 1_777_000_000_000, false, 0, 0, "1777000000000-0", false},
// Fresh stream, clock pre-epoch (clamped to 0): MUST yield 0-1
// rather than 0-0 — the original Codex P2 / Gemini-medium case.
{"fresh stream, clamped clock → 0-1", 0, false, 0, 0, "0-1", false},
// Existing stream, nowMs strictly greater: seq resets to 0.
{"clock advanced past lastMs", 200, true, 100, 5, "200-0", false},
// Existing stream, nowMs == lastMs: bumpStreamID seq carry.
{"same ms as lastMs", 100, true, 100, 5, "100-6", false},
// Existing stream, nowMs < lastMs (clock went backwards):
// bumpStreamID carries from lastMs/lastSeq, NOT from nowMs.
{"clock behind lastMs", 50, true, 100, 5, "100-6", false},
// seq at MaxUint64 carries to ms+1.
{"seq at max carries", 100, true, 100, ^uint64(0), "101-0", false},
// Both ms and seq at MaxUint64: ID space exhausted, error.
{"ID space exhausted", 100, true, ^uint64(0), ^uint64(0), "", true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got, err := autoXAddID(tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq)
if tc.wantErr {
if err == nil {
t.Fatalf("autoXAddID(%d,%v,%d,%d): expected error, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, got)
}
return
}
if err != nil {
t.Fatalf("autoXAddID(%d,%v,%d,%d): unexpected error %v", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, err)
}
if got != tc.want {
t.Fatalf("autoXAddID(%d,%v,%d,%d): want %q, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, tc.want, got)
}
})
}
}
Loading