From 5e323cc53995cb3f960fff496cf78024ea85c78c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 19 May 2026 18:14:09 +0900 Subject: [PATCH 1/7] backup: Redis zset encoder (Phase 0a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decodes !zs|meta|/!zs|mem|/!zs|scr|/!zs|meta|d| snapshot records into the per-zset `zsets/.json` shape defined by the Phase 0 design (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md line 334-335). Mirrors the hash/list/set encoder shape: - !zs|meta| → 8-byte BE declared member count - !zs|mem| → member m with 8-byte IEEE 754 score - !zs|scr| → secondary score index; silently discarded (!zs|mem| is source of truth at backup time) - !zs|meta|d| → delta records; silently skipped (set/hash policy) Output is sorted by raw member-name bytes so `diff -r` of two dumps with the same logical contents but mutated scores stays line-stable. Scores serialize as JSON numbers for finite values and the ZADD-conventional `"+inf"`/`"-inf"` strings for non-finite ones (json.Marshal rejects ±Inf so the score field uses json.RawMessage). NaN scores fail closed at HandleZSetMember: Redis's ZADD rejects NaN at the wire level, so a NaN in storage indicates corruption that we refuse to silently propagate into the dump. TTL routing: !redis|ttl| for a registered zset key folds into the zset JSON's expire_at_ms (matching the set/list/hash inlining), so a restorer replays ZADD + EXPIRE in one shot without chasing a separate sidecar. Self-review: 1. Data loss — !zs|mem| is the source of truth; !zs|scr| and the delta family are intentional silent skips with caller-audit notes. 2. Concurrency — RedisDB is sequential per scope (matches the existing per-DB encoder contract); no shared state across DBs. 3. Performance — per-zset state buffered in a map; flushed at Finalize. Bounded by maxWideColumnItems on the live side. Sort is O(n log n) on member-name bytes; identical cost shape to the hash/set encoders that shipped in #725/#758. 4. Consistency — JSON field-order determinism preserved (struct tags, not map). Inf score uses string form via json.RawMessage so the same `score` key accepts both shapes. 5. Coverage — 17 table-driven tests: - round-trip basic / empty zset / TTL inlining - binary member via base64 envelope - delta-key skip (both HandleZSetMeta entry + parseZSetMetaKey guard) - HandleZSetMetaDelta explicit entry point - HandleZSetScore silent discard - malformed meta length / overflow / MaxInt64 boundary - malformed member-value length / NaN rejection - ±Inf string-form serialization - members-without-meta still emits file - duplicate-members latest-wins (ZADD semantics) Caller audit for semantics-changing edit (new `case redisKindZSet:` in HandleTTL, redis_string.go:310): purely additive. The new branch fires only when zsetState() has previously registered the key; no existing handler maps to redisKindZSet, so no prior call site changes behavior. Verified via `grep -n 'redisKindZSet' internal/backup/`. --- internal/backup/redis_hash.go | 6 +- internal/backup/redis_string.go | 22 +- internal/backup/redis_zset.go | 329 ++++++++++++++++++ internal/backup/redis_zset_test.go | 527 +++++++++++++++++++++++++++++ 4 files changed, 881 insertions(+), 3 deletions(-) create mode 100644 internal/backup/redis_zset.go create mode 100644 internal/backup/redis_zset_test.go diff --git a/internal/backup/redis_hash.go b/internal/backup/redis_hash.go index e314cfbe..2e5f1e5b 100644 --- a/internal/backup/redis_hash.go +++ b/internal/backup/redis_hash.go @@ -237,7 +237,11 @@ type hashFieldRecord struct { Value json.RawMessage `json:"value"` } -func marshalHashJSON(st *redisHashState) ([]byte, error) { +// nolint comment lives at the function head: dupl pairs this with +// marshalZSetJSON, which carries the rationale (parallel design-spec +// wrappers that can't collapse into a shared helper without breaking +// JSON field-order determinism). See redis_zset.go:marshalZSetJSON. +func marshalHashJSON(st *redisHashState) ([]byte, error) { //nolint:dupl // see comment above + redis_zset.go // Sort by raw byte order for deterministic output across runs. names := make([]string, 0, len(st.fields)) for name := range st.fields { diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 688225d1..a2d0ecb5 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -85,6 +85,7 @@ const ( redisKindHash redisKindList redisKindSet + redisKindZSet ) // RedisDB encodes one logical Redis database (`redis/db_/`). All @@ -185,6 +186,14 @@ type RedisDB struct { // Finalize into sets/.json with members sorted by raw byte // order for deterministic dump output. sets map[string]*redisSetState + + // zsets buffers per-userKey sorted-set state. Score lives in the + // !zs|mem| value (8-byte IEEE 754 big-endian); member name is the + // trailing key bytes (binary-safe). Flushed at Finalize into + // zsets/.json sorted by member-name bytes (not by score) so + // `diff -r` between dumps stays line-stable across score-only + // mutations. + zsets map[string]*redisZSetState } // NewRedisDB constructs a RedisDB rooted at /redis/db_/. @@ -204,6 +213,7 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { hashes: make(map[string]*redisHashState), lists: make(map[string]*redisListState), sets: make(map[string]*redisSetState), + zsets: make(map[string]*redisZSetState), } } @@ -297,9 +307,16 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.expireAtMs = expireAtMs st.hasTTL = true return nil + case redisKindZSet: + // Same per-record TTL inlining: ZADD + EXPIRE replay in + // one shot from the per-zset JSON, no separate sidecar. + st := r.zsetState(userKey) + st.expireAtMs = expireAtMs + st.hasTTL = true + return nil case redisKindUnknown: // Track orphan TTL counts only — keys are unused before the - // remaining wide-column encoders (set/zset/stream) land, and + // remaining wide-column encoder (stream) lands, and // buffering them allocates proportional to user-key size // (up to 1 MiB per key) for no benefit. Codex P2 round 6. r.orphanTTLCount++ @@ -318,6 +335,7 @@ func (r *RedisDB) Finalize() error { r.flushHashes, r.flushLists, r.flushSets, + r.flushZSets, func() error { return closeJSONL(r.stringsTTL) }, func() error { return closeJSONL(r.hllTTL) }, r.closeKeymap, @@ -329,7 +347,7 @@ func (r *RedisDB) Finalize() error { if r.warn != nil && r.orphanTTLCount > 0 { r.warn("redis_orphan_ttl", "count", r.orphanTTLCount, - "hint", "remaining wide-column encoders (zset/stream) have not landed yet") + "hint", "remaining wide-column encoder (stream) has not landed yet") } return firstErr } diff --git a/internal/backup/redis_zset.go b/internal/backup/redis_zset.go new file mode 100644 index 00000000..0f4e20a4 --- /dev/null +++ b/internal/backup/redis_zset.go @@ -0,0 +1,329 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "math" + "path/filepath" + "sort" + + cockroachdberr "github.com/cockroachdb/errors" +) + +// Redis zset encoder. Translates raw !zs|... snapshot records into the +// per-zset `zsets/.json` shape defined by Phase 0 +// (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md). +// +// Wire format mirrors store/zset_helpers.go: +// - !zs|meta| → 8-byte BE Len +// - !zs|mem| → 8-byte IEEE 754 score +// - !zs|scr| → empty (score index) +// - !zs|meta|d|<...> → 8-byte LenDelta +// +// Routing rules: +// - `!zs|mem|` is the source of truth: it carries both the member name +// (in the trailing key bytes) and its IEEE 754 score (in the value). +// - `!zs|scr|` is a secondary index used at scan time on the live side; +// for backup purposes it is redundant and is silently skipped, the +// same way `!st|scr|` (no such prefix exists) and `!hs|meta|d|` +// deltas are skipped by the hash/set encoders. +// - `!zs|meta|d|` deltas are silently skipped; `!zs|mem|` already +// reflects the post-delta state at backup time. +const ( + RedisZSetMetaPrefix = "!zs|meta|" + RedisZSetMemberPrefix = "!zs|mem|" + RedisZSetScorePrefix = "!zs|scr|" + RedisZSetMetaDeltaPrefix = "!zs|meta|d|" + + // redisZSetScoreSize is the size of the IEEE 754 big-endian score + // stored in !zs|mem| values. Same constant as zsetMetaSizeBytes in + // store/zset_helpers.go; duplicated here to keep the backup + // package free of internal/storage imports. + redisZSetScoreSize = 8 +) + +// ErrRedisInvalidZSetMeta is returned when an !zs|meta| value is not +// the expected 8-byte big-endian member count. +var ErrRedisInvalidZSetMeta = cockroachdberr.New("backup: invalid !zs|meta| value") + +// ErrRedisInvalidZSetMember is returned when an !zs|mem| value is not +// the expected 8-byte IEEE 754 score, or contains a NaN score (Redis's +// ZADD command rejects NaN, so a NaN at backup time indicates store +// corruption and a silent fall-through would re-corrupt the restored +// cluster). +var ErrRedisInvalidZSetMember = cockroachdberr.New("backup: invalid !zs|mem| value") + +// ErrRedisInvalidZSetKey is returned when an !zs| key cannot be parsed +// for its userKeyLen+userKey (or member) segments. +var ErrRedisInvalidZSetKey = cockroachdberr.New("backup: malformed !zs| key") + +// redisZSetState buffers one userKey's zset during a snapshot scan. +// Members are stored as a map keyed by their byte string so duplicate +// HandleZSetMember calls collapse to the latest-wins score (matching +// Redis's ZADD semantics: re-adding a member overwrites the prior +// score). +type redisZSetState struct { + metaSeen bool + declaredLen int64 + members map[string]float64 + expireAtMs uint64 + hasTTL bool +} + +// HandleZSetMeta processes one !zs|meta| record. The +// value is the 8-byte BE member count. We park the declared length so +// flushZSets can warn on a mismatch with the observed member count +// and register the user key so a later !redis|ttl| record +// routes back to this zset state. +// +// !zs|meta|d|... delta keys share the !zs|meta| string prefix, so a +// snapshot dispatcher that routes by "starts with RedisZSetMetaPrefix" +// lands delta records here too. We mirror the hash/set encoder policy +// (PRs #725, #758) and silently skip the delta family because +// !zs|mem| records are the source of truth for restored zset +// contents. +func (r *RedisDB) HandleZSetMeta(key, value []byte) error { + if bytes.HasPrefix(key, []byte(RedisZSetMetaDeltaPrefix)) { + return nil + } + userKey, ok := parseZSetMetaKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetKey, "meta key: %q", key) + } + if len(value) != redisUint64Bytes { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMeta, + "length %d != %d", len(value), redisUint64Bytes) + } + // Bounds-check the uint64 declared count before narrowing to + // int64; without this a corrupted store with the high bit set + // would wrap to a negative declaredLen and fire spurious + // redis_zset_length_mismatch warnings on every flush. Mirrors + // the hash/list/set encoders' symmetric guard. + rawLen := binary.BigEndian.Uint64(value) + if rawLen > math.MaxInt64 { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMeta, + "declared len %d overflows int64", rawLen) + } + st := r.zsetState(userKey) + st.declaredLen = int64(rawLen) //nolint:gosec // bounds-checked above + st.metaSeen = true + return nil +} + +// HandleZSetMember processes one !zs|mem| +// record. The value is the 8-byte IEEE 754 big-endian score. The +// member bytes live in the key (binary-safe). +// +// NaN scores are rejected. Redis's ZADD command itself rejects NaN +// at the wire level, so a NaN at backup time indicates either a +// corrupted store or a write path that bypassed ZADD validation — +// either way, emitting it as `score: null` or letting json.Marshal +// fail mid-flush would corrupt the dump silently. Fail closed here +// so the problem surfaces at the source record. +func (r *RedisDB) HandleZSetMember(key, value []byte) error { + userKey, member, ok := parseZSetMemberKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetKey, "member key: %q", key) + } + if len(value) != redisZSetScoreSize { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMember, + "length %d != %d", len(value), redisZSetScoreSize) + } + score := math.Float64frombits(binary.BigEndian.Uint64(value)) + if math.IsNaN(score) { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMember, + "NaN score for member of user key %q (length %d)", userKey, len(userKey)) + } + st := r.zsetState(userKey) + st.members[string(member)] = score + return nil +} + +// HandleZSetScore accepts and discards one !zs|scr|... record. The +// score index is a live-side secondary index for ZRANGEBYSCORE; the +// authoritative member→score mapping lives in !zs|mem|. Snapshot +// dispatchers should still route this prefix here (rather than to +// the orphan-record warning sink) so a future audit that greps for +// "every !zs| prefix has a handler" finds one. +func (r *RedisDB) HandleZSetScore(_, _ []byte) error { return nil } + +// HandleZSetMetaDelta accepts and discards one !zs|meta|d|... record. +// See HandleZSetMeta's docstring for the rationale; !zs|mem| is the +// source of truth at backup time. +func (r *RedisDB) HandleZSetMetaDelta(_, _ []byte) error { return nil } + +// zsetState lazily creates per-key state. Mirrors the hash/list/set +// kindByKey-registration pattern so HandleZSetMeta, HandleZSetMember, +// and the HandleTTL back-edge all agree on the kind. +func (r *RedisDB) zsetState(userKey []byte) *redisZSetState { + uk := string(userKey) + if st, ok := r.zsets[uk]; ok { + return st + } + st := &redisZSetState{members: make(map[string]float64)} + r.zsets[uk] = st + r.kindByKey[uk] = redisKindZSet + return st +} + +// parseZSetMetaKey strips !zs|meta| and the 4-byte BE userKeyLen +// prefix. Returns (userKey, true) on success. Delta keys +// (!zs|meta|d|...) share the meta string prefix and would otherwise +// be parsed as base-meta with a garbage userKeyLen — refuse them at +// the boundary so a misrouted delta surfaces a parse error rather +// than silent state corruption. Mirrors parseHashMetaKey/parseSetMetaKey. +func parseZSetMetaKey(key []byte) ([]byte, bool) { + if bytes.HasPrefix(key, []byte(RedisZSetMetaDeltaPrefix)) { + return nil, false + } + rest := bytes.TrimPrefix(key, []byte(RedisZSetMetaPrefix)) + if len(rest) == len(key) { + return nil, false + } + return parseUserKeyLenPrefix(rest) +} + +// parseZSetMemberKey strips !zs|mem| and the 4-byte BE userKeyLen +// prefix, then returns (userKey, member, true). The member bytes +// are everything after the userKey segment — binary-safe per +// Redis's ZADD contract. +func parseZSetMemberKey(key []byte) ([]byte, []byte, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisZSetMemberPrefix)) + if len(rest) == len(key) { + return nil, nil, false + } + userKey, ok := parseUserKeyLenPrefix(rest) + if !ok { + return nil, nil, false + } + member := rest[wideColumnUserKeyLenSize+len(userKey):] + return userKey, member, true +} + +// flushZSets writes one JSON file per accumulated zset to +// zsets/.json. Empty zsets (Len=0, no members) still emit a +// file when meta was seen, mirroring the hash/list/set encoders: +// their existence is observable to clients (TYPE returns "zset", +// ZCARD returns 0). Mismatched declared-vs-observed length surfaces +// an `redis_zset_length_mismatch` warning. +func (r *RedisDB) flushZSets() error { + return flushWideColumnDir(r, r.zsets, "zsets", func(dir, uk string, st *redisZSetState) error { + if r.warn != nil && st.metaSeen && int64(len(st.members)) != st.declaredLen { + r.warn("redis_zset_length_mismatch", + "user_key_len", len(uk), + "declared_len", st.declaredLen, + "observed_members", len(st.members), + "hint", "meta record's Len does not match the count of !zs|mem| keys for this user key") + } + return r.writeZSetJSON(dir, []byte(uk), st) + }) +} + +func (r *RedisDB) writeZSetJSON(dir string, userKey []byte, st *redisZSetState) error { + encoded := EncodeSegment(userKey) + if err := r.recordIfFallback(encoded, userKey); err != nil { + return err + } + path := filepath.Join(dir, encoded+".json") + body, err := marshalZSetJSON(st) + if err != nil { + return err + } + if err := writeFileAtomic(path, body); err != nil { + return cockroachdberr.WithStack(err) + } + return nil +} + +// zsetMemberRecord is the dump-format projection of one zset entry. +// `member` is binary-safe (via marshalRedisBinaryValue) and `score` +// is a JSON RawMessage so finite scores serialize as JSON numbers +// (matching the design example `"score": 100`) while ±Inf, which +// json.Marshal rejects, serialize as the conventional ZADD strings +// `"+inf"` / `"-inf"`. Phase 0b's reverse encoder reads both shapes. +type zsetMemberRecord struct { + Member json.RawMessage `json:"member"` + Score json.RawMessage `json:"score"` +} + +// marshalRedisZSetScore renders one IEEE 754 score as the dump's +// score field. ±Inf is emitted as a JSON string ("+inf"/"-inf") +// because json.Marshal returns an error for non-finite floats; this +// matches the conventional ZADD score syntax (Redis accepts "+inf", +// "-inf", "+Inf", "-Inf", and bare "inf"). NaN is unreachable here +// — HandleZSetMember rejects NaN scores at intake — so we do not +// emit a NaN representation. +func marshalRedisZSetScore(score float64) (json.RawMessage, error) { + if math.IsInf(score, 1) { + return json.RawMessage(`"+inf"`), nil + } + if math.IsInf(score, -1) { + return json.RawMessage(`"-inf"`), nil + } + out, err := json.Marshal(score) + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + return out, nil +} + +// marshalZSetJSON renders one zset state as the design's +// `{format_version, members, expire_at_ms}` JSON shape. Members are +// emitted as an array (not a JSON object) sorted by raw byte order +// of the member name so identical snapshots produce identical dump +// output across runs — same rationale as the hash/set encoders' +// arrays (binary-safe member names that would collide under JSON +// object keying when percent-encoded). Each member name goes through +// marshalRedisBinaryValue so non-UTF-8 bytes round-trip via the +// `{"base64":"..."}` envelope. +// +// Member sort key is the member-name bytes only; if a Phase 0b +// consumer relies on score-sorted output, it should re-sort on +// read. We pick name order here so a `diff -r` of two dumps with +// the same logical contents but mutated scores still produces a +// stable line-by-line diff. +// +// The wrapper shape (format_version + + expire_at_ms) is +// intentionally parallel to marshalHashJSON's because the design +// specifies parallel wrappers per Redis type. Collapsing them into a +// shared helper would require either a map[string]any (kills JSON +// field-order determinism) or a generic struct (can't carry distinct +// JSON tags per type), so we keep the structural parallel explicit. +// Reviewers comparing the two functions should diff +// (hashFieldRecord, "fields") against (zsetMemberRecord, "members") — +// the only intentional divergence. +func marshalZSetJSON(st *redisZSetState) ([]byte, error) { //nolint:dupl // see comment above + names := make([]string, 0, len(st.members)) + for m := range st.members { + names = append(names, m) + } + sort.Strings(names) + out := make([]zsetMemberRecord, 0, len(names)) + for _, name := range names { + nameJSON, err := marshalRedisBinaryValue([]byte(name)) + if err != nil { + return nil, err + } + scoreJSON, err := marshalRedisZSetScore(st.members[name]) + if err != nil { + return nil, err + } + out = append(out, zsetMemberRecord{Member: nameJSON, Score: scoreJSON}) + } + type record struct { + FormatVersion uint32 `json:"format_version"` + Members []zsetMemberRecord `json:"members"` + ExpireAtMs *uint64 `json:"expire_at_ms"` + } + rec := record{FormatVersion: 1, Members: out} + if st.hasTTL { + ms := st.expireAtMs + rec.ExpireAtMs = &ms + } + body, err := json.MarshalIndent(rec, "", " ") + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + return body, nil +} diff --git a/internal/backup/redis_zset_test.go b/internal/backup/redis_zset_test.go new file mode 100644 index 00000000..59e00845 --- /dev/null +++ b/internal/backup/redis_zset_test.go @@ -0,0 +1,527 @@ +package backup + +import ( + "encoding/binary" + "encoding/json" + "math" + "os" + "path/filepath" + "testing" + + "github.com/cockroachdb/errors" +) + +// encodeZSetMetaValue builds the 8-byte BE member-count value used by +// the live store/zset_helpers.go (mirror of store.MarshalZSetMeta). +func encodeZSetMetaValue(memberCount int64) []byte { + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, uint64(memberCount)) //nolint:gosec + return v +} + +// encodeZSetScoreValue builds the 8-byte IEEE 754 BE score value used by +// the live store/zset_helpers.go (mirror of store.MarshalZSetScore). +func encodeZSetScoreValue(score float64) []byte { + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, math.Float64bits(score)) + return v +} + +// zsetMetaKey is the test-side mirror of store.ZSetMetaKey: +// !zs|meta|. +func zsetMetaKey(userKey string) []byte { + out := []byte(RedisZSetMetaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + return append(out, userKey...) +} + +// zsetMemberKey mirrors store.ZSetMemberKey: +// !zs|mem|. Member is binary-safe. +func zsetMemberKey(userKey string, member []byte) []byte { + out := []byte(RedisZSetMemberPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + return append(out, member...) +} + +// zsetScoreKey mirrors store.ZSetScoreKey: +// !zs|scr|. The encoder +// must silently discard this prefix; we only build it here to feed +// HandleZSetScore in a test that pins the "discard" contract. +func zsetScoreKey(userKey string, sortableScore [8]byte, member []byte) []byte { + out := []byte(RedisZSetScorePrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + out = append(out, sortableScore[:]...) + return append(out, member...) +} + +// zsetMetaDeltaKey mirrors store.ZSetMetaDeltaKey: +// !zs|meta|d|. +func zsetMetaDeltaKey(userKey string, commitTS uint64, seqInTxn uint32) []byte { + out := []byte(RedisZSetMetaDeltaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + var ts [8]byte + binary.BigEndian.PutUint64(ts[:], commitTS) + out = append(out, ts[:]...) + var seq [4]byte + binary.BigEndian.PutUint32(seq[:], seqInTxn) + return append(out, seq[:]...) +} + +func readZSetJSON(t *testing.T, path string) map[string]any { + t.Helper() + b, err := os.ReadFile(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + t.Fatalf("unmarshal: %v", err) + } + return m +} + +func zsetMembersArray(t *testing.T, m map[string]any) []any { + t.Helper() + v, ok := m["members"] + if !ok { + t.Fatalf("members missing in %+v", m) + } + raw, ok := v.([]any) + if !ok { + t.Fatalf("members = %T(%v), want []any", v, v) + } + return raw +} + +func zsetFloat(t *testing.T, m map[string]any, key string) float64 { + t.Helper() + v, ok := m[key] + if !ok { + t.Fatalf("field %q missing in %+v", key, m) + } + f, ok := v.(float64) + if !ok { + t.Fatalf("field %q = %T(%v), want float64", key, v, v) + } + return f +} + +// zsetMemberEntry is the test-side projection of one decoded +// {"member":..., "score":...} record. +type zsetMemberEntry struct { + member string + scoreNum float64 + scoreStr string // populated only when score is the string form ("+inf"/"-inf") + scoreKind string // "number" or "string" +} + +// assertZSetEntryScore checks that one decoded record matches the +// expected entry. Extracted from assertZSetMembersEqual to keep both +// callers below the cyclop complexity ceiling and so the +// type-assertion guards live in one place. +func assertZSetEntryScore(t *testing.T, idx int, rec map[string]any, w zsetMemberEntry) { + t.Helper() + switch w.scoreKind { + case "number": + score, ok := rec["score"].(float64) + if !ok { + t.Fatalf("members[%d].score = %T(%v), want number", idx, rec["score"], rec["score"]) + } + if score != w.scoreNum { + t.Fatalf("members[%d].score = %v, want %v", idx, score, w.scoreNum) + } + case "string": + if rec["score"] != w.scoreStr { + t.Fatalf("members[%d].score = %v(%T), want %q (Inf string form)", idx, rec["score"], rec["score"], w.scoreStr) + } + default: + t.Fatalf("test bug: unknown scoreKind %q", w.scoreKind) + } +} + +// assertZSetMembersEqual checks that the JSON `members` array matches +// the given expected entries. +func assertZSetMembersEqual(t *testing.T, members []any, want []zsetMemberEntry) { + t.Helper() + if len(members) != len(want) { + t.Fatalf("len(members) = %d, want %d (got %v)", len(members), len(want), members) + } + for i, w := range want { + rec, ok := members[i].(map[string]any) + if !ok { + t.Fatalf("members[%d] = %T(%v), want object", i, members[i], members[i]) + } + if rec["member"] != w.member { + t.Fatalf("members[%d].member = %v, want %v", i, rec["member"], w.member) + } + assertZSetEntryScore(t, i, rec, w) + } +} + +// TestRedisDB_ZSetRoundTripBasic confirms a multi-member zset +// round-trips through the encoder, that members are sorted by +// raw byte order (NOT score order, see marshalZSetJSON's docstring), +// and that score values land on the JSON record as plain numbers. +func TestRedisDB_ZSetRoundTripBasic(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("leaderboard"), encodeZSetMetaValue(3)); err != nil { + t.Fatalf("HandleZSetMeta: %v", err) + } + // Submit out of byte order and out of score order to exercise the sort. + for _, e := range []struct { + member string + score float64 + }{ + {"charlie", 30}, + {"alice", 100}, + {"bob", 50}, + } { + if err := db.HandleZSetMember(zsetMemberKey("leaderboard", []byte(e.member)), encodeZSetScoreValue(e.score)); err != nil { + t.Fatalf("HandleZSetMember(%s): %v", e.member, err) + } + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "leaderboard.json")) + if zsetFloat(t, got, "format_version") != 1 { + t.Fatalf("format_version = %v", got["format_version"]) + } + if got["expire_at_ms"] != nil { + t.Fatalf("expire_at_ms must be nil without TTL, got %v", got["expire_at_ms"]) + } + // Sorted by member bytes: alice < bob < charlie. + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 100, scoreKind: "number"}, + {member: "bob", scoreNum: 50, scoreKind: "number"}, + {member: "charlie", scoreNum: 30, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetEmptyZSetStillEmitsFile mirrors the hash/list/set +// emit-empty rule: ZCARD==0 is observable to clients (TYPE returns +// "zset"), so the dump must preserve existence. +func TestRedisDB_ZSetEmptyZSetStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("empty"), encodeZSetMetaValue(0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "empty.json")) + if members := zsetMembersArray(t, got); len(members) != 0 { + t.Fatalf("empty zset should emit empty members array, got %v", members) + } +} + +// TestRedisDB_ZSetTTLInlinedFromScanIndex pins that !redis|ttl| +// records for a zset user key fold into the zset's JSON expire_at_ms, +// not a separate sidecar (the strings/HLL pattern). +func TestRedisDB_ZSetTTLInlinedFromScanIndex(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("k"), encodeZSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), encodeZSetScoreValue(1.5)); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if zsetFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d", got["expire_at_ms"], fixedExpireMs) + } + if _, err := os.Stat(filepath.Join(root, "redis", "db_0", "zsets_ttl.jsonl")); !os.IsNotExist(err) { + t.Fatalf("unexpected zset TTL sidecar: stat err=%v", err) + } +} + +// TestRedisDB_ZSetLengthMismatchWarns pins the warn-on-mismatch +// contract — same shape as the hash/list/set encoders. +func TestRedisDB_ZSetLengthMismatchWarns(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleZSetMeta(zsetMetaKey("z"), encodeZSetMetaValue(5)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("only")), encodeZSetScoreValue(1)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + want := "redis_zset_length_mismatch" + found := false + for _, e := range events { + if e == want { + found = true + break + } + } + if !found { + t.Fatalf("expected %q warning, got %v", want, events) + } +} + +// TestRedisDB_ZSetBinaryMemberUsesBase64Envelope confirms non-UTF-8 +// member bytes round-trip via the typed `{"base64":"..."}` envelope. +func TestRedisDB_ZSetBinaryMemberUsesBase64Envelope(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("blob"), encodeZSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("blob", []byte{0x80, 0xff, 0x01}), encodeZSetScoreValue(42)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "blob.json")) + members := zsetMembersArray(t, got) + if len(members) != 1 { + t.Fatalf("len(members) = %d, want 1", len(members)) + } + rec, ok := members[0].(map[string]any) + if !ok { + t.Fatalf("members[0] = %T(%v), want object", members[0], members[0]) + } + envelope, ok := rec["member"].(map[string]any) + if !ok { + t.Fatalf("expected base64 envelope on member, got %T(%v)", rec["member"], rec["member"]) + } + if envelope["base64"] == "" { + t.Fatalf("base64 envelope missing payload: %v", envelope) + } +} + +// TestRedisDB_ZSetHandleMetaSkipsDeltaKey pins that the !zs|meta|d|... +// family is silently skipped by HandleZSetMeta. Mirrors the +// hash/list/set delta-key guards. +func TestRedisDB_ZSetHandleMetaSkipsDeltaKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + deltaValue := make([]byte, 8) + if err := db.HandleZSetMeta(zsetMetaDeltaKey("k", 7, 0), deltaValue); err != nil { + t.Fatalf("delta key must be silently skipped, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(db.outRoot, "redis", "db_0", "zsets")); !os.IsNotExist(err) { + t.Fatalf("delta-only run should not create zsets/, stat err=%v", err) + } +} + +// TestRedisDB_ZSetHandleZSetMetaDelta exercises the explicit +// delta-handler entry point (the dispatcher routes !zs|meta|d| keys +// here directly when it can distinguish them from base meta). +func TestRedisDB_ZSetHandleZSetMetaDelta(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + if err := db.HandleZSetMetaDelta(zsetMetaDeltaKey("k", 1, 0), make([]byte, 8)); err != nil { + t.Fatalf("HandleZSetMetaDelta must accept any value, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } +} + +// TestRedisDB_ZSetHandleZSetScoreSilentlyDiscards pins that the +// !zs|scr| score-index prefix is dropped at intake. !zs|mem| carries +// the authoritative member→score mapping. +func TestRedisDB_ZSetHandleZSetScoreSilentlyDiscards(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var sortable [8]byte + binary.BigEndian.PutUint64(sortable[:], 0xdeadbeef) + if err := db.HandleZSetScore(zsetScoreKey("k", sortable, []byte("m")), nil); err != nil { + t.Fatalf("HandleZSetScore must not return errors, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + // No state was registered — no directory, no warning. + if _, err := os.Stat(filepath.Join(db.outRoot, "redis", "db_0", "zsets")); !os.IsNotExist(err) { + t.Fatalf("score-only run must not create zsets/, stat err=%v", err) + } +} + +// TestRedisDB_ZSetRejectsMalformedMetaValueLength pins that an +// !zs|meta| value of the wrong length surfaces as an error. +func TestRedisDB_ZSetRejectsMalformedMetaValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + err := db.HandleZSetMeta(zsetMetaKey("k"), []byte{0x00}) + if !errors.Is(err, ErrRedisInvalidZSetMeta) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMeta", err) + } +} + +// TestRedisDB_ZSetRejectsOverflowingMetaValue pins the high-bit +// overflow guard — same shape as hash + list + set encoders. +func TestRedisDB_ZSetRejectsOverflowingMetaValue(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + overflow := make([]byte, 8) + binary.BigEndian.PutUint64(overflow, 1<<63) + err := db.HandleZSetMeta(zsetMetaKey("k"), overflow) + if !errors.Is(err, ErrRedisInvalidZSetMeta) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMeta", err) + } +} + +// TestRedisDB_ZSetRejectsMalformedMemberValueLength pins that an +// !zs|mem| value of the wrong length surfaces as an error. The score +// MUST be exactly 8 bytes — anything else points at corruption or a +// wire-format change we want to detect at intake. +func TestRedisDB_ZSetRejectsMalformedMemberValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), []byte{0x00, 0x01, 0x02}) + if !errors.Is(err, ErrRedisInvalidZSetMember) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMember", err) + } +} + +// TestRedisDB_ZSetRejectsNaNScore pins that NaN scores fail closed. +// Redis's ZADD command itself rejects NaN at the wire level +// (Lua_redis_pcall → ZADD with NaN returns NOTNUM), so a NaN at +// backup time indicates corruption or a bypass; silently dumping +// `score: null` would re-corrupt the restored cluster. +func TestRedisDB_ZSetRejectsNaNScore(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + nan := make([]byte, 8) + binary.BigEndian.PutUint64(nan, math.Float64bits(math.NaN())) + err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), nan) + if !errors.Is(err, ErrRedisInvalidZSetMember) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMember", err) + } +} + +// TestRedisDB_ZSetInfinityScoresUseStringForm pins that ±Inf +// scores serialize as the ZADD-conventional strings `"+inf"`/`"-inf"` +// because json.Marshal returns an error for non-finite floats. The +// per-record `score` field is a json.RawMessage so a finite score +// emits as a number AND an Inf score emits as a string, with the +// same key name. +func TestRedisDB_ZSetInfinityScoresUseStringForm(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("inf"), encodeZSetMetaValue(2)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("inf", []byte("top")), encodeZSetScoreValue(math.Inf(1))); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("inf", []byte("bottom")), encodeZSetScoreValue(math.Inf(-1))); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "inf.json")) + // Sorted by member name: bottom < top. + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "bottom", scoreStr: "-inf", scoreKind: "string"}, + {member: "top", scoreStr: "+inf", scoreKind: "string"}, + }) +} + +// TestRedisDB_ZSetParseMetaKeyRejectsDelta is the parser-level guard +// companion to the dispatcher skip — pins that a future refactor +// bypassing HandleZSetMeta's prefix check still surfaces a parse +// failure rather than silent state corruption. +func TestRedisDB_ZSetParseMetaKeyRejectsDelta(t *testing.T) { + t.Parallel() + if _, ok := parseZSetMetaKey(zsetMetaDeltaKey("k", 1, 0)); ok { + t.Fatalf("parseZSetMetaKey must reject delta-prefixed keys") + } +} + +// TestRedisDB_ZSetMembersWithoutMetaStillEmitsFile pins the +// members-without-meta contract: members may arrive before (or +// without) meta, and the encoder must still emit the JSON without +// firing the length-mismatch warning. Mirrors the set encoder's +// rule (PR #758). +func TestRedisDB_ZSetMembersWithoutMetaStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("a")), encodeZSetScoreValue(1)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "z.json")) + members := zsetMembersArray(t, got) + if len(members) != 1 { + t.Fatalf("len(members) = %d, want 1", len(members)) + } + for _, e := range events { + if e == "redis_zset_length_mismatch" { + t.Fatalf("members-without-meta must not fire length-mismatch warning, got events %v", events) + } + } +} + +// TestRedisDB_ZSetDuplicateMembersUseLatestScore pins ZADD's +// latest-wins semantics: re-adding a member with a different score +// overwrites the prior score. (Snapshot iterators don't typically +// emit duplicates, but a recovery replay across overlapping windows +// could.) +func TestRedisDB_ZSetDuplicateMembersUseLatestScore(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("m")), encodeZSetScoreValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("m")), encodeZSetScoreValue(2)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "z.json")) + // Latest-wins: the second HandleZSetMember overwrites score 1 with score 2. + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "m", scoreNum: 2, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetMaxInt64DeclaredLen pins the math.MaxInt64 +// boundary — declaredLen=math.MaxInt64 must be accepted, only > that +// rejected. +func TestRedisDB_ZSetMaxInt64DeclaredLen(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + boundary := make([]byte, 8) + binary.BigEndian.PutUint64(boundary, math.MaxInt64) // exactly the int64 max — must NOT reject + if err := db.HandleZSetMeta(zsetMetaKey("k"), boundary); err != nil { + t.Fatalf("math.MaxInt64 boundary must be accepted, got %v", err) + } +} From 63e54d91a77f3948ed43c9bff3cb6f5a7dd60231 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 19 May 2026 18:40:53 +0900 Subject: [PATCH 2/7] =?UTF-8?q?backup(zset):=20PR790=20r1=20codex=20P1=20?= =?UTF-8?q?=E2=80=94=20buffer=20TTLs=20that=20arrive=20before=20typed=20re?= =?UTF-8?q?cords?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pebble snapshots are emitted in encoded-key order (store/snapshot_pebble.go::iter.First()+Next()), and `!redis|ttl|` lex-sorts BEFORE every wide-column prefix where the type letter is >= 's' (`!st|` set, `!stream|`, `!zs|` zset — because `r` < `s`/`z`). The original HandleTTL routed any unknown-kind expiry straight into orphanTTLCount, so for sets, streams, and zsets the TTL was DROPPED before zsetState/setState/streamState could claim it. Restored sets/zsets/streams with TTL became permanent. Codex P1 finding on PR #790. The same bug exists in the already- merged set encoder (PR #758); this commit fixes both retroactively. Stream encoder (PR #791) inherits the fix once rebased. Fix: HandleTTL parks unknown-kind expiries in a new pendingTTL map. Each wide-column state-init that may face the bad ordering (setState, zsetState — and streamState once PR #791 lands) drains the entry on first user-key registration via claimPendingTTL(). Finalize counts whatever remains in pendingTTL as truly-unmatched orphans (a TTL whose user key never appeared in any typed record — indicates store corruption or an unknown type prefix). Semantic-change caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch: previously incremented orphanTTLCount immediately; now buffers and lets Finalize count. All callers: only the per-record dispatcher in cmd/elastickv-snapshot-decode (not yet built — Phase 0a follow- up). No external caller mutates orphanTTLCount today. - TestRedisDB_OrphanTTLCountedNotBuffered: updated to assert intake-time orphanTTLCount==0 + pendingTTL grows, then post- Finalize orphanTTLCount==n. - New caller claimPendingTTL: called only by zsetState and setState in this PR. hashState/listState don't call it because their type prefixes (`!hs|`/`!lst|`) lex-sort BEFORE `!redis|ttl|` so the typed record arrives first; pendingTTL is always empty for them. Verified via `grep -n 'claimPendingTTL' internal/backup/`. New tests: - TestRedisDB_ZSetTTLArrivesBeforeRows — pins the fix for zsets. - TestRedisDB_SetTTLArrivesBeforeRows — retroactive coverage for PR #758's set encoder. - TestRedisDB_OrphanTTLCountsTrulyUnmatchedKeys — pins the new Finalize-time orphan semantics. Self-review: 1. Data loss — the original code DROPPED real TTL'd sets/zsets/streams on every backup. This fix recovers them. No new data-loss surface introduced. 2. Concurrency — pendingTTL is added to RedisDB which is already sequential-per-scope; no new locking required. 3. Performance — pendingTTL holds (string-userKey, uint64-expireAt) pairs. The string allocation cost matches kindByKey's, which we already pay for every typed record. The original P2 round 6 concern (don't buffer arbitrarily-large value payloads) is preserved: we still only buffer 8-byte expiry, not value bytes. 4. Consistency — drain happens at FIRST state registration so a later HandleTTL re-arrival (which would route through the redisKindSet/ZSet case in HandleTTL) is a no-op (st.expireAtMs gets overwritten with the same value). 5. Coverage — 3 new tests + 1 updated test. All 64 redis tests pass. --- internal/backup/redis_set.go | 12 +++ internal/backup/redis_string.go | 107 +++++++++++++++++++++++---- internal/backup/redis_string_test.go | 31 ++++++-- internal/backup/redis_zset.go | 11 +++ internal/backup/redis_zset_test.go | 83 +++++++++++++++++++++ 5 files changed, 224 insertions(+), 20 deletions(-) diff --git a/internal/backup/redis_set.go b/internal/backup/redis_set.go index 7cd4a440..392ae8ce 100644 --- a/internal/backup/redis_set.go +++ b/internal/backup/redis_set.go @@ -114,12 +114,24 @@ func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil } // setState lazily creates per-key state. Mirrors the hash/list // kindByKey-registration pattern so HandleSetMeta, HandleSetMember, // and the HandleTTL back-edge all agree on the kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!st|...` (because `r` < `s`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the set's +// `expire_at_ms`. Without this drain step, every TTL'd set would +// restore as permanent — a latent bug in PR #758 surfaced by codex +// on PR #790. Phase 0a tests added in the same PR pin the ordering. func (r *RedisDB) setState(userKey []byte) *redisSetState { uk := string(userKey) if st, ok := r.sets[uk]; ok { return st } st := &redisSetState{members: make(map[string]struct{})} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } r.sets[uk] = st r.kindByKey[uk] = redisKindSet return st diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index a2d0ecb5..76040613 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -194,6 +194,26 @@ type RedisDB struct { // `diff -r` between dumps stays line-stable across score-only // mutations. zsets map[string]*redisZSetState + + // pendingTTL buffers expiries whose user-key prefix sorts AFTER + // `!redis|ttl|` in the snapshot's lex-ordered stream. Pebble + // snapshots emit records in encoded-key order + // (`store/snapshot_pebble.go::iter.First()/Next()`), and + // `!redis|ttl|` lex-sorts before all `!st|`/`!stream|`/`!zs|` + // prefixes (`r` < `s`/`s`/`z`). Without buffering, HandleTTL + // would see kindByKey == redisKindUnknown and count the TTL + // as an orphan, dropping it before zsetState / setState / + // streamState had a chance to claim the user key — TTL'd + // sorted sets, sets, and streams would silently restore as + // permanent. + // + // Lifecycle: HandleTTL files the expiry here when kind is + // still unknown. Each wide-column state-init function + // (setState / zsetState / streamState etc.) drains the entry + // when it first registers the user key. Finalize fires the + // orphan-TTL warning for whatever remains (those keys never + // appeared as a typed record — likely a corrupted store). + pendingTTL map[string]uint64 } // NewRedisDB constructs a RedisDB rooted at /redis/db_/. @@ -214,6 +234,7 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { lists: make(map[string]*redisListState), sets: make(map[string]*redisSetState), zsets: make(map[string]*redisZSetState), + pendingTTL: make(map[string]uint64), } } @@ -256,15 +277,32 @@ func (r *RedisDB) HandleHLL(userKey, value []byte) error { return r.writeBlob("hll", userKey, value) } -// HandleTTL processes one !redis|ttl| record. Routing depends on -// what HandleString/HandleHLL recorded for the same userKey: +// HandleTTL processes one !redis|ttl| record. Routing +// depends on what the encoder has previously recorded for the user +// key. There are two ordering regimes the snapshot stream presents: +// +// 1. Prefix sorts BEFORE !redis|ttl| in encoded-key order +// (!hs|, !lst|, !redis|str|, !redis|hll|). The typed record +// arrives FIRST, kindByKey is already set when HandleTTL fires, +// and we route directly to the per-type sidecar / inline field. +// 2. Prefix sorts AFTER !redis|ttl| (!st|, !stream|, !zs|, because +// `r` < `s`/`s`/`z`). The TTL arrives FIRST and kindByKey is +// still redisKindUnknown. We park the expiry in pendingTTL and +// let each wide-column state-init function (setState / +// zsetState / streamState) drain it when the user key finally +// surfaces as a typed record. Codex P1 finding on PR #790. // -// - redisKindHLL -> hll_ttl.jsonl -// - redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL -// lives in !redis|ttl| rather than the inline magic-prefix header) -// - redisKindUnknown -> counted in orphanTTLCount; reported via the -// warn sink on Finalize because Phase 0a's wide-column encoders -// have not landed yet. +// Routing: +// +// - redisKindHLL -> hll_ttl.jsonl (case 1) +// - redisKindString -> strings_ttl.jsonl (case 1; legacy strings +// whose TTL lives in !redis|ttl| rather than the inline header) +// - redisKindHash/List/Set/ZSet/Stream -> inlined into the +// per-key JSON (case 1 for hash/list, case 2 for set/zset/stream +// where the state-init already drained from pendingTTL before +// HandleTTL would even be called the second time) +// - redisKindUnknown -> bufferPendingTTL. Finalize counts truly +// unmatched entries (key never registered as a typed record). func (r *RedisDB) HandleTTL(userKey, value []byte) error { expireAtMs, err := decodeRedisTTLValue(value) if err != nil { @@ -315,16 +353,48 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.hasTTL = true return nil case redisKindUnknown: - // Track orphan TTL counts only — keys are unused before the - // remaining wide-column encoder (stream) lands, and - // buffering them allocates proportional to user-key size - // (up to 1 MiB per key) for no benefit. Codex P2 round 6. - r.orphanTTLCount++ + // Park the expiry until a wide-column Handle*Meta / + // Handle*Member / Handle*Entry registers the user key. + // Without this buffering, sorted-set / set / stream TTLs + // would be counted as orphans and dropped because their + // type prefixes lex-sort AFTER `!redis|ttl|` in the + // snapshot's encoded-key order. Codex P1 (PR #790). + // + // We store userKey as a string copy (`string([]byte)` + // allocates) rather than the alias slice — the snapshot + // reader reuses key buffers across iterations, so a slice + // alias would race with the next record. + r.pendingTTL[string(userKey)] = expireAtMs return nil } return nil } +// claimPendingTTL drains any buffered TTL for userKey into the +// caller-provided state. Called by the wide-column state-init +// functions (setState / zsetState / streamState) when they first +// register a user key, so the parked expiry inlines into the same +// per-key JSON the rest of the record assembles. +// +// Returns (expireAtMs, true) when a buffered TTL existed. The +// caller should set state.expireAtMs / state.hasTTL on the +// returned value. The pending entry is removed so Finalize's +// orphan-count loop only sees truly-unmatched TTLs. +// +// Safe to call from hashState/listState too even though those +// types' typed records sort before `!redis|ttl|`; pendingTTL will +// always be empty for them. Keeping the call site uniform keeps +// the state-init contract simple. +func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) { + uk := string(userKey) + expireAtMs, ok := r.pendingTTL[uk] + if !ok { + return 0, false + } + delete(r.pendingTTL, uk) + return expireAtMs, true +} + // Finalize flushes all open sidecar writers and emits warnings for any // pending TTL records whose user key was never claimed by the wide-column // encoders. Call exactly once after every snapshot record has been @@ -344,10 +414,19 @@ func (r *RedisDB) Finalize() error { firstErr = err } } + // At this point all type-prefixed records have been processed + // and every wide-column state-init drained its claimPendingTTL. + // Whatever remains in pendingTTL is truly unmatched — the + // user key never appeared as a typed record. Likely causes: + // store corruption, a snapshot mid-write where the typed + // record was dropped, or a `!redis|ttl|` entry written for a + // key whose type prefix we don't recognise (a future Redis + // type added on the live side without a backup-encoder update). + r.orphanTTLCount += len(r.pendingTTL) if r.warn != nil && r.orphanTTLCount > 0 { r.warn("redis_orphan_ttl", "count", r.orphanTTLCount, - "hint", "remaining wide-column encoder (stream) has not landed yet") + "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix") } return firstErr } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 152a55d4..5343d3b5 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -579,11 +579,21 @@ func TestRedisDB_NoKeymapWhenAllReversible(t *testing.T) { func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Parallel() - // Codex P2 round 6: orphan TTL records (those with no prior - // HandleString/HandleHLL claim) must be counted only — the - // per-key payload would allocate proportional to user-key size - // and is unused before the wide-column encoders land. Drive a - // sample of orphan records and assert the count, not a buffer. + // Codex P1 (PR #790): orphan TTL records are now BUFFERED in + // pendingTTL during intake — the wide-column state-init + // functions need to drain them when a typed record finally + // registers a user key. The buffer holds (string-userKey, + // uint64-expireAt) pairs; the per-record allocation cost is the + // same as kindByKey's, which we already pay for every typed + // record. The original Codex P2 round 6 concern (don't buffer + // arbitrarily-large payload bytes) is preserved — we still + // don't keep the value bytes. + // + // At Finalize, entries still in pendingTTL are counted as truly + // unmatched orphans. This test now asserts: + // - During intake: orphanTTLCount stays at 0, pendingTTL grows. + // - After Finalize: orphanTTLCount == n (no typed record ever + // drained the entries). db, _ := newRedisDB(t) const n = 10_000 for i := 0; i < n; i++ { @@ -593,8 +603,17 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Fatalf("HandleTTL[%d]: %v", i, err) } } + if db.orphanTTLCount != 0 { + t.Fatalf("orphanTTLCount = %d at intake, want 0 (buffered)", db.orphanTTLCount) + } + if len(db.pendingTTL) != n { + t.Fatalf("pendingTTL len = %d, want %d", len(db.pendingTTL), n) + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } if db.orphanTTLCount != n { - t.Fatalf("orphanTTLCount = %d, want %d", db.orphanTTLCount, n) + t.Fatalf("orphanTTLCount = %d after Finalize, want %d", db.orphanTTLCount, n) } } diff --git a/internal/backup/redis_zset.go b/internal/backup/redis_zset.go index 0f4e20a4..8350fdbc 100644 --- a/internal/backup/redis_zset.go +++ b/internal/backup/redis_zset.go @@ -156,12 +156,23 @@ func (r *RedisDB) HandleZSetMetaDelta(_, _ []byte) error { return nil } // zsetState lazily creates per-key state. Mirrors the hash/list/set // kindByKey-registration pattern so HandleZSetMeta, HandleZSetMember, // and the HandleTTL back-edge all agree on the kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!zs|...` (because `r` < `z`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the zset's +// `expire_at_ms`. Without this drain step, every TTL'd zset would +// restore as permanent. Codex P1 finding on PR #790. func (r *RedisDB) zsetState(userKey []byte) *redisZSetState { uk := string(userKey) if st, ok := r.zsets[uk]; ok { return st } st := &redisZSetState{members: make(map[string]float64)} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } r.zsets[uk] = st r.kindByKey[uk] = redisKindZSet return st diff --git a/internal/backup/redis_zset_test.go b/internal/backup/redis_zset_test.go index 59e00845..ecfa7563 100644 --- a/internal/backup/redis_zset_test.go +++ b/internal/backup/redis_zset_test.go @@ -513,6 +513,89 @@ func TestRedisDB_ZSetDuplicateMembersUseLatestScore(t *testing.T) { }) } +// TestRedisDB_ZSetTTLArrivesBeforeRows pins the codex P1 fix: +// `!redis|ttl|` lex-sorts BEFORE `!zs|...` because `r` < `z`, +// so in a real Pebble snapshot's encoded-key order the TTL record +// arrives FIRST. The encoder MUST buffer the expiry in pendingTTL +// and drain it when the zset first registers via zsetState, +// inlining the value into the zset's JSON expire_at_ms. Without +// this drain step every TTL'd zset would restore as permanent — +// the P1 finding on PR #790 (chatgpt-codex-connector). +func TestRedisDB_ZSetTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Snapshot order: TTL first, then meta + member. + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMeta(zsetMetaKey("k"), encodeZSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), encodeZSetScoreValue(1.5)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if zsetFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d — pendingTTL drain failed", got["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_SetTTLArrivesBeforeRows pins the same ordering fix +// for sets (`!redis|ttl|` lex-sorts before `!st|...` because +// `r` < `s`). Retroactive coverage for PR #758, which shipped the +// set encoder before the pendingTTL infrastructure existed. +func TestRedisDB_SetTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Snapshot order: TTL first, then meta + member. + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMeta(setMetaKey("k"), encodeSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("k", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "k.json")) + if setFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d — pendingTTL drain failed", got["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_OrphanTTLCountsTrulyUnmatchedKeys pins the post-fix +// orphan semantics: a TTL for a key that NEVER appears in any +// typed record (store corruption or unknown type prefix) is +// counted at Finalize, not silently dropped on intake. +func TestRedisDB_OrphanTTLCountsTrulyUnmatchedKeys(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleTTL([]byte("orphan"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + found := false + for _, e := range events { + if e == "redis_orphan_ttl" { + found = true + break + } + } + if !found { + t.Fatalf("expected redis_orphan_ttl warning, got %v", events) + } +} + // TestRedisDB_ZSetMaxInt64DeclaredLen pins the math.MaxInt64 // boundary — declaredLen=math.MaxInt64 must be accepted, only > that // rejected. From 5446190f782deeffca7e33784b88efb3d01b0e14 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 19 May 2026 18:52:27 +0900 Subject: [PATCH 3/7] =?UTF-8?q?backup(zset):=20PR790=20r2=20codex=20P1=20?= =?UTF-8?q?=E2=80=94=20handle=20legacy=20!redis|zset|=20single-blob=20layo?= =?UTF-8?q?ut?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex flagged that the wide-column zset encoder skips the legacy consolidated single-key blob layout the live store still writes. A zset stored only as `!redis|zset|` (with the magic- prefixed pb.RedisZSetValue body) is silently dropped from backup output and any inline TTL becomes an orphan — user-visible sorted-set data loss. Live-side references (adapter, not changed by this commit): - adapter/redis_compat_types.go:82 — redisZSetPrefix - adapter/redis_compat_commands.go:3495-3508 — writes the blob for non-empty persisted zset updates - adapter/redis_compat_helpers.go:610-631 — reads it as the fallback when no wide-column members exist Fix: new public RedisDB.HandleZSetLegacyBlob method that decodes the magic-prefixed pb.RedisZSetValue and registers the same per- member state HandleZSetMember would. The wide-column merge case (mid-migration snapshot containing BOTH layouts for the same user key) works because `!redis|zset|` sorts BEFORE `!zs|...` so the blob arrives first and wide-column rows then update / add members via the latest-wins map. Inline TTL: `!redis|zset|` sorts BEFORE `!redis|ttl|`, so HandleTTL after this handler sees redisKindZSet already and folds via the case-redisKindZSet branch. No pendingTTL detour needed for this ordering. Fail-closed contract (matches existing wide-column path): - Missing magic prefix → ErrRedisInvalidZSetLegacyBlob - Unmarshal error → ErrRedisInvalidZSetLegacyBlob - NaN score → ErrRedisInvalidZSetLegacyBlob (Redis ZADD rejects NaN at wire level) Caller audit (per /loop standing instruction): new public method HandleZSetLegacyBlob has no external callers. Verified via 'grep -rn HandleZSetLegacyBlob --include=*.go' — all matches inside the test file in this PR. The cmd/elastickv-snapshot-decode dispatcher (Phase 0a follow-up, not yet built) will route the `!redis|zset|` prefix to this handler. Parallel bug class: the SAME issue exists for `!redis|hash|`, `!redis|set|`, and `!redis|stream|` legacy blob prefixes. Those encoders shipped in earlier PRs (#725, #758, #791). Each needs its own legacy-blob handler in a follow-up PR; this commit fixes only the zset case codex flagged on PR #790. New tests: - TestRedisDB_ZSetLegacyBlobRoundTrip — basic round-trip - TestRedisDB_ZSetLegacyBlobThenWideColumnMerges — mid-migration - TestRedisDB_ZSetLegacyBlobWithInlineTTL — TTL ordering - TestRedisDB_ZSetLegacyBlobRejectsMissingMagic — fail-closed - TestRedisDB_ZSetLegacyBlobRejectsNaNScore — fail-closed - TestRedisDB_ZSetLegacyBlobRejectsMalformedKey — fail-closed Self-review: 1. Data loss — exact opposite: this commit RECOVERS zsets that were silently dropped. New fail-closed guards prevent silently importing a corrupt blob. 2. Concurrency — no new shared state; per-DB sequential as before. 3. Performance — one protobuf Unmarshal per legacy zset key (same as the live read path). Member map shares the same latest-wins behavior as wide-column intake. 4. Consistency — merge order (blob first, wide-column second) is determined by snapshot lex order; tested explicitly. 5. Coverage — 6 new tests. All 84 redis tests pass. --- internal/backup/redis_zset.go | 118 ++++++++++++++++++++ internal/backup/redis_zset_test.go | 167 +++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) diff --git a/internal/backup/redis_zset.go b/internal/backup/redis_zset.go index 8350fdbc..8b39e526 100644 --- a/internal/backup/redis_zset.go +++ b/internal/backup/redis_zset.go @@ -8,7 +8,9 @@ import ( "path/filepath" "sort" + pb "github.com/bootjp/elastickv/proto" cockroachdberr "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" ) // Redis zset encoder. Translates raw !zs|... snapshot records into the @@ -36,17 +38,48 @@ const ( RedisZSetScorePrefix = "!zs|scr|" RedisZSetMetaDeltaPrefix = "!zs|meta|d|" + // RedisZSetLegacyBlobPrefix is the consolidated single-key + // layout the live store still writes for non-empty persisted + // zsets (`adapter/redis_compat_types.go:82` redisZSetPrefix, + // produced by `adapter/redis_compat_commands.go:3495-3508` and + // read by `adapter/redis_compat_helpers.go:610-631` as the + // fallback when no wide-column members exist). A backup that + // skipped this prefix would silently drop legacy-only zsets; + // HandleZSetLegacyBlob decodes the blob and registers the same + // per-member state HandleZSetMeta + HandleZSetMember would. + // Codex P1 finding on PR #790 (round 2). + RedisZSetLegacyBlobPrefix = "!redis|zset|" + // redisZSetScoreSize is the size of the IEEE 754 big-endian score // stored in !zs|mem| values. Same constant as zsetMetaSizeBytes in // store/zset_helpers.go; duplicated here to keep the backup // package free of internal/storage imports. redisZSetScoreSize = 8 + + // redisZSetLegacyProtoPrefixLen is the on-disk magic prefix size + // for `!redis|zset|` values + // (`adapter/redis_storage_codec.go:15` storedRedisZSetProtoPrefix). + redisZSetLegacyProtoPrefixLen = 4 ) +// redisZSetLegacyProtoPrefix mirrors +// adapter/redis_storage_codec.go:15 storedRedisZSetProtoPrefix. A +// rename on the live side without an accompanying backup update +// surfaces as ErrRedisInvalidZSetLegacyBlob on decode of any real +// cluster dump. +var redisZSetLegacyProtoPrefix = []byte{0x00, 'R', 'Z', 0x01} + // ErrRedisInvalidZSetMeta is returned when an !zs|meta| value is not // the expected 8-byte big-endian member count. var ErrRedisInvalidZSetMeta = cockroachdberr.New("backup: invalid !zs|meta| value") +// ErrRedisInvalidZSetLegacyBlob is returned when a `!redis|zset|` +// value's magic prefix is missing, its protobuf body fails to +// unmarshal, or its decoded scores include NaN. Fail-closed for +// the same reason as ErrRedisInvalidZSetMember: silently accepting +// a corrupt blob would lose the entire zset's contents at restore. +var ErrRedisInvalidZSetLegacyBlob = cockroachdberr.New("backup: invalid !redis|zset| value") + // ErrRedisInvalidZSetMember is returned when an !zs|mem| value is not // the expected 8-byte IEEE 754 score, or contains a NaN score (Redis's // ZADD command rejects NaN, so a NaN at backup time indicates store @@ -153,6 +186,91 @@ func (r *RedisDB) HandleZSetScore(_, _ []byte) error { return nil } // source of truth at backup time. func (r *RedisDB) HandleZSetMetaDelta(_, _ []byte) error { return nil } +// HandleZSetLegacyBlob processes one `!redis|zset|` record. +// This is the consolidated single-key layout the live store still +// writes for non-empty persisted zsets (see RedisZSetLegacyBlobPrefix +// docstring). The encoded value is a magic-prefixed +// `pb.RedisZSetValue` carrying every (member, score) pair. +// +// Decoded entries land in the same per-key state HandleZSetMember +// would have produced, so the per-zset JSON output is identical +// regardless of which layout the live store used. NaN scores fail +// closed at intake, matching HandleZSetMember's contract. +// +// The legacy prefix `!redis|zset|` lex-sorts BEFORE `!zs|...` and +// BEFORE `!redis|ttl|`, so when a zset is stored in both formats +// (mid-migration), this handler creates the state first and the +// later wide-column records merge into it — duplicate +// HandleZSetMember calls follow the same latest-wins policy. +// +// `!redis|zset|` ALSO sorts BEFORE `!redis|ttl|`, so an inline TTL +// on the same user key will reach HandleTTL after this handler has +// already registered redisKindZSet. The HandleTTL redisKindZSet +// branch then folds the expiry into st.expireAtMs via zsetState +// (which itself drains pendingTTL — a no-op here since the typed +// record came first). +func (r *RedisDB) HandleZSetLegacyBlob(key, value []byte) error { + userKey, ok := parseZSetLegacyBlobKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, "key: %q", key) + } + entries, err := decodeZSetLegacyBlobValue(value) + if err != nil { + return err + } + st := r.zsetState(userKey) + for _, e := range entries { + st.members[e.member] = e.score + } + return nil +} + +// zsetLegacyEntry is the per-(member, score) projection extracted +// from a `!redis|zset|` blob's protobuf body. +type zsetLegacyEntry struct { + member string + score float64 +} + +// parseZSetLegacyBlobKey strips `!redis|zset|` and returns the +// user-key bytes. Unlike the wide-column meta key there is no +// userKeyLen prefix — the live store appends the user key directly +// (`adapter/redis_compat_types.go:177` ZSetKey). +func parseZSetLegacyBlobKey(key []byte) ([]byte, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisZSetLegacyBlobPrefix)) + if len(rest) == len(key) { + return nil, false + } + return rest, true +} + +// decodeZSetLegacyBlobValue strips the magic prefix and unmarshals +// the protobuf body into a slice of (member, score) entries. +// Rejects NaN scores (same fail-closed contract as +// HandleZSetMember). +func decodeZSetLegacyBlobValue(value []byte) ([]zsetLegacyEntry, error) { + if len(value) < redisZSetLegacyProtoPrefixLen || + !bytes.Equal(value[:redisZSetLegacyProtoPrefixLen], redisZSetLegacyProtoPrefix) { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, + "missing or corrupt magic prefix (len=%d)", len(value)) + } + msg := &pb.RedisZSetValue{} + if err := gproto.Unmarshal(value[redisZSetLegacyProtoPrefixLen:], msg); err != nil { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, + "unmarshal: %v", err) + } + out := make([]zsetLegacyEntry, 0, len(msg.GetEntries())) + for _, e := range msg.GetEntries() { + score := e.GetScore() + if math.IsNaN(score) { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, + "NaN score for member %q", e.GetMember()) + } + out = append(out, zsetLegacyEntry{member: e.GetMember(), score: score}) + } + return out, nil +} + // zsetState lazily creates per-key state. Mirrors the hash/list/set // kindByKey-registration pattern so HandleZSetMeta, HandleZSetMember, // and the HandleTTL back-edge all agree on the kind. diff --git a/internal/backup/redis_zset_test.go b/internal/backup/redis_zset_test.go index ecfa7563..dca5a9cd 100644 --- a/internal/backup/redis_zset_test.go +++ b/internal/backup/redis_zset_test.go @@ -8,9 +8,41 @@ import ( "path/filepath" "testing" + pb "github.com/bootjp/elastickv/proto" "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" ) +// encodeZSetLegacyBlobValue produces the magic-prefixed protobuf +// wire format the live store writes for `!redis|zset|` +// values (mirror of adapter/redis_storage_codec.go::marshalZSetValue). +func encodeZSetLegacyBlobValue(t *testing.T, entries []zsetLegacyEntry) []byte { + t.Helper() + msg := &pb.RedisZSetValue{} + for _, e := range entries { + msg.Entries = append(msg.Entries, &pb.RedisZSetEntry{ + Member: e.member, + Score: e.score, + }) + } + body, err := gproto.Marshal(msg) + if err != nil { + t.Fatalf("marshal pb.RedisZSetValue: %v", err) + } + out := make([]byte, 0, redisZSetLegacyProtoPrefixLen+len(body)) + out = append(out, redisZSetLegacyProtoPrefix...) + out = append(out, body...) + return out +} + +// zsetLegacyBlobKey is the test-side mirror of +// adapter/redis_compat_types.go:177 ZSetKey: +// `!redis|zset|` (no userKeyLen prefix). +func zsetLegacyBlobKey(userKey string) []byte { + out := []byte(RedisZSetLegacyBlobPrefix) + return append(out, userKey...) +} + // encodeZSetMetaValue builds the 8-byte BE member-count value used by // the live store/zset_helpers.go (mirror of store.MarshalZSetMeta). func encodeZSetMetaValue(memberCount int64) []byte { @@ -608,3 +640,138 @@ func TestRedisDB_ZSetMaxInt64DeclaredLen(t *testing.T) { t.Fatalf("math.MaxInt64 boundary must be accepted, got %v", err) } } + +// TestRedisDB_ZSetLegacyBlobRoundTrip pins the codex P1 fix: a +// zset stored only via the legacy `!redis|zset|` blob +// must surface in the dump with all its members. Without +// HandleZSetLegacyBlob, the encoder would skip the record and +// produce an empty zsets/ output for that key — silent backup +// data loss. +func TestRedisDB_ZSetLegacyBlobRoundTrip(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "alice", score: 100}, + {member: "bob", score: 50}, + {member: "charlie", score: 30}, + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("leaderboard"), value); err != nil { + t.Fatalf("HandleZSetLegacyBlob: %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "leaderboard.json")) + // Members sorted by member-name bytes (matches the wide-column + // encoder's output policy). + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 100, scoreKind: "number"}, + {member: "bob", scoreNum: 50, scoreKind: "number"}, + {member: "charlie", scoreNum: 30, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetLegacyBlobThenWideColumnMerges pins the mid- +// migration shape: when a snapshot carries BOTH the legacy +// `!redis|zset|` blob AND wide-column `!zs|mem|...` rows for +// the same user key (which the live store does during the +// migration window), the encoder must produce a single merged +// zset. `!redis|zset|` lex-sorts before `!zs|...` so the blob +// arrives first; the wide-column rows then update / add members. +func TestRedisDB_ZSetLegacyBlobThenWideColumnMerges(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "alice", score: 1}, + {member: "bob", score: 2}, + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil { + t.Fatal(err) + } + // Wide-column rows: update alice's score, add charlie. + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("alice")), encodeZSetScoreValue(99)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("charlie")), encodeZSetScoreValue(3)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 99, scoreKind: "number"}, // wide-column won + {member: "bob", scoreNum: 2, scoreKind: "number"}, // legacy survived + {member: "charlie", scoreNum: 3, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetLegacyBlobWithInlineTTL pins that a TTL'd zset +// stored only via the legacy blob round-trips its expiry. The +// snapshot order is `!redis|zset|` (sorts before `!redis|ttl|`), +// so HandleZSetLegacyBlob runs first and registers redisKindZSet, +// then HandleTTL routes via the redisKindZSet branch (no +// pendingTTL detour needed for this ordering). +func TestRedisDB_ZSetLegacyBlobWithInlineTTL(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: 1}}) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), value); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if zsetFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d", got["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_ZSetLegacyBlobRejectsMissingMagic pins fail-closed +// behaviour: a `!redis|zset|` value without the magic prefix +// fails at intake rather than silently decoding garbage protobuf. +func TestRedisDB_ZSetLegacyBlobRejectsMissingMagic(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + body, err := gproto.Marshal(&pb.RedisZSetValue{}) + if err != nil { + t.Fatalf("marshal: %v", err) + } + err = db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), body) // no magic prefix + if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) { + t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err) + } +} + +// TestRedisDB_ZSetLegacyBlobRejectsNaNScore pins NaN-fail-closed +// parallel to HandleZSetMember's contract. Redis ZADD rejects NaN +// at the wire level, so a NaN in storage indicates corruption. +func TestRedisDB_ZSetLegacyBlobRejectsNaNScore(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: math.NaN()}}) + err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), value) + if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) { + t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err) + } +} + +// TestRedisDB_ZSetLegacyBlobRejectsMalformedKey pins that a +// `!redis|zset|` key with no trailing user-key bytes fails parse. +func TestRedisDB_ZSetLegacyBlobRejectsMalformedKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: 1}}) + // Key has the prefix but no trailing user-key bytes — parser must + // still accept it (empty user key is technically valid Redis). + // Use a key that doesn't have the prefix to trigger the parse + // failure. + err := db.HandleZSetLegacyBlob([]byte("not-the-right-prefix|k"), value) + if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) { + t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err) + } +} From ab88f3e0967131716665212aa2bc0dc5e92972b9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 19 May 2026 19:01:35 +0900 Subject: [PATCH 4/7] =?UTF-8?q?backup(zset):=20PR790=20r3=20codex=20P1=20?= =?UTF-8?q?=E2=80=94=20wide-column=20rows=20supersede=20legacy=20blob?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex flagged that the r2 fix (legacy-blob handler in #790 #2) mis-handles the mid-migration / stale-leftover case: when a snapshot contains BOTH `!redis|zset|` and `!zs|mem|...` for the same user key, my code merged both, surfacing members that are no longer visible to readers. Live read path (adapter/redis_compat_helpers.go:610-620 RedisServer.loadZSetAt): when ANY wide-column row exists, loadZSetMembersAt is used and the legacy blob is IGNORED. A stale `!redis|zset|` blob left over after a partial migration would be invisible to clients but my encoder was surfacing its entries in the dump — silent backup corruption. Fix: add a sawWide flag to redisZSetState that flips on every wide-column observation (HandleZSetMeta + HandleZSetMember). markZSetWide() clears any legacy entries previously deposited by HandleZSetLegacyBlob, and HandleZSetLegacyBlob short-circuits if sawWide is already set. The result matches the live read path's source-of-truth selection: - No wide rows → legacy blob is the source of truth. - Any wide row exists → wide rows are authoritative; legacy entries are ignored (whether they arrived before or after the wide row). Caller audit (per /loop standing instruction): the new markZSetWide method is called only from HandleZSetMeta line 160 and HandleZSetMember line 194. HandleZSetLegacyBlob (line 268) short-circuits on the same flag. No other call sites — verified via 'grep -rn markZSetWide|sawWide internal/backup/'. The semantic change (legacy entries cleared on first wide observation) is contained to redisZSetState; no public API or external caller visible. Test changes: - TestRedisDB_ZSetLegacyBlobThenWideColumnMerges renamed to TestRedisDB_ZSetWideColumnSupersedesLegacyBlob, expectations updated (bob-stale no longer survives). - New TestRedisDB_ZSetMetaAloneEvictsLegacyBlob — meta-only wide-column row evicts legacy blob. - New TestRedisDB_ZSetLegacyBlobAfterWideRowsIsDropped — reverse- order case (wide first, blob later). Self-review: 1. Data loss — opposite: this fix PREVENTS surfacing stale members that the live read path hides. Legacy blob is still applied when no wide rows exist (preserves the original P1 round 2 fix for legacy-only zsets). 2. Concurrency — no new shared state. 3. Performance — sawWide is a single bool per state; markZSetWide short-circuits on subsequent calls so the clear-loop runs at most once per state. 4. Consistency — encoder output now matches `loadZSetAt`'s read- side source-of-truth selection. Tested with both orderings (legacy-first-wide-later, wide-first-legacy-later, meta-alone). 5. Coverage — 1 renamed test + 2 new tests. All 90 redis tests pass. --- internal/backup/redis_zset.go | 55 ++++++++++++++++++ internal/backup/redis_zset_test.go | 90 ++++++++++++++++++++++++++---- 2 files changed, 134 insertions(+), 11 deletions(-) diff --git a/internal/backup/redis_zset.go b/internal/backup/redis_zset.go index 8b39e526..fbdfb278 100644 --- a/internal/backup/redis_zset.go +++ b/internal/backup/redis_zset.go @@ -96,10 +96,22 @@ var ErrRedisInvalidZSetKey = cockroachdberr.New("backup: malformed !zs| key") // HandleZSetMember calls collapse to the latest-wins score (matching // Redis's ZADD semantics: re-adding a member overwrites the prior // score). +// +// sawWide tracks whether ANY wide-column row (`!zs|meta|` or +// `!zs|mem|`) has been observed for the user key. This mirrors the +// live read path's source-of-truth selection in +// `adapter/redis_compat_helpers.go:610-620` +// (`RedisServer.loadZSetAt`): when wide-column rows exist they are +// authoritative, and any `!redis|zset|` legacy blob is ignored +// (treated as a stale post-migration leftover). Without this flag, +// the encoder would merge stale legacy members on top of the +// wide-column source-of-truth, surfacing deleted or outdated +// entries in the restored zset. Codex P1 round 3 (PR #790). type redisZSetState struct { metaSeen bool declaredLen int64 members map[string]float64 + sawWide bool expireAtMs uint64 hasTTL bool } @@ -141,6 +153,11 @@ func (r *RedisDB) HandleZSetMeta(key, value []byte) error { st := r.zsetState(userKey) st.declaredLen = int64(rawLen) //nolint:gosec // bounds-checked above st.metaSeen = true + // Wide-column meta means any legacy blob already merged into + // this state is stale; drop it. The live read path + // (adapter/redis_compat_helpers.go:610-620) makes the same + // choice on read. + r.markZSetWide(st) return nil } @@ -169,10 +186,39 @@ func (r *RedisDB) HandleZSetMember(key, value []byte) error { "NaN score for member of user key %q (length %d)", userKey, len(userKey)) } st := r.zsetState(userKey) + // First wide-column row evicts any provisional legacy-blob + // members on this user key. From here on, the wide-column + // rows are the authoritative source of truth — matches the + // live read path's preference in + // adapter/redis_compat_helpers.go:610-620. + r.markZSetWide(st) st.members[string(member)] = score return nil } +// markZSetWide flips the sawWide flag and, if this is the first +// wide-column observation for the state, clears any provisional +// legacy-blob members. The live read path treats `!redis|zset|` +// as a fallback that is ONLY consulted when no wide-column rows +// exist (`adapter/redis_compat_helpers.go::RedisServer.loadZSetAt` +// line 610-620), so a snapshot containing both layouts for the +// same user key MUST drop the legacy entries to avoid surfacing +// stale post-migration leftovers. Codex P1 round 3 (PR #790). +func (r *RedisDB) markZSetWide(st *redisZSetState) { + if st.sawWide { + return + } + st.sawWide = true + // Clear any legacy-blob members deposited before the first + // wide-column row arrived. Re-using the same map keeps the + // nil-vs-empty contract for empty-but-meta-seen zsets (an + // explicit `make` would change observable Finalize behavior + // for those edge cases). + for k := range st.members { + delete(st.members, k) + } +} + // HandleZSetScore accepts and discards one !zs|scr|... record. The // score index is a live-side secondary index for ZRANGEBYSCORE; the // authoritative member→score mapping lives in !zs|mem|. Snapshot @@ -219,6 +265,15 @@ func (r *RedisDB) HandleZSetLegacyBlob(key, value []byte) error { return err } st := r.zsetState(userKey) + if st.sawWide { + // Wide-column rows already arrived for this user key (rare + // in practice because !redis|zset| sorts before !zs|..., + // but possible with a custom dispatcher ordering or a mid- + // migration replay). The live read path ignores the legacy + // blob in that case, so we do too — applying these entries + // would surface stale post-migration leftovers in the dump. + return nil + } for _, e := range entries { st.members[e.member] = e.score } diff --git a/internal/backup/redis_zset_test.go b/internal/backup/redis_zset_test.go index dca5a9cd..4cd76476 100644 --- a/internal/backup/redis_zset_test.go +++ b/internal/backup/redis_zset_test.go @@ -671,24 +671,35 @@ func TestRedisDB_ZSetLegacyBlobRoundTrip(t *testing.T) { }) } -// TestRedisDB_ZSetLegacyBlobThenWideColumnMerges pins the mid- -// migration shape: when a snapshot carries BOTH the legacy +// TestRedisDB_ZSetWideColumnSupersedesLegacyBlob pins the codex +// P1 round 3 fix: when a snapshot carries BOTH the legacy // `!redis|zset|` blob AND wide-column `!zs|mem|...` rows for -// the same user key (which the live store does during the -// migration window), the encoder must produce a single merged -// zset. `!redis|zset|` lex-sorts before `!zs|...` so the blob -// arrives first; the wide-column rows then update / add members. -func TestRedisDB_ZSetLegacyBlobThenWideColumnMerges(t *testing.T) { +// the same user key (a mid-migration state, or a stale post- +// migration leftover), the encoder MUST drop the legacy entries +// and use only the wide-column source-of-truth. This matches the +// live read path in adapter/redis_compat_helpers.go:610-620 +// (RedisServer.loadZSetAt) which falls back to the legacy blob +// ONLY when no wide-column rows exist. +// +// Without this fix, the dump would surface deleted or outdated +// members from a stale legacy blob — silent backup corruption. +func TestRedisDB_ZSetWideColumnSupersedesLegacyBlob(t *testing.T) { t.Parallel() db, root := newRedisDB(t) + // Snapshot order: legacy blob arrives first (sorts before !zs|). + // "bob-stale" exists ONLY in the legacy blob — a stale + // post-migration leftover that the live store hides via the + // loadZSetAt preference for wide-column rows. legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ {member: "alice", score: 1}, - {member: "bob", score: 2}, + {member: "bob-stale", score: 2}, }) if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil { t.Fatal(err) } - // Wide-column rows: update alice's score, add charlie. + // Wide-column source-of-truth: alice with a new score, charlie new. + // "bob-stale" is intentionally absent — its presence in the legacy + // blob is the stale-leftover scenario this test guards against. if err := db.HandleZSetMember(zsetMemberKey("k", []byte("alice")), encodeZSetScoreValue(99)); err != nil { t.Fatal(err) } @@ -699,13 +710,70 @@ func TestRedisDB_ZSetLegacyBlobThenWideColumnMerges(t *testing.T) { t.Fatal(err) } got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + // "bob-stale" MUST NOT appear — wide-column rows supersede the legacy blob. assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ - {member: "alice", scoreNum: 99, scoreKind: "number"}, // wide-column won - {member: "bob", scoreNum: 2, scoreKind: "number"}, // legacy survived + {member: "alice", scoreNum: 99, scoreKind: "number"}, {member: "charlie", scoreNum: 3, scoreKind: "number"}, }) } +// TestRedisDB_ZSetMetaAloneEvictsLegacyBlob pins that a `!zs|meta|` +// record (even without any `!zs|mem|` rows) also triggers the +// legacy-supersedence rule. The live read path's loadZSetAt +// preference is based on whether ANY wide-column key exists for +// the user key — meta-only is rare in practice but the encoder +// stays consistent with the read-side contract. +func TestRedisDB_ZSetMetaAloneEvictsLegacyBlob(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "stale", score: 1}, + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil { + t.Fatal(err) + } + // Wide-column meta record declares zero members. + if err := db.HandleZSetMeta(zsetMetaKey("k"), encodeZSetMetaValue(0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if members := zsetMembersArray(t, got); len(members) != 0 { + t.Fatalf("meta-only wide-column row MUST evict legacy blob, got members=%v", members) + } +} + +// TestRedisDB_ZSetLegacyBlobAfterWideRowsIsDropped pins the +// reverse-order edge case: if a snapshot somehow emits a +// `!redis|zset|` AFTER `!zs|mem|` rows (custom dispatcher +// ordering, or a replay), the legacy blob is dropped at intake +// rather than overwriting the established wide-column state. +func TestRedisDB_ZSetLegacyBlobAfterWideRowsIsDropped(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Wide-column row first — registers sawWide. + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("alice")), encodeZSetScoreValue(99)); err != nil { + t.Fatal(err) + } + // Legacy blob arrives later — must be ignored. + legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "alice", score: 1}, // would overwrite if not gated + {member: "stale", score: 2}, // would re-add if not gated + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 99, scoreKind: "number"}, // unchanged + }) +} + // TestRedisDB_ZSetLegacyBlobWithInlineTTL pins that a TTL'd zset // stored only via the legacy blob round-trips its expiry. The // snapshot order is `!redis|zset|` (sorts before `!redis|ttl|`), From 174456aed15094b6702cece0e6f24bbb670be12c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 04:46:43 +0900 Subject: [PATCH 5/7] =?UTF-8?q?backup(zset):=20PR790=20r4=20codex=20P1=20?= =?UTF-8?q?=E2=80=94=20bound=20pendingTTL=20to=20prevent=20OOM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex flagged the same P1 on PR #791 round 2 — pendingTTL grows unboundedly when no zset handler is present to drain the parked TTLs. The same bug exists on PR #790's branch (the two PRs share the pendingTTL infrastructure; the codex finding's text applies verbatim). Fix: cap pendingTTL at pendingTTLCap entries (default 1 MiB). Once the cap is reached, subsequent unknown-kind TTLs fall back to immediate-orphan counting via the new parkUnknownTTL helper. New API: - RedisDB.WithPendingTTLCap(int) chainable setter. - pendingTTLOverflow counter surfaced in the orphan-TTL warning. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch behavior changes: previously ALWAYS buffered, now buffers up to cap then orphan-counts. Other branches unchanged. - Callers of HandleTTL: tests only. The cmd/elastickv-snapshot-decode driver (Phase 0a follow-up) will be the first production caller. - New helper parkUnknownTTL is package-private with one call site (HandleTTL). No prior call sites to audit. - Verified via `grep -rn HandleTTL|pendingTTLCap|WithPendingTTLCap --include=*.go internal/backup/`. New tests: - TestRedisDB_PendingTTLBoundedByCap. - TestRedisDB_WithPendingTTLCapZeroDisablesBuffering. - TestRedisDB_WithPendingTTLCapNegativeCoercedToZero. Self-review: 1. Data loss - cap can mis-classify TTLs in the overflow window that COULD have been drained by a later state-init. Mitigation: default cap (1 MiB) is well above the count of legitimately- buffered wide-column TTL'd keys on real clusters; operators can tune via WithPendingTTLCap. 2. Concurrency - no new shared state. 3. Performance - one extra comparison per unknown-kind TTL. 4. Consistency - mirrors the fix landing on PR #791 r3; when either PR merges first the other can rebase cleanly. 5. Coverage - 3 new tests + the existing TestRedisDB_OrphanTTLCountedNotBuffered (still pins the default-cap buffered + drained-at-Finalize semantics). --- internal/backup/redis_string.go | 89 +++++++++++++++++++++++----- internal/backup/redis_string_test.go | 65 ++++++++++++++++++++ 2 files changed, 140 insertions(+), 14 deletions(-) diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 76040613..cd000e65 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -214,8 +214,32 @@ type RedisDB struct { // orphan-TTL warning for whatever remains (those keys never // appeared as a typed record — likely a corrupted store). pendingTTL map[string]uint64 + + // pendingTTLCap caps pendingTTL's in-memory size. Once the map + // reaches this many entries, subsequent unknown-kind TTLs fall + // back to incrementing orphanTTLCount directly without + // buffering the user-key bytes. Without this cap, an + // adversarial or corrupt snapshot whose `!redis|ttl|` records + // never find a typed-record claimer would grow pendingTTL + // unboundedly and could OOM the decoder. Codex P1 finding + // surfaced on PR #791 round 2; the same fix is applied here on + // PR #790 because the two PRs share the pendingTTL + // infrastructure and the bug is identical. + pendingTTLCap int + + // pendingTTLOverflow counts entries skipped because the + // pendingTTL buffer was at cap. Surfaced in the Finalize + // warning so operators can distinguish "snapshot exceeded the + // buffer cap" from "TTL records remained unmatched". + pendingTTLOverflow int } +// defaultPendingTTLCap caps pendingTTL at 1 MiB entries by default +// (~64 MiB worst-case memory at ~64 B per Go map entry). Override +// via WithPendingTTLCap for hosts that need a different memory / +// coverage trade-off. +const defaultPendingTTLCap = 1 << 20 + // NewRedisDB constructs a RedisDB rooted at /redis/db_/. // dbIndex selects ; today the producer always passes 0, but accepting // the index as a parameter prevents a future multi-db dump from silently @@ -235,7 +259,21 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { sets: make(map[string]*redisSetState), zsets: make(map[string]*redisZSetState), pendingTTL: make(map[string]uint64), + pendingTTLCap: defaultPendingTTLCap, + } +} + +// WithPendingTTLCap overrides the default cap on the pendingTTL +// buffer. A value of 0 disables buffering — every unknown-kind TTL +// becomes an immediate orphan (matches the pre-pendingTTL behavior). +// Negative inputs are coerced to 0. Returns the receiver so it can +// be chained with other With* setters. +func (r *RedisDB) WithPendingTTLCap(capacity int) *RedisDB { + if capacity < 0 { + capacity = 0 } + r.pendingTTLCap = capacity + return r } // WithWarnSink wires a structured-warning sink. The sink is called with @@ -353,23 +391,39 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.hasTTL = true return nil case redisKindUnknown: - // Park the expiry until a wide-column Handle*Meta / - // Handle*Member / Handle*Entry registers the user key. - // Without this buffering, sorted-set / set / stream TTLs - // would be counted as orphans and dropped because their - // type prefixes lex-sort AFTER `!redis|ttl|` in the - // snapshot's encoded-key order. Codex P1 (PR #790). - // - // We store userKey as a string copy (`string([]byte)` - // allocates) rather than the alias slice — the snapshot - // reader reuses key buffers across iterations, so a slice - // alias would race with the next record. - r.pendingTTL[string(userKey)] = expireAtMs + r.parkUnknownTTL(userKey, expireAtMs) return nil } return nil } +// parkUnknownTTL buffers a redisKindUnknown TTL into pendingTTL, or +// falls back to the immediate orphan-count path when the buffer is +// at cap. Extracted from HandleTTL's switch so the parent stays +// under the cyclop budget. +// +// Rationale (codex P1 on PR #791 round 2; same fix mirrored here): +// Pebble snapshots emit records in encoded-key order, and +// `!redis|ttl|` lex-sorts BEFORE every wide-column prefix where the +// type letter is >= 's' (set, stream, zset). The buffer lets us +// drain the parked expiry when the typed record finally arrives; +// the cap prevents an adversarial snapshot whose TTLs never find a +// claimer from OOM'ing the decoder. Storage: userKey is COPIED +// (`string([]byte)` allocates) because the snapshot reader reuses +// key buffers across iterations — an alias slice would race with +// the next record. +func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) { + if len(r.pendingTTL) >= r.pendingTTLCap { + // Cap reached: fall back to the immediate-orphan path so + // memory stays bounded. Already-buffered entries can still + // be drained by later state-inits. + r.pendingTTLOverflow++ + r.orphanTTLCount++ + return + } + r.pendingTTL[string(userKey)] = expireAtMs +} + // claimPendingTTL drains any buffered TTL for userKey into the // caller-provided state. Called by the wide-column state-init // functions (setState / zsetState / streamState) when they first @@ -424,9 +478,16 @@ func (r *RedisDB) Finalize() error { // type added on the live side without a backup-encoder update). r.orphanTTLCount += len(r.pendingTTL) if r.warn != nil && r.orphanTTLCount > 0 { - r.warn("redis_orphan_ttl", + fields := []any{ "count", r.orphanTTLCount, - "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix") + "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix", + } + if r.pendingTTLOverflow > 0 { + fields = append(fields, + "pending_ttl_buffer_overflow", r.pendingTTLOverflow, + "pending_ttl_buffer_cap", r.pendingTTLCap) + } + r.warn("redis_orphan_ttl", fields...) } return firstErr } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 5343d3b5..104a74eb 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -617,6 +617,71 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { } } +// TestRedisDB_PendingTTLBoundedByCap pins the codex P1 fix on PR +// #791 round 2 (mirrored here on PR #790 round 4): pendingTTL must +// NOT grow unboundedly. Once the cap is reached, subsequent +// unknown-kind TTLs fall back to immediate-orphan counting without +// buffering the user-key bytes. +func TestRedisDB_PendingTTLBoundedByCap(t *testing.T) { + t.Parallel() + const cap = 8 + db, _ := newRedisDB(t) + db.WithPendingTTLCap(cap) + for i := 0; i < cap*2; i++ { + key := []byte("orphan-" + intToDecimal(i)) + ms := uint64(i) + 1 //nolint:gosec // i bounded above + if err := db.HandleTTL(key, encodeTTLValue(ms)); err != nil { + t.Fatalf("HandleTTL[%d]: %v", i, err) + } + } + if got := len(db.pendingTTL); got != cap { + t.Fatalf("pendingTTL len = %d, want %d (cap)", got, cap) + } + if got := db.orphanTTLCount; got != cap { + t.Fatalf("orphanTTLCount = %d at intake, want %d (overflow path)", got, cap) + } + if got := db.pendingTTLOverflow; got != cap { + t.Fatalf("pendingTTLOverflow = %d, want %d", got, cap) + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + if got := db.orphanTTLCount; got != cap*2 { + t.Fatalf("orphanTTLCount = %d after Finalize, want %d", got, cap*2) + } +} + +// TestRedisDB_WithPendingTTLCapZeroDisablesBuffering pins that +// cap==0 reverts to the original immediate-orphan path. +func TestRedisDB_WithPendingTTLCapZeroDisablesBuffering(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + db.WithPendingTTLCap(0) + const n = 5 + for i := 0; i < n; i++ { + if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { + t.Fatal(err) + } + } + if got := len(db.pendingTTL); got != 0 { + t.Fatalf("pendingTTL len = %d, want 0 (cap disabled)", got) + } + if got := db.orphanTTLCount; got != n { + t.Fatalf("orphanTTLCount = %d, want %d", got, n) + } +} + +// TestRedisDB_WithPendingTTLCapNegativeCoercedToZero pins input +// sanitisation. +func TestRedisDB_WithPendingTTLCapNegativeCoercedToZero(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + db.WithPendingTTLCap(-100) + if db.pendingTTLCap != 0 { + t.Fatalf("pendingTTLCap = %d after WithPendingTTLCap(-100), want 0", db.pendingTTLCap) + } +} + func TestRedisDB_DirsCreatedCachesMkdirAll(t *testing.T) { t.Parallel() // Two HandleString calls in a row should populate dirsCreated From 800cc8c16df7225472811f27645b49f3f57b7851 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 04:55:31 +0900 Subject: [PATCH 6/7] =?UTF-8?q?backup(zset):=20PR790=20r5=20codex=20P1=20?= =?UTF-8?q?=E2=80=94=20fail=20closed=20on=20pendingTTL=20overflow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 on round 4: the r4 fix capped pendingTTL but silently counted overflow entries as orphans. That's a data-loss path on legitimate large datasets — in real Pebble snapshot order (`!redis|ttl|` < `!st|`/`!stream|`/`!zs|`), an overflowed TTL likely belongs to a wide-column key arriving later in the scan, and dropping it permanently loses `expire_at_ms`. Restored data ends up non-expiring without the operator noticing — only a warning gets logged. Fix: replace the silent-orphan fallback with fail-closed semantics. Three modes determined by pendingTTLCap: - cap > 0 and buffer NOT full: buffer the entry as before. - cap == 0: counter-only mode (operator explicit opt-out; orphan-count without buffering). - cap > 0 and buffer FULL: fail closed with new sentinel ErrPendingTTLBufferFull. Operator must raise the cap via WithPendingTTLCap or investigate the snapshot for corruption. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch now CAN return non-nil error in a new condition. Existing callers (all in test files) already check the error. Verified via `grep -rn '\.HandleTTL(' --include=*.go`: every call site has `if err := db.HandleTTL(...); err != nil { ... }`. No silent swallow. - The cmd/elastickv-snapshot-decode driver (Phase 0a follow-up) will see the new error class and can either bail or call WithPendingTTLCap before retrying. - Tests TestRedisDB_PendingTTLFailsClosedAtCap (renamed from TestRedisDB_PendingTTLBoundedByCap) and TestRedisDB_WithPendingTTLCapZeroOpts (renamed from TestRedisDB_WithPendingTTLCapZeroDisablesBuffering) pin the new semantics. The existing TestRedisDB_OrphanTTLCountedNotBuffered drives 10,000 TTLs with default cap=1M, well under the cap so no behavior change. - parkUnknownTTL signature changed from `func(...)` to `func(...) error`. Single caller (HandleTTL) updated. Self-review: 1. Data loss - opposite of the original concern: fail-closed prevents silent TTL drops. Operators who explicitly accept the lossy path opt in via WithPendingTTLCap(0). 2. Concurrency - no new shared state; orphanTTLCount unchanged for the cap=0 counter mode. 3. Performance - one comparison per unknown-kind TTL. Failed- closed path is a clean Go error return; no allocation amplification. 4. Consistency - matches the "fail closed" pattern called out in CLAUDE.md (replication/HLC/snapshot reads). Error type makes the failure mode legible to the caller. 5. Coverage - 2 renamed tests pin the new semantics. The error path is exercised at the cap+1 record; the counter-only mode stays lossy by explicit opt-in. --- internal/backup/redis_string.go | 75 ++++++++++++++++++++-------- internal/backup/redis_string_test.go | 50 +++++++++++-------- 2 files changed, 83 insertions(+), 42 deletions(-) diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index cd000e65..7ad2d39c 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -72,6 +72,24 @@ var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str| // expected 8-byte big-endian uint64 millisecond expiry. var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value") +// ErrPendingTTLBufferFull is returned by HandleTTL when an +// unknown-kind TTL arrives but the pendingTTL buffer is already at +// pendingTTLCap. The encoder fails closed here rather than silently +// counting the TTL as an orphan because in real Pebble snapshot +// order (`!redis|ttl|` lex-sorts before `!st|`/`!stream|`/`!zs|`), +// the dropped entry would likely belong to a valid wide-column key +// that arrives later — losing its expire_at_ms would produce a +// restored database with non-expiring data that the source +// snapshot's clients expected to expire. Codex P1 on PR #790 +// round 5. +// +// Recovery: raise WithPendingTTLCap above the snapshot's count of +// unmatched-at-intake TTLs, or set the cap to 0 to explicitly opt +// into the lossy counter-only mode (callers that prefer not to +// see this error must accept that orphan-counted entries will be +// dropped without inlining into wide-column state). +var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL buffer at cap; raise WithPendingTTLCap or accept orphan-counter mode via WithPendingTTLCap(0)") + // redisKeyKind tracks which Redis-type prefix introduced a user key, so that // when a later !redis|ttl| record arrives we know whether to write its // expiry into strings_ttl.jsonl, hll_ttl.jsonl, or buffer it for a wide- @@ -391,37 +409,52 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.hasTTL = true return nil case redisKindUnknown: - r.parkUnknownTTL(userKey, expireAtMs) - return nil + return r.parkUnknownTTL(userKey, expireAtMs) } return nil } // parkUnknownTTL buffers a redisKindUnknown TTL into pendingTTL, or -// falls back to the immediate orphan-count path when the buffer is -// at cap. Extracted from HandleTTL's switch so the parent stays -// under the cyclop budget. +// fails closed when the buffer is at cap. Extracted from HandleTTL's +// switch so the parent stays under the cyclop budget. +// +// Three modes determined by pendingTTLCap: +// +// - cap > 0 and buffer NOT full: store the (userKey, expireAtMs) +// pair so a later wide-column state-init can drain it. +// - cap == 0: counter-only mode. The TTL becomes an immediate +// orphan. Operator-explicit opt-out for callers that prefer +// constant-space orphan counting over the buffered drain path. +// - cap > 0 and buffer FULL: fail closed with +// ErrPendingTTLBufferFull. Silently counting the entry as an +// orphan would permanently lose `expire_at_ms` for the wide- +// column key that arrives later — restored data becomes +// non-expiring without the operator noticing. Codex P1 on PR +// #790 round 5. // -// Rationale (codex P1 on PR #791 round 2; same fix mirrored here): -// Pebble snapshots emit records in encoded-key order, and -// `!redis|ttl|` lex-sorts BEFORE every wide-column prefix where the -// type letter is >= 's' (set, stream, zset). The buffer lets us -// drain the parked expiry when the typed record finally arrives; -// the cap prevents an adversarial snapshot whose TTLs never find a -// claimer from OOM'ing the decoder. Storage: userKey is COPIED -// (`string([]byte)` allocates) because the snapshot reader reuses -// key buffers across iterations — an alias slice would race with -// the next record. -func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) { +// Storage: userKey is COPIED (`string([]byte)` allocates) because +// the snapshot reader reuses key buffers across iterations — an +// alias slice would race with the next record. +func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) error { + if r.pendingTTLCap == 0 { + // Counter-only mode (operator explicitly disabled the buffer). + r.orphanTTLCount++ + return nil + } if len(r.pendingTTL) >= r.pendingTTLCap { - // Cap reached: fall back to the immediate-orphan path so - // memory stays bounded. Already-buffered entries can still - // be drained by later state-inits. + // Fail closed: refuse to silently drop a TTL that may + // belong to a wide-column key arriving later in the + // snapshot scan. The operator should raise the cap + // (WithPendingTTLCap) or investigate the snapshot for + // corruption. We still increment pendingTTLOverflow so + // the Finalize warning surfaces the count even if the + // caller swallows the error. r.pendingTTLOverflow++ - r.orphanTTLCount++ - return + return cockroachdberr.Wrapf(ErrPendingTTLBufferFull, + "buffer at cap=%d (user_key_len=%d)", r.pendingTTLCap, len(userKey)) } r.pendingTTL[string(userKey)] = expireAtMs + return nil } // claimPendingTTL drains any buffered TTL for userKey into the diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 104a74eb..38de429b 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -617,50 +617,58 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { } } -// TestRedisDB_PendingTTLBoundedByCap pins the codex P1 fix on PR -// #791 round 2 (mirrored here on PR #790 round 4): pendingTTL must -// NOT grow unboundedly. Once the cap is reached, subsequent -// unknown-kind TTLs fall back to immediate-orphan counting without -// buffering the user-key bytes. -func TestRedisDB_PendingTTLBoundedByCap(t *testing.T) { +// TestRedisDB_PendingTTLFailsClosedAtCap pins the codex P1 fix on +// PR #790 round 5: when pendingTTL reaches cap and another +// unknown-kind TTL arrives, HandleTTL fails closed with +// ErrPendingTTLBufferFull rather than silently counting the entry +// as an orphan. Silent overflow would permanently lose +// expire_at_ms for wide-column keys that arrive later in the scan, +// producing restored data with TTLs the source snapshot expected +// to expire. +func TestRedisDB_PendingTTLFailsClosedAtCap(t *testing.T) { t.Parallel() const cap = 8 db, _ := newRedisDB(t) db.WithPendingTTLCap(cap) - for i := 0; i < cap*2; i++ { + // Fill the buffer to exactly cap — should all succeed. + for i := 0; i < cap; i++ { key := []byte("orphan-" + intToDecimal(i)) ms := uint64(i) + 1 //nolint:gosec // i bounded above if err := db.HandleTTL(key, encodeTTLValue(ms)); err != nil { - t.Fatalf("HandleTTL[%d]: %v", i, err) + t.Fatalf("HandleTTL[%d]: %v (should succeed within cap)", i, err) } } if got := len(db.pendingTTL); got != cap { t.Fatalf("pendingTTL len = %d, want %d (cap)", got, cap) } - if got := db.orphanTTLCount; got != cap { - t.Fatalf("orphanTTLCount = %d at intake, want %d (overflow path)", got, cap) + // One more entry should fail closed. + err := db.HandleTTL([]byte("orphan-overflow"), encodeTTLValue(999)) + if !errors.Is(err, ErrPendingTTLBufferFull) { + t.Fatalf("err = %v, want ErrPendingTTLBufferFull at cap", err) } - if got := db.pendingTTLOverflow; got != cap { - t.Fatalf("pendingTTLOverflow = %d, want %d", got, cap) + if got := db.pendingTTLOverflow; got != 1 { + t.Fatalf("pendingTTLOverflow = %d, want 1", got) } - if err := db.Finalize(); err != nil { - t.Fatalf("Finalize: %v", err) - } - if got := db.orphanTTLCount; got != cap*2 { - t.Fatalf("orphanTTLCount = %d after Finalize, want %d", got, cap*2) + // orphanTTLCount must NOT be incremented at intake — the + // caller has to handle the error explicitly. + if got := db.orphanTTLCount; got != 0 { + t.Fatalf("orphanTTLCount = %d at intake, want 0 (failed closed)", got) } } -// TestRedisDB_WithPendingTTLCapZeroDisablesBuffering pins that -// cap==0 reverts to the original immediate-orphan path. -func TestRedisDB_WithPendingTTLCapZeroDisablesBuffering(t *testing.T) { +// TestRedisDB_WithPendingTTLCapZeroOpts pins the explicit +// counter-only mode: cap==0 disables buffering entirely and every +// unknown-kind TTL becomes an immediate orphan WITHOUT firing +// ErrPendingTTLBufferFull. Operators that prefer constant-space +// orphan-counting over the fail-closed path opt in via cap=0. +func TestRedisDB_WithPendingTTLCapZeroOpts(t *testing.T) { t.Parallel() db, _ := newRedisDB(t) db.WithPendingTTLCap(0) const n = 5 for i := 0; i < n; i++ { if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { - t.Fatal(err) + t.Fatalf("HandleTTL[%d]: %v (counter-only mode must not fail)", i, err) } } if got := len(db.pendingTTL); got != 0 { From 8eca5cdf88c330f9a44c65705efe49044556329d Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 20 May 2026 05:08:22 +0900 Subject: [PATCH 7/7] =?UTF-8?q?backup(zset):=20PR790=20r6=20codex=20P1=20?= =?UTF-8?q?=E2=80=94=20bound=20pendingTTL=20by=20bytes,=20not=20entries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 on round 5: the entry-count cap introduced in r4 doesn't actually bound memory. Redis user keys can be up to 1 MiB each, so an entry-count cap of 1 MiB entries permits up to 1 TiB of buffered key bytes — defeating the OOM protection on adversarial snapshots with large keys. Fix: replace entry-count cap with a byte budget. Each entry costs `len(userKey) + 8` bytes (the 8 is the uint64 expireAtMs payload; Go-map bucket overhead is intentionally NOT counted so the budget stays deterministic for operators). The budget is decremented when claimPendingTTL drains an entry, so a long-running scan that drains as it goes can reuse the budget. Renames: - WithPendingTTLCap → WithPendingTTLByteCap (the cap is now bytes) - defaultPendingTTLCap → defaultPendingTTLBytesCap (64 MiB) - pendingTTLCap field → pendingTTLBytesCap (struct field) - Added pendingTTLBytes counter (current cumulative cost) - Added pendingTTLEntryOverheadBytes constant (= 8) Caller audit (per /loop standing instruction): - WithPendingTTLCap was introduced in r3/r4 of this PR; no external caller has consumed it yet. Verified via `grep -rn 'WithPendingTTLCap|pendingTTLCap\b' --include=*.go`: zero matches in the full tree (the rename is total). - parkUnknownTTL's signature unchanged (still returns error). The failure condition changes from "entry count >= cap" to "would exceed byte budget"; same fail-closed semantics, just measured in bytes. - claimPendingTTL gains a `r.pendingTTLBytes -= ...` line. Single caller (the wide-column state-init helpers); accounting matches parkUnknownTTL's add side exactly. - Finalize warning surfaces `pending_ttl_buffer_bytes_cap` instead of the old `pending_ttl_buffer_cap`. Field name change in the observable warning; no programmatic consumer exists yet. Tests renamed/added: - TestRedisDB_PendingTTLFailsClosedAtByteCap (renamed from TestRedisDB_PendingTTLFailsClosedAtCap; switched to byte-cap semantics with 8-byte keys × 16-byte cost). - New TestRedisDB_PendingTTLByteCapBoundedByLargeKey pins the exact bug class codex flagged: a single 100-byte key blocks under a 64-byte budget. - New TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim pins that draining via claimPendingTTL frees byte budget. - TestRedisDB_WithPendingTTLByteCapZeroOpts (renamed). - TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero (renamed). Self-review: 1. Data loss - byte-budget cap is a strictly tighter bound than the previous entry-count cap; no new data-loss surface. fail-closed semantics from r5 unchanged. 2. Concurrency - pendingTTLBytes is per-RedisDB; same single- threaded contract as the rest of the struct. 3. Performance - one int comparison + add per parkUnknownTTL; one int subtract per claimPendingTTL. O(1) overhead. 4. Consistency - byte budget matches the OOM protection reviewer asked for, sized at 64 MiB to comfortably fit any realistic legitimate workload. 5. Coverage - 5 tests total (3 new, 2 renamed). All pass with -race. Existing TestRedisDB_OrphanTTLCountedNotBuffered still passes with the default 64 MiB budget (10K small keys ≈ 180 KB). --- internal/backup/redis_string.go | 169 +++++++++++++++++---------- internal/backup/redis_string_test.go | 127 ++++++++++++++------ 2 files changed, 198 insertions(+), 98 deletions(-) diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 7ad2d39c..6524b0d8 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -73,22 +73,27 @@ var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str| var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value") // ErrPendingTTLBufferFull is returned by HandleTTL when an -// unknown-kind TTL arrives but the pendingTTL buffer is already at -// pendingTTLCap. The encoder fails closed here rather than silently -// counting the TTL as an orphan because in real Pebble snapshot -// order (`!redis|ttl|` lex-sorts before `!st|`/`!stream|`/`!zs|`), -// the dropped entry would likely belong to a valid wide-column key -// that arrives later — losing its expire_at_ms would produce a -// restored database with non-expiring data that the source -// snapshot's clients expected to expire. Codex P1 on PR #790 -// round 5. +// unknown-kind TTL arrives but the pendingTTL buffer's BYTE +// budget (pendingTTLBytesCap) is already exhausted. The encoder +// fails closed here rather than silently counting the TTL as an +// orphan because in real Pebble snapshot order (`!redis|ttl|` +// lex-sorts before `!st|`/`!stream|`/`!zs|`), the dropped entry +// would likely belong to a valid wide-column key that arrives +// later — losing its expire_at_ms would produce a restored +// database with non-expiring data that the source snapshot's +// clients expected to expire. // -// Recovery: raise WithPendingTTLCap above the snapshot's count of -// unmatched-at-intake TTLs, or set the cap to 0 to explicitly opt -// into the lossy counter-only mode (callers that prefer not to -// see this error must accept that orphan-counted entries will be -// dropped without inlining into wide-column state). -var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL buffer at cap; raise WithPendingTTLCap or accept orphan-counter mode via WithPendingTTLCap(0)") +// The budget is in BYTES (not entries) because Redis user keys can +// be up to 1 MiB each; an entry-count cap of N still permits N +// MiB of accumulated key bytes, which defeats the OOM protection +// on adversarial snapshots with large keys. Codex P1 on PR #790 +// round 5 (entry-count -> byte-budget on round 6). +// +// Recovery: raise WithPendingTTLByteCap above the snapshot's +// expected cumulative byte cost of unmatched-at-intake TTLs, or +// set the byte cap to 0 to explicitly opt into the lossy +// counter-only mode. +var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL byte budget exhausted; raise WithPendingTTLByteCap or accept orphan-counter mode via WithPendingTTLByteCap(0)") // redisKeyKind tracks which Redis-type prefix introduced a user key, so that // when a later !redis|ttl| record arrives we know whether to write its @@ -233,30 +238,48 @@ type RedisDB struct { // appeared as a typed record — likely a corrupted store). pendingTTL map[string]uint64 - // pendingTTLCap caps pendingTTL's in-memory size. Once the map - // reaches this many entries, subsequent unknown-kind TTLs fall - // back to incrementing orphanTTLCount directly without - // buffering the user-key bytes. Without this cap, an - // adversarial or corrupt snapshot whose `!redis|ttl|` records - // never find a typed-record claimer would grow pendingTTL - // unboundedly and could OOM the decoder. Codex P1 finding - // surfaced on PR #791 round 2; the same fix is applied here on - // PR #790 because the two PRs share the pendingTTL - // infrastructure and the bug is identical. - pendingTTLCap int - - // pendingTTLOverflow counts entries skipped because the - // pendingTTL buffer was at cap. Surfaced in the Finalize - // warning so operators can distinguish "snapshot exceeded the - // buffer cap" from "TTL records remained unmatched". + // pendingTTLBytes tracks the cumulative byte cost of the + // pendingTTL buffer (each entry costs len(userKey) + 8 bytes, + // where 8 is the uint64 expireAtMs payload — the per-entry + // Go-map overhead is not counted here, since we want the cap + // to be a deterministic byte budget the operator can reason + // about). + pendingTTLBytes int + + // pendingTTLBytesCap caps the byte cost of pendingTTL. Once + // the cumulative cost would exceed this budget, subsequent + // unknown-kind TTLs fail closed with ErrPendingTTLBufferFull. + // We bound by bytes (not by entry count) because Redis user + // keys can be up to 1 MiB each; an entry-count cap of N + // would still permit ~N MiB of accumulated key bytes — + // defeating the OOM protection on adversarial snapshots with + // large keys. Codex P1 finding on PR #790 round 6. + pendingTTLBytesCap int + + // pendingTTLOverflow counts entries that would have entered + // pendingTTL but were rejected because the byte budget was + // exhausted. Surfaced in the Finalize warning so operators + // can distinguish "snapshot exceeded the buffer budget" from + // "TTL records remained unmatched after the entire scan". pendingTTLOverflow int } -// defaultPendingTTLCap caps pendingTTL at 1 MiB entries by default -// (~64 MiB worst-case memory at ~64 B per Go map entry). Override -// via WithPendingTTLCap for hosts that need a different memory / -// coverage trade-off. -const defaultPendingTTLCap = 1 << 20 +// defaultPendingTTLBytesCap caps pendingTTL at 64 MiB cumulative +// key+payload bytes by default. Override via WithPendingTTLByteCap +// for hosts that need a different memory / coverage trade-off. +// 64 MiB covers tens of thousands to millions of typical Redis +// keys (usually << 100 bytes each); the absolute key-size ceiling +// is 1 MiB, so this budget tolerates ~64 maximally-large keys +// without dropping the OOM protection. +const defaultPendingTTLBytesCap = 64 << 20 + +// pendingTTLEntryOverheadBytes is the per-entry payload cost we +// charge against pendingTTLBytesCap on top of the user-key bytes. +// It accounts for the uint64 expireAtMs we store as the map value. +// The Go-map's bucket overhead is NOT included — we want a +// deterministic byte budget the operator can reason about +// directly, not one that drifts with Go's runtime map layout. +const pendingTTLEntryOverheadBytes = 8 // NewRedisDB constructs a RedisDB rooted at /redis/db_/. // dbIndex selects ; today the producer always passes 0, but accepting @@ -267,30 +290,37 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { dbIndex = 0 } return &RedisDB{ - outRoot: outRoot, - dbIndex: dbIndex, - kindByKey: make(map[string]redisKeyKind), - dirsCreated: make(map[string]struct{}), - inlineTTLEmitted: make(map[string]struct{}), - hashes: make(map[string]*redisHashState), - lists: make(map[string]*redisListState), - sets: make(map[string]*redisSetState), - zsets: make(map[string]*redisZSetState), - pendingTTL: make(map[string]uint64), - pendingTTLCap: defaultPendingTTLCap, + outRoot: outRoot, + dbIndex: dbIndex, + kindByKey: make(map[string]redisKeyKind), + dirsCreated: make(map[string]struct{}), + inlineTTLEmitted: make(map[string]struct{}), + hashes: make(map[string]*redisHashState), + lists: make(map[string]*redisListState), + sets: make(map[string]*redisSetState), + zsets: make(map[string]*redisZSetState), + pendingTTL: make(map[string]uint64), + pendingTTLBytesCap: defaultPendingTTLBytesCap, } } -// WithPendingTTLCap overrides the default cap on the pendingTTL -// buffer. A value of 0 disables buffering — every unknown-kind TTL -// becomes an immediate orphan (matches the pre-pendingTTL behavior). -// Negative inputs are coerced to 0. Returns the receiver so it can -// be chained with other With* setters. -func (r *RedisDB) WithPendingTTLCap(capacity int) *RedisDB { +// WithPendingTTLByteCap overrides the default byte budget for the +// pendingTTL buffer. A value of 0 disables buffering — every +// unknown-kind TTL becomes an immediate orphan (matches the +// pre-pendingTTL behavior). Negative inputs are coerced to 0. +// Returns the receiver so it can be chained with other With* +// setters. +// +// The budget is in BYTES (sum of `len(userKey) + +// pendingTTLEntryOverheadBytes` over every buffered entry), NOT +// entry count, because Redis user keys can be up to 1 MiB each +// and an entry-count cap of N would still permit ~N MiB of +// accumulated key bytes. Codex P1 finding on PR #790 round 6. +func (r *RedisDB) WithPendingTTLByteCap(capacity int) *RedisDB { if capacity < 0 { capacity = 0 } - r.pendingTTLCap = capacity + r.pendingTTLBytesCap = capacity return r } @@ -418,7 +448,7 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { // fails closed when the buffer is at cap. Extracted from HandleTTL's // switch so the parent stays under the cyclop budget. // -// Three modes determined by pendingTTLCap: +// Three modes determined by pendingTTLBytesCap: // // - cap > 0 and buffer NOT full: store the (userKey, expireAtMs) // pair so a later wide-column state-init can drain it. @@ -436,24 +466,31 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { // the snapshot reader reuses key buffers across iterations — an // alias slice would race with the next record. func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) error { - if r.pendingTTLCap == 0 { + if r.pendingTTLBytesCap == 0 { // Counter-only mode (operator explicitly disabled the buffer). r.orphanTTLCount++ return nil } - if len(r.pendingTTL) >= r.pendingTTLCap { + // Charge the byte cost: user-key bytes + the uint64 expireAtMs + // payload. The Go-map's per-bucket overhead is NOT counted + // because we want the cap to be a deterministic byte budget + // the operator can reason about. See pendingTTLEntryOverheadBytes. + cost := len(userKey) + pendingTTLEntryOverheadBytes + if r.pendingTTLBytes+cost > r.pendingTTLBytesCap { // Fail closed: refuse to silently drop a TTL that may // belong to a wide-column key arriving later in the - // snapshot scan. The operator should raise the cap - // (WithPendingTTLCap) or investigate the snapshot for - // corruption. We still increment pendingTTLOverflow so - // the Finalize warning surfaces the count even if the - // caller swallows the error. + // snapshot scan. The operator should raise the budget + // (WithPendingTTLByteCap) or investigate the snapshot + // for corruption. We still increment pendingTTLOverflow + // so the Finalize warning surfaces the count even if + // the caller swallows the error. r.pendingTTLOverflow++ return cockroachdberr.Wrapf(ErrPendingTTLBufferFull, - "buffer at cap=%d (user_key_len=%d)", r.pendingTTLCap, len(userKey)) + "buffer would exceed byte_cap=%d by %d bytes (user_key_len=%d, in_flight=%d)", + r.pendingTTLBytesCap, r.pendingTTLBytes+cost-r.pendingTTLBytesCap, len(userKey), r.pendingTTLBytes) } r.pendingTTL[string(userKey)] = expireAtMs + r.pendingTTLBytes += cost return nil } @@ -479,6 +516,10 @@ func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) { return 0, false } delete(r.pendingTTL, uk) + // Free the byte budget so a later snapshot scan that drains + // many entries can buffer further work. Matches the cost + // charged in parkUnknownTTL exactly. + r.pendingTTLBytes -= len(uk) + pendingTTLEntryOverheadBytes return expireAtMs, true } @@ -518,7 +559,7 @@ func (r *RedisDB) Finalize() error { if r.pendingTTLOverflow > 0 { fields = append(fields, "pending_ttl_buffer_overflow", r.pendingTTLOverflow, - "pending_ttl_buffer_cap", r.pendingTTLCap) + "pending_ttl_buffer_bytes_cap", r.pendingTTLBytesCap) } r.warn("redis_orphan_ttl", fields...) } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 38de429b..5a01c247 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -617,54 +617,113 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { } } -// TestRedisDB_PendingTTLFailsClosedAtCap pins the codex P1 fix on -// PR #790 round 5: when pendingTTL reaches cap and another -// unknown-kind TTL arrives, HandleTTL fails closed with -// ErrPendingTTLBufferFull rather than silently counting the entry -// as an orphan. Silent overflow would permanently lose -// expire_at_ms for wide-column keys that arrive later in the scan, -// producing restored data with TTLs the source snapshot expected -// to expire. -func TestRedisDB_PendingTTLFailsClosedAtCap(t *testing.T) { +// TestRedisDB_PendingTTLFailsClosedAtByteCap pins the codex P1 fix +// on PR #790 round 6: the byte budget (not entry count) bounds +// pendingTTL memory. When an incoming entry's byte cost would +// exceed pendingTTLBytesCap, HandleTTL fails closed with +// ErrPendingTTLBufferFull rather than silently counting it as an +// orphan. Without this, an adversarial snapshot with a small +// number of 1 MiB unmatched TTL keys could still OOM the decoder +// — defeating the entry-count cap from r4/r5. +func TestRedisDB_PendingTTLFailsClosedAtByteCap(t *testing.T) { t.Parallel() - const cap = 8 + // Each "orphan-N" key (N=0..7) is 8 bytes. With + // pendingTTLEntryOverheadBytes=8, each entry costs 16 bytes. + // 8 entries = 128 bytes; 9th would push to 144. Cap = 128 → + // 8 fit, 9th fails closed. + const byteCap = 128 + const entriesPerByteCap = 8 db, _ := newRedisDB(t) - db.WithPendingTTLCap(cap) - // Fill the buffer to exactly cap — should all succeed. - for i := 0; i < cap; i++ { + db.WithPendingTTLByteCap(byteCap) + for i := 0; i < entriesPerByteCap; i++ { key := []byte("orphan-" + intToDecimal(i)) ms := uint64(i) + 1 //nolint:gosec // i bounded above if err := db.HandleTTL(key, encodeTTLValue(ms)); err != nil { - t.Fatalf("HandleTTL[%d]: %v (should succeed within cap)", i, err) + t.Fatalf("HandleTTL[%d]: %v (should succeed within byte_cap)", i, err) } } - if got := len(db.pendingTTL); got != cap { - t.Fatalf("pendingTTL len = %d, want %d (cap)", got, cap) + if got := len(db.pendingTTL); got != entriesPerByteCap { + t.Fatalf("pendingTTL len = %d, want %d", got, entriesPerByteCap) } - // One more entry should fail closed. - err := db.HandleTTL([]byte("orphan-overflow"), encodeTTLValue(999)) + if got := db.pendingTTLBytes; got != byteCap { + t.Fatalf("pendingTTLBytes = %d, want %d", got, byteCap) + } + // One more entry should fail closed (would push to 144 > 128). + err := db.HandleTTL([]byte("orphan-X"), encodeTTLValue(999)) if !errors.Is(err, ErrPendingTTLBufferFull) { - t.Fatalf("err = %v, want ErrPendingTTLBufferFull at cap", err) + t.Fatalf("err = %v, want ErrPendingTTLBufferFull at byte_cap", err) } if got := db.pendingTTLOverflow; got != 1 { t.Fatalf("pendingTTLOverflow = %d, want 1", got) } - // orphanTTLCount must NOT be incremented at intake — the - // caller has to handle the error explicitly. if got := db.orphanTTLCount; got != 0 { t.Fatalf("orphanTTLCount = %d at intake, want 0 (failed closed)", got) } } -// TestRedisDB_WithPendingTTLCapZeroOpts pins the explicit -// counter-only mode: cap==0 disables buffering entirely and every -// unknown-kind TTL becomes an immediate orphan WITHOUT firing -// ErrPendingTTLBufferFull. Operators that prefer constant-space -// orphan-counting over the fail-closed path opt in via cap=0. -func TestRedisDB_WithPendingTTLCapZeroOpts(t *testing.T) { +// TestRedisDB_PendingTTLByteCapBoundedByLargeKey pins the +// large-key defense codex flagged on PR #790 round 6: a single +// 1 MiB key would have fit under any reasonable entry-count cap +// but exhausts a sensible byte budget. We use a synthetic small +// budget (64 bytes) and a 100-byte key to exercise the same logic +// without allocating a real 1 MiB blob. +func TestRedisDB_PendingTTLByteCapBoundedByLargeKey(t *testing.T) { + t.Parallel() + const byteCap = 64 + db, _ := newRedisDB(t) + db.WithPendingTTLByteCap(byteCap) + largeKey := make([]byte, 100) // 100 + 8 = 108 > 64 + for i := range largeKey { + largeKey[i] = byte(i % 256) + } + err := db.HandleTTL(largeKey, encodeTTLValue(1)) + if !errors.Is(err, ErrPendingTTLBufferFull) { + t.Fatalf("err = %v, want ErrPendingTTLBufferFull on oversize key", err) + } + if got := len(db.pendingTTL); got != 0 { + t.Fatalf("pendingTTL must be empty after failed insert, got %d", got) + } +} + +// TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim pins that the +// byte counter is decremented when an entry is drained via +// claimPendingTTL. A later wide-column state-init that drains the +// buffer must free byte budget so subsequent unknown-kind TTLs can +// be buffered again. +func TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + const byteCap = 32 // ("k0"=2 + 8) * 3 = 30 entries fit; 4th would overflow at 40 + db.WithPendingTTLByteCap(byteCap) + for i := 0; i < 3; i++ { + if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { + t.Fatalf("HandleTTL[%d]: %v", i, err) + } + } + if got := db.pendingTTLBytes; got != 30 { + t.Fatalf("pendingTTLBytes = %d, want 30", got) + } + // Drain k0 via a wide-column state-init. + if err := db.HandleSetMember(setMemberKey("k0", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if got := db.pendingTTLBytes; got != 20 { + t.Fatalf("pendingTTLBytes after drain = %d, want 20 (reclaimed 10 bytes)", got) + } + // New buffer space available — another small entry fits. + if err := db.HandleTTL([]byte("k3"), encodeTTLValue(1)); err != nil { + t.Fatalf("HandleTTL after drain failed: %v", err) + } +} + +// TestRedisDB_WithPendingTTLByteCapZeroOpts pins the explicit +// counter-only mode: byte_cap==0 disables buffering entirely and +// every unknown-kind TTL becomes an immediate orphan WITHOUT +// firing ErrPendingTTLBufferFull. +func TestRedisDB_WithPendingTTLByteCapZeroOpts(t *testing.T) { t.Parallel() db, _ := newRedisDB(t) - db.WithPendingTTLCap(0) + db.WithPendingTTLByteCap(0) const n = 5 for i := 0; i < n; i++ { if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { @@ -679,14 +738,14 @@ func TestRedisDB_WithPendingTTLCapZeroOpts(t *testing.T) { } } -// TestRedisDB_WithPendingTTLCapNegativeCoercedToZero pins input -// sanitisation. -func TestRedisDB_WithPendingTTLCapNegativeCoercedToZero(t *testing.T) { +// TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero pins +// input sanitisation. +func TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero(t *testing.T) { t.Parallel() db, _ := newRedisDB(t) - db.WithPendingTTLCap(-100) - if db.pendingTTLCap != 0 { - t.Fatalf("pendingTTLCap = %d after WithPendingTTLCap(-100), want 0", db.pendingTTLCap) + db.WithPendingTTLByteCap(-100) + if db.pendingTTLBytesCap != 0 { + t.Fatalf("pendingTTLBytesCap = %d after WithPendingTTLByteCap(-100), want 0", db.pendingTTLBytesCap) } }