diff --git a/internal/backup/redis_hash.go b/internal/backup/redis_hash.go index e314cfbe..2e5f1e5b 100644 --- a/internal/backup/redis_hash.go +++ b/internal/backup/redis_hash.go @@ -237,7 +237,11 @@ type hashFieldRecord struct { Value json.RawMessage `json:"value"` } -func marshalHashJSON(st *redisHashState) ([]byte, error) { +// nolint comment lives at the function head: dupl pairs this with +// marshalZSetJSON, which carries the rationale (parallel design-spec +// wrappers that can't collapse into a shared helper without breaking +// JSON field-order determinism). See redis_zset.go:marshalZSetJSON. +func marshalHashJSON(st *redisHashState) ([]byte, error) { //nolint:dupl // see comment above + redis_zset.go // Sort by raw byte order for deterministic output across runs. names := make([]string, 0, len(st.fields)) for name := range st.fields { diff --git a/internal/backup/redis_set.go b/internal/backup/redis_set.go index 7cd4a440..392ae8ce 100644 --- a/internal/backup/redis_set.go +++ b/internal/backup/redis_set.go @@ -114,12 +114,24 @@ func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil } // setState lazily creates per-key state. Mirrors the hash/list // kindByKey-registration pattern so HandleSetMeta, HandleSetMember, // and the HandleTTL back-edge all agree on the kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!st|...` (because `r` < `s`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the set's +// `expire_at_ms`. Without this drain step, every TTL'd set would +// restore as permanent — a latent bug in PR #758 surfaced by codex +// on PR #790. Phase 0a tests added in the same PR pin the ordering. func (r *RedisDB) setState(userKey []byte) *redisSetState { uk := string(userKey) if st, ok := r.sets[uk]; ok { return st } st := &redisSetState{members: make(map[string]struct{})} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } r.sets[uk] = st r.kindByKey[uk] = redisKindSet return st diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 688225d1..6524b0d8 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -72,6 +72,29 @@ var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str| // expected 8-byte big-endian uint64 millisecond expiry. var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value") +// ErrPendingTTLBufferFull is returned by HandleTTL when an +// unknown-kind TTL arrives but the pendingTTL buffer's BYTE +// budget (pendingTTLBytesCap) is already exhausted. The encoder +// fails closed here rather than silently counting the TTL as an +// orphan because in real Pebble snapshot order (`!redis|ttl|` +// lex-sorts before `!st|`/`!stream|`/`!zs|`), the dropped entry +// would likely belong to a valid wide-column key that arrives +// later — losing its expire_at_ms would produce a restored +// database with non-expiring data that the source snapshot's +// clients expected to expire. +// +// The budget is in BYTES (not entries) because Redis user keys can +// be up to 1 MiB each; an entry-count cap of N still permits N +// MiB of accumulated key bytes, which defeats the OOM protection +// on adversarial snapshots with large keys. Codex P1 on PR #790 +// round 5 (entry-count -> byte-budget on round 6). +// +// Recovery: raise WithPendingTTLByteCap above the snapshot's +// expected cumulative byte cost of unmatched-at-intake TTLs, or +// set the byte cap to 0 to explicitly opt into the lossy +// counter-only mode. +var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL byte budget exhausted; raise WithPendingTTLByteCap or accept orphan-counter mode via WithPendingTTLByteCap(0)") + // redisKeyKind tracks which Redis-type prefix introduced a user key, so that // when a later !redis|ttl| record arrives we know whether to write its // expiry into strings_ttl.jsonl, hll_ttl.jsonl, or buffer it for a wide- @@ -85,6 +108,7 @@ const ( redisKindHash redisKindList redisKindSet + redisKindZSet ) // RedisDB encodes one logical Redis database (`redis/db_/`). All @@ -185,8 +209,78 @@ type RedisDB struct { // Finalize into sets/.json with members sorted by raw byte // order for deterministic dump output. sets map[string]*redisSetState + + // zsets buffers per-userKey sorted-set state. Score lives in the + // !zs|mem| value (8-byte IEEE 754 big-endian); member name is the + // trailing key bytes (binary-safe). Flushed at Finalize into + // zsets/.json sorted by member-name bytes (not by score) so + // `diff -r` between dumps stays line-stable across score-only + // mutations. + zsets map[string]*redisZSetState + + // pendingTTL buffers expiries whose user-key prefix sorts AFTER + // `!redis|ttl|` in the snapshot's lex-ordered stream. Pebble + // snapshots emit records in encoded-key order + // (`store/snapshot_pebble.go::iter.First()/Next()`), and + // `!redis|ttl|` lex-sorts before all `!st|`/`!stream|`/`!zs|` + // prefixes (`r` < `s`/`s`/`z`). Without buffering, HandleTTL + // would see kindByKey == redisKindUnknown and count the TTL + // as an orphan, dropping it before zsetState / setState / + // streamState had a chance to claim the user key — TTL'd + // sorted sets, sets, and streams would silently restore as + // permanent. + // + // Lifecycle: HandleTTL files the expiry here when kind is + // still unknown. Each wide-column state-init function + // (setState / zsetState / streamState etc.) drains the entry + // when it first registers the user key. Finalize fires the + // orphan-TTL warning for whatever remains (those keys never + // appeared as a typed record — likely a corrupted store). + pendingTTL map[string]uint64 + + // pendingTTLBytes tracks the cumulative byte cost of the + // pendingTTL buffer (each entry costs len(userKey) + 8 bytes, + // where 8 is the uint64 expireAtMs payload — the per-entry + // Go-map overhead is not counted here, since we want the cap + // to be a deterministic byte budget the operator can reason + // about). + pendingTTLBytes int + + // pendingTTLBytesCap caps the byte cost of pendingTTL. Once + // the cumulative cost would exceed this budget, subsequent + // unknown-kind TTLs fail closed with ErrPendingTTLBufferFull. + // We bound by bytes (not by entry count) because Redis user + // keys can be up to 1 MiB each; an entry-count cap of N + // would still permit ~N MiB of accumulated key bytes — + // defeating the OOM protection on adversarial snapshots with + // large keys. Codex P1 finding on PR #790 round 6. + pendingTTLBytesCap int + + // pendingTTLOverflow counts entries that would have entered + // pendingTTL but were rejected because the byte budget was + // exhausted. Surfaced in the Finalize warning so operators + // can distinguish "snapshot exceeded the buffer budget" from + // "TTL records remained unmatched after the entire scan". + pendingTTLOverflow int } +// defaultPendingTTLBytesCap caps pendingTTL at 64 MiB cumulative +// key+payload bytes by default. Override via WithPendingTTLByteCap +// for hosts that need a different memory / coverage trade-off. +// 64 MiB covers tens of thousands to millions of typical Redis +// keys (usually << 100 bytes each); the absolute key-size ceiling +// is 1 MiB, so this budget tolerates ~64 maximally-large keys +// without dropping the OOM protection. +const defaultPendingTTLBytesCap = 64 << 20 + +// pendingTTLEntryOverheadBytes is the per-entry payload cost we +// charge against pendingTTLBytesCap on top of the user-key bytes. +// It accounts for the uint64 expireAtMs we store as the map value. +// The Go-map's bucket overhead is NOT included — we want a +// deterministic byte budget the operator can reason about +// directly, not one that drifts with Go's runtime map layout. +const pendingTTLEntryOverheadBytes = 8 + // NewRedisDB constructs a RedisDB rooted at /redis/db_/. // dbIndex selects ; today the producer always passes 0, but accepting // the index as a parameter prevents a future multi-db dump from silently @@ -196,17 +290,40 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { dbIndex = 0 } return &RedisDB{ - outRoot: outRoot, - dbIndex: dbIndex, - kindByKey: make(map[string]redisKeyKind), - dirsCreated: make(map[string]struct{}), - inlineTTLEmitted: make(map[string]struct{}), - hashes: make(map[string]*redisHashState), - lists: make(map[string]*redisListState), - sets: make(map[string]*redisSetState), + outRoot: outRoot, + dbIndex: dbIndex, + kindByKey: make(map[string]redisKeyKind), + dirsCreated: make(map[string]struct{}), + inlineTTLEmitted: make(map[string]struct{}), + hashes: make(map[string]*redisHashState), + lists: make(map[string]*redisListState), + sets: make(map[string]*redisSetState), + zsets: make(map[string]*redisZSetState), + pendingTTL: make(map[string]uint64), + pendingTTLBytesCap: defaultPendingTTLBytesCap, } } +// WithPendingTTLByteCap overrides the default byte budget for the +// pendingTTL buffer. A value of 0 disables buffering — every +// unknown-kind TTL becomes an immediate orphan (matches the +// pre-pendingTTL behavior). Negative inputs are coerced to 0. +// Returns the receiver so it can be chained with other With* +// setters. +// +// The budget is in BYTES (sum of `len(userKey) + +// pendingTTLEntryOverheadBytes` over every buffered entry), NOT +// entry count, because Redis user keys can be up to 1 MiB each +// and an entry-count cap of N would still permit ~N MiB of +// accumulated key bytes. Codex P1 finding on PR #790 round 6. +func (r *RedisDB) WithPendingTTLByteCap(capacity int) *RedisDB { + if capacity < 0 { + capacity = 0 + } + r.pendingTTLBytesCap = capacity + return r +} + // WithWarnSink wires a structured-warning sink. The sink is called with // stable event names ("redis_orphan_ttl", etc.) and key=value pairs. func (r *RedisDB) WithWarnSink(fn func(event string, fields ...any)) *RedisDB { @@ -246,15 +363,32 @@ func (r *RedisDB) HandleHLL(userKey, value []byte) error { return r.writeBlob("hll", userKey, value) } -// HandleTTL processes one !redis|ttl| record. Routing depends on -// what HandleString/HandleHLL recorded for the same userKey: +// HandleTTL processes one !redis|ttl| record. Routing +// depends on what the encoder has previously recorded for the user +// key. There are two ordering regimes the snapshot stream presents: +// +// 1. Prefix sorts BEFORE !redis|ttl| in encoded-key order +// (!hs|, !lst|, !redis|str|, !redis|hll|). The typed record +// arrives FIRST, kindByKey is already set when HandleTTL fires, +// and we route directly to the per-type sidecar / inline field. +// 2. Prefix sorts AFTER !redis|ttl| (!st|, !stream|, !zs|, because +// `r` < `s`/`s`/`z`). The TTL arrives FIRST and kindByKey is +// still redisKindUnknown. We park the expiry in pendingTTL and +// let each wide-column state-init function (setState / +// zsetState / streamState) drain it when the user key finally +// surfaces as a typed record. Codex P1 finding on PR #790. +// +// Routing: // -// - redisKindHLL -> hll_ttl.jsonl -// - redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL -// lives in !redis|ttl| rather than the inline magic-prefix header) -// - redisKindUnknown -> counted in orphanTTLCount; reported via the -// warn sink on Finalize because Phase 0a's wide-column encoders -// have not landed yet. +// - redisKindHLL -> hll_ttl.jsonl (case 1) +// - redisKindString -> strings_ttl.jsonl (case 1; legacy strings +// whose TTL lives in !redis|ttl| rather than the inline header) +// - redisKindHash/List/Set/ZSet/Stream -> inlined into the +// per-key JSON (case 1 for hash/list, case 2 for set/zset/stream +// where the state-init already drained from pendingTTL before +// HandleTTL would even be called the second time) +// - redisKindUnknown -> bufferPendingTTL. Finalize counts truly +// unmatched entries (key never registered as a typed record). func (r *RedisDB) HandleTTL(userKey, value []byte) error { expireAtMs, err := decodeRedisTTLValue(value) if err != nil { @@ -297,17 +431,98 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.expireAtMs = expireAtMs st.hasTTL = true return nil + case redisKindZSet: + // Same per-record TTL inlining: ZADD + EXPIRE replay in + // one shot from the per-zset JSON, no separate sidecar. + st := r.zsetState(userKey) + st.expireAtMs = expireAtMs + st.hasTTL = true + return nil case redisKindUnknown: - // Track orphan TTL counts only — keys are unused before the - // remaining wide-column encoders (set/zset/stream) land, and - // buffering them allocates proportional to user-key size - // (up to 1 MiB per key) for no benefit. Codex P2 round 6. + return r.parkUnknownTTL(userKey, expireAtMs) + } + return nil +} + +// parkUnknownTTL buffers a redisKindUnknown TTL into pendingTTL, or +// fails closed when the buffer is at cap. Extracted from HandleTTL's +// switch so the parent stays under the cyclop budget. +// +// Three modes determined by pendingTTLBytesCap: +// +// - cap > 0 and buffer NOT full: store the (userKey, expireAtMs) +// pair so a later wide-column state-init can drain it. +// - cap == 0: counter-only mode. The TTL becomes an immediate +// orphan. Operator-explicit opt-out for callers that prefer +// constant-space orphan counting over the buffered drain path. +// - cap > 0 and buffer FULL: fail closed with +// ErrPendingTTLBufferFull. Silently counting the entry as an +// orphan would permanently lose `expire_at_ms` for the wide- +// column key that arrives later — restored data becomes +// non-expiring without the operator noticing. Codex P1 on PR +// #790 round 5. +// +// Storage: userKey is COPIED (`string([]byte)` allocates) because +// the snapshot reader reuses key buffers across iterations — an +// alias slice would race with the next record. +func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) error { + if r.pendingTTLBytesCap == 0 { + // Counter-only mode (operator explicitly disabled the buffer). r.orphanTTLCount++ return nil } + // Charge the byte cost: user-key bytes + the uint64 expireAtMs + // payload. The Go-map's per-bucket overhead is NOT counted + // because we want the cap to be a deterministic byte budget + // the operator can reason about. See pendingTTLEntryOverheadBytes. + cost := len(userKey) + pendingTTLEntryOverheadBytes + if r.pendingTTLBytes+cost > r.pendingTTLBytesCap { + // Fail closed: refuse to silently drop a TTL that may + // belong to a wide-column key arriving later in the + // snapshot scan. The operator should raise the budget + // (WithPendingTTLByteCap) or investigate the snapshot + // for corruption. We still increment pendingTTLOverflow + // so the Finalize warning surfaces the count even if + // the caller swallows the error. + r.pendingTTLOverflow++ + return cockroachdberr.Wrapf(ErrPendingTTLBufferFull, + "buffer would exceed byte_cap=%d by %d bytes (user_key_len=%d, in_flight=%d)", + r.pendingTTLBytesCap, r.pendingTTLBytes+cost-r.pendingTTLBytesCap, len(userKey), r.pendingTTLBytes) + } + r.pendingTTL[string(userKey)] = expireAtMs + r.pendingTTLBytes += cost return nil } +// claimPendingTTL drains any buffered TTL for userKey into the +// caller-provided state. Called by the wide-column state-init +// functions (setState / zsetState / streamState) when they first +// register a user key, so the parked expiry inlines into the same +// per-key JSON the rest of the record assembles. +// +// Returns (expireAtMs, true) when a buffered TTL existed. The +// caller should set state.expireAtMs / state.hasTTL on the +// returned value. The pending entry is removed so Finalize's +// orphan-count loop only sees truly-unmatched TTLs. +// +// Safe to call from hashState/listState too even though those +// types' typed records sort before `!redis|ttl|`; pendingTTL will +// always be empty for them. Keeping the call site uniform keeps +// the state-init contract simple. +func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) { + uk := string(userKey) + expireAtMs, ok := r.pendingTTL[uk] + if !ok { + return 0, false + } + delete(r.pendingTTL, uk) + // Free the byte budget so a later snapshot scan that drains + // many entries can buffer further work. Matches the cost + // charged in parkUnknownTTL exactly. + r.pendingTTLBytes -= len(uk) + pendingTTLEntryOverheadBytes + return expireAtMs, true +} + // Finalize flushes all open sidecar writers and emits warnings for any // pending TTL records whose user key was never claimed by the wide-column // encoders. Call exactly once after every snapshot record has been @@ -318,6 +533,7 @@ func (r *RedisDB) Finalize() error { r.flushHashes, r.flushLists, r.flushSets, + r.flushZSets, func() error { return closeJSONL(r.stringsTTL) }, func() error { return closeJSONL(r.hllTTL) }, r.closeKeymap, @@ -326,10 +542,26 @@ func (r *RedisDB) Finalize() error { firstErr = err } } + // At this point all type-prefixed records have been processed + // and every wide-column state-init drained its claimPendingTTL. + // Whatever remains in pendingTTL is truly unmatched — the + // user key never appeared as a typed record. Likely causes: + // store corruption, a snapshot mid-write where the typed + // record was dropped, or a `!redis|ttl|` entry written for a + // key whose type prefix we don't recognise (a future Redis + // type added on the live side without a backup-encoder update). + r.orphanTTLCount += len(r.pendingTTL) if r.warn != nil && r.orphanTTLCount > 0 { - r.warn("redis_orphan_ttl", + fields := []any{ "count", r.orphanTTLCount, - "hint", "remaining wide-column encoders (zset/stream) have not landed yet") + "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix", + } + if r.pendingTTLOverflow > 0 { + fields = append(fields, + "pending_ttl_buffer_overflow", r.pendingTTLOverflow, + "pending_ttl_buffer_bytes_cap", r.pendingTTLBytesCap) + } + r.warn("redis_orphan_ttl", fields...) } return firstErr } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 152a55d4..5a01c247 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -579,11 +579,21 @@ func TestRedisDB_NoKeymapWhenAllReversible(t *testing.T) { func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Parallel() - // Codex P2 round 6: orphan TTL records (those with no prior - // HandleString/HandleHLL claim) must be counted only — the - // per-key payload would allocate proportional to user-key size - // and is unused before the wide-column encoders land. Drive a - // sample of orphan records and assert the count, not a buffer. + // Codex P1 (PR #790): orphan TTL records are now BUFFERED in + // pendingTTL during intake — the wide-column state-init + // functions need to drain them when a typed record finally + // registers a user key. The buffer holds (string-userKey, + // uint64-expireAt) pairs; the per-record allocation cost is the + // same as kindByKey's, which we already pay for every typed + // record. The original Codex P2 round 6 concern (don't buffer + // arbitrarily-large payload bytes) is preserved — we still + // don't keep the value bytes. + // + // At Finalize, entries still in pendingTTL are counted as truly + // unmatched orphans. This test now asserts: + // - During intake: orphanTTLCount stays at 0, pendingTTL grows. + // - After Finalize: orphanTTLCount == n (no typed record ever + // drained the entries). db, _ := newRedisDB(t) const n = 10_000 for i := 0; i < n; i++ { @@ -593,8 +603,149 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Fatalf("HandleTTL[%d]: %v", i, err) } } + if db.orphanTTLCount != 0 { + t.Fatalf("orphanTTLCount = %d at intake, want 0 (buffered)", db.orphanTTLCount) + } + if len(db.pendingTTL) != n { + t.Fatalf("pendingTTL len = %d, want %d", len(db.pendingTTL), n) + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } if db.orphanTTLCount != n { - t.Fatalf("orphanTTLCount = %d, want %d", db.orphanTTLCount, n) + t.Fatalf("orphanTTLCount = %d after Finalize, want %d", db.orphanTTLCount, n) + } +} + +// TestRedisDB_PendingTTLFailsClosedAtByteCap pins the codex P1 fix +// on PR #790 round 6: the byte budget (not entry count) bounds +// pendingTTL memory. When an incoming entry's byte cost would +// exceed pendingTTLBytesCap, HandleTTL fails closed with +// ErrPendingTTLBufferFull rather than silently counting it as an +// orphan. Without this, an adversarial snapshot with a small +// number of 1 MiB unmatched TTL keys could still OOM the decoder +// — defeating the entry-count cap from r4/r5. +func TestRedisDB_PendingTTLFailsClosedAtByteCap(t *testing.T) { + t.Parallel() + // Each "orphan-N" key (N=0..7) is 8 bytes. With + // pendingTTLEntryOverheadBytes=8, each entry costs 16 bytes. + // 8 entries = 128 bytes; 9th would push to 144. Cap = 128 → + // 8 fit, 9th fails closed. + const byteCap = 128 + const entriesPerByteCap = 8 + db, _ := newRedisDB(t) + db.WithPendingTTLByteCap(byteCap) + for i := 0; i < entriesPerByteCap; i++ { + key := []byte("orphan-" + intToDecimal(i)) + ms := uint64(i) + 1 //nolint:gosec // i bounded above + if err := db.HandleTTL(key, encodeTTLValue(ms)); err != nil { + t.Fatalf("HandleTTL[%d]: %v (should succeed within byte_cap)", i, err) + } + } + if got := len(db.pendingTTL); got != entriesPerByteCap { + t.Fatalf("pendingTTL len = %d, want %d", got, entriesPerByteCap) + } + if got := db.pendingTTLBytes; got != byteCap { + t.Fatalf("pendingTTLBytes = %d, want %d", got, byteCap) + } + // One more entry should fail closed (would push to 144 > 128). + err := db.HandleTTL([]byte("orphan-X"), encodeTTLValue(999)) + if !errors.Is(err, ErrPendingTTLBufferFull) { + t.Fatalf("err = %v, want ErrPendingTTLBufferFull at byte_cap", err) + } + if got := db.pendingTTLOverflow; got != 1 { + t.Fatalf("pendingTTLOverflow = %d, want 1", got) + } + if got := db.orphanTTLCount; got != 0 { + t.Fatalf("orphanTTLCount = %d at intake, want 0 (failed closed)", got) + } +} + +// TestRedisDB_PendingTTLByteCapBoundedByLargeKey pins the +// large-key defense codex flagged on PR #790 round 6: a single +// 1 MiB key would have fit under any reasonable entry-count cap +// but exhausts a sensible byte budget. We use a synthetic small +// budget (64 bytes) and a 100-byte key to exercise the same logic +// without allocating a real 1 MiB blob. +func TestRedisDB_PendingTTLByteCapBoundedByLargeKey(t *testing.T) { + t.Parallel() + const byteCap = 64 + db, _ := newRedisDB(t) + db.WithPendingTTLByteCap(byteCap) + largeKey := make([]byte, 100) // 100 + 8 = 108 > 64 + for i := range largeKey { + largeKey[i] = byte(i % 256) + } + err := db.HandleTTL(largeKey, encodeTTLValue(1)) + if !errors.Is(err, ErrPendingTTLBufferFull) { + t.Fatalf("err = %v, want ErrPendingTTLBufferFull on oversize key", err) + } + if got := len(db.pendingTTL); got != 0 { + t.Fatalf("pendingTTL must be empty after failed insert, got %d", got) + } +} + +// TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim pins that the +// byte counter is decremented when an entry is drained via +// claimPendingTTL. A later wide-column state-init that drains the +// buffer must free byte budget so subsequent unknown-kind TTLs can +// be buffered again. +func TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + const byteCap = 32 // ("k0"=2 + 8) * 3 = 30 entries fit; 4th would overflow at 40 + db.WithPendingTTLByteCap(byteCap) + for i := 0; i < 3; i++ { + if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { + t.Fatalf("HandleTTL[%d]: %v", i, err) + } + } + if got := db.pendingTTLBytes; got != 30 { + t.Fatalf("pendingTTLBytes = %d, want 30", got) + } + // Drain k0 via a wide-column state-init. + if err := db.HandleSetMember(setMemberKey("k0", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if got := db.pendingTTLBytes; got != 20 { + t.Fatalf("pendingTTLBytes after drain = %d, want 20 (reclaimed 10 bytes)", got) + } + // New buffer space available — another small entry fits. + if err := db.HandleTTL([]byte("k3"), encodeTTLValue(1)); err != nil { + t.Fatalf("HandleTTL after drain failed: %v", err) + } +} + +// TestRedisDB_WithPendingTTLByteCapZeroOpts pins the explicit +// counter-only mode: byte_cap==0 disables buffering entirely and +// every unknown-kind TTL becomes an immediate orphan WITHOUT +// firing ErrPendingTTLBufferFull. +func TestRedisDB_WithPendingTTLByteCapZeroOpts(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + db.WithPendingTTLByteCap(0) + const n = 5 + for i := 0; i < n; i++ { + if err := db.HandleTTL([]byte("k"+intToDecimal(i)), encodeTTLValue(1)); err != nil { + t.Fatalf("HandleTTL[%d]: %v (counter-only mode must not fail)", i, err) + } + } + if got := len(db.pendingTTL); got != 0 { + t.Fatalf("pendingTTL len = %d, want 0 (cap disabled)", got) + } + if got := db.orphanTTLCount; got != n { + t.Fatalf("orphanTTLCount = %d, want %d", got, n) + } +} + +// TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero pins +// input sanitisation. +func TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + db.WithPendingTTLByteCap(-100) + if db.pendingTTLBytesCap != 0 { + t.Fatalf("pendingTTLBytesCap = %d after WithPendingTTLByteCap(-100), want 0", db.pendingTTLBytesCap) } } diff --git a/internal/backup/redis_zset.go b/internal/backup/redis_zset.go new file mode 100644 index 00000000..fbdfb278 --- /dev/null +++ b/internal/backup/redis_zset.go @@ -0,0 +1,513 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "math" + "path/filepath" + "sort" + + pb "github.com/bootjp/elastickv/proto" + cockroachdberr "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" +) + +// Redis zset encoder. Translates raw !zs|... snapshot records into the +// per-zset `zsets/.json` shape defined by Phase 0 +// (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md). +// +// Wire format mirrors store/zset_helpers.go: +// - !zs|meta| → 8-byte BE Len +// - !zs|mem| → 8-byte IEEE 754 score +// - !zs|scr| → empty (score index) +// - !zs|meta|d|<...> → 8-byte LenDelta +// +// Routing rules: +// - `!zs|mem|` is the source of truth: it carries both the member name +// (in the trailing key bytes) and its IEEE 754 score (in the value). +// - `!zs|scr|` is a secondary index used at scan time on the live side; +// for backup purposes it is redundant and is silently skipped, the +// same way `!st|scr|` (no such prefix exists) and `!hs|meta|d|` +// deltas are skipped by the hash/set encoders. +// - `!zs|meta|d|` deltas are silently skipped; `!zs|mem|` already +// reflects the post-delta state at backup time. +const ( + RedisZSetMetaPrefix = "!zs|meta|" + RedisZSetMemberPrefix = "!zs|mem|" + RedisZSetScorePrefix = "!zs|scr|" + RedisZSetMetaDeltaPrefix = "!zs|meta|d|" + + // RedisZSetLegacyBlobPrefix is the consolidated single-key + // layout the live store still writes for non-empty persisted + // zsets (`adapter/redis_compat_types.go:82` redisZSetPrefix, + // produced by `adapter/redis_compat_commands.go:3495-3508` and + // read by `adapter/redis_compat_helpers.go:610-631` as the + // fallback when no wide-column members exist). A backup that + // skipped this prefix would silently drop legacy-only zsets; + // HandleZSetLegacyBlob decodes the blob and registers the same + // per-member state HandleZSetMeta + HandleZSetMember would. + // Codex P1 finding on PR #790 (round 2). + RedisZSetLegacyBlobPrefix = "!redis|zset|" + + // redisZSetScoreSize is the size of the IEEE 754 big-endian score + // stored in !zs|mem| values. Same constant as zsetMetaSizeBytes in + // store/zset_helpers.go; duplicated here to keep the backup + // package free of internal/storage imports. + redisZSetScoreSize = 8 + + // redisZSetLegacyProtoPrefixLen is the on-disk magic prefix size + // for `!redis|zset|` values + // (`adapter/redis_storage_codec.go:15` storedRedisZSetProtoPrefix). + redisZSetLegacyProtoPrefixLen = 4 +) + +// redisZSetLegacyProtoPrefix mirrors +// adapter/redis_storage_codec.go:15 storedRedisZSetProtoPrefix. A +// rename on the live side without an accompanying backup update +// surfaces as ErrRedisInvalidZSetLegacyBlob on decode of any real +// cluster dump. +var redisZSetLegacyProtoPrefix = []byte{0x00, 'R', 'Z', 0x01} + +// ErrRedisInvalidZSetMeta is returned when an !zs|meta| value is not +// the expected 8-byte big-endian member count. +var ErrRedisInvalidZSetMeta = cockroachdberr.New("backup: invalid !zs|meta| value") + +// ErrRedisInvalidZSetLegacyBlob is returned when a `!redis|zset|` +// value's magic prefix is missing, its protobuf body fails to +// unmarshal, or its decoded scores include NaN. Fail-closed for +// the same reason as ErrRedisInvalidZSetMember: silently accepting +// a corrupt blob would lose the entire zset's contents at restore. +var ErrRedisInvalidZSetLegacyBlob = cockroachdberr.New("backup: invalid !redis|zset| value") + +// ErrRedisInvalidZSetMember is returned when an !zs|mem| value is not +// the expected 8-byte IEEE 754 score, or contains a NaN score (Redis's +// ZADD command rejects NaN, so a NaN at backup time indicates store +// corruption and a silent fall-through would re-corrupt the restored +// cluster). +var ErrRedisInvalidZSetMember = cockroachdberr.New("backup: invalid !zs|mem| value") + +// ErrRedisInvalidZSetKey is returned when an !zs| key cannot be parsed +// for its userKeyLen+userKey (or member) segments. +var ErrRedisInvalidZSetKey = cockroachdberr.New("backup: malformed !zs| key") + +// redisZSetState buffers one userKey's zset during a snapshot scan. +// Members are stored as a map keyed by their byte string so duplicate +// HandleZSetMember calls collapse to the latest-wins score (matching +// Redis's ZADD semantics: re-adding a member overwrites the prior +// score). +// +// sawWide tracks whether ANY wide-column row (`!zs|meta|` or +// `!zs|mem|`) has been observed for the user key. This mirrors the +// live read path's source-of-truth selection in +// `adapter/redis_compat_helpers.go:610-620` +// (`RedisServer.loadZSetAt`): when wide-column rows exist they are +// authoritative, and any `!redis|zset|` legacy blob is ignored +// (treated as a stale post-migration leftover). Without this flag, +// the encoder would merge stale legacy members on top of the +// wide-column source-of-truth, surfacing deleted or outdated +// entries in the restored zset. Codex P1 round 3 (PR #790). +type redisZSetState struct { + metaSeen bool + declaredLen int64 + members map[string]float64 + sawWide bool + expireAtMs uint64 + hasTTL bool +} + +// HandleZSetMeta processes one !zs|meta| record. The +// value is the 8-byte BE member count. We park the declared length so +// flushZSets can warn on a mismatch with the observed member count +// and register the user key so a later !redis|ttl| record +// routes back to this zset state. +// +// !zs|meta|d|... delta keys share the !zs|meta| string prefix, so a +// snapshot dispatcher that routes by "starts with RedisZSetMetaPrefix" +// lands delta records here too. We mirror the hash/set encoder policy +// (PRs #725, #758) and silently skip the delta family because +// !zs|mem| records are the source of truth for restored zset +// contents. +func (r *RedisDB) HandleZSetMeta(key, value []byte) error { + if bytes.HasPrefix(key, []byte(RedisZSetMetaDeltaPrefix)) { + return nil + } + userKey, ok := parseZSetMetaKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetKey, "meta key: %q", key) + } + if len(value) != redisUint64Bytes { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMeta, + "length %d != %d", len(value), redisUint64Bytes) + } + // Bounds-check the uint64 declared count before narrowing to + // int64; without this a corrupted store with the high bit set + // would wrap to a negative declaredLen and fire spurious + // redis_zset_length_mismatch warnings on every flush. Mirrors + // the hash/list/set encoders' symmetric guard. + rawLen := binary.BigEndian.Uint64(value) + if rawLen > math.MaxInt64 { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMeta, + "declared len %d overflows int64", rawLen) + } + st := r.zsetState(userKey) + st.declaredLen = int64(rawLen) //nolint:gosec // bounds-checked above + st.metaSeen = true + // Wide-column meta means any legacy blob already merged into + // this state is stale; drop it. The live read path + // (adapter/redis_compat_helpers.go:610-620) makes the same + // choice on read. + r.markZSetWide(st) + return nil +} + +// HandleZSetMember processes one !zs|mem| +// record. The value is the 8-byte IEEE 754 big-endian score. The +// member bytes live in the key (binary-safe). +// +// NaN scores are rejected. Redis's ZADD command itself rejects NaN +// at the wire level, so a NaN at backup time indicates either a +// corrupted store or a write path that bypassed ZADD validation — +// either way, emitting it as `score: null` or letting json.Marshal +// fail mid-flush would corrupt the dump silently. Fail closed here +// so the problem surfaces at the source record. +func (r *RedisDB) HandleZSetMember(key, value []byte) error { + userKey, member, ok := parseZSetMemberKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetKey, "member key: %q", key) + } + if len(value) != redisZSetScoreSize { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMember, + "length %d != %d", len(value), redisZSetScoreSize) + } + score := math.Float64frombits(binary.BigEndian.Uint64(value)) + if math.IsNaN(score) { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetMember, + "NaN score for member of user key %q (length %d)", userKey, len(userKey)) + } + st := r.zsetState(userKey) + // First wide-column row evicts any provisional legacy-blob + // members on this user key. From here on, the wide-column + // rows are the authoritative source of truth — matches the + // live read path's preference in + // adapter/redis_compat_helpers.go:610-620. + r.markZSetWide(st) + st.members[string(member)] = score + return nil +} + +// markZSetWide flips the sawWide flag and, if this is the first +// wide-column observation for the state, clears any provisional +// legacy-blob members. The live read path treats `!redis|zset|` +// as a fallback that is ONLY consulted when no wide-column rows +// exist (`adapter/redis_compat_helpers.go::RedisServer.loadZSetAt` +// line 610-620), so a snapshot containing both layouts for the +// same user key MUST drop the legacy entries to avoid surfacing +// stale post-migration leftovers. Codex P1 round 3 (PR #790). +func (r *RedisDB) markZSetWide(st *redisZSetState) { + if st.sawWide { + return + } + st.sawWide = true + // Clear any legacy-blob members deposited before the first + // wide-column row arrived. Re-using the same map keeps the + // nil-vs-empty contract for empty-but-meta-seen zsets (an + // explicit `make` would change observable Finalize behavior + // for those edge cases). + for k := range st.members { + delete(st.members, k) + } +} + +// HandleZSetScore accepts and discards one !zs|scr|... record. The +// score index is a live-side secondary index for ZRANGEBYSCORE; the +// authoritative member→score mapping lives in !zs|mem|. Snapshot +// dispatchers should still route this prefix here (rather than to +// the orphan-record warning sink) so a future audit that greps for +// "every !zs| prefix has a handler" finds one. +func (r *RedisDB) HandleZSetScore(_, _ []byte) error { return nil } + +// HandleZSetMetaDelta accepts and discards one !zs|meta|d|... record. +// See HandleZSetMeta's docstring for the rationale; !zs|mem| is the +// source of truth at backup time. +func (r *RedisDB) HandleZSetMetaDelta(_, _ []byte) error { return nil } + +// HandleZSetLegacyBlob processes one `!redis|zset|` record. +// This is the consolidated single-key layout the live store still +// writes for non-empty persisted zsets (see RedisZSetLegacyBlobPrefix +// docstring). The encoded value is a magic-prefixed +// `pb.RedisZSetValue` carrying every (member, score) pair. +// +// Decoded entries land in the same per-key state HandleZSetMember +// would have produced, so the per-zset JSON output is identical +// regardless of which layout the live store used. NaN scores fail +// closed at intake, matching HandleZSetMember's contract. +// +// The legacy prefix `!redis|zset|` lex-sorts BEFORE `!zs|...` and +// BEFORE `!redis|ttl|`, so when a zset is stored in both formats +// (mid-migration), this handler creates the state first and the +// later wide-column records merge into it — duplicate +// HandleZSetMember calls follow the same latest-wins policy. +// +// `!redis|zset|` ALSO sorts BEFORE `!redis|ttl|`, so an inline TTL +// on the same user key will reach HandleTTL after this handler has +// already registered redisKindZSet. The HandleTTL redisKindZSet +// branch then folds the expiry into st.expireAtMs via zsetState +// (which itself drains pendingTTL — a no-op here since the typed +// record came first). +func (r *RedisDB) HandleZSetLegacyBlob(key, value []byte) error { + userKey, ok := parseZSetLegacyBlobKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, "key: %q", key) + } + entries, err := decodeZSetLegacyBlobValue(value) + if err != nil { + return err + } + st := r.zsetState(userKey) + if st.sawWide { + // Wide-column rows already arrived for this user key (rare + // in practice because !redis|zset| sorts before !zs|..., + // but possible with a custom dispatcher ordering or a mid- + // migration replay). The live read path ignores the legacy + // blob in that case, so we do too — applying these entries + // would surface stale post-migration leftovers in the dump. + return nil + } + for _, e := range entries { + st.members[e.member] = e.score + } + return nil +} + +// zsetLegacyEntry is the per-(member, score) projection extracted +// from a `!redis|zset|` blob's protobuf body. +type zsetLegacyEntry struct { + member string + score float64 +} + +// parseZSetLegacyBlobKey strips `!redis|zset|` and returns the +// user-key bytes. Unlike the wide-column meta key there is no +// userKeyLen prefix — the live store appends the user key directly +// (`adapter/redis_compat_types.go:177` ZSetKey). +func parseZSetLegacyBlobKey(key []byte) ([]byte, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisZSetLegacyBlobPrefix)) + if len(rest) == len(key) { + return nil, false + } + return rest, true +} + +// decodeZSetLegacyBlobValue strips the magic prefix and unmarshals +// the protobuf body into a slice of (member, score) entries. +// Rejects NaN scores (same fail-closed contract as +// HandleZSetMember). +func decodeZSetLegacyBlobValue(value []byte) ([]zsetLegacyEntry, error) { + if len(value) < redisZSetLegacyProtoPrefixLen || + !bytes.Equal(value[:redisZSetLegacyProtoPrefixLen], redisZSetLegacyProtoPrefix) { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, + "missing or corrupt magic prefix (len=%d)", len(value)) + } + msg := &pb.RedisZSetValue{} + if err := gproto.Unmarshal(value[redisZSetLegacyProtoPrefixLen:], msg); err != nil { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, + "unmarshal: %v", err) + } + out := make([]zsetLegacyEntry, 0, len(msg.GetEntries())) + for _, e := range msg.GetEntries() { + score := e.GetScore() + if math.IsNaN(score) { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, + "NaN score for member %q", e.GetMember()) + } + out = append(out, zsetLegacyEntry{member: e.GetMember(), score: score}) + } + return out, nil +} + +// zsetState lazily creates per-key state. Mirrors the hash/list/set +// kindByKey-registration pattern so HandleZSetMeta, HandleZSetMember, +// and the HandleTTL back-edge all agree on the kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!zs|...` (because `r` < `z`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the zset's +// `expire_at_ms`. Without this drain step, every TTL'd zset would +// restore as permanent. Codex P1 finding on PR #790. +func (r *RedisDB) zsetState(userKey []byte) *redisZSetState { + uk := string(userKey) + if st, ok := r.zsets[uk]; ok { + return st + } + st := &redisZSetState{members: make(map[string]float64)} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } + r.zsets[uk] = st + r.kindByKey[uk] = redisKindZSet + return st +} + +// parseZSetMetaKey strips !zs|meta| and the 4-byte BE userKeyLen +// prefix. Returns (userKey, true) on success. Delta keys +// (!zs|meta|d|...) share the meta string prefix and would otherwise +// be parsed as base-meta with a garbage userKeyLen — refuse them at +// the boundary so a misrouted delta surfaces a parse error rather +// than silent state corruption. Mirrors parseHashMetaKey/parseSetMetaKey. +func parseZSetMetaKey(key []byte) ([]byte, bool) { + if bytes.HasPrefix(key, []byte(RedisZSetMetaDeltaPrefix)) { + return nil, false + } + rest := bytes.TrimPrefix(key, []byte(RedisZSetMetaPrefix)) + if len(rest) == len(key) { + return nil, false + } + return parseUserKeyLenPrefix(rest) +} + +// parseZSetMemberKey strips !zs|mem| and the 4-byte BE userKeyLen +// prefix, then returns (userKey, member, true). The member bytes +// are everything after the userKey segment — binary-safe per +// Redis's ZADD contract. +func parseZSetMemberKey(key []byte) ([]byte, []byte, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisZSetMemberPrefix)) + if len(rest) == len(key) { + return nil, nil, false + } + userKey, ok := parseUserKeyLenPrefix(rest) + if !ok { + return nil, nil, false + } + member := rest[wideColumnUserKeyLenSize+len(userKey):] + return userKey, member, true +} + +// flushZSets writes one JSON file per accumulated zset to +// zsets/.json. Empty zsets (Len=0, no members) still emit a +// file when meta was seen, mirroring the hash/list/set encoders: +// their existence is observable to clients (TYPE returns "zset", +// ZCARD returns 0). Mismatched declared-vs-observed length surfaces +// an `redis_zset_length_mismatch` warning. +func (r *RedisDB) flushZSets() error { + return flushWideColumnDir(r, r.zsets, "zsets", func(dir, uk string, st *redisZSetState) error { + if r.warn != nil && st.metaSeen && int64(len(st.members)) != st.declaredLen { + r.warn("redis_zset_length_mismatch", + "user_key_len", len(uk), + "declared_len", st.declaredLen, + "observed_members", len(st.members), + "hint", "meta record's Len does not match the count of !zs|mem| keys for this user key") + } + return r.writeZSetJSON(dir, []byte(uk), st) + }) +} + +func (r *RedisDB) writeZSetJSON(dir string, userKey []byte, st *redisZSetState) error { + encoded := EncodeSegment(userKey) + if err := r.recordIfFallback(encoded, userKey); err != nil { + return err + } + path := filepath.Join(dir, encoded+".json") + body, err := marshalZSetJSON(st) + if err != nil { + return err + } + if err := writeFileAtomic(path, body); err != nil { + return cockroachdberr.WithStack(err) + } + return nil +} + +// zsetMemberRecord is the dump-format projection of one zset entry. +// `member` is binary-safe (via marshalRedisBinaryValue) and `score` +// is a JSON RawMessage so finite scores serialize as JSON numbers +// (matching the design example `"score": 100`) while ±Inf, which +// json.Marshal rejects, serialize as the conventional ZADD strings +// `"+inf"` / `"-inf"`. Phase 0b's reverse encoder reads both shapes. +type zsetMemberRecord struct { + Member json.RawMessage `json:"member"` + Score json.RawMessage `json:"score"` +} + +// marshalRedisZSetScore renders one IEEE 754 score as the dump's +// score field. ±Inf is emitted as a JSON string ("+inf"/"-inf") +// because json.Marshal returns an error for non-finite floats; this +// matches the conventional ZADD score syntax (Redis accepts "+inf", +// "-inf", "+Inf", "-Inf", and bare "inf"). NaN is unreachable here +// — HandleZSetMember rejects NaN scores at intake — so we do not +// emit a NaN representation. +func marshalRedisZSetScore(score float64) (json.RawMessage, error) { + if math.IsInf(score, 1) { + return json.RawMessage(`"+inf"`), nil + } + if math.IsInf(score, -1) { + return json.RawMessage(`"-inf"`), nil + } + out, err := json.Marshal(score) + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + return out, nil +} + +// marshalZSetJSON renders one zset state as the design's +// `{format_version, members, expire_at_ms}` JSON shape. Members are +// emitted as an array (not a JSON object) sorted by raw byte order +// of the member name so identical snapshots produce identical dump +// output across runs — same rationale as the hash/set encoders' +// arrays (binary-safe member names that would collide under JSON +// object keying when percent-encoded). Each member name goes through +// marshalRedisBinaryValue so non-UTF-8 bytes round-trip via the +// `{"base64":"..."}` envelope. +// +// Member sort key is the member-name bytes only; if a Phase 0b +// consumer relies on score-sorted output, it should re-sort on +// read. We pick name order here so a `diff -r` of two dumps with +// the same logical contents but mutated scores still produces a +// stable line-by-line diff. +// +// The wrapper shape (format_version + + expire_at_ms) is +// intentionally parallel to marshalHashJSON's because the design +// specifies parallel wrappers per Redis type. Collapsing them into a +// shared helper would require either a map[string]any (kills JSON +// field-order determinism) or a generic struct (can't carry distinct +// JSON tags per type), so we keep the structural parallel explicit. +// Reviewers comparing the two functions should diff +// (hashFieldRecord, "fields") against (zsetMemberRecord, "members") — +// the only intentional divergence. +func marshalZSetJSON(st *redisZSetState) ([]byte, error) { //nolint:dupl // see comment above + names := make([]string, 0, len(st.members)) + for m := range st.members { + names = append(names, m) + } + sort.Strings(names) + out := make([]zsetMemberRecord, 0, len(names)) + for _, name := range names { + nameJSON, err := marshalRedisBinaryValue([]byte(name)) + if err != nil { + return nil, err + } + scoreJSON, err := marshalRedisZSetScore(st.members[name]) + if err != nil { + return nil, err + } + out = append(out, zsetMemberRecord{Member: nameJSON, Score: scoreJSON}) + } + type record struct { + FormatVersion uint32 `json:"format_version"` + Members []zsetMemberRecord `json:"members"` + ExpireAtMs *uint64 `json:"expire_at_ms"` + } + rec := record{FormatVersion: 1, Members: out} + if st.hasTTL { + ms := st.expireAtMs + rec.ExpireAtMs = &ms + } + body, err := json.MarshalIndent(rec, "", " ") + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + return body, nil +} diff --git a/internal/backup/redis_zset_test.go b/internal/backup/redis_zset_test.go new file mode 100644 index 00000000..4cd76476 --- /dev/null +++ b/internal/backup/redis_zset_test.go @@ -0,0 +1,845 @@ +package backup + +import ( + "encoding/binary" + "encoding/json" + "math" + "os" + "path/filepath" + "testing" + + pb "github.com/bootjp/elastickv/proto" + "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" +) + +// encodeZSetLegacyBlobValue produces the magic-prefixed protobuf +// wire format the live store writes for `!redis|zset|` +// values (mirror of adapter/redis_storage_codec.go::marshalZSetValue). +func encodeZSetLegacyBlobValue(t *testing.T, entries []zsetLegacyEntry) []byte { + t.Helper() + msg := &pb.RedisZSetValue{} + for _, e := range entries { + msg.Entries = append(msg.Entries, &pb.RedisZSetEntry{ + Member: e.member, + Score: e.score, + }) + } + body, err := gproto.Marshal(msg) + if err != nil { + t.Fatalf("marshal pb.RedisZSetValue: %v", err) + } + out := make([]byte, 0, redisZSetLegacyProtoPrefixLen+len(body)) + out = append(out, redisZSetLegacyProtoPrefix...) + out = append(out, body...) + return out +} + +// zsetLegacyBlobKey is the test-side mirror of +// adapter/redis_compat_types.go:177 ZSetKey: +// `!redis|zset|` (no userKeyLen prefix). +func zsetLegacyBlobKey(userKey string) []byte { + out := []byte(RedisZSetLegacyBlobPrefix) + return append(out, userKey...) +} + +// encodeZSetMetaValue builds the 8-byte BE member-count value used by +// the live store/zset_helpers.go (mirror of store.MarshalZSetMeta). +func encodeZSetMetaValue(memberCount int64) []byte { + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, uint64(memberCount)) //nolint:gosec + return v +} + +// encodeZSetScoreValue builds the 8-byte IEEE 754 BE score value used by +// the live store/zset_helpers.go (mirror of store.MarshalZSetScore). +func encodeZSetScoreValue(score float64) []byte { + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, math.Float64bits(score)) + return v +} + +// zsetMetaKey is the test-side mirror of store.ZSetMetaKey: +// !zs|meta|. +func zsetMetaKey(userKey string) []byte { + out := []byte(RedisZSetMetaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + return append(out, userKey...) +} + +// zsetMemberKey mirrors store.ZSetMemberKey: +// !zs|mem|. Member is binary-safe. +func zsetMemberKey(userKey string, member []byte) []byte { + out := []byte(RedisZSetMemberPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + return append(out, member...) +} + +// zsetScoreKey mirrors store.ZSetScoreKey: +// !zs|scr|. The encoder +// must silently discard this prefix; we only build it here to feed +// HandleZSetScore in a test that pins the "discard" contract. +func zsetScoreKey(userKey string, sortableScore [8]byte, member []byte) []byte { + out := []byte(RedisZSetScorePrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + out = append(out, sortableScore[:]...) + return append(out, member...) +} + +// zsetMetaDeltaKey mirrors store.ZSetMetaDeltaKey: +// !zs|meta|d|. +func zsetMetaDeltaKey(userKey string, commitTS uint64, seqInTxn uint32) []byte { + out := []byte(RedisZSetMetaDeltaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + var ts [8]byte + binary.BigEndian.PutUint64(ts[:], commitTS) + out = append(out, ts[:]...) + var seq [4]byte + binary.BigEndian.PutUint32(seq[:], seqInTxn) + return append(out, seq[:]...) +} + +func readZSetJSON(t *testing.T, path string) map[string]any { + t.Helper() + b, err := os.ReadFile(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + t.Fatalf("unmarshal: %v", err) + } + return m +} + +func zsetMembersArray(t *testing.T, m map[string]any) []any { + t.Helper() + v, ok := m["members"] + if !ok { + t.Fatalf("members missing in %+v", m) + } + raw, ok := v.([]any) + if !ok { + t.Fatalf("members = %T(%v), want []any", v, v) + } + return raw +} + +func zsetFloat(t *testing.T, m map[string]any, key string) float64 { + t.Helper() + v, ok := m[key] + if !ok { + t.Fatalf("field %q missing in %+v", key, m) + } + f, ok := v.(float64) + if !ok { + t.Fatalf("field %q = %T(%v), want float64", key, v, v) + } + return f +} + +// zsetMemberEntry is the test-side projection of one decoded +// {"member":..., "score":...} record. +type zsetMemberEntry struct { + member string + scoreNum float64 + scoreStr string // populated only when score is the string form ("+inf"/"-inf") + scoreKind string // "number" or "string" +} + +// assertZSetEntryScore checks that one decoded record matches the +// expected entry. Extracted from assertZSetMembersEqual to keep both +// callers below the cyclop complexity ceiling and so the +// type-assertion guards live in one place. +func assertZSetEntryScore(t *testing.T, idx int, rec map[string]any, w zsetMemberEntry) { + t.Helper() + switch w.scoreKind { + case "number": + score, ok := rec["score"].(float64) + if !ok { + t.Fatalf("members[%d].score = %T(%v), want number", idx, rec["score"], rec["score"]) + } + if score != w.scoreNum { + t.Fatalf("members[%d].score = %v, want %v", idx, score, w.scoreNum) + } + case "string": + if rec["score"] != w.scoreStr { + t.Fatalf("members[%d].score = %v(%T), want %q (Inf string form)", idx, rec["score"], rec["score"], w.scoreStr) + } + default: + t.Fatalf("test bug: unknown scoreKind %q", w.scoreKind) + } +} + +// assertZSetMembersEqual checks that the JSON `members` array matches +// the given expected entries. +func assertZSetMembersEqual(t *testing.T, members []any, want []zsetMemberEntry) { + t.Helper() + if len(members) != len(want) { + t.Fatalf("len(members) = %d, want %d (got %v)", len(members), len(want), members) + } + for i, w := range want { + rec, ok := members[i].(map[string]any) + if !ok { + t.Fatalf("members[%d] = %T(%v), want object", i, members[i], members[i]) + } + if rec["member"] != w.member { + t.Fatalf("members[%d].member = %v, want %v", i, rec["member"], w.member) + } + assertZSetEntryScore(t, i, rec, w) + } +} + +// TestRedisDB_ZSetRoundTripBasic confirms a multi-member zset +// round-trips through the encoder, that members are sorted by +// raw byte order (NOT score order, see marshalZSetJSON's docstring), +// and that score values land on the JSON record as plain numbers. +func TestRedisDB_ZSetRoundTripBasic(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("leaderboard"), encodeZSetMetaValue(3)); err != nil { + t.Fatalf("HandleZSetMeta: %v", err) + } + // Submit out of byte order and out of score order to exercise the sort. + for _, e := range []struct { + member string + score float64 + }{ + {"charlie", 30}, + {"alice", 100}, + {"bob", 50}, + } { + if err := db.HandleZSetMember(zsetMemberKey("leaderboard", []byte(e.member)), encodeZSetScoreValue(e.score)); err != nil { + t.Fatalf("HandleZSetMember(%s): %v", e.member, err) + } + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "leaderboard.json")) + if zsetFloat(t, got, "format_version") != 1 { + t.Fatalf("format_version = %v", got["format_version"]) + } + if got["expire_at_ms"] != nil { + t.Fatalf("expire_at_ms must be nil without TTL, got %v", got["expire_at_ms"]) + } + // Sorted by member bytes: alice < bob < charlie. + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 100, scoreKind: "number"}, + {member: "bob", scoreNum: 50, scoreKind: "number"}, + {member: "charlie", scoreNum: 30, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetEmptyZSetStillEmitsFile mirrors the hash/list/set +// emit-empty rule: ZCARD==0 is observable to clients (TYPE returns +// "zset"), so the dump must preserve existence. +func TestRedisDB_ZSetEmptyZSetStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("empty"), encodeZSetMetaValue(0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "empty.json")) + if members := zsetMembersArray(t, got); len(members) != 0 { + t.Fatalf("empty zset should emit empty members array, got %v", members) + } +} + +// TestRedisDB_ZSetTTLInlinedFromScanIndex pins that !redis|ttl| +// records for a zset user key fold into the zset's JSON expire_at_ms, +// not a separate sidecar (the strings/HLL pattern). +func TestRedisDB_ZSetTTLInlinedFromScanIndex(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("k"), encodeZSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), encodeZSetScoreValue(1.5)); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if zsetFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d", got["expire_at_ms"], fixedExpireMs) + } + if _, err := os.Stat(filepath.Join(root, "redis", "db_0", "zsets_ttl.jsonl")); !os.IsNotExist(err) { + t.Fatalf("unexpected zset TTL sidecar: stat err=%v", err) + } +} + +// TestRedisDB_ZSetLengthMismatchWarns pins the warn-on-mismatch +// contract — same shape as the hash/list/set encoders. +func TestRedisDB_ZSetLengthMismatchWarns(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleZSetMeta(zsetMetaKey("z"), encodeZSetMetaValue(5)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("only")), encodeZSetScoreValue(1)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + want := "redis_zset_length_mismatch" + found := false + for _, e := range events { + if e == want { + found = true + break + } + } + if !found { + t.Fatalf("expected %q warning, got %v", want, events) + } +} + +// TestRedisDB_ZSetBinaryMemberUsesBase64Envelope confirms non-UTF-8 +// member bytes round-trip via the typed `{"base64":"..."}` envelope. +func TestRedisDB_ZSetBinaryMemberUsesBase64Envelope(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("blob"), encodeZSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("blob", []byte{0x80, 0xff, 0x01}), encodeZSetScoreValue(42)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "blob.json")) + members := zsetMembersArray(t, got) + if len(members) != 1 { + t.Fatalf("len(members) = %d, want 1", len(members)) + } + rec, ok := members[0].(map[string]any) + if !ok { + t.Fatalf("members[0] = %T(%v), want object", members[0], members[0]) + } + envelope, ok := rec["member"].(map[string]any) + if !ok { + t.Fatalf("expected base64 envelope on member, got %T(%v)", rec["member"], rec["member"]) + } + if envelope["base64"] == "" { + t.Fatalf("base64 envelope missing payload: %v", envelope) + } +} + +// TestRedisDB_ZSetHandleMetaSkipsDeltaKey pins that the !zs|meta|d|... +// family is silently skipped by HandleZSetMeta. Mirrors the +// hash/list/set delta-key guards. +func TestRedisDB_ZSetHandleMetaSkipsDeltaKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + deltaValue := make([]byte, 8) + if err := db.HandleZSetMeta(zsetMetaDeltaKey("k", 7, 0), deltaValue); err != nil { + t.Fatalf("delta key must be silently skipped, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(db.outRoot, "redis", "db_0", "zsets")); !os.IsNotExist(err) { + t.Fatalf("delta-only run should not create zsets/, stat err=%v", err) + } +} + +// TestRedisDB_ZSetHandleZSetMetaDelta exercises the explicit +// delta-handler entry point (the dispatcher routes !zs|meta|d| keys +// here directly when it can distinguish them from base meta). +func TestRedisDB_ZSetHandleZSetMetaDelta(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + if err := db.HandleZSetMetaDelta(zsetMetaDeltaKey("k", 1, 0), make([]byte, 8)); err != nil { + t.Fatalf("HandleZSetMetaDelta must accept any value, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } +} + +// TestRedisDB_ZSetHandleZSetScoreSilentlyDiscards pins that the +// !zs|scr| score-index prefix is dropped at intake. !zs|mem| carries +// the authoritative member→score mapping. +func TestRedisDB_ZSetHandleZSetScoreSilentlyDiscards(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var sortable [8]byte + binary.BigEndian.PutUint64(sortable[:], 0xdeadbeef) + if err := db.HandleZSetScore(zsetScoreKey("k", sortable, []byte("m")), nil); err != nil { + t.Fatalf("HandleZSetScore must not return errors, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + // No state was registered — no directory, no warning. + if _, err := os.Stat(filepath.Join(db.outRoot, "redis", "db_0", "zsets")); !os.IsNotExist(err) { + t.Fatalf("score-only run must not create zsets/, stat err=%v", err) + } +} + +// TestRedisDB_ZSetRejectsMalformedMetaValueLength pins that an +// !zs|meta| value of the wrong length surfaces as an error. +func TestRedisDB_ZSetRejectsMalformedMetaValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + err := db.HandleZSetMeta(zsetMetaKey("k"), []byte{0x00}) + if !errors.Is(err, ErrRedisInvalidZSetMeta) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMeta", err) + } +} + +// TestRedisDB_ZSetRejectsOverflowingMetaValue pins the high-bit +// overflow guard — same shape as hash + list + set encoders. +func TestRedisDB_ZSetRejectsOverflowingMetaValue(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + overflow := make([]byte, 8) + binary.BigEndian.PutUint64(overflow, 1<<63) + err := db.HandleZSetMeta(zsetMetaKey("k"), overflow) + if !errors.Is(err, ErrRedisInvalidZSetMeta) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMeta", err) + } +} + +// TestRedisDB_ZSetRejectsMalformedMemberValueLength pins that an +// !zs|mem| value of the wrong length surfaces as an error. The score +// MUST be exactly 8 bytes — anything else points at corruption or a +// wire-format change we want to detect at intake. +func TestRedisDB_ZSetRejectsMalformedMemberValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), []byte{0x00, 0x01, 0x02}) + if !errors.Is(err, ErrRedisInvalidZSetMember) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMember", err) + } +} + +// TestRedisDB_ZSetRejectsNaNScore pins that NaN scores fail closed. +// Redis's ZADD command itself rejects NaN at the wire level +// (Lua_redis_pcall → ZADD with NaN returns NOTNUM), so a NaN at +// backup time indicates corruption or a bypass; silently dumping +// `score: null` would re-corrupt the restored cluster. +func TestRedisDB_ZSetRejectsNaNScore(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + nan := make([]byte, 8) + binary.BigEndian.PutUint64(nan, math.Float64bits(math.NaN())) + err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), nan) + if !errors.Is(err, ErrRedisInvalidZSetMember) { + t.Fatalf("err=%v want ErrRedisInvalidZSetMember", err) + } +} + +// TestRedisDB_ZSetInfinityScoresUseStringForm pins that ±Inf +// scores serialize as the ZADD-conventional strings `"+inf"`/`"-inf"` +// because json.Marshal returns an error for non-finite floats. The +// per-record `score` field is a json.RawMessage so a finite score +// emits as a number AND an Inf score emits as a string, with the +// same key name. +func TestRedisDB_ZSetInfinityScoresUseStringForm(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMeta(zsetMetaKey("inf"), encodeZSetMetaValue(2)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("inf", []byte("top")), encodeZSetScoreValue(math.Inf(1))); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("inf", []byte("bottom")), encodeZSetScoreValue(math.Inf(-1))); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "inf.json")) + // Sorted by member name: bottom < top. + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "bottom", scoreStr: "-inf", scoreKind: "string"}, + {member: "top", scoreStr: "+inf", scoreKind: "string"}, + }) +} + +// TestRedisDB_ZSetParseMetaKeyRejectsDelta is the parser-level guard +// companion to the dispatcher skip — pins that a future refactor +// bypassing HandleZSetMeta's prefix check still surfaces a parse +// failure rather than silent state corruption. +func TestRedisDB_ZSetParseMetaKeyRejectsDelta(t *testing.T) { + t.Parallel() + if _, ok := parseZSetMetaKey(zsetMetaDeltaKey("k", 1, 0)); ok { + t.Fatalf("parseZSetMetaKey must reject delta-prefixed keys") + } +} + +// TestRedisDB_ZSetMembersWithoutMetaStillEmitsFile pins the +// members-without-meta contract: members may arrive before (or +// without) meta, and the encoder must still emit the JSON without +// firing the length-mismatch warning. Mirrors the set encoder's +// rule (PR #758). +func TestRedisDB_ZSetMembersWithoutMetaStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("a")), encodeZSetScoreValue(1)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "z.json")) + members := zsetMembersArray(t, got) + if len(members) != 1 { + t.Fatalf("len(members) = %d, want 1", len(members)) + } + for _, e := range events { + if e == "redis_zset_length_mismatch" { + t.Fatalf("members-without-meta must not fire length-mismatch warning, got events %v", events) + } + } +} + +// TestRedisDB_ZSetDuplicateMembersUseLatestScore pins ZADD's +// latest-wins semantics: re-adding a member with a different score +// overwrites the prior score. (Snapshot iterators don't typically +// emit duplicates, but a recovery replay across overlapping windows +// could.) +func TestRedisDB_ZSetDuplicateMembersUseLatestScore(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("m")), encodeZSetScoreValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("z", []byte("m")), encodeZSetScoreValue(2)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "z.json")) + // Latest-wins: the second HandleZSetMember overwrites score 1 with score 2. + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "m", scoreNum: 2, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetTTLArrivesBeforeRows pins the codex P1 fix: +// `!redis|ttl|` lex-sorts BEFORE `!zs|...` because `r` < `z`, +// so in a real Pebble snapshot's encoded-key order the TTL record +// arrives FIRST. The encoder MUST buffer the expiry in pendingTTL +// and drain it when the zset first registers via zsetState, +// inlining the value into the zset's JSON expire_at_ms. Without +// this drain step every TTL'd zset would restore as permanent — +// the P1 finding on PR #790 (chatgpt-codex-connector). +func TestRedisDB_ZSetTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Snapshot order: TTL first, then meta + member. + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMeta(zsetMetaKey("k"), encodeZSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("m")), encodeZSetScoreValue(1.5)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if zsetFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d — pendingTTL drain failed", got["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_SetTTLArrivesBeforeRows pins the same ordering fix +// for sets (`!redis|ttl|` lex-sorts before `!st|...` because +// `r` < `s`). Retroactive coverage for PR #758, which shipped the +// set encoder before the pendingTTL infrastructure existed. +func TestRedisDB_SetTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Snapshot order: TTL first, then meta + member. + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMeta(setMetaKey("k"), encodeSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("k", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "k.json")) + if setFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d — pendingTTL drain failed", got["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_OrphanTTLCountsTrulyUnmatchedKeys pins the post-fix +// orphan semantics: a TTL for a key that NEVER appears in any +// typed record (store corruption or unknown type prefix) is +// counted at Finalize, not silently dropped on intake. +func TestRedisDB_OrphanTTLCountsTrulyUnmatchedKeys(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleTTL([]byte("orphan"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + found := false + for _, e := range events { + if e == "redis_orphan_ttl" { + found = true + break + } + } + if !found { + t.Fatalf("expected redis_orphan_ttl warning, got %v", events) + } +} + +// TestRedisDB_ZSetMaxInt64DeclaredLen pins the math.MaxInt64 +// boundary — declaredLen=math.MaxInt64 must be accepted, only > that +// rejected. +func TestRedisDB_ZSetMaxInt64DeclaredLen(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + boundary := make([]byte, 8) + binary.BigEndian.PutUint64(boundary, math.MaxInt64) // exactly the int64 max — must NOT reject + if err := db.HandleZSetMeta(zsetMetaKey("k"), boundary); err != nil { + t.Fatalf("math.MaxInt64 boundary must be accepted, got %v", err) + } +} + +// TestRedisDB_ZSetLegacyBlobRoundTrip pins the codex P1 fix: a +// zset stored only via the legacy `!redis|zset|` blob +// must surface in the dump with all its members. Without +// HandleZSetLegacyBlob, the encoder would skip the record and +// produce an empty zsets/ output for that key — silent backup +// data loss. +func TestRedisDB_ZSetLegacyBlobRoundTrip(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "alice", score: 100}, + {member: "bob", score: 50}, + {member: "charlie", score: 30}, + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("leaderboard"), value); err != nil { + t.Fatalf("HandleZSetLegacyBlob: %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "leaderboard.json")) + // Members sorted by member-name bytes (matches the wide-column + // encoder's output policy). + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 100, scoreKind: "number"}, + {member: "bob", scoreNum: 50, scoreKind: "number"}, + {member: "charlie", scoreNum: 30, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetWideColumnSupersedesLegacyBlob pins the codex +// P1 round 3 fix: when a snapshot carries BOTH the legacy +// `!redis|zset|` blob AND wide-column `!zs|mem|...` rows for +// the same user key (a mid-migration state, or a stale post- +// migration leftover), the encoder MUST drop the legacy entries +// and use only the wide-column source-of-truth. This matches the +// live read path in adapter/redis_compat_helpers.go:610-620 +// (RedisServer.loadZSetAt) which falls back to the legacy blob +// ONLY when no wide-column rows exist. +// +// Without this fix, the dump would surface deleted or outdated +// members from a stale legacy blob — silent backup corruption. +func TestRedisDB_ZSetWideColumnSupersedesLegacyBlob(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Snapshot order: legacy blob arrives first (sorts before !zs|). + // "bob-stale" exists ONLY in the legacy blob — a stale + // post-migration leftover that the live store hides via the + // loadZSetAt preference for wide-column rows. + legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "alice", score: 1}, + {member: "bob-stale", score: 2}, + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil { + t.Fatal(err) + } + // Wide-column source-of-truth: alice with a new score, charlie new. + // "bob-stale" is intentionally absent — its presence in the legacy + // blob is the stale-leftover scenario this test guards against. + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("alice")), encodeZSetScoreValue(99)); err != nil { + t.Fatal(err) + } + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("charlie")), encodeZSetScoreValue(3)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + // "bob-stale" MUST NOT appear — wide-column rows supersede the legacy blob. + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 99, scoreKind: "number"}, + {member: "charlie", scoreNum: 3, scoreKind: "number"}, + }) +} + +// TestRedisDB_ZSetMetaAloneEvictsLegacyBlob pins that a `!zs|meta|` +// record (even without any `!zs|mem|` rows) also triggers the +// legacy-supersedence rule. The live read path's loadZSetAt +// preference is based on whether ANY wide-column key exists for +// the user key — meta-only is rare in practice but the encoder +// stays consistent with the read-side contract. +func TestRedisDB_ZSetMetaAloneEvictsLegacyBlob(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "stale", score: 1}, + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil { + t.Fatal(err) + } + // Wide-column meta record declares zero members. + if err := db.HandleZSetMeta(zsetMetaKey("k"), encodeZSetMetaValue(0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if members := zsetMembersArray(t, got); len(members) != 0 { + t.Fatalf("meta-only wide-column row MUST evict legacy blob, got members=%v", members) + } +} + +// TestRedisDB_ZSetLegacyBlobAfterWideRowsIsDropped pins the +// reverse-order edge case: if a snapshot somehow emits a +// `!redis|zset|` AFTER `!zs|mem|` rows (custom dispatcher +// ordering, or a replay), the legacy blob is dropped at intake +// rather than overwriting the established wide-column state. +func TestRedisDB_ZSetLegacyBlobAfterWideRowsIsDropped(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Wide-column row first — registers sawWide. + if err := db.HandleZSetMember(zsetMemberKey("k", []byte("alice")), encodeZSetScoreValue(99)); err != nil { + t.Fatal(err) + } + // Legacy blob arrives later — must be ignored. + legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{ + {member: "alice", score: 1}, // would overwrite if not gated + {member: "stale", score: 2}, // would re-add if not gated + }) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{ + {member: "alice", scoreNum: 99, scoreKind: "number"}, // unchanged + }) +} + +// TestRedisDB_ZSetLegacyBlobWithInlineTTL pins that a TTL'd zset +// stored only via the legacy blob round-trips its expiry. The +// snapshot order is `!redis|zset|` (sorts before `!redis|ttl|`), +// so HandleZSetLegacyBlob runs first and registers redisKindZSet, +// then HandleTTL routes via the redisKindZSet branch (no +// pendingTTL detour needed for this ordering). +func TestRedisDB_ZSetLegacyBlobWithInlineTTL(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: 1}}) + if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), value); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json")) + if zsetFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d", got["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_ZSetLegacyBlobRejectsMissingMagic pins fail-closed +// behaviour: a `!redis|zset|` value without the magic prefix +// fails at intake rather than silently decoding garbage protobuf. +func TestRedisDB_ZSetLegacyBlobRejectsMissingMagic(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + body, err := gproto.Marshal(&pb.RedisZSetValue{}) + if err != nil { + t.Fatalf("marshal: %v", err) + } + err = db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), body) // no magic prefix + if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) { + t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err) + } +} + +// TestRedisDB_ZSetLegacyBlobRejectsNaNScore pins NaN-fail-closed +// parallel to HandleZSetMember's contract. Redis ZADD rejects NaN +// at the wire level, so a NaN in storage indicates corruption. +func TestRedisDB_ZSetLegacyBlobRejectsNaNScore(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: math.NaN()}}) + err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), value) + if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) { + t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err) + } +} + +// TestRedisDB_ZSetLegacyBlobRejectsMalformedKey pins that a +// `!redis|zset|` key with no trailing user-key bytes fails parse. +func TestRedisDB_ZSetLegacyBlobRejectsMalformedKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: 1}}) + // Key has the prefix but no trailing user-key bytes — parser must + // still accept it (empty user key is technically valid Redis). + // Use a key that doesn't have the prefix to trigger the parse + // failure. + err := db.HandleZSetLegacyBlob([]byte("not-the-right-prefix|k"), value) + if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) { + t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err) + } +}