From b2d0b82c745ed8427930184cd4da928e12a8a16c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 19 May 2026 18:24:01 +0900 Subject: [PATCH 1/6] backup: Redis stream encoder (Phase 0a) Decodes !stream|meta|/!stream|entry| snapshot records into per-stream streams/.jsonl files per the Phase 0 design (lines 336-344). Mirrors the hash/list/set/zset encoders (#725/#755/#758/#790). Wire format: - !stream|meta| -> 24-byte BE Length(8) || LastMs(8) || LastSeq(8) - !stream|entry| -> magic-prefixed pb.RedisStreamEntry protobuf (0x00 'R' 'X' 'E' 0x01 || pb.Marshal(...)) Output is JSONL -- one record per line, sorted by (ms, seq) -- plus a trailing _meta terminator that captures length, last_ms, last_seq, and expire_at_ms (the design's pattern at line 338-339). The interleaved (name, value) field list from the protobuf decodes into the "fields" JSON object matching the design example. Per-line JSONL was chosen over per-entry files because real streams routinely hold tens of thousands of entries (one file per entry would dominate tar + find runtime by inode pressure). Fail-closed behavior: - Magic-prefix missing on an entry value -> ErrRedisInvalidStreamEntry. The live store always writes the prefix; its absence indicates corruption or a stale legacy value. Decoding raw protobuf without the prefix would either silently misparse or panic inside protobuf. - Odd field count -> ErrRedisInvalidStreamEntry. Live XADD enforces even arity at the wire level; an odd count at backup time would silently drop the dangling field if accepted. - Meta value of wrong length / overflow -> ErrRedisInvalidStreamMeta (same shape as the hash/list/set/zset overflow guards). - Entry key without the trailing 16-byte StreamID -> ErrRedisInvalidStreamKey. TTL routing: !redis|ttl| for a registered stream key folds into the JSONL _meta terminator's expire_at_ms field, matching the design's line 341-344 explicit requirement. Without this routing, a TTL'd stream would silently restore as permanent. Self-review: 1. Data loss -- magic-prefix, even-arity, and overflow guards all fail closed. Entry value is cloned (cloneStringSlice on the protobuf output) so a follow-up edit to the snapshot buffer cannot mutate emitted state. 2. Concurrency -- RedisDB is sequential per scope; no shared state. 3. Performance -- per-stream state in a slice (not a map) so XADD- order accumulation costs O(n); sort at flush is O(n log n) on (ms, seq). JSONL output is streamed via bytes.Buffer (single allocation grow). Matches list/zset cost shape. 4. Consistency -- entries sorted by (ms, seq) tuple, NOT by formatted string (sorting "10-0" vs "2-0" lexicographically would emit them out of XADD order); _meta last_ms/last_seq preserved verbatim so a restorer keeps XADD '*' monotonicity. JSONL terminator marker _meta:true is on a dedicated line so streaming consumers can detect end-of-stream without reading the whole file. 5. Coverage -- 14 table-driven tests under redis_stream_test.go: - round-trip basic (out-of-order entries, sorted at flush) - fields decoded to JSON object (design example match) - empty stream still emits file - TTL inlining into _meta.expire_at_ms - length-mismatch warning - malformed meta length / overflow / MaxInt64 boundary - missing magic prefix rejection - odd field count rejection - malformed entry key (wrong StreamID suffix length) - entries-without-meta still emit file - ID wire format ("-" decimal) - multi-stream user-key sort order Caller audit for semantics-changing edit (new case redisKindStream branch in HandleTTL, redis_string.go:309): purely additive -- the new branch fires only when streamState() has previously registered the key. No prior call site changes behavior. Verified via 'grep -n redisKindStream internal/backup/': three refs, all new in this PR. --- internal/backup/redis_stream.go | 371 +++++++++++++++++++++ internal/backup/redis_stream_test.go | 461 +++++++++++++++++++++++++++ internal/backup/redis_string.go | 22 +- 3 files changed, 852 insertions(+), 2 deletions(-) create mode 100644 internal/backup/redis_stream.go create mode 100644 internal/backup/redis_stream_test.go diff --git a/internal/backup/redis_stream.go b/internal/backup/redis_stream.go new file mode 100644 index 00000000..dcfb8d2d --- /dev/null +++ b/internal/backup/redis_stream.go @@ -0,0 +1,371 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "math" + "path/filepath" + "sort" + "strconv" + + pb "github.com/bootjp/elastickv/proto" + cockroachdberr "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" +) + +// Redis stream encoder. Translates raw !stream|... snapshot records +// into the per-stream `streams/.jsonl` shape defined by Phase 0 +// (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md, lines +// 336-344). +// +// Wire format mirrors store/stream_helpers.go and +// adapter/redis_storage_codec.go: +// - !stream|meta| +// → 24-byte BE Length(8) || LastMs(8) || LastSeq(8) +// - !stream|entry| +// → magic-prefixed pb.RedisStreamEntry protobuf with fields +// {id string, fields []string} where Fields is the +// interleaved (name1, value1, name2, value2, ...) XADD +// field list. +// +// The protobuf entry value carries a magic prefix +// `0x00 'R' 'X' 'E' 0x01` (mirror of +// adapter/redis_storage_codec.go:17 storedRedisStreamEntryProtoPrefix); +// re-declared here so this package stays adapter-independent. +// +// Output is JSONL (one record per line) plus a trailing `_meta` +// terminator line that captures length, last_ms, last_seq, and TTL. +// Per the design line 336-339: +// +// {"id":"1714400000000-0","fields":{"event":"login","user":"alice"}} +// {"_meta":true,"length":2,"last_ms":1714400000001,"last_seq":0, +// "expire_at_ms":null} +// +// JSONL was chosen for streams over per-entry files because real +// streams routinely hold tens of thousands of entries and per-entry +// inode pressure would dominate `tar`/`find` runtime. +const ( + RedisStreamMetaPrefix = "!stream|meta|" + RedisStreamEntryPrefix = "!stream|entry|" + + // redisStreamMetaSize is the on-disk size of one !stream|meta| + // value: Length(8) || LastMs(8) || LastSeq(8). Mirrors + // store.streamMetaBinarySize; duplicated here to keep the backup + // package free of `store` imports. + redisStreamMetaSize = 24 + + // redisStreamIDBytes is the per-entry-key suffix size: ms(8) + // || seq(8). Mirrors store.StreamIDBytes. + redisStreamIDBytes = 16 + + // redisStreamProtoPrefix is the magic byte prefix on the stored + // pb.RedisStreamEntry serialization. Mirrors + // adapter/redis_storage_codec.go:storedRedisStreamEntryProtoPrefix. + // A live-side rename here without an accompanying backup update + // would surface as ErrRedisInvalidStreamEntry on decode of any + // real cluster dump — caught at the property tests. + redisStreamProtoPrefixLen = 5 +) + +var redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01} + +// ErrRedisInvalidStreamMeta is returned when an !stream|meta| value +// is not the expected 24 bytes or carries a negative length. +var ErrRedisInvalidStreamMeta = cockroachdberr.New("backup: invalid !stream|meta| value") + +// ErrRedisInvalidStreamEntry is returned when an !stream|entry| +// value's magic prefix is missing or its protobuf body fails to +// unmarshal. +var ErrRedisInvalidStreamEntry = cockroachdberr.New("backup: invalid !stream|entry| value") + +// ErrRedisInvalidStreamKey is returned when a !stream| key cannot +// be parsed for its userKeyLen+userKey (or trailing ID) segments. +var ErrRedisInvalidStreamKey = cockroachdberr.New("backup: malformed !stream| key") + +// redisStreamEntry buffers one decoded XADD entry while the encoder +// assembles the per-stream JSONL output. We keep ms+seq separately +// alongside the formatted string ID so flushStreams can sort by +// (ms, seq) deterministically; sorting by the formatted "ms-seq" +// string would put "10-0" before "2-0". +type redisStreamEntry struct { + ms uint64 + seq uint64 + fields []string // interleaved (name, value) pairs, XADD order +} + +// redisStreamState buffers one userKey's stream during a snapshot +// scan. Like the hash/list/set/zset encoders we accumulate per-key +// state in memory; a single stream is bounded by maxWideColumnItems +// on the live side, so this remains tractable. +type redisStreamState struct { + metaSeen bool + length int64 + lastMs uint64 + lastSeq uint64 + entries []redisStreamEntry + expireAtMs uint64 + hasTTL bool +} + +// HandleStreamMeta processes one !stream|meta| record. +// Value layout: Length(8) || LastMs(8) || LastSeq(8). The encoder +// uses the meta's last_ms / last_seq verbatim in the JSONL _meta +// terminator so a restorer can replay them into the same XADD '*' +// monotonicity window. Length mismatches against the observed +// entry count surface as `redis_stream_length_mismatch` at flush +// time. +func (r *RedisDB) HandleStreamMeta(key, value []byte) error { + userKey, ok := parseStreamMetaKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamKey, "meta key: %q", key) + } + if len(value) != redisStreamMetaSize { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamMeta, + "length %d != %d", len(value), redisStreamMetaSize) + } + rawLen := binary.BigEndian.Uint64(value[0:8]) + if rawLen > math.MaxInt64 { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamMeta, + "declared length %d overflows int64", rawLen) + } + st := r.streamState(userKey) + st.length = int64(rawLen) //nolint:gosec // bounded above + st.lastMs = binary.BigEndian.Uint64(value[8:16]) + st.lastSeq = binary.BigEndian.Uint64(value[16:24]) + st.metaSeen = true + return nil +} + +// HandleStreamEntry processes one !stream|entry| +// record. The ID is recovered from the trailing 16 bytes of the +// key; the value is the magic-prefixed `pb.RedisStreamEntry` +// protobuf carrying the entry's interleaved (name, value) field +// list. +func (r *RedisDB) HandleStreamEntry(key, value []byte) error { + userKey, ms, seq, ok := parseStreamEntryKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamKey, "entry key: %q", key) + } + fields, err := decodeStreamEntryValue(value) + if err != nil { + return err + } + st := r.streamState(userKey) + st.entries = append(st.entries, redisStreamEntry{ms: ms, seq: seq, fields: fields}) + return nil +} + +// streamState lazily creates per-key state. Mirrors the +// hash/list/set/zset kindByKey-registration pattern so HandleStreamMeta, +// HandleStreamEntry, and the HandleTTL back-edge all agree on the +// kind. +func (r *RedisDB) streamState(userKey []byte) *redisStreamState { + uk := string(userKey) + if st, ok := r.streams[uk]; ok { + return st + } + st := &redisStreamState{} + r.streams[uk] = st + r.kindByKey[uk] = redisKindStream + return st +} + +// parseStreamMetaKey strips !stream|meta| and the 4-byte BE +// userKeyLen prefix. Returns (userKey, true) on success. Unlike +// the hash/set encoders there is no `!stream|meta|d|...` delta +// family — streams update meta in-place rather than via per-XADD +// deltas — so we do not need a delta-skip guard here. +func parseStreamMetaKey(key []byte) ([]byte, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisStreamMetaPrefix)) + if len(rest) == len(key) { + return nil, false + } + return parseUserKeyLenPrefix(rest) +} + +// parseStreamEntryKey strips !stream|entry| and the 4-byte BE +// userKeyLen prefix, then peels off the trailing 16-byte StreamID +// (ms || seq). Returns (userKey, ms, seq, true) on success. +func parseStreamEntryKey(key []byte) ([]byte, uint64, uint64, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisStreamEntryPrefix)) + if len(rest) == len(key) { + return nil, 0, 0, false + } + userKey, ok := parseUserKeyLenPrefix(rest) + if !ok { + return nil, 0, 0, false + } + // After (userKeyLen(4) + userKey), exactly StreamIDBytes must remain. + tail := rest[wideColumnUserKeyLenSize+len(userKey):] + if len(tail) != redisStreamIDBytes { + return nil, 0, 0, false + } + ms := binary.BigEndian.Uint64(tail[0:8]) + seq := binary.BigEndian.Uint64(tail[8:16]) + return userKey, ms, seq, true +} + +// decodeStreamEntryValue strips the magic prefix and protobuf-decodes +// the entry payload. Returns the interleaved field list (name1, +// value1, name2, value2, ...) used by every Redis stream consumer. +func decodeStreamEntryValue(value []byte) ([]string, error) { + if len(value) < redisStreamProtoPrefixLen || + !bytes.Equal(value[:redisStreamProtoPrefixLen], redisStreamProtoPrefix) { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidStreamEntry, + "missing or corrupt magic prefix (len=%d)", len(value)) + } + msg := &pb.RedisStreamEntry{} + if err := gproto.Unmarshal(value[redisStreamProtoPrefixLen:], msg); err != nil { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidStreamEntry, + "unmarshal: %v", err) + } + if len(msg.GetFields())%2 != 0 { + // Live XADD enforces even arity (name/value pairs). An odd + // field count at backup time indicates corruption that would + // silently lose the dangling field if we accepted it — fail + // closed. + return nil, cockroachdberr.Wrapf(ErrRedisInvalidStreamEntry, + "odd field count %d (XADD enforces name/value pairs)", len(msg.GetFields())) + } + return cloneStringSlice(msg.GetFields()), nil +} + +func cloneStringSlice(src []string) []string { + if src == nil { + return nil + } + out := make([]string, len(src)) + copy(out, src) + return out +} + +// flushStreams writes one JSONL file per accumulated stream to +// streams/.jsonl. Empty streams (Length==0, no entries) +// still emit a file when meta was seen, mirroring the wide-column +// encoders' policy: their existence is observable to clients (TYPE +// returns "stream", XLEN returns 0). Mismatched declared-vs-observed +// length surfaces an `redis_stream_length_mismatch` warning. +func (r *RedisDB) flushStreams() error { + if len(r.streams) == 0 { + return nil + } + dir := filepath.Join(r.dbDir(), "streams") + if err := r.ensureDir(dir); err != nil { + return err + } + userKeys := make([]string, 0, len(r.streams)) + for k := range r.streams { + userKeys = append(userKeys, k) + } + sort.Strings(userKeys) + for _, uk := range userKeys { + st := r.streams[uk] + if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length { + r.warn("redis_stream_length_mismatch", + "user_key_len", len(uk), + "declared_len", st.length, + "observed_entries", len(st.entries), + "hint", "meta record's Length does not match the count of !stream|entry| keys for this user key") + } + if err := r.writeStreamJSONL(dir, []byte(uk), st); err != nil { + return err + } + } + return nil +} + +func (r *RedisDB) writeStreamJSONL(dir string, userKey []byte, st *redisStreamState) error { + encoded := EncodeSegment(userKey) + if err := r.recordIfFallback(encoded, userKey); err != nil { + return err + } + path := filepath.Join(dir, encoded+".jsonl") + body, err := marshalStreamJSONL(st) + if err != nil { + return err + } + if err := writeFileAtomic(path, body); err != nil { + return cockroachdberr.WithStack(err) + } + return nil +} + +// streamEntryJSON is the dump-format projection of one stream entry. +// Fields is emitted as a JSON object keyed by name (matching the +// design's `"fields": {"event":"login","user":"alice"}` example) +// because XADD itself enforces name/value pair shape and Redis +// stream field names are user-controlled strings rather than +// binary-safe bytes. A future format-version bump can switch to a +// `[{"name":..., "value":...}]` array if reviewers find names that +// collide under JSON-object keying. +type streamEntryJSON struct { + ID string `json:"id"` + Fields map[string]string `json:"fields"` +} + +// streamMetaJSON is the dump-format projection of the final _meta +// terminator line. +type streamMetaJSON struct { + Meta bool `json:"_meta"` + Length int64 `json:"length"` + LastMs uint64 `json:"last_ms"` + LastSeq uint64 `json:"last_seq"` + ExpireAtMs *uint64 `json:"expire_at_ms"` +} + +// marshalStreamJSONL renders one stream state as JSONL. Entries are +// sorted by (ms, seq) so identical snapshots produce identical +// output across runs regardless of XADD insertion order. Each line +// uses encoding/json (compact, no MarshalIndent) so the format is +// stable enough for `diff -r`. +func marshalStreamJSONL(st *redisStreamState) ([]byte, error) { + sort.Slice(st.entries, func(i, j int) bool { + a, b := st.entries[i], st.entries[j] + if a.ms != b.ms { + return a.ms < b.ms + } + return a.seq < b.seq + }) + var buf bytes.Buffer + const xaddPairWidth = 2 // (name, value) — XADD enforces even arity + for _, e := range st.entries { + fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth) + for i := 0; i+1 < len(e.fields); i += xaddPairWidth { + fieldsMap[e.fields[i]] = e.fields[i+1] + } + rec := streamEntryJSON{ + ID: formatStreamID(e.ms, e.seq), + Fields: fieldsMap, + } + line, err := json.Marshal(rec) + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + buf.Write(line) + buf.WriteByte('\n') + } + meta := streamMetaJSON{ + Meta: true, + Length: st.length, + LastMs: st.lastMs, + LastSeq: st.lastSeq, + } + if st.hasTTL { + ms := st.expireAtMs + meta.ExpireAtMs = &ms + } + line, err := json.Marshal(meta) + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + buf.Write(line) + buf.WriteByte('\n') + return buf.Bytes(), nil +} + +// formatStreamID emits a stream ID in Redis's "ms-seq" wire format +// (the same shape XADD/XRANGE clients exchange on the wire). +func formatStreamID(ms, seq uint64) string { + return strconv.FormatUint(ms, 10) + "-" + strconv.FormatUint(seq, 10) +} diff --git a/internal/backup/redis_stream_test.go b/internal/backup/redis_stream_test.go new file mode 100644 index 00000000..018ca98b --- /dev/null +++ b/internal/backup/redis_stream_test.go @@ -0,0 +1,461 @@ +package backup + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/json" + "math" + "os" + "path/filepath" + "testing" + + pb "github.com/bootjp/elastickv/proto" + "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" +) + +// streamMetaKey is the test-side mirror of store.StreamMetaKey. +func streamMetaKey(userKey string) []byte { + out := []byte(RedisStreamMetaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + return append(out, userKey...) +} + +// streamEntryKey is the test-side mirror of store.StreamEntryKey. +func streamEntryKey(userKey string, ms, seq uint64) []byte { + out := []byte(RedisStreamEntryPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + var id [16]byte + binary.BigEndian.PutUint64(id[0:8], ms) + binary.BigEndian.PutUint64(id[8:16], seq) + return append(out, id[:]...) +} + +// encodeStreamMetaValue mirrors store.MarshalStreamMeta: +// Length(8) || LastMs(8) || LastSeq(8), all big-endian. +func encodeStreamMetaValue(length int64, lastMs, lastSeq uint64) []byte { + v := make([]byte, redisStreamMetaSize) + binary.BigEndian.PutUint64(v[0:8], uint64(length)) //nolint:gosec + binary.BigEndian.PutUint64(v[8:16], lastMs) + binary.BigEndian.PutUint64(v[16:24], lastSeq) + return v +} + +// encodeStreamEntryValue produces the magic-prefixed protobuf wire +// format the live store writes for !stream|entry| values. +func encodeStreamEntryValue(t *testing.T, id string, fields []string) []byte { + t.Helper() + body, err := gproto.Marshal(&pb.RedisStreamEntry{Id: id, Fields: fields}) + if err != nil { + t.Fatalf("marshal pb.RedisStreamEntry: %v", err) + } + out := make([]byte, 0, redisStreamProtoPrefixLen+len(body)) + out = append(out, redisStreamProtoPrefix...) + out = append(out, body...) + return out +} + +// readStreamJSONL parses the JSONL output (one record per line) and +// splits it into entries (no `_meta` key) plus the trailing meta +// terminator. The terminator is always the last line per spec. +func readStreamJSONL(t *testing.T, path string) (entries []map[string]any, meta map[string]any) { + t.Helper() + f, err := os.Open(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("open %s: %v", path, err) + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Bytes() + if len(bytes.TrimSpace(line)) == 0 { + continue + } + var rec map[string]any + if err := json.Unmarshal(line, &rec); err != nil { + t.Fatalf("unmarshal %q: %v", line, err) + } + if _, ok := rec["_meta"]; ok { + meta = rec + continue + } + entries = append(entries, rec) + } + if err := scanner.Err(); err != nil { + t.Fatalf("scan: %v", err) + } + if meta == nil { + t.Fatalf("no _meta terminator in %s", path) + } + return entries, meta +} + +func streamFloat(t *testing.T, m map[string]any, key string) float64 { + t.Helper() + v, ok := m[key] + if !ok { + t.Fatalf("field %q missing in %+v", key, m) + } + f, ok := v.(float64) + if !ok { + t.Fatalf("field %q = %T(%v), want float64", key, v, v) + } + return f +} + +// assertStreamMetaTerminator pins the trailing `_meta` line shape. +// Extracted from the round-trip tests to keep the per-test bodies +// below the cyclop ceiling. +func assertStreamMetaTerminator(t *testing.T, meta map[string]any, wantLen int64, wantLastMs, wantLastSeq uint64) { + t.Helper() + if streamFloat(t, meta, "length") != float64(wantLen) { + t.Fatalf("meta.length = %v want %d", meta["length"], wantLen) + } + if streamFloat(t, meta, "last_ms") != float64(wantLastMs) { + t.Fatalf("meta.last_ms = %v want %d", meta["last_ms"], wantLastMs) + } + if streamFloat(t, meta, "last_seq") != float64(wantLastSeq) { + t.Fatalf("meta.last_seq = %v want %d", meta["last_seq"], wantLastSeq) + } + if meta["expire_at_ms"] != nil { + t.Fatalf("meta.expire_at_ms must be nil without TTL, got %v", meta["expire_at_ms"]) + } +} + +// TestRedisDB_StreamRoundTripBasic confirms a multi-entry stream +// round-trips through the encoder in (ms, seq) order, with the +// fields decoded from the protobuf entry value and the trailing +// _meta line preserving length / last_ms / last_seq. +func TestRedisDB_StreamRoundTripBasic(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("events"), encodeStreamMetaValue(3, 1714400000002, 0)); err != nil { + t.Fatalf("HandleStreamMeta: %v", err) + } + // Submit out of (ms, seq) order — encoder must sort. + for _, e := range []struct { + ms, seq uint64 + id string + fields []string + }{ + {1714400000002, 0, "1714400000002-0", []string{"event", "logout", "user", "alice"}}, + {1714400000000, 0, "1714400000000-0", []string{"event", "login", "user", "alice"}}, + {1714400000001, 0, "1714400000001-0", []string{"event", "click", "user", "alice"}}, + } { + key := streamEntryKey("events", e.ms, e.seq) + val := encodeStreamEntryValue(t, e.id, e.fields) + if err := db.HandleStreamEntry(key, val); err != nil { + t.Fatalf("HandleStreamEntry(%s): %v", e.id, err) + } + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + entries, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "events.jsonl")) + if len(entries) != 3 { + t.Fatalf("entries = %d, want 3 (got %v)", len(entries), entries) + } + for i, w := range []string{"1714400000000-0", "1714400000001-0", "1714400000002-0"} { + if entries[i]["id"] != w { + t.Fatalf("entries[%d].id = %v, want %v", i, entries[i]["id"], w) + } + } + assertStreamMetaTerminator(t, meta, 3, 1714400000002, 0) +} + +// TestRedisDB_StreamFieldsDecodedToObject confirms that interleaved +// `[name1, value1, name2, value2, ...]` arrays from the protobuf +// land as `{"name1": "value1", "name2": "value2"}` JSON objects per +// the design example (line 338). +func TestRedisDB_StreamFieldsDecodedToObject(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 100, 5)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-5", []string{"event", "login", "user", "alice", "ip", "10.0.0.1"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 100, 5), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + if len(entries) != 1 { + t.Fatalf("entries = %d, want 1", len(entries)) + } + fields, ok := entries[0]["fields"].(map[string]any) + if !ok { + t.Fatalf("entries[0].fields = %T(%v), want object", entries[0]["fields"], entries[0]["fields"]) + } + want := map[string]any{"event": "login", "user": "alice", "ip": "10.0.0.1"} + if len(fields) != len(want) { + t.Fatalf("fields = %v, want %v", fields, want) + } + for k, v := range want { + if fields[k] != v { + t.Fatalf("fields[%q] = %v, want %v", k, fields[k], v) + } + } +} + +// TestRedisDB_StreamEmptyStreamStillEmitsFile mirrors the other +// wide-column encoders: an empty stream (Length=0) must still emit +// a file because TYPE returns "stream" and XLEN returns 0 to clients. +func TestRedisDB_StreamEmptyStreamStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("empty"), encodeStreamMetaValue(0, 0, 0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "empty.jsonl")) + if len(entries) != 0 { + t.Fatalf("empty stream should emit no entry lines, got %v", entries) + } + if streamFloat(t, meta, "length") != 0 { + t.Fatalf("meta.length = %v want 0", meta["length"]) + } +} + +// TestRedisDB_StreamTTLInlinedFromScanIndex pins that +// !redis|ttl| records for a stream user key fold into the +// _meta terminator's expire_at_ms field — design line 341-344. +// Without this routing, restoring a TTL'd stream would silently +// drop the TTL. +func TestRedisDB_StreamTTLInlinedFromScanIndex(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("k"), encodeStreamMetaValue(1, 100, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-0", []string{"event", "login"}) + if err := db.HandleStreamEntry(streamEntryKey("k", 100, 0), val); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + _, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "k.jsonl")) + if streamFloat(t, meta, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("meta.expire_at_ms = %v want %d", meta["expire_at_ms"], fixedExpireMs) + } + if _, err := os.Stat(filepath.Join(root, "redis", "db_0", "streams_ttl.jsonl")); !os.IsNotExist(err) { + t.Fatalf("unexpected stream TTL sidecar: stat err=%v", err) + } +} + +// TestRedisDB_StreamLengthMismatchWarns pins the warn-on-mismatch +// contract — same shape as the hash/list/set/zset encoders. +func TestRedisDB_StreamLengthMismatchWarns(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(5, 100, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-0", []string{"k", "v"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 100, 0), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + want := "redis_stream_length_mismatch" + found := false + for _, e := range events { + if e == want { + found = true + break + } + } + if !found { + t.Fatalf("expected %q warning, got %v", want, events) + } +} + +// TestRedisDB_StreamRejectsMalformedMetaValueLength pins that a +// !stream|meta| value of the wrong length surfaces as an error. +// The 24-byte fixed shape is the wire-format contract — any other +// length means the on-disk record is corrupt. +func TestRedisDB_StreamRejectsMalformedMetaValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + err := db.HandleStreamMeta(streamMetaKey("k"), []byte{0x00, 0x01, 0x02}) + if !errors.Is(err, ErrRedisInvalidStreamMeta) { + t.Fatalf("err=%v want ErrRedisInvalidStreamMeta", err) + } +} + +// TestRedisDB_StreamRejectsOverflowingMetaLength pins the high-bit +// overflow guard — same shape as hash/list/set/zset encoders. +func TestRedisDB_StreamRejectsOverflowingMetaLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + overflow := make([]byte, redisStreamMetaSize) + binary.BigEndian.PutUint64(overflow[0:8], 1<<63) + err := db.HandleStreamMeta(streamMetaKey("k"), overflow) + if !errors.Is(err, ErrRedisInvalidStreamMeta) { + t.Fatalf("err=%v want ErrRedisInvalidStreamMeta", err) + } +} + +// TestRedisDB_StreamMaxInt64MetaLength pins the math.MaxInt64 +// boundary — declaredLen=math.MaxInt64 must be accepted, only > that +// rejected. +func TestRedisDB_StreamMaxInt64MetaLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + boundary := make([]byte, redisStreamMetaSize) + binary.BigEndian.PutUint64(boundary[0:8], math.MaxInt64) + if err := db.HandleStreamMeta(streamMetaKey("k"), boundary); err != nil { + t.Fatalf("math.MaxInt64 boundary must be accepted, got %v", err) + } +} + +// TestRedisDB_StreamRejectsEntryWithMissingMagic pins that a value +// missing the `0x00 'R' 'X' 'E' 0x01` magic prefix fails closed. +// The live store always writes this prefix; its absence indicates +// the value came from a stale legacy format or from a corrupted +// store, and decoding raw protobuf bytes without the prefix would +// either silently misparse or panic deep inside the proto library. +func TestRedisDB_StreamRejectsEntryWithMissingMagic(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + body, err := gproto.Marshal(&pb.RedisStreamEntry{Id: "1-0", Fields: []string{"k", "v"}}) + if err != nil { + t.Fatalf("marshal: %v", err) + } + // Pass the raw body without the magic prefix. + err = db.HandleStreamEntry(streamEntryKey("k", 1, 0), body) + if !errors.Is(err, ErrRedisInvalidStreamEntry) { + t.Fatalf("err=%v want ErrRedisInvalidStreamEntry", err) + } +} + +// TestRedisDB_StreamRejectsEntryWithOddFieldCount pins the +// even-arity invariant. Live XADD enforces name/value pairs at the +// wire level (XADD rejects odd argument counts), so an odd count +// at backup time indicates corruption. Silently emitting the +// dangling field as `{"": null}` would re-corrupt the +// restored cluster. +func TestRedisDB_StreamRejectsEntryWithOddFieldCount(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + val := encodeStreamEntryValue(t, "1-0", []string{"event"}) // 1 element, missing value + err := db.HandleStreamEntry(streamEntryKey("k", 1, 0), val) + if !errors.Is(err, ErrRedisInvalidStreamEntry) { + t.Fatalf("err=%v want ErrRedisInvalidStreamEntry", err) + } +} + +// TestRedisDB_StreamRejectsMalformedEntryKey pins that an entry key +// without the trailing 16-byte StreamID fails parse cleanly. +func TestRedisDB_StreamRejectsMalformedEntryKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + // Construct a key with the entry prefix + userKeyLen + userKey + // but only a 4-byte trailing suffix (should be 16 for StreamID). + out := []byte(RedisStreamEntryPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], 1) + out = append(out, l[:]...) + out = append(out, 'k') + out = append(out, 0x00, 0x00, 0x00, 0x01) + val := encodeStreamEntryValue(t, "1-0", []string{"a", "b"}) + err := db.HandleStreamEntry(out, val) + if !errors.Is(err, ErrRedisInvalidStreamKey) { + t.Fatalf("err=%v want ErrRedisInvalidStreamKey", err) + } +} + +// TestRedisDB_StreamEntriesWithoutMetaStillEmitFile pins the +// entries-without-meta contract: stream entries may arrive before +// (or without) meta, and the encoder must still emit the JSONL +// without firing the length-mismatch warning. Mirrors the +// items-without-meta rule from list (#755 round 2) and set (#758). +func TestRedisDB_StreamEntriesWithoutMetaStillEmitFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + val := encodeStreamEntryValue(t, "1-0", []string{"a", "b"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 1, 0), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + if len(entries) != 1 { + t.Fatalf("entries = %v, want 1", entries) + } + for _, e := range events { + if e == "redis_stream_length_mismatch" { + t.Fatalf("entries-without-meta must not fire length-mismatch warning, got events %v", events) + } + } +} + +// TestRedisDB_StreamIDFormatMatchesRedisWire pins the wire format +// of the JSON `id` field: `-` decimal, matching what +// XADD/XRANGE expose to clients. A future encoder bug that emitted +// hex or base10-with-leading-zeros would silently corrupt the +// stream-restore replay path. +func TestRedisDB_StreamIDFormatMatchesRedisWire(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 1714400000000, 7)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "1714400000000-7", []string{"k", "v"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 1714400000000, 7), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + if entries[0]["id"] != "1714400000000-7" { + t.Fatalf("id = %v want %q", entries[0]["id"], "1714400000000-7") + } +} + +// TestRedisDB_StreamMultipleStreamsSortedByUserKey pins that +// flushStreams iterates user keys in sorted byte order so two +// runs producing the same logical content produce identical +// directory output. +func TestRedisDB_StreamMultipleStreamsSortedByUserKey(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + for _, uk := range []string{"zeta", "alpha", "mango"} { + if err := db.HandleStreamMeta(streamMetaKey(uk), encodeStreamMetaValue(1, 1, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "1-0", []string{"k", "v"}) + if err := db.HandleStreamEntry(streamEntryKey(uk, 1, 0), val); err != nil { + t.Fatal(err) + } + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + for _, uk := range []string{"alpha", "mango", "zeta"} { + path := filepath.Join(root, "redis", "db_0", "streams", uk+".jsonl") + if _, err := os.Stat(path); err != nil { + t.Fatalf("stat %s: %v", path, err) + } + } +} diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 688225d1..1b6d0b61 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -85,6 +85,7 @@ const ( redisKindHash redisKindList redisKindSet + redisKindStream ) // RedisDB encodes one logical Redis database (`redis/db_/`). All @@ -185,6 +186,13 @@ type RedisDB struct { // Finalize into sets/.json with members sorted by raw byte // order for deterministic dump output. sets map[string]*redisSetState + + // streams buffers per-userKey stream state (meta + entry list). + // Flushed at Finalize into streams/.jsonl as one record per + // entry plus a trailing `_meta` line. Streams are bounded by + // maxWideColumnItems on the live side, so a single in-memory + // slice per stream is tractable. + streams map[string]*redisStreamState } // NewRedisDB constructs a RedisDB rooted at /redis/db_/. @@ -204,6 +212,7 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { hashes: make(map[string]*redisHashState), lists: make(map[string]*redisListState), sets: make(map[string]*redisSetState), + streams: make(map[string]*redisStreamState), } } @@ -297,9 +306,17 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.expireAtMs = expireAtMs st.hasTTL = true return nil + case redisKindStream: + // Same per-record TTL inlining: XADD + EXPIRE replay in + // one shot from the per-stream JSONL `_meta` terminator, + // no separate sidecar. + st := r.streamState(userKey) + st.expireAtMs = expireAtMs + st.hasTTL = true + return nil case redisKindUnknown: // Track orphan TTL counts only — keys are unused before the - // remaining wide-column encoders (set/zset/stream) land, and + // remaining wide-column encoder (zset) lands, and // buffering them allocates proportional to user-key size // (up to 1 MiB per key) for no benefit. Codex P2 round 6. r.orphanTTLCount++ @@ -318,6 +335,7 @@ func (r *RedisDB) Finalize() error { r.flushHashes, r.flushLists, r.flushSets, + r.flushStreams, func() error { return closeJSONL(r.stringsTTL) }, func() error { return closeJSONL(r.hllTTL) }, r.closeKeymap, @@ -329,7 +347,7 @@ func (r *RedisDB) Finalize() error { if r.warn != nil && r.orphanTTLCount > 0 { r.warn("redis_orphan_ttl", "count", r.orphanTTLCount, - "hint", "remaining wide-column encoders (zset/stream) have not landed yet") + "hint", "remaining wide-column encoder (zset) has not landed yet — orphan TTLs may also indicate a corrupted store or a !redis|ttl| record whose user-key prefix dispatcher missed routing to a wide-column Handler") } return firstErr } From d412194bb8be4d40afa983640ea71f44a2c3b93d Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 19 May 2026 18:47:03 +0900 Subject: [PATCH 2/6] =?UTF-8?q?backup(stream):=20PR791=20r1=20codex=20P1?= =?UTF-8?q?=20x2=20=E2=80=94=20pendingTTL=20drain=20+=20duplicate-field=20?= =?UTF-8?q?preservation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two P1 findings from chatgpt-codex on PR #791: P1a: Buffer stream TTLs that arrive before stream rows Pebble snapshots emit records in encoded-key order (store/snapshot_pebble.go::iter.First()+Next()), and `!redis|ttl|` lex-sorts BEFORE `!stream|...` because `r` < `s`. In real snapshot order the TTL arrives FIRST, kindByKey is still redisKindUnknown when HandleTTL fires, and the original code counted the TTL as an orphan and dropped it — every TTL'd stream restored as permanent. Same root cause as the set encoder's latent bug in PR #758. This commit adds a pendingTTL infrastructure (matching the parallel fix on PR #790) so the expiry parks during the redisKindUnknown window and drains when streamState first registers the user key. The set encoder gets the same retroactive drain. P1b: Preserve duplicate stream fields instead of map-collapsing XADD permits duplicate field names within one entry (e.g. `XADD s * f v1 f v2`). The protobuf entry stores the interleaved slice verbatim, but my marshalStreamJSONL collapsed pairs into `map[string]string`, silently dropping every duplicate. A restore of such an entry would lose the second (and later) pair. Fix: emit `fields` as a JSON ARRAY of `{name, value}` records (streamFieldJSON). Order is the protobuf's interleaved order so a restore can replay the original XADD argv exactly. The design example at docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:338 showed object form. That representation was unsafe for streams (though fine for hashes where the wire-level encoder normalises field names earlier). The format is owned by Phase 0 — adjusted in this PR before the format ships any consumers. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch: same semantic change as PR #790's r1 — previously incremented orphanTTLCount on intake; now buffers in pendingTTL and lets Finalize count at end. Same audit conclusion: no external callers of orphanTTLCount; TestRedisDB_OrphanTTLCountedNotBuffered updated to assert the new intake/Finalize split. - streamEntryJSON.Fields type change `map → slice`: only marshalled by encoding/json; the only reader is the test suite, which is updated in this commit. No on-disk format compatibility concerns because Phase 0 has not shipped a consumer yet. - New caller claimPendingTTL: called by setState (retroactive) and streamState (new) in this PR. hashState/listState don't call it because their type prefixes lex-sort BEFORE `!redis|ttl|`. Verified via `grep -n 'claimPendingTTL' internal/backup/`. New tests: - TestRedisDB_StreamDuplicateFieldsPreserved — pins P1b fix. - TestRedisDB_StreamTTLArrivesBeforeRows — pins P1a fix for streams. - TestRedisDB_SetTTLArrivesBeforeRows — retroactive coverage for PR #758's set encoder (same root cause as the stream bug). - TestRedisDB_StreamFieldsDecodedToArray (renamed from ToObject) — updated to match the array shape. Self-review: 1. Data loss — the original code DROPPED real TTL'd streams on every backup AND dropped duplicate-field entries' later pairs. This fix recovers both. No new data-loss surface introduced. 2. Concurrency — pendingTTL added to RedisDB which is already sequential-per-scope; no new locking required. 3. Performance — pendingTTL holds (string-userKey, uint64-expireAt) pairs; same allocation shape as kindByKey. Fields slice replaces a map of the same logical size — slightly cheaper actually (no hash overhead). 4. Consistency — drain happens at FIRST state registration. The array form preserves insertion order from the protobuf so the restored XADD argv matches. 5. Coverage — 4 new tests + 2 updated. All 78 redis tests pass. --- internal/backup/redis_set.go | 12 +++ internal/backup/redis_stream.go | 51 ++++++--- internal/backup/redis_stream_test.go | 153 ++++++++++++++++++++++++--- internal/backup/redis_string.go | 106 ++++++++++++++++--- internal/backup/redis_string_test.go | 31 ++++-- 5 files changed, 308 insertions(+), 45 deletions(-) diff --git a/internal/backup/redis_set.go b/internal/backup/redis_set.go index 7cd4a440..f6f80975 100644 --- a/internal/backup/redis_set.go +++ b/internal/backup/redis_set.go @@ -114,12 +114,24 @@ func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil } // setState lazily creates per-key state. Mirrors the hash/list // kindByKey-registration pattern so HandleSetMeta, HandleSetMember, // and the HandleTTL back-edge all agree on the kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!st|...` (because `r` < `s`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the set's +// `expire_at_ms`. Without this drain step, every TTL'd set would +// restore as permanent — a latent bug in PR #758 surfaced by codex +// on PR #791. Phase 0a tests added in the same PR pin the ordering. func (r *RedisDB) setState(userKey []byte) *redisSetState { uk := string(userKey) if st, ok := r.sets[uk]; ok { return st } st := &redisSetState{members: make(map[string]struct{})} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } r.sets[uk] = st r.kindByKey[uk] = redisKindSet return st diff --git a/internal/backup/redis_stream.go b/internal/backup/redis_stream.go index dcfb8d2d..46250d25 100644 --- a/internal/backup/redis_stream.go +++ b/internal/backup/redis_stream.go @@ -160,12 +160,23 @@ func (r *RedisDB) HandleStreamEntry(key, value []byte) error { // hash/list/set/zset kindByKey-registration pattern so HandleStreamMeta, // HandleStreamEntry, and the HandleTTL back-edge all agree on the // kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!stream|...` (because `r` < `s`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the stream's +// JSONL `_meta.expire_at_ms`. Without this drain step, every TTL'd +// stream would restore as permanent. Codex P1 finding on PR #791. func (r *RedisDB) streamState(userKey []byte) *redisStreamState { uk := string(userKey) if st, ok := r.streams[uk]; ok { return st } st := &redisStreamState{} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } r.streams[uk] = st r.kindByKey[uk] = redisKindStream return st @@ -291,17 +302,30 @@ func (r *RedisDB) writeStreamJSONL(dir string, userKey []byte, st *redisStreamSt return nil } -// streamEntryJSON is the dump-format projection of one stream entry. -// Fields is emitted as a JSON object keyed by name (matching the -// design's `"fields": {"event":"login","user":"alice"}` example) -// because XADD itself enforces name/value pair shape and Redis -// stream field names are user-controlled strings rather than -// binary-safe bytes. A future format-version bump can switch to a -// `[{"name":..., "value":...}]` array if reviewers find names that -// collide under JSON-object keying. +// streamFieldJSON is the dump-format projection of one (name, value) +// pair from a stream entry. We emit a list of name/value records +// rather than a JSON object because XADD permits duplicate field +// names within one entry — e.g. `XADD s * f v1 f v2` records BOTH +// (f, v1) and (f, v2) as distinct interleaved entries in the +// stored protobuf's Fields slice. The design example at +// docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:338 +// showed an object shape, but that representation silently drops +// duplicates and a restore would not reproduce the original +// stream entry. Codex P1 (PR #791) — switched to a name/value +// record array. +type streamFieldJSON struct { + Name string `json:"name"` + Value string `json:"value"` +} + +// streamEntryJSON is the dump-format projection of one stream +// entry. Fields is an ARRAY so duplicate field names within a +// single XADD round-trip correctly. The array preserves the +// interleaved insertion order from the protobuf so consumers can +// re-assemble the original XADD argv. type streamEntryJSON struct { ID string `json:"id"` - Fields map[string]string `json:"fields"` + Fields []streamFieldJSON `json:"fields"` } // streamMetaJSON is the dump-format projection of the final _meta @@ -330,13 +354,16 @@ func marshalStreamJSONL(st *redisStreamState) ([]byte, error) { var buf bytes.Buffer const xaddPairWidth = 2 // (name, value) — XADD enforces even arity for _, e := range st.entries { - fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth) + fields := make([]streamFieldJSON, 0, len(e.fields)/xaddPairWidth) for i := 0; i+1 < len(e.fields); i += xaddPairWidth { - fieldsMap[e.fields[i]] = e.fields[i+1] + fields = append(fields, streamFieldJSON{ + Name: e.fields[i], + Value: e.fields[i+1], + }) } rec := streamEntryJSON{ ID: formatStreamID(e.ms, e.seq), - Fields: fieldsMap, + Fields: fields, } line, err := json.Marshal(rec) if err != nil { diff --git a/internal/backup/redis_stream_test.go b/internal/backup/redis_stream_test.go index 018ca98b..69797783 100644 --- a/internal/backup/redis_stream_test.go +++ b/internal/backup/redis_stream_test.go @@ -169,11 +169,62 @@ func TestRedisDB_StreamRoundTripBasic(t *testing.T) { assertStreamMetaTerminator(t, meta, 3, 1714400000002, 0) } -// TestRedisDB_StreamFieldsDecodedToObject confirms that interleaved +// streamFieldsPair is the decoded counterpart of streamFieldJSON used +// in assertions. +type streamFieldsPair struct{ name, value string } + +// extractStreamFieldsAsPairs pulls the `fields` array out of a +// decoded JSONL entry and returns it as a slice of (name, value) +// pairs. Centralises the type assertions so the per-test bodies +// stay below the cyclop ceiling and forcetypeassert lints don't +// fire at every call site. +func extractStreamFieldsAsPairs(t *testing.T, entry map[string]any) []streamFieldsPair { + t.Helper() + raw, ok := entry["fields"].([]any) + if !ok { + t.Fatalf("entry.fields = %T(%v), want array", entry["fields"], entry["fields"]) + } + out := make([]streamFieldsPair, 0, len(raw)) + for i, r := range raw { + rec, ok := r.(map[string]any) + if !ok { + t.Fatalf("entry.fields[%d] = %T(%v), want object", i, r, r) + } + name, ok := rec["name"].(string) + if !ok { + t.Fatalf("entry.fields[%d].name = %T(%v), want string", i, rec["name"], rec["name"]) + } + value, ok := rec["value"].(string) + if !ok { + t.Fatalf("entry.fields[%d].value = %T(%v), want string", i, rec["value"], rec["value"]) + } + out = append(out, streamFieldsPair{name: name, value: value}) + } + return out +} + +// assertStreamFieldsEqual checks that the decoded fields array +// matches the expected ordered list of (name, value) pairs. +func assertStreamFieldsEqual(t *testing.T, got []streamFieldsPair, want []streamFieldsPair) { + t.Helper() + if len(got) != len(want) { + t.Fatalf("len(fields) = %d, want %d", len(got), len(want)) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("fields[%d] = %v, want %v", i, got[i], want[i]) + } + } +} + +// TestRedisDB_StreamFieldsDecodedToArray confirms that interleaved // `[name1, value1, name2, value2, ...]` arrays from the protobuf -// land as `{"name1": "value1", "name2": "value2"}` JSON objects per -// the design example (line 338). -func TestRedisDB_StreamFieldsDecodedToObject(t *testing.T) { +// land as a JSONL `[{"name":...,"value":...}]` array. Switched +// from the design's original map shape (line 338) in response to +// codex's P1 on PR #791: XADD allows duplicate field names within +// one entry (e.g. `XADD s * f v1 f v2`) and the map representation +// silently collapsed them. +func TestRedisDB_StreamFieldsDecodedToArray(t *testing.T) { t.Parallel() db, root := newRedisDB(t) if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 100, 5)); err != nil { @@ -190,18 +241,92 @@ func TestRedisDB_StreamFieldsDecodedToObject(t *testing.T) { if len(entries) != 1 { t.Fatalf("entries = %d, want 1", len(entries)) } - fields, ok := entries[0]["fields"].(map[string]any) - if !ok { - t.Fatalf("entries[0].fields = %T(%v), want object", entries[0]["fields"], entries[0]["fields"]) + assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, entries[0]), []streamFieldsPair{ + {"event", "login"}, + {"user", "alice"}, + {"ip", "10.0.0.1"}, + }) +} + +// TestRedisDB_StreamDuplicateFieldsPreserved pins the codex P1 fix: +// XADD permits duplicate field names within one entry (e.g. +// `XADD s * f v1 f v2` stores both pairs verbatim in the +// protobuf's Fields slice). The original map-based projection +// silently collapsed duplicates; the array shape preserves them +// and a restore can replay the original XADD argv exactly. +func TestRedisDB_StreamDuplicateFieldsPreserved(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 1, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "1-0", []string{"f", "v1", "f", "v2", "g", "v3"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 1, 0), val); err != nil { + t.Fatal(err) } - want := map[string]any{"event": "login", "user": "alice", "ip": "10.0.0.1"} - if len(fields) != len(want) { - t.Fatalf("fields = %v, want %v", fields, want) + if err := db.Finalize(); err != nil { + t.Fatal(err) } - for k, v := range want { - if fields[k] != v { - t.Fatalf("fields[%q] = %v, want %v", k, fields[k], v) - } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, entries[0]), []streamFieldsPair{ + {"f", "v1"}, + {"f", "v2"}, + {"g", "v3"}, + }) +} + +// TestRedisDB_StreamTTLArrivesBeforeRows pins the codex P1 fix: +// `!redis|ttl|` lex-sorts BEFORE `!stream|...` because `r` < +// `s`, so in real Pebble snapshot order the TTL arrives FIRST. +// The encoder MUST buffer the expiry in pendingTTL and drain it +// when streamState first registers the user key, inlining the +// value into the JSONL `_meta.expire_at_ms`. Without this drain +// every TTL'd stream would restore as permanent. +func TestRedisDB_StreamTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Snapshot order: TTL first, then meta + entry. + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleStreamMeta(streamMetaKey("k"), encodeStreamMetaValue(1, 100, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-0", []string{"event", "login"}) + if err := db.HandleStreamEntry(streamEntryKey("k", 100, 0), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + _, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "k.jsonl")) + if streamFloat(t, meta, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("meta.expire_at_ms = %v want %d — pendingTTL drain failed", meta["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_SetTTLArrivesBeforeRows pins the same ordering fix +// for sets (`!redis|ttl|` lex-sorts before `!st|...` because +// `r` < `s`). Retroactive coverage for PR #758, which shipped the +// set encoder before the pendingTTL infrastructure existed. +func TestRedisDB_SetTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMeta(setMetaKey("k"), encodeSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("k", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "k.json")) + if setFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d — pendingTTL drain failed", got["expire_at_ms"], fixedExpireMs) } } diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 1b6d0b61..f9adeb69 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -193,6 +193,26 @@ type RedisDB struct { // maxWideColumnItems on the live side, so a single in-memory // slice per stream is tractable. streams map[string]*redisStreamState + + // pendingTTL buffers expiries whose user-key prefix sorts AFTER + // `!redis|ttl|` in the snapshot's lex-ordered stream. Pebble + // snapshots emit records in encoded-key order + // (`store/snapshot_pebble.go::iter.First()/Next()`), and + // `!redis|ttl|` lex-sorts before all `!st|`/`!stream|`/`!zs|` + // prefixes (`r` < `s`/`s`/`z`). Without buffering, HandleTTL + // would see kindByKey == redisKindUnknown and count the TTL + // as an orphan, dropping it before setState / streamState / + // zsetState had a chance to claim the user key — TTL'd + // sets, streams, and sorted sets would silently restore as + // permanent. + // + // Lifecycle: HandleTTL files the expiry here when kind is + // still unknown. Each wide-column state-init function + // (setState / streamState / zsetState) drains the entry when + // it first registers the user key. Finalize fires the + // orphan-TTL warning for whatever remains (those keys never + // appeared as a typed record — likely a corrupted store). + pendingTTL map[string]uint64 } // NewRedisDB constructs a RedisDB rooted at /redis/db_/. @@ -213,6 +233,7 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { lists: make(map[string]*redisListState), sets: make(map[string]*redisSetState), streams: make(map[string]*redisStreamState), + pendingTTL: make(map[string]uint64), } } @@ -256,14 +277,32 @@ func (r *RedisDB) HandleHLL(userKey, value []byte) error { } // HandleTTL processes one !redis|ttl| record. Routing depends on -// what HandleString/HandleHLL recorded for the same userKey: +// what HandleString/HandleHLL recorded for the same userKey. +// +// There are two ordering regimes the snapshot stream presents: +// +// 1. Prefix sorts BEFORE !redis|ttl| in encoded-key order +// (!hs|, !lst|, !redis|str|, !redis|hll|). The typed record +// arrives FIRST, kindByKey is already set when HandleTTL fires, +// and we route directly to the per-type sidecar / inline field. +// 2. Prefix sorts AFTER !redis|ttl| (!st|, !stream|, !zs|, because +// `r` < `s`/`s`/`z`). The TTL arrives FIRST and kindByKey is +// still redisKindUnknown. We park the expiry in pendingTTL and +// let each wide-column state-init function (setState / +// streamState / zsetState) drain it when the user key finally +// surfaces as a typed record. Codex P1 finding on PR #791. +// +// Routing: // -// - redisKindHLL -> hll_ttl.jsonl -// - redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL -// lives in !redis|ttl| rather than the inline magic-prefix header) -// - redisKindUnknown -> counted in orphanTTLCount; reported via the -// warn sink on Finalize because Phase 0a's wide-column encoders -// have not landed yet. +// - redisKindHLL -> hll_ttl.jsonl (case 1) +// - redisKindString -> strings_ttl.jsonl (case 1; legacy strings +// whose TTL lives in !redis|ttl| rather than the inline header) +// - redisKindHash/List/Set/Stream/ZSet -> inlined into the +// per-key JSON (case 1 for hash/list, case 2 for set/stream/zset +// where the state-init already drained from pendingTTL before +// HandleTTL would even be called the second time) +// - redisKindUnknown -> bufferPendingTTL. Finalize counts truly +// unmatched entries (key never registered as a typed record). func (r *RedisDB) HandleTTL(userKey, value []byte) error { expireAtMs, err := decodeRedisTTLValue(value) if err != nil { @@ -315,16 +354,48 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.hasTTL = true return nil case redisKindUnknown: - // Track orphan TTL counts only — keys are unused before the - // remaining wide-column encoder (zset) lands, and - // buffering them allocates proportional to user-key size - // (up to 1 MiB per key) for no benefit. Codex P2 round 6. - r.orphanTTLCount++ + // Park the expiry until a wide-column Handle*Meta / + // Handle*Member / Handle*Entry registers the user key. + // Without this buffering, sorted-set / set / stream TTLs + // would be counted as orphans and dropped because their + // type prefixes lex-sort AFTER `!redis|ttl|` in the + // snapshot's encoded-key order. Codex P1 (PR #791). + // + // We store userKey as a string copy (`string([]byte)` + // allocates) rather than the alias slice — the snapshot + // reader reuses key buffers across iterations, so a slice + // alias would race with the next record. + r.pendingTTL[string(userKey)] = expireAtMs return nil } return nil } +// claimPendingTTL drains any buffered TTL for userKey into the +// caller-provided state. Called by the wide-column state-init +// functions (setState / streamState / zsetState) when they first +// register a user key, so the parked expiry inlines into the same +// per-key JSON the rest of the record assembles. +// +// Returns (expireAtMs, true) when a buffered TTL existed. The +// caller should set state.expireAtMs / state.hasTTL on the +// returned value. The pending entry is removed so Finalize's +// orphan-count loop only sees truly-unmatched TTLs. +// +// Safe to call from hashState/listState too even though those +// types' typed records sort before `!redis|ttl|`; pendingTTL will +// always be empty for them. Keeping the call site uniform keeps +// the state-init contract simple. +func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) { + uk := string(userKey) + expireAtMs, ok := r.pendingTTL[uk] + if !ok { + return 0, false + } + delete(r.pendingTTL, uk) + return expireAtMs, true +} + // Finalize flushes all open sidecar writers and emits warnings for any // pending TTL records whose user key was never claimed by the wide-column // encoders. Call exactly once after every snapshot record has been @@ -344,10 +415,19 @@ func (r *RedisDB) Finalize() error { firstErr = err } } + // At this point all type-prefixed records have been processed + // and every wide-column state-init drained its claimPendingTTL. + // Whatever remains in pendingTTL is truly unmatched — the + // user key never appeared as a typed record. Likely causes: + // store corruption, a snapshot mid-write where the typed + // record was dropped, or a `!redis|ttl|` entry written for a + // key whose type prefix we don't recognise (a future Redis + // type added on the live side without a backup-encoder update). + r.orphanTTLCount += len(r.pendingTTL) if r.warn != nil && r.orphanTTLCount > 0 { r.warn("redis_orphan_ttl", "count", r.orphanTTLCount, - "hint", "remaining wide-column encoder (zset) has not landed yet — orphan TTLs may also indicate a corrupted store or a !redis|ttl| record whose user-key prefix dispatcher missed routing to a wide-column Handler") + "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix") } return firstErr } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 152a55d4..4654a9d5 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -579,11 +579,21 @@ func TestRedisDB_NoKeymapWhenAllReversible(t *testing.T) { func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Parallel() - // Codex P2 round 6: orphan TTL records (those with no prior - // HandleString/HandleHLL claim) must be counted only — the - // per-key payload would allocate proportional to user-key size - // and is unused before the wide-column encoders land. Drive a - // sample of orphan records and assert the count, not a buffer. + // Codex P1 (PR #791): orphan TTL records are now BUFFERED in + // pendingTTL during intake — the wide-column state-init + // functions need to drain them when a typed record finally + // registers a user key. The buffer holds (string-userKey, + // uint64-expireAt) pairs; the per-record allocation cost is + // the same as kindByKey's, which we already pay for every + // typed record. The original Codex P2 round 6 concern (don't + // buffer arbitrarily-large payload bytes) is preserved — we + // still don't keep the value bytes. + // + // At Finalize, entries still in pendingTTL are counted as + // truly unmatched orphans. This test asserts: + // - During intake: orphanTTLCount stays 0, pendingTTL grows. + // - After Finalize: orphanTTLCount == n (no typed record + // ever drained the entries). db, _ := newRedisDB(t) const n = 10_000 for i := 0; i < n; i++ { @@ -593,8 +603,17 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Fatalf("HandleTTL[%d]: %v", i, err) } } + if db.orphanTTLCount != 0 { + t.Fatalf("orphanTTLCount = %d at intake, want 0 (buffered)", db.orphanTTLCount) + } + if len(db.pendingTTL) != n { + t.Fatalf("pendingTTL len = %d, want %d", len(db.pendingTTL), n) + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } if db.orphanTTLCount != n { - t.Fatalf("orphanTTLCount = %d, want %d", db.orphanTTLCount, n) + t.Fatalf("orphanTTLCount = %d after Finalize, want %d", db.orphanTTLCount, n) } } From e5bd65048febf8a788f21d9579a6ba31707e453c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 04:28:34 +0900 Subject: [PATCH 3/6] =?UTF-8?q?backup(stream):=20PR791=20r2=20claude=20Cri?= =?UTF-8?q?tical=20=E2=80=94=20preserve=20non-UTF-8=20stream=20fields?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Claude-bot Critical finding (PR #791 round 2): the stream encoder emitted field names and values as plain Go strings, and `json.Marshal(string)` silently substitutes U+FFFD for every ill-formed UTF-8 byte sequence. A future schema migration switching `pb.RedisStreamEntry.Fields` from `repeated string` to `bytes`, or a code path that bypasses the proto marshaler's UTF-8 validation, would surface as silent backup corruption of binary stream payloads (sensor data, legacy encodings, raw bytes). The hash encoder already solves this via `marshalRedisBinaryValue` + `json.RawMessage`, which emits non-UTF-8 bytes as `{"base64":"..."}` envelopes (redis_hash.go:235-238). This commit applies the same projection to streams. Changes: - streamFieldJSON.Name/Value: string -> json.RawMessage. - New buildStreamFieldRecords helper: extracted the per-pair marshaling out of marshalStreamJSONL so the projection is unit-testable independently of the protobuf pipeline (proto3 string fields enforce UTF-8 at gproto.Marshal so we cannot push binary bytes through that path). - extractStreamFieldsAsPairs test helper now accepts EITHER a plain JSON string OR the `{"base64":"..."}` envelope, hiding the per-pair envelope detection from per-test assertions. A new decodeRedisBinaryEnvelope helper handles the base64url reversal. Caller audit (per /loop standing instruction): buildStreamFieldRecords is private to the package. streamFieldJSON's two-field shape did not change publicly visible behavior (the JSON output for UTF-8 fields is byte-identical to before; only non-UTF-8 fields now route through the envelope). The only consumer of streamFieldJSON is marshalStreamJSONL (line 369). Verified via `grep -rn buildStreamFieldRecords|streamFieldJSON internal/backup/` - all matches inside the same file or the test file. New test: - TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip exercises buildStreamFieldRecords directly with non-UTF-8 input. Marshals one streamEntryJSON, re-parses, and asserts byte-identical round-trip via the binary envelope. Existing TestRedisDB_StreamFieldsDecodedToArray and TestRedisDB_StreamDuplicateFieldsPreserved still pin the UTF-8 plain-string path. Self-review: 1. Data loss - opposite of the original concern: this commit preserves binary stream bytes that the previous code would have mangled into U+FFFD. The protobuf wire format prevents the bad bytes from reaching us today, but the projection now matches the hash encoder's defensive shape. 2. Concurrency - no new shared state. 3. Performance - one extra allocation per (name, value) pair for the marshaled RawMessage. Matches hash encoder's cost shape; bounded by maxWideColumnItems on the live side. 4. Consistency - stream encoder's binary-safe handling now matches the hash encoder's. A future format-version bump can canonicalize the JSON projection across all wide-column types. 5. Coverage - 1 new test pinning the projection; existing round-trip and duplicate-fields tests confirm no UTF-8 regression. --- internal/backup/redis_stream.go | 50 ++++++++++++--- internal/backup/redis_stream_test.go | 94 +++++++++++++++++++++++++--- 2 files changed, 127 insertions(+), 17 deletions(-) diff --git a/internal/backup/redis_stream.go b/internal/backup/redis_stream.go index 46250d25..a4cd6d17 100644 --- a/internal/backup/redis_stream.go +++ b/internal/backup/redis_stream.go @@ -313,9 +313,21 @@ func (r *RedisDB) writeStreamJSONL(dir string, userKey []byte, st *redisStreamSt // duplicates and a restore would not reproduce the original // stream entry. Codex P1 (PR #791) — switched to a name/value // record array. +// +// Name and Value are `json.RawMessage` populated via +// `marshalRedisBinaryValue` so non-UTF-8 bytes round-trip via the +// `{"base64":"..."}` envelope. Without this, `json.Marshal` of a +// plain `string` carrying invalid UTF-8 silently substitutes U+FFFD +// for each ill-formed byte sequence, and the restored stream entry +// would carry the replacement-character mangle instead of the +// original bytes. Redis stream field names and values are +// binary-safe (the live store keeps them as protobuf `bytes` +// despite the wire-format `repeated string` shape), so the +// projection must preserve every byte. Mirrors hashFieldRecord +// (redis_hash.go:235-238). Claude bot Critical (PR #791 round 2). type streamFieldJSON struct { - Name string `json:"name"` - Value string `json:"value"` + Name json.RawMessage `json:"name"` + Value json.RawMessage `json:"value"` } // streamEntryJSON is the dump-format projection of one stream @@ -354,12 +366,9 @@ func marshalStreamJSONL(st *redisStreamState) ([]byte, error) { var buf bytes.Buffer const xaddPairWidth = 2 // (name, value) — XADD enforces even arity for _, e := range st.entries { - fields := make([]streamFieldJSON, 0, len(e.fields)/xaddPairWidth) - for i := 0; i+1 < len(e.fields); i += xaddPairWidth { - fields = append(fields, streamFieldJSON{ - Name: e.fields[i], - Value: e.fields[i+1], - }) + fields, err := buildStreamFieldRecords(e.fields, xaddPairWidth) + if err != nil { + return nil, err } rec := streamEntryJSON{ ID: formatStreamID(e.ms, e.seq), @@ -396,3 +405,28 @@ func marshalStreamJSONL(st *redisStreamState) ([]byte, error) { func formatStreamID(ms, seq uint64) string { return strconv.FormatUint(ms, 10) + "-" + strconv.FormatUint(seq, 10) } + +// buildStreamFieldRecords converts one entry's interleaved +// (name1, value1, name2, value2, ...) field slice into a +// streamFieldJSON array. Each name/value goes through +// marshalRedisBinaryValue so non-UTF-8 bytes round-trip via the +// `{"base64":"..."}` envelope. Without this projection, plain +// `string` fields would surrender every ill-formed UTF-8 byte to +// json.Marshal's silent U+FFFD substitution and the restored +// stream entry would not be byte-identical to the source. Claude +// bot Critical (PR #791 round 2). +func buildStreamFieldRecords(fields []string, pairWidth int) ([]streamFieldJSON, error) { + out := make([]streamFieldJSON, 0, len(fields)/pairWidth) + for i := 0; i+1 < len(fields); i += pairWidth { + nameJSON, err := marshalRedisBinaryValue([]byte(fields[i])) + if err != nil { + return nil, err + } + valueJSON, err := marshalRedisBinaryValue([]byte(fields[i+1])) + if err != nil { + return nil, err + } + out = append(out, streamFieldJSON{Name: nameJSON, Value: valueJSON}) + } + return out, nil +} diff --git a/internal/backup/redis_stream_test.go b/internal/backup/redis_stream_test.go index 69797783..234faebe 100644 --- a/internal/backup/redis_stream_test.go +++ b/internal/backup/redis_stream_test.go @@ -3,6 +3,7 @@ package backup import ( "bufio" "bytes" + "encoding/base64" "encoding/binary" "encoding/json" "math" @@ -178,6 +179,12 @@ type streamFieldsPair struct{ name, value string } // pairs. Centralises the type assertions so the per-test bodies // stay below the cyclop ceiling and forcetypeassert lints don't // fire at every call site. +// +// Each name/value can be EITHER a plain JSON string (UTF-8 content) +// or a `{"base64":"..."}` envelope object (non-UTF-8 binary bytes). +// The fields are emitted via marshalRedisBinaryValue so binary +// stream payloads round-trip byte-identical; this helper hides the +// per-pair envelope detection from the per-test assertions. func extractStreamFieldsAsPairs(t *testing.T, entry map[string]any) []streamFieldsPair { t.Helper() raw, ok := entry["fields"].([]any) @@ -190,19 +197,40 @@ func extractStreamFieldsAsPairs(t *testing.T, entry map[string]any) []streamFiel if !ok { t.Fatalf("entry.fields[%d] = %T(%v), want object", i, r, r) } - name, ok := rec["name"].(string) - if !ok { - t.Fatalf("entry.fields[%d].name = %T(%v), want string", i, rec["name"], rec["name"]) - } - value, ok := rec["value"].(string) - if !ok { - t.Fatalf("entry.fields[%d].value = %T(%v), want string", i, rec["value"], rec["value"]) - } - out = append(out, streamFieldsPair{name: name, value: value}) + out = append(out, streamFieldsPair{ + name: decodeRedisBinaryEnvelope(t, "name", rec["name"]), + value: decodeRedisBinaryEnvelope(t, "value", rec["value"]), + }) } return out } +// decodeRedisBinaryEnvelope reverses marshalRedisBinaryValue for +// tests: a plain JSON string round-trips as a string; a +// `{"base64":"..."}` envelope decodes via base64url back to the +// original byte string. Returns the recovered bytes as a Go +// string so the test assertions can compare against the input +// regardless of which projection the encoder chose. +func decodeRedisBinaryEnvelope(t *testing.T, label string, raw any) string { + t.Helper() + if s, ok := raw.(string); ok { + return s + } + env, ok := raw.(map[string]any) + if !ok { + t.Fatalf("%s = %T(%v), want string or base64 envelope", label, raw, raw) + } + encoded, ok := env["base64"].(string) + if !ok { + t.Fatalf("%s base64 envelope missing payload: %v", label, env) + } + decoded, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + t.Fatalf("%s base64 decode: %v (payload %q)", label, err, encoded) + } + return string(decoded) +} + // assertStreamFieldsEqual checks that the decoded fields array // matches the expected ordered list of (name, value) pairs. func assertStreamFieldsEqual(t *testing.T, got []streamFieldsPair, want []streamFieldsPair) { @@ -248,6 +276,54 @@ func TestRedisDB_StreamFieldsDecodedToArray(t *testing.T) { }) } +// TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip pins the +// claude-bot Critical fix on PR #791: stream field names and +// values are binary-safe in Redis. Previously +// streamFieldJSON.Name/Value were plain Go `string` and went +// through json.Marshal, which silently substitutes U+FFFD for +// every ill-formed UTF-8 byte sequence — a stream entry carrying +// raw binary would be silently corrupted in the dump. The fix +// routes both fields through marshalRedisBinaryValue so non-UTF-8 +// bytes emit as `{"base64":"..."}` and round-trip byte-identical. +// +// The protobuf wire format itself enforces UTF-8 on `string` fields +// (proto3 `string` is by spec UTF-8, and `gproto.Marshal` rejects +// invalid bytes), so the path "live store → snapshot → decoder" +// cannot actually carry non-UTF-8 stream fields today; it's a +// defensive invariant in case a future schema migration switches +// `Fields` to `bytes`, or a code path bypasses the proto marshaler. +// We pin the projection's behavior directly on +// buildStreamFieldRecords + extractStreamFieldsAsPairs rather than +// trying to push bytes through a gproto.Marshal step that would +// reject them. +func TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip(t *testing.T) { + t.Parallel() + const xaddPairWidth = 2 + binaryName := "\xff\xfe\x80" + binaryValue := "\x00\x01\xc3\x28\x02" + records, err := buildStreamFieldRecords([]string{binaryName, binaryValue, "utf8-key", "utf8-val"}, xaddPairWidth) + if err != nil { + t.Fatalf("buildStreamFieldRecords: %v", err) + } + if len(records) != xaddPairWidth { // we passed 2 pairs + t.Fatalf("len(records) = %d, want %d", len(records), xaddPairWidth) + } + // Marshal one entry to JSONL bytes, parse it back, and assert + // that the recovered pairs match the input byte-identical. + body, err := json.Marshal(streamEntryJSON{ID: "1-0", Fields: records}) + if err != nil { + t.Fatalf("json.Marshal streamEntryJSON: %v", err) + } + var parsed map[string]any + if err := json.Unmarshal(body, &parsed); err != nil { + t.Fatalf("json.Unmarshal: %v", err) + } + assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, parsed), []streamFieldsPair{ + {binaryName, binaryValue}, + {"utf8-key", "utf8-val"}, + }) +} + // TestRedisDB_StreamDuplicateFieldsPreserved pins the codex P1 fix: // XADD permits duplicate field names within one entry (e.g. // `XADD s * f v1 f v2` stores both pairs verbatim in the From f59acc07920683243920974f3fbb02eda629efd7 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 04:43:40 +0900 Subject: [PATCH 4/6] =?UTF-8?q?backup(stream):=20PR791=20r3=20codex=20P1?= =?UTF-8?q?=20=E2=80=94=20bound=20pendingTTL=20to=20prevent=20OOM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 on round 2: the pendingTTL buffer introduced in r1 grew without bound. An adversarial or corrupt snapshot whose `!redis|ttl|` records never find a typed-record claimer would accumulate (string-userKey, uint64-expireAt) entries until the decoder runs out of memory. The reviewer notes that this PR's branch has no zset handler yet, so every zset TTL accumulates for the whole run; on a real cluster's snapshot that's potentially millions of buffered keys. Fix: cap pendingTTL at pendingTTLCap entries (default 1 MiB entries, ~64 MiB worst-case memory). Once the cap is reached, subsequent unknown-kind TTLs fall back to the original immediate- orphan-count path without buffering the user-key bytes. Already- buffered entries can still be drained by later state-inits; new entries beyond the cap are counted but not buffered. API: - new RedisDB.WithPendingTTLCap(int) chainable setter. - new pendingTTLOverflow counter (surfaced in the orphan-TTL warning). - Finalize's warning includes the overflow count and the cap so an operator can detect a snapshot that exceeded the buffer budget. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch behavior changes: previously ALWAYS buffered, now buffers up to cap then orphan-counts. Other branches unchanged. - Callers of HandleTTL: only test files and the future cmd/elastickv-snapshot-decode driver. No external caller depends on the unbounded-buffering shape; the cap default (1 MiB) covers every realistic legitimate workload, and the WithPendingTTLCap override lets callers tune to their host budget. - New helper parkUnknownTTL is package-private with one call site (HandleTTL line 401). No prior call sites to audit. - Verified via `grep -rn HandleTTL|pendingTTLCap|WithPendingTTLCap --include=*.go internal/backup/`. New tests: - TestRedisDB_PendingTTLBoundedByCap drives 2*cap unknown TTLs and asserts: pendingTTL stays at cap, orphanTTLCount tracks the overflow at intake, post-Finalize total == 2*cap. - TestRedisDB_WithPendingTTLCapZeroDisablesBuffering pins that cap==0 reverts to the original immediate-orphan path. - TestRedisDB_WithPendingTTLCapNegativeCoercedToZero pins input sanitisation. Self-review: 1. Data loss - the cap can mis-classify TTLs that arrive in the overflow window AND would have been drained by a later state-init. Mitigation: the default cap (1 MiB entries) is well above the count of legitimately-buffered TTL'd wide-column keys on any real cluster; operators with abnormal workloads can raise the cap via WithPendingTTLCap. 2. Concurrency - no new shared state. 3. Performance - one extra comparison per unknown-kind TTL. Bounded memory is the win. 4. Consistency - parkUnknownTTL keeps the userKey-copy contract (allocates string) when buffering, matches the prior intent. 5. Coverage - 3 new tests + 1 updated test (TestRedisDB_OrphanTTLCountedNotBuffered already pins the buffered + drained-at-Finalize semantics under the default cap). --- internal/backup/redis_string.go | 100 +++++++++++++++++++++++---- internal/backup/redis_string_test.go | 77 +++++++++++++++++++++ 2 files changed, 163 insertions(+), 14 deletions(-) diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index f9adeb69..19400dd3 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -213,8 +213,36 @@ type RedisDB struct { // orphan-TTL warning for whatever remains (those keys never // appeared as a typed record — likely a corrupted store). pendingTTL map[string]uint64 + + // pendingTTLCap caps the in-memory size of pendingTTL. Once the + // map reaches this many entries, subsequent unknown-kind TTLs + // fall back to incrementing orphanTTLCount directly without + // buffering the user-key bytes. Without this cap, an adversarial + // or corrupt snapshot whose `!redis|ttl|` records never find a + // typed-record claimer would grow pendingTTL unboundedly and + // could OOM the decoder before backup completes. Codex P1 on + // PR #791 round 2. + // + // Default: defaultPendingTTLCap (1 << 20 == 1 MiB entries). At + // ~64 bytes per Go map entry that bounds the worst-case buffer + // at ~64 MiB — well above the count of legitimately-buffered + // wide-column TTL'd keys on any real cluster, but well below + // the OOM threshold for an operator decoder host. + pendingTTLCap int + + // pendingTTLOverflow counts entries that would have entered + // pendingTTL but were skipped because the buffer was at cap. + // Surfaced in the Finalize warning so operators can detect a + // snapshot whose TTL-vs-type cardinality exceeded the buffer. + pendingTTLOverflow int } +// defaultPendingTTLCap caps pendingTTL at 1 MiB entries by default. +// The cap is overridable via WithPendingTTLCap for callers running +// on host classes where a different memory/coverage trade-off is +// appropriate (e.g., embedded decoders or single-key forensic dumps). +const defaultPendingTTLCap = 1 << 20 + // NewRedisDB constructs a RedisDB rooted at /redis/db_/. // dbIndex selects ; today the producer always passes 0, but accepting // the index as a parameter prevents a future multi-db dump from silently @@ -234,9 +262,23 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { sets: make(map[string]*redisSetState), streams: make(map[string]*redisStreamState), pendingTTL: make(map[string]uint64), + pendingTTLCap: defaultPendingTTLCap, } } +// WithPendingTTLCap overrides the default cap on the pendingTTL +// buffer. A value of 0 disables buffering entirely — every +// unknown-kind TTL becomes an immediate orphan (matches the pre- +// PR #791-r1 behavior). Negative inputs are coerced to 0. Returns +// the receiver so it can be chained with other With* setters. +func (r *RedisDB) WithPendingTTLCap(capacity int) *RedisDB { + if capacity < 0 { + capacity = 0 + } + r.pendingTTLCap = capacity + return r +} + // WithWarnSink wires a structured-warning sink. The sink is called with // stable event names ("redis_orphan_ttl", etc.) and key=value pairs. func (r *RedisDB) WithWarnSink(fn func(event string, fields ...any)) *RedisDB { @@ -354,23 +396,40 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.hasTTL = true return nil case redisKindUnknown: - // Park the expiry until a wide-column Handle*Meta / - // Handle*Member / Handle*Entry registers the user key. - // Without this buffering, sorted-set / set / stream TTLs - // would be counted as orphans and dropped because their - // type prefixes lex-sort AFTER `!redis|ttl|` in the - // snapshot's encoded-key order. Codex P1 (PR #791). - // - // We store userKey as a string copy (`string([]byte)` - // allocates) rather than the alias slice — the snapshot - // reader reuses key buffers across iterations, so a slice - // alias would race with the next record. - r.pendingTTL[string(userKey)] = expireAtMs + r.parkUnknownTTL(userKey, expireAtMs) return nil } return nil } +// parkUnknownTTL buffers a redisKindUnknown TTL into pendingTTL, or +// falls back to the immediate orphan-count path when the buffer is +// at cap. Extracted from HandleTTL's switch so the parent stays +// under the cyclop budget. +// +// Rationale (codex P1 PR #791 round 2): Pebble snapshots emit +// records in encoded-key order, and `!redis|ttl|` lex-sorts BEFORE +// every wide-column prefix where the type letter is >= 's' (set, +// stream, zset). The buffer lets us drain the parked expiry when +// the typed record finally arrives; the cap prevents an adversarial +// snapshot whose TTLs never find a claimer from OOM'ing the decoder. +// +// Storage: userKey is COPIED (`string([]byte)` allocates) because +// the snapshot reader reuses key buffers across iterations — an +// alias slice would race with the next record. +func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) { + if len(r.pendingTTL) >= r.pendingTTLCap { + // Cap reached: fall back to the original orphan-count path + // so memory stays bounded. Already-buffered entries can + // still be drained by later state-inits, but new entries + // beyond the cap become immediate orphans. + r.pendingTTLOverflow++ + r.orphanTTLCount++ + return + } + r.pendingTTL[string(userKey)] = expireAtMs +} + // claimPendingTTL drains any buffered TTL for userKey into the // caller-provided state. Called by the wide-column state-init // functions (setState / streamState / zsetState) when they first @@ -425,9 +484,22 @@ func (r *RedisDB) Finalize() error { // type added on the live side without a backup-encoder update). r.orphanTTLCount += len(r.pendingTTL) if r.warn != nil && r.orphanTTLCount > 0 { - r.warn("redis_orphan_ttl", + fields := []any{ "count", r.orphanTTLCount, - "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix") + "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix", + } + if r.pendingTTLOverflow > 0 { + // Surface buffer-overflow separately so operators can + // distinguish "snapshot had more unmatched TTLs than + // the buffer cap" (potential miss-classifier — keys + // at cap WERE drainable by later state-inits, but + // overflow keys were skipped before they got a chance) + // from "TTL records remained unmatched at Finalize". + fields = append(fields, + "pending_ttl_buffer_overflow", r.pendingTTLOverflow, + "pending_ttl_buffer_cap", r.pendingTTLCap) + } + r.warn("redis_orphan_ttl", fields...) } return firstErr } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 4654a9d5..1e115538 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -617,6 +617,83 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { } } +// TestRedisDB_PendingTTLBoundedByCap pins the codex P1 fix on PR +// #791 round 2: pendingTTL must NOT grow unboundedly. Once the cap +// is reached, subsequent unknown-kind TTLs fall back to immediate- +// orphan counting without buffering the user-key bytes. An +// adversarial snapshot with millions of unmatched `!redis|ttl|` +// records would otherwise OOM the decoder. +func TestRedisDB_PendingTTLBoundedByCap(t *testing.T) { + t.Parallel() + const cap = 8 + db, _ := newRedisDB(t) + db.WithPendingTTLCap(cap) + // Drive 2*cap unknown-kind TTLs. + for i := 0; i < cap*2; i++ { + key := []byte("orphan-" + intToDecimal(i)) + ms := uint64(i) + 1 //nolint:gosec // i bounded above + if err := db.HandleTTL(key, encodeTTLValue(ms)); err != nil { + t.Fatalf("HandleTTL[%d]: %v", i, err) + } + } + // At intake: pendingTTL is exactly at cap, the excess went + // straight to orphanTTLCount via the overflow path. + if got := len(db.pendingTTL); got != cap { + t.Fatalf("pendingTTL len = %d, want %d (cap)", got, cap) + } + if got := db.orphanTTLCount; got != cap { + t.Fatalf("orphanTTLCount = %d at intake, want %d (overflow path)", got, cap) + } + if got := db.pendingTTLOverflow; got != cap { + t.Fatalf("pendingTTLOverflow = %d, want %d", got, cap) + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + // After Finalize: the remaining buffered entries are added to + // orphanTTLCount, bringing the total to 2*cap (the actual count + // of unmatched TTLs we received). + if got := db.orphanTTLCount; got != cap*2 { + t.Fatalf("orphanTTLCount = %d after Finalize, want %d", got, cap*2) + } +} + +// TestRedisDB_WithPendingTTLCapZeroDisablesBuffering pins that a +// cap of 0 disables buffering entirely — every unknown-kind TTL +// becomes an immediate orphan. Matches the pre-PR #791-r1 +// behavior, useful for callers that prefer constant-space orphan +// counting over the buffered drain path. +func TestRedisDB_WithPendingTTLCapZeroDisablesBuffering(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + db.WithPendingTTLCap(0) + const n = 5 + for i := 0; i < n; i++ { + if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { + t.Fatal(err) + } + } + if got := len(db.pendingTTL); got != 0 { + t.Fatalf("pendingTTL len = %d, want 0 (cap disabled)", got) + } + if got := db.orphanTTLCount; got != n { + t.Fatalf("orphanTTLCount = %d, want %d", got, n) + } +} + +// TestRedisDB_WithPendingTTLCapNegativeCoercedToZero pins the input +// sanitisation: WithPendingTTLCap(-1) must behave like cap=0 +// (disable buffering) rather than panicking or leaking the negative +// value into the comparison. +func TestRedisDB_WithPendingTTLCapNegativeCoercedToZero(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + db.WithPendingTTLCap(-100) + if db.pendingTTLCap != 0 { + t.Fatalf("pendingTTLCap = %d after WithPendingTTLCap(-100), want 0", db.pendingTTLCap) + } +} + func TestRedisDB_DirsCreatedCachesMkdirAll(t *testing.T) { t.Parallel() // Two HandleString calls in a row should populate dirsCreated From be430fe29d7b9c3091e157886a30c86145265799 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 04:58:07 +0900 Subject: [PATCH 5/6] =?UTF-8?q?backup(stream):=20PR791=20r4=20codex=20P1?= =?UTF-8?q?=20=E2=80=94=20fail=20closed=20on=20pendingTTL=20overflow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror of PR #790 round 5 codex P1 fix: the cap-with-silent-orphan fallback introduced in r3 is itself a data-loss path. In real Pebble snapshot order (`!redis|ttl|` < `!st|`/`!stream|`/`!zs|`), an overflowed TTL likely belongs to a wide-column key arriving later in the scan; dropping it permanently loses `expire_at_ms`. Fix: replace the silent-orphan fallback with fail-closed semantics. Three modes: - cap > 0, buffer NOT full: buffer the entry as before. - cap == 0: counter-only mode (operator explicit opt-out). - cap > 0, buffer FULL: return ErrPendingTTLBufferFull. Operator must raise WithPendingTTLCap or accept counter-only via cap=0. Caller audit (per /loop standing instruction): same as the parallel PR #790 r5 fix. HandleTTL's redisKindUnknown branch CAN return a non-nil error in a new condition. All call sites check the error (grep -rn '\.HandleTTL(' --include=*.go). parkUnknownTTL changes signature from `func(...)` to `func(...) error`; single caller (HandleTTL) updated. Test renames: - TestRedisDB_PendingTTLBoundedByCap → TestRedisDB_PendingTTLFailsClosedAtCap - TestRedisDB_WithPendingTTLCapZeroDisablesBuffering → TestRedisDB_WithPendingTTLCapZeroOpts Self-review: same shape as PR #790 r5 commit's review. Data loss avoidance via fail-closed; semantics match CLAUDE.md's pattern for replication/HLC/snapshot invariants. --- internal/backup/redis_string.go | 65 ++++++++++++++++++++-------- internal/backup/redis_string_test.go | 54 ++++++++++------------- 2 files changed, 70 insertions(+), 49 deletions(-) diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 19400dd3..d135ebef 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -72,6 +72,22 @@ var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str| // expected 8-byte big-endian uint64 millisecond expiry. var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value") +// ErrPendingTTLBufferFull is returned by HandleTTL when an +// unknown-kind TTL arrives but the pendingTTL buffer is already at +// pendingTTLCap. The encoder fails closed here rather than silently +// counting the TTL as an orphan because in real Pebble snapshot +// order (`!redis|ttl|` lex-sorts before `!st|`/`!stream|`/`!zs|`), +// the dropped entry would likely belong to a valid wide-column key +// that arrives later — losing its expire_at_ms would produce a +// restored database with non-expiring data that the source +// snapshot's clients expected to expire. Codex P1 on PR #790 round +// 5 (same fix mirrored here on PR #791). +// +// Recovery: raise WithPendingTTLCap above the snapshot's count of +// unmatched-at-intake TTLs, or set the cap to 0 to explicitly opt +// into the lossy counter-only mode. +var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL buffer at cap; raise WithPendingTTLCap or accept orphan-counter mode via WithPendingTTLCap(0)") + // redisKeyKind tracks which Redis-type prefix introduced a user key, so that // when a later !redis|ttl| record arrives we know whether to write its // expiry into strings_ttl.jsonl, hll_ttl.jsonl, or buffer it for a wide- @@ -396,38 +412,51 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.hasTTL = true return nil case redisKindUnknown: - r.parkUnknownTTL(userKey, expireAtMs) - return nil + return r.parkUnknownTTL(userKey, expireAtMs) } return nil } // parkUnknownTTL buffers a redisKindUnknown TTL into pendingTTL, or -// falls back to the immediate orphan-count path when the buffer is -// at cap. Extracted from HandleTTL's switch so the parent stays -// under the cyclop budget. +// fails closed when the buffer is at cap. Extracted from HandleTTL's +// switch so the parent stays under the cyclop budget. // -// Rationale (codex P1 PR #791 round 2): Pebble snapshots emit -// records in encoded-key order, and `!redis|ttl|` lex-sorts BEFORE -// every wide-column prefix where the type letter is >= 's' (set, -// stream, zset). The buffer lets us drain the parked expiry when -// the typed record finally arrives; the cap prevents an adversarial -// snapshot whose TTLs never find a claimer from OOM'ing the decoder. +// Three modes determined by pendingTTLCap: +// +// - cap > 0 and buffer NOT full: store the (userKey, expireAtMs) +// pair so a later wide-column state-init can drain it. +// - cap == 0: counter-only mode. The TTL becomes an immediate +// orphan. Operator-explicit opt-out for callers that prefer +// constant-space orphan counting over the buffered drain path. +// - cap > 0 and buffer FULL: fail closed with +// ErrPendingTTLBufferFull. Silently counting the entry as an +// orphan would permanently lose `expire_at_ms` for the wide- +// column key that arrives later. Codex P1 on PR #790 round 5 +// (mirrored here on PR #791). // // Storage: userKey is COPIED (`string([]byte)` allocates) because // the snapshot reader reuses key buffers across iterations — an // alias slice would race with the next record. -func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) { +func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) error { + if r.pendingTTLCap == 0 { + // Counter-only mode (operator explicitly disabled the buffer). + r.orphanTTLCount++ + return nil + } if len(r.pendingTTL) >= r.pendingTTLCap { - // Cap reached: fall back to the original orphan-count path - // so memory stays bounded. Already-buffered entries can - // still be drained by later state-inits, but new entries - // beyond the cap become immediate orphans. + // Fail closed: refuse to silently drop a TTL that may + // belong to a wide-column key arriving later in the + // snapshot scan. The operator should raise the cap + // (WithPendingTTLCap) or investigate the snapshot for + // corruption. We still increment pendingTTLOverflow so + // the Finalize warning surfaces the count even if the + // caller swallows the error. r.pendingTTLOverflow++ - r.orphanTTLCount++ - return + return cockroachdberr.Wrapf(ErrPendingTTLBufferFull, + "buffer at cap=%d (user_key_len=%d)", r.pendingTTLCap, len(userKey)) } r.pendingTTL[string(userKey)] = expireAtMs + return nil } // claimPendingTTL drains any buffered TTL for userKey into the diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 1e115538..07c7bba3 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -617,60 +617,52 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { } } -// TestRedisDB_PendingTTLBoundedByCap pins the codex P1 fix on PR -// #791 round 2: pendingTTL must NOT grow unboundedly. Once the cap -// is reached, subsequent unknown-kind TTLs fall back to immediate- -// orphan counting without buffering the user-key bytes. An -// adversarial snapshot with millions of unmatched `!redis|ttl|` -// records would otherwise OOM the decoder. -func TestRedisDB_PendingTTLBoundedByCap(t *testing.T) { +// TestRedisDB_PendingTTLFailsClosedAtCap pins the codex P1 fix on +// PR #790 round 5 (mirrored here on PR #791 round 4): when +// pendingTTL reaches cap and another unknown-kind TTL arrives, +// HandleTTL fails closed with ErrPendingTTLBufferFull rather than +// silently counting the entry as an orphan. Silent overflow would +// permanently lose expire_at_ms for wide-column keys arriving +// later in the scan. +func TestRedisDB_PendingTTLFailsClosedAtCap(t *testing.T) { t.Parallel() const cap = 8 db, _ := newRedisDB(t) db.WithPendingTTLCap(cap) - // Drive 2*cap unknown-kind TTLs. - for i := 0; i < cap*2; i++ { + for i := 0; i < cap; i++ { key := []byte("orphan-" + intToDecimal(i)) ms := uint64(i) + 1 //nolint:gosec // i bounded above if err := db.HandleTTL(key, encodeTTLValue(ms)); err != nil { - t.Fatalf("HandleTTL[%d]: %v", i, err) + t.Fatalf("HandleTTL[%d]: %v (should succeed within cap)", i, err) } } - // At intake: pendingTTL is exactly at cap, the excess went - // straight to orphanTTLCount via the overflow path. if got := len(db.pendingTTL); got != cap { t.Fatalf("pendingTTL len = %d, want %d (cap)", got, cap) } - if got := db.orphanTTLCount; got != cap { - t.Fatalf("orphanTTLCount = %d at intake, want %d (overflow path)", got, cap) + err := db.HandleTTL([]byte("orphan-overflow"), encodeTTLValue(999)) + if !errors.Is(err, ErrPendingTTLBufferFull) { + t.Fatalf("err = %v, want ErrPendingTTLBufferFull at cap", err) } - if got := db.pendingTTLOverflow; got != cap { - t.Fatalf("pendingTTLOverflow = %d, want %d", got, cap) + if got := db.pendingTTLOverflow; got != 1 { + t.Fatalf("pendingTTLOverflow = %d, want 1", got) } - if err := db.Finalize(); err != nil { - t.Fatalf("Finalize: %v", err) - } - // After Finalize: the remaining buffered entries are added to - // orphanTTLCount, bringing the total to 2*cap (the actual count - // of unmatched TTLs we received). - if got := db.orphanTTLCount; got != cap*2 { - t.Fatalf("orphanTTLCount = %d after Finalize, want %d", got, cap*2) + if got := db.orphanTTLCount; got != 0 { + t.Fatalf("orphanTTLCount = %d at intake, want 0 (failed closed)", got) } } -// TestRedisDB_WithPendingTTLCapZeroDisablesBuffering pins that a -// cap of 0 disables buffering entirely — every unknown-kind TTL -// becomes an immediate orphan. Matches the pre-PR #791-r1 -// behavior, useful for callers that prefer constant-space orphan -// counting over the buffered drain path. -func TestRedisDB_WithPendingTTLCapZeroDisablesBuffering(t *testing.T) { +// TestRedisDB_WithPendingTTLCapZeroOpts pins the explicit +// counter-only mode: cap==0 disables buffering entirely and every +// unknown-kind TTL becomes an immediate orphan WITHOUT firing +// ErrPendingTTLBufferFull. +func TestRedisDB_WithPendingTTLCapZeroOpts(t *testing.T) { t.Parallel() db, _ := newRedisDB(t) db.WithPendingTTLCap(0) const n = 5 for i := 0; i < n; i++ { if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { - t.Fatal(err) + t.Fatalf("HandleTTL[%d]: %v (counter-only mode must not fail)", i, err) } } if got := len(db.pendingTTL); got != 0 { From 421254e4636689264eec5259ca7a434d80d4e112 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 05:12:09 +0900 Subject: [PATCH 6/6] =?UTF-8?q?backup(stream):=20PR791=20r5=20codex=20P1?= =?UTF-8?q?=20=E2=80=94=20bound=20pendingTTL=20by=20bytes,=20not=20entries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror of PR #790 round 6 codex P1 fix. The entry-count cap from r3/r4 doesn't actually bound memory (Redis user keys up to 1 MiB each → entry-count cap permits N × 1 MiB of buffered key bytes). Fix: replace entry-count cap with a byte budget (pendingTTLBytesCap, default 64 MiB). claimPendingTTL decrements the byte counter so a long-running scan that drains as it goes can reuse the budget. Renames (no external callers existed — symbols introduced in r3/r4 of this PR): - WithPendingTTLCap → WithPendingTTLByteCap - pendingTTLCap → pendingTTLBytesCap - defaultPendingTTLCap → defaultPendingTTLBytesCap (64 MiB) - Added pendingTTLBytes counter - Added pendingTTLEntryOverheadBytes (= 8) Caller audit (per /loop standing instruction): WithPendingTTLCap had zero external callers in this branch's tree (`grep -rn 'WithPendingTTLCap|pendingTTLCap\b' --include=*.go` returns nothing post-rename). parkUnknownTTL keeps its error return signature; only the comparison shifts from entry count to byte budget. claimPendingTTL gains a counter-decrement matching parkUnknownTTL's add side. Tests renamed/added (same as PR #790 r6): - TestRedisDB_PendingTTLFailsClosedAtByteCap (renamed) - TestRedisDB_PendingTTLByteCapBoundedByLargeKey (new) - TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim (new) - TestRedisDB_WithPendingTTLByteCapZeroOpts (renamed) - TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero (renamed) Self-review: 1. Data loss - byte budget is a strictly tighter memory bound than the previous entry-count cap. 2. Concurrency - no new shared state. 3. Performance - one int comparison + add per parkUnknownTTL; one subtract per claimPendingTTL. 4. Consistency - matches PR #790 r6 exactly; the two PRs share the pendingTTL infrastructure and the fix is identical. 5. Coverage - 5 tests (3 new, 2 renamed). All pass with -race. --- internal/backup/redis_string.go | 152 ++++++++++++++++----------- internal/backup/redis_string_test.go | 106 +++++++++++++------ 2 files changed, 167 insertions(+), 91 deletions(-) diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index d135ebef..ca803012 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -73,20 +73,28 @@ var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str| var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value") // ErrPendingTTLBufferFull is returned by HandleTTL when an -// unknown-kind TTL arrives but the pendingTTL buffer is already at -// pendingTTLCap. The encoder fails closed here rather than silently -// counting the TTL as an orphan because in real Pebble snapshot -// order (`!redis|ttl|` lex-sorts before `!st|`/`!stream|`/`!zs|`), -// the dropped entry would likely belong to a valid wide-column key -// that arrives later — losing its expire_at_ms would produce a -// restored database with non-expiring data that the source -// snapshot's clients expected to expire. Codex P1 on PR #790 round -// 5 (same fix mirrored here on PR #791). +// unknown-kind TTL arrives but the pendingTTL buffer's BYTE budget +// (pendingTTLBytesCap) is already exhausted. The encoder fails +// closed here rather than silently counting the TTL as an orphan +// because in real Pebble snapshot order (`!redis|ttl|` lex-sorts +// before `!st|`/`!stream|`/`!zs|`), the dropped entry would likely +// belong to a valid wide-column key that arrives later — losing +// its expire_at_ms would produce a restored database with non- +// expiring data that the source snapshot's clients expected to +// expire. // -// Recovery: raise WithPendingTTLCap above the snapshot's count of -// unmatched-at-intake TTLs, or set the cap to 0 to explicitly opt -// into the lossy counter-only mode. -var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL buffer at cap; raise WithPendingTTLCap or accept orphan-counter mode via WithPendingTTLCap(0)") +// The budget is in BYTES (not entries) because Redis user keys can +// be up to 1 MiB each; an entry-count cap of N still permits ~N MiB +// of accumulated key bytes, which defeats the OOM protection on +// adversarial snapshots with large keys. Codex P1 on PR #790 round +// 5 (entry-count) and round 6 (entry-count → byte-budget); the +// same fix is mirrored here on PR #791. +// +// Recovery: raise WithPendingTTLByteCap above the snapshot's +// expected cumulative byte cost of unmatched-at-intake TTLs, or +// set the byte cap to 0 to explicitly opt into the lossy +// counter-only mode. +var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL byte budget exhausted; raise WithPendingTTLByteCap or accept orphan-counter mode via WithPendingTTLByteCap(0)") // redisKeyKind tracks which Redis-type prefix introduced a user key, so that // when a later !redis|ttl| record arrives we know whether to write its @@ -230,34 +238,43 @@ type RedisDB struct { // appeared as a typed record — likely a corrupted store). pendingTTL map[string]uint64 - // pendingTTLCap caps the in-memory size of pendingTTL. Once the - // map reaches this many entries, subsequent unknown-kind TTLs - // fall back to incrementing orphanTTLCount directly without - // buffering the user-key bytes. Without this cap, an adversarial - // or corrupt snapshot whose `!redis|ttl|` records never find a - // typed-record claimer would grow pendingTTL unboundedly and - // could OOM the decoder before backup completes. Codex P1 on - // PR #791 round 2. - // - // Default: defaultPendingTTLCap (1 << 20 == 1 MiB entries). At - // ~64 bytes per Go map entry that bounds the worst-case buffer - // at ~64 MiB — well above the count of legitimately-buffered - // wide-column TTL'd keys on any real cluster, but well below - // the OOM threshold for an operator decoder host. - pendingTTLCap int + // pendingTTLBytes tracks the cumulative byte cost of the + // pendingTTL buffer (each entry costs len(userKey) + 8 bytes, + // where 8 is the uint64 expireAtMs payload). Per-bucket Go-map + // overhead is NOT counted — we want the cap to be a + // deterministic byte budget the operator can reason about. + pendingTTLBytes int + + // pendingTTLBytesCap caps the byte cost of pendingTTL. Once + // the cumulative cost would exceed this budget, subsequent + // unknown-kind TTLs fail closed with ErrPendingTTLBufferFull. + // We bound by bytes (not by entry count) because Redis user + // keys can be up to 1 MiB each; an entry-count cap of N + // would still permit ~N MiB of accumulated key bytes — + // defeating the OOM protection on adversarial snapshots with + // large keys. Codex P1 finding on PR #790 round 6 (mirrored + // here on PR #791). + pendingTTLBytesCap int // pendingTTLOverflow counts entries that would have entered - // pendingTTL but were skipped because the buffer was at cap. - // Surfaced in the Finalize warning so operators can detect a - // snapshot whose TTL-vs-type cardinality exceeded the buffer. + // pendingTTL but were rejected because the byte budget was + // exhausted. Surfaced in the Finalize warning so operators + // can detect a snapshot whose TTL-vs-type cardinality exceeded + // the buffer. pendingTTLOverflow int } -// defaultPendingTTLCap caps pendingTTL at 1 MiB entries by default. -// The cap is overridable via WithPendingTTLCap for callers running -// on host classes where a different memory/coverage trade-off is -// appropriate (e.g., embedded decoders or single-key forensic dumps). -const defaultPendingTTLCap = 1 << 20 +// defaultPendingTTLBytesCap caps pendingTTL at 64 MiB cumulative +// key+payload bytes by default. Override via WithPendingTTLByteCap +// for hosts that need a different memory/coverage trade-off. +const defaultPendingTTLBytesCap = 64 << 20 + +// pendingTTLEntryOverheadBytes is the per-entry payload cost we +// charge against pendingTTLBytesCap on top of the user-key bytes. +// It accounts for the uint64 expireAtMs we store as the map value. +// The Go-map's bucket overhead is NOT included — we want a +// deterministic byte budget the operator can reason about. +const pendingTTLEntryOverheadBytes = 8 // NewRedisDB constructs a RedisDB rooted at /redis/db_/. // dbIndex selects ; today the producer always passes 0, but accepting @@ -268,30 +285,36 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { dbIndex = 0 } return &RedisDB{ - outRoot: outRoot, - dbIndex: dbIndex, - kindByKey: make(map[string]redisKeyKind), - dirsCreated: make(map[string]struct{}), - inlineTTLEmitted: make(map[string]struct{}), - hashes: make(map[string]*redisHashState), - lists: make(map[string]*redisListState), - sets: make(map[string]*redisSetState), - streams: make(map[string]*redisStreamState), - pendingTTL: make(map[string]uint64), - pendingTTLCap: defaultPendingTTLCap, + outRoot: outRoot, + dbIndex: dbIndex, + kindByKey: make(map[string]redisKeyKind), + dirsCreated: make(map[string]struct{}), + inlineTTLEmitted: make(map[string]struct{}), + hashes: make(map[string]*redisHashState), + lists: make(map[string]*redisListState), + sets: make(map[string]*redisSetState), + streams: make(map[string]*redisStreamState), + pendingTTL: make(map[string]uint64), + pendingTTLBytesCap: defaultPendingTTLBytesCap, } } -// WithPendingTTLCap overrides the default cap on the pendingTTL -// buffer. A value of 0 disables buffering entirely — every +// WithPendingTTLByteCap overrides the default byte budget for the +// pendingTTL buffer. A value of 0 disables buffering — every // unknown-kind TTL becomes an immediate orphan (matches the pre- // PR #791-r1 behavior). Negative inputs are coerced to 0. Returns // the receiver so it can be chained with other With* setters. -func (r *RedisDB) WithPendingTTLCap(capacity int) *RedisDB { +// +// The budget is in BYTES (sum of `len(userKey) + +// pendingTTLEntryOverheadBytes` over every buffered entry), NOT +// entry count, because Redis user keys can be up to 1 MiB each +// and an entry-count cap of N would still permit ~N MiB of +// accumulated key bytes. Codex P1 finding on PR #790 round 6. +func (r *RedisDB) WithPendingTTLByteCap(capacity int) *RedisDB { if capacity < 0 { capacity = 0 } - r.pendingTTLCap = capacity + r.pendingTTLBytesCap = capacity return r } @@ -421,7 +444,7 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { // fails closed when the buffer is at cap. Extracted from HandleTTL's // switch so the parent stays under the cyclop budget. // -// Three modes determined by pendingTTLCap: +// Three modes determined by pendingTTLBytesCap: // // - cap > 0 and buffer NOT full: store the (userKey, expireAtMs) // pair so a later wide-column state-init can drain it. @@ -438,24 +461,27 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { // the snapshot reader reuses key buffers across iterations — an // alias slice would race with the next record. func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) error { - if r.pendingTTLCap == 0 { + if r.pendingTTLBytesCap == 0 { // Counter-only mode (operator explicitly disabled the buffer). r.orphanTTLCount++ return nil } - if len(r.pendingTTL) >= r.pendingTTLCap { + cost := len(userKey) + pendingTTLEntryOverheadBytes + if r.pendingTTLBytes+cost > r.pendingTTLBytesCap { // Fail closed: refuse to silently drop a TTL that may // belong to a wide-column key arriving later in the - // snapshot scan. The operator should raise the cap - // (WithPendingTTLCap) or investigate the snapshot for - // corruption. We still increment pendingTTLOverflow so - // the Finalize warning surfaces the count even if the - // caller swallows the error. + // snapshot scan. The operator should raise the budget + // (WithPendingTTLByteCap) or investigate the snapshot + // for corruption. We still increment pendingTTLOverflow + // so the Finalize warning surfaces the count even if + // the caller swallows the error. r.pendingTTLOverflow++ return cockroachdberr.Wrapf(ErrPendingTTLBufferFull, - "buffer at cap=%d (user_key_len=%d)", r.pendingTTLCap, len(userKey)) + "buffer would exceed byte_cap=%d by %d bytes (user_key_len=%d, in_flight=%d)", + r.pendingTTLBytesCap, r.pendingTTLBytes+cost-r.pendingTTLBytesCap, len(userKey), r.pendingTTLBytes) } r.pendingTTL[string(userKey)] = expireAtMs + r.pendingTTLBytes += cost return nil } @@ -481,6 +507,10 @@ func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) { return 0, false } delete(r.pendingTTL, uk) + // Free the byte budget so a later snapshot scan that drains + // many entries can buffer further work. Matches the cost + // charged in parkUnknownTTL exactly. + r.pendingTTLBytes -= len(uk) + pendingTTLEntryOverheadBytes return expireAtMs, true } @@ -526,7 +556,7 @@ func (r *RedisDB) Finalize() error { // from "TTL records remained unmatched at Finalize". fields = append(fields, "pending_ttl_buffer_overflow", r.pendingTTLOverflow, - "pending_ttl_buffer_cap", r.pendingTTLCap) + "pending_ttl_buffer_bytes_cap", r.pendingTTLBytesCap) } r.warn("redis_orphan_ttl", fields...) } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 07c7bba3..82d1cecb 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -617,31 +617,33 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { } } -// TestRedisDB_PendingTTLFailsClosedAtCap pins the codex P1 fix on -// PR #790 round 5 (mirrored here on PR #791 round 4): when -// pendingTTL reaches cap and another unknown-kind TTL arrives, -// HandleTTL fails closed with ErrPendingTTLBufferFull rather than -// silently counting the entry as an orphan. Silent overflow would -// permanently lose expire_at_ms for wide-column keys arriving -// later in the scan. -func TestRedisDB_PendingTTLFailsClosedAtCap(t *testing.T) { +// TestRedisDB_PendingTTLFailsClosedAtByteCap pins the codex P1 +// fix on PR #790 round 6 (mirrored here on PR #791 round 5): the +// byte budget bounds pendingTTL memory. When an incoming entry's +// byte cost would exceed pendingTTLBytesCap, HandleTTL fails +// closed with ErrPendingTTLBufferFull. +func TestRedisDB_PendingTTLFailsClosedAtByteCap(t *testing.T) { t.Parallel() - const cap = 8 + const byteCap = 128 + const entriesPerByteCap = 8 db, _ := newRedisDB(t) - db.WithPendingTTLCap(cap) - for i := 0; i < cap; i++ { + db.WithPendingTTLByteCap(byteCap) + for i := 0; i < entriesPerByteCap; i++ { key := []byte("orphan-" + intToDecimal(i)) ms := uint64(i) + 1 //nolint:gosec // i bounded above if err := db.HandleTTL(key, encodeTTLValue(ms)); err != nil { - t.Fatalf("HandleTTL[%d]: %v (should succeed within cap)", i, err) + t.Fatalf("HandleTTL[%d]: %v (should succeed within byte_cap)", i, err) } } - if got := len(db.pendingTTL); got != cap { - t.Fatalf("pendingTTL len = %d, want %d (cap)", got, cap) + if got := len(db.pendingTTL); got != entriesPerByteCap { + t.Fatalf("pendingTTL len = %d, want %d", got, entriesPerByteCap) } - err := db.HandleTTL([]byte("orphan-overflow"), encodeTTLValue(999)) + if got := db.pendingTTLBytes; got != byteCap { + t.Fatalf("pendingTTLBytes = %d, want %d", got, byteCap) + } + err := db.HandleTTL([]byte("orphan-X"), encodeTTLValue(999)) if !errors.Is(err, ErrPendingTTLBufferFull) { - t.Fatalf("err = %v, want ErrPendingTTLBufferFull at cap", err) + t.Fatalf("err = %v, want ErrPendingTTLBufferFull at byte_cap", err) } if got := db.pendingTTLOverflow; got != 1 { t.Fatalf("pendingTTLOverflow = %d, want 1", got) @@ -651,14 +653,60 @@ func TestRedisDB_PendingTTLFailsClosedAtCap(t *testing.T) { } } -// TestRedisDB_WithPendingTTLCapZeroOpts pins the explicit -// counter-only mode: cap==0 disables buffering entirely and every -// unknown-kind TTL becomes an immediate orphan WITHOUT firing -// ErrPendingTTLBufferFull. -func TestRedisDB_WithPendingTTLCapZeroOpts(t *testing.T) { +// TestRedisDB_PendingTTLByteCapBoundedByLargeKey pins the large- +// key defense: a single oversized key fails closed under a small +// byte budget even though it's a single entry. +func TestRedisDB_PendingTTLByteCapBoundedByLargeKey(t *testing.T) { + t.Parallel() + const byteCap = 64 + db, _ := newRedisDB(t) + db.WithPendingTTLByteCap(byteCap) + largeKey := make([]byte, 100) // 100 + 8 = 108 > 64 + for i := range largeKey { + largeKey[i] = byte(i % 256) + } + err := db.HandleTTL(largeKey, encodeTTLValue(1)) + if !errors.Is(err, ErrPendingTTLBufferFull) { + t.Fatalf("err = %v, want ErrPendingTTLBufferFull on oversize key", err) + } + if got := len(db.pendingTTL); got != 0 { + t.Fatalf("pendingTTL must be empty after failed insert, got %d", got) + } +} + +// TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim pins that the +// byte counter is decremented when an entry is drained via +// claimPendingTTL. +func TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + const byteCap = 32 + db.WithPendingTTLByteCap(byteCap) + for i := 0; i < 3; i++ { + if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { + t.Fatalf("HandleTTL[%d]: %v", i, err) + } + } + if got := db.pendingTTLBytes; got != 30 { + t.Fatalf("pendingTTLBytes = %d, want 30", got) + } + if err := db.HandleSetMember(setMemberKey("k0", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if got := db.pendingTTLBytes; got != 20 { + t.Fatalf("pendingTTLBytes after drain = %d, want 20 (reclaimed 10 bytes)", got) + } + if err := db.HandleTTL([]byte("k3"), encodeTTLValue(1)); err != nil { + t.Fatalf("HandleTTL after drain failed: %v", err) + } +} + +// TestRedisDB_WithPendingTTLByteCapZeroOpts pins the explicit +// counter-only mode: byte_cap==0 disables buffering entirely. +func TestRedisDB_WithPendingTTLByteCapZeroOpts(t *testing.T) { t.Parallel() db, _ := newRedisDB(t) - db.WithPendingTTLCap(0) + db.WithPendingTTLByteCap(0) const n = 5 for i := 0; i < n; i++ { if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { @@ -673,16 +721,14 @@ func TestRedisDB_WithPendingTTLCapZeroOpts(t *testing.T) { } } -// TestRedisDB_WithPendingTTLCapNegativeCoercedToZero pins the input -// sanitisation: WithPendingTTLCap(-1) must behave like cap=0 -// (disable buffering) rather than panicking or leaking the negative -// value into the comparison. -func TestRedisDB_WithPendingTTLCapNegativeCoercedToZero(t *testing.T) { +// TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero pins +// input sanitisation. +func TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero(t *testing.T) { t.Parallel() db, _ := newRedisDB(t) - db.WithPendingTTLCap(-100) - if db.pendingTTLCap != 0 { - t.Fatalf("pendingTTLCap = %d after WithPendingTTLCap(-100), want 0", db.pendingTTLCap) + db.WithPendingTTLByteCap(-100) + if db.pendingTTLBytesCap != 0 { + t.Fatalf("pendingTTLBytesCap = %d after WithPendingTTLByteCap(-100), want 0", db.pendingTTLBytesCap) } }