diff --git a/internal/backup/redis_hash.go b/internal/backup/redis_hash.go index 38a9b410..bacdcd78 100644 --- a/internal/backup/redis_hash.go +++ b/internal/backup/redis_hash.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "encoding/binary" "encoding/json" + "math" "path/filepath" "sort" "unicode/utf8" @@ -71,8 +72,21 @@ func (r *RedisDB) HandleHashMeta(key, value []byte) error { return cockroachdberr.Wrapf(ErrRedisInvalidHashMeta, "length %d != %d", len(value), redisUint64Bytes) } + // Bounds-check the uint64 field count before narrowing to int64. + // Without this, a corrupted store value with the high bit set + // would wrap to a negative declaredLen and fire spurious + // `redis_hash_length_mismatch` warnings on every flush. Mirrors + // the list encoder's symmetric guard (redis_list.go) so both + // wide-column encoders fail closed on the same shape of + // corruption. Round-2 review on PR #755 — backported from list + // encoder for cross-encoder consistency. + rawLen := binary.BigEndian.Uint64(value) + if rawLen > math.MaxInt64 { + return cockroachdberr.Wrapf(ErrRedisInvalidHashMeta, + "declared len %d overflows int64", rawLen) + } st := r.hashState(userKey) - st.declaredLen = int64(binary.BigEndian.Uint64(value)) //nolint:gosec // signed int64 by design + st.declaredLen = int64(rawLen) //nolint:gosec // bounds-checked above st.metaSeen = true return nil } @@ -180,23 +194,7 @@ func parseUserKeyLenPrefix(b []byte) ([]byte, bool) { // HLEN). Mismatched declared-vs-observed length surfaces an // `redis_hash_length_mismatch` warning. func (r *RedisDB) flushHashes() error { - if len(r.hashes) == 0 { - return nil - } - dir := filepath.Join(r.dbDir(), "hashes") - if err := r.ensureDir(dir); err != nil { - return err - } - // Stable order across runs (Codex pattern from #716): sort by user - // key before flushing so identical snapshots produce identical - // dump output regardless of Go's randomised map iteration. - userKeys := make([]string, 0, len(r.hashes)) - for k := range r.hashes { - userKeys = append(userKeys, k) - } - sort.Strings(userKeys) - for _, uk := range userKeys { - st := r.hashes[uk] + return flushWideColumnDir(r, r.hashes, "hashes", func(dir, uk string, st *redisHashState) error { if r.warn != nil && st.metaSeen && int64(len(st.fields)) != st.declaredLen { r.warn("redis_hash_length_mismatch", "user_key_len", len(uk), @@ -204,11 +202,8 @@ func (r *RedisDB) flushHashes() error { "observed_fields", len(st.fields), "hint", "meta record's Len does not match the count of !hs|fld| keys for this user key") } - if err := r.writeHashJSON(dir, []byte(uk), st); err != nil { - return err - } - } - return nil + return r.writeHashJSON(dir, []byte(uk), st) + }) } func (r *RedisDB) writeHashJSON(dir string, userKey []byte, st *redisHashState) error { diff --git a/internal/backup/redis_hash_test.go b/internal/backup/redis_hash_test.go index b72e8df9..8344fc31 100644 --- a/internal/backup/redis_hash_test.go +++ b/internal/backup/redis_hash_test.go @@ -365,6 +365,28 @@ func TestRedisDB_HashRejectsMalformedMetaValueLength(t *testing.T) { } } +// TestRedisDB_HashRejectsOverflowingMetaValue pins the round-2 PR +// review backport: a uint64 field count with the high bit set wraps +// to a negative int64 declaredLen and would fire spurious +// `redis_hash_length_mismatch` warnings on every flush. The hash +// encoder must now refuse the value before narrowing, mirroring the +// list encoder's symmetric guard so both wide-column encoders fail +// closed on the same shape of corruption. +func TestRedisDB_HashRejectsOverflowingMetaValue(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + // High bit set — uint64 value > math.MaxInt64. + overflow := make([]byte, 8) + binary.BigEndian.PutUint64(overflow, 1<<63) + err := db.HandleHashMeta(hashMetaKey("k"), overflow) + if err == nil { + t.Fatalf("expected error for overflowing meta value") + } + if !errors.Is(err, ErrRedisInvalidHashMeta) { + t.Fatalf("err=%v want ErrRedisInvalidHashMeta", err) + } +} + // TestRedisDB_HashHandleHashMetaSkipsDeltaKey is the regression for // Codex P1 round 14 (PR #725 #13): the delta prefix `!hs|meta|d|` // shares the base meta prefix `!hs|meta|`, so a snapshot dispatcher diff --git a/internal/backup/redis_list.go b/internal/backup/redis_list.go new file mode 100644 index 00000000..bcc1bc40 --- /dev/null +++ b/internal/backup/redis_list.go @@ -0,0 +1,274 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "math" + "path/filepath" + "sort" + + cockroachdberr "github.com/cockroachdb/errors" +) + +// Redis list encoder. Translates raw !lst|... snapshot records into the +// per-list `lists/.json` shape defined by Phase 0 +// (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md). +// +// Three on-disk key families share the `!lst|` namespace; only two +// carry restorable state: +// +// - !lst|meta| -> 24-byte (Head, Tail, Len) blob +// - !lst|itm| -> raw item bytes (Redis lists +// are binary-safe) +// - !lst|claim|... -> POP tombstone for OCC +// uniqueness. The live read +// path (rangeList → +// fetchListRange in redis.go) +// does NOT consult claims — +// POPs also Del the underlying +// item key in the same OCC +// commit, so a snapshot taken +// after a POP commit has no +// item record for the popped +// seq. The encoder therefore +// skips claim keys entirely. +// - !lst|meta|d|... -> meta delta. The hash encoder +// skips its analogous deltas +// and treats !hs|fld| as the +// source of truth; the list +// encoder mirrors that policy +// — !lst|itm| keys are the +// source of truth and the +// delta arithmetic is not +// replayed at backup time. +const ( + ListMetaPrefix = "!lst|meta|" + ListItemPrefix = "!lst|itm|" + ListMetaDeltaPrefix = "!lst|meta|d|" + ListClaimPrefix = "!lst|claim|" + + // listMetaBinarySize mirrors store/list_helpers.go (24 bytes: + // Head(8) + Tail(8) + Len(8)). Re-declared here rather than + // imported because the backup package is intentionally adapter- + // and store-independent. + listMetaBinarySize = 24 + + // listSeqBytes is the fixed width of the trailing sortable-int64 + // sequence number in an !lst|itm| key. + listSeqBytes = 8 +) + +// ErrRedisInvalidListMeta is returned when an !lst|meta| value is not +// the expected 24-byte (Head, Tail, Len) layout. +var ErrRedisInvalidListMeta = cockroachdberr.New("backup: invalid !lst|meta| value") + +// ErrRedisInvalidListKey is returned when an !lst| key cannot be parsed +// for its userKey + (optional) seq segments. +var ErrRedisInvalidListKey = cockroachdberr.New("backup: malformed !lst| key") + +// redisListState buffers one userKey's list during a snapshot scan. +// items is keyed by signed-int64 sequence so the seq-ordering at +// flush time matches the live store's left-to-right order regardless +// of the order in which !lst|itm| records arrive at the dispatcher. +type redisListState struct { + metaSeen bool + declaredLen int64 + items map[int64][]byte + expireAtMs uint64 + hasTTL bool +} + +// HandleListMeta processes one !lst|meta| record. The value is +// the 24-byte (Head, Tail, Len) layout. We park the declared length so +// flushLists can warn on a mismatch with the observed item count and +// register the user key so a later !redis|ttl| record routes +// back to this list state. +// +// !lst|meta|d|... delta keys share the !lst|meta| string +// prefix, so a snapshot dispatcher that routes by "starts with +// ListMetaPrefix" lands delta records here too. The hash encoder +// solved the analogous problem (Codex P1 round 14 PR #725) by silently +// skipping the delta family; we mirror that policy because !lst|itm| +// records are the source of truth for the restored list contents and +// the delta arithmetic does not need to be replayed at backup time. +func (r *RedisDB) HandleListMeta(key, value []byte) error { + if bytes.HasPrefix(key, []byte(ListMetaDeltaPrefix)) { + return nil + } + userKey, ok := parseListMetaKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidListKey, "meta key: %q", key) + } + if len(value) != listMetaBinarySize { + return cockroachdberr.Wrapf(ErrRedisInvalidListMeta, + "length %d != %d", len(value), listMetaBinarySize) + } + // Length is the only field needed at backup time. Head/Tail are + // recomputable from the observed seqs and the live store's + // invariant (Tail = Head + Len), so we deliberately do not + // persist them. + rawLen := binary.BigEndian.Uint64(value[16:24]) + if rawLen > math.MaxInt64 { + return cockroachdberr.Wrapf(ErrRedisInvalidListMeta, + "declared len %d overflows int64", rawLen) + } + st := r.listState(userKey) + st.declaredLen = int64(rawLen) //nolint:gosec // bounds-checked above + st.metaSeen = true + return nil +} + +// HandleListItem processes one !lst|itm| +// record. The value is the raw item bytes (binary-safe). The seq is +// the trailing 8-byte sortable-int64 — sortable encoding flips the +// sign bit so a forward byte-ordered scan yields ascending int64, +// which matches the live store's left-to-right read order. +func (r *RedisDB) HandleListItem(key, value []byte) error { + userKey, seq, ok := parseListItemKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidListKey, "item key: %q", key) + } + st := r.listState(userKey) + st.items[seq] = bytes.Clone(value) + return nil +} + +// HandleListClaim accepts and discards one !lst|claim|... record. See +// the file-level comment: the live read path does not consult claims; +// POP'd item keys are deleted in the same OCC commit. Restored lists +// therefore reflect the post-POP state without any claim replay. +func (r *RedisDB) HandleListClaim(_, _ []byte) error { return nil } + +// HandleListMetaDelta accepts and discards one !lst|meta|d|... record. +// See HandleListMeta's docstring for the rationale; !lst|itm| is the +// source of truth at backup time. +func (r *RedisDB) HandleListMetaDelta(_, _ []byte) error { return nil } + +// listState lazily creates per-key state. Mirrors the hashState +// kindByKey-registration pattern (PR #725 #1/#3) so HandleListMeta, +// HandleListItem, and the HandleTTL back-edge all agree on the kind. +func (r *RedisDB) listState(userKey []byte) *redisListState { + uk := string(userKey) + if st, ok := r.lists[uk]; ok { + return st + } + st := &redisListState{items: make(map[int64][]byte)} + r.lists[uk] = st + r.kindByKey[uk] = redisKindList + return st +} + +// parseListMetaKey strips !lst|meta| from a meta key and returns +// (userKey, true). The list meta key shape is `prefix + userKey` with +// no length prefix (mirror of store.ListMetaKey), so the trimmed +// remainder is the userKey verbatim. Delta keys (!lst|meta|d|...) +// share the meta string prefix and must be rejected here so a +// misrouted delta surfaces a parse failure rather than silent state +// corruption — analogous to parseHashMetaKey's delta guard. +func parseListMetaKey(key []byte) ([]byte, bool) { + if bytes.HasPrefix(key, []byte(ListMetaDeltaPrefix)) { + return nil, false + } + rest := bytes.TrimPrefix(key, []byte(ListMetaPrefix)) + if len(rest) == len(key) { + return nil, false + } + return rest, true +} + +// parseListItemKey strips !lst|itm| and extracts (userKey, seq). The +// list item key shape (mirror of store.ListItemKey) is +// `prefix + userKey + sortableInt64(seq)`, with no userKey length +// prefix; the trailing 8 bytes are always the seq, and everything +// in between is the userKey. The seq is decoded by undoing the +// sign-flip encoding (seq^MinInt64) the live store applies for +// byte-order sortability. +func parseListItemKey(key []byte) ([]byte, int64, bool) { + rest := bytes.TrimPrefix(key, []byte(ListItemPrefix)) + if len(rest) == len(key) { + return nil, 0, false + } + if len(rest) < listSeqBytes { + return nil, 0, false + } + userKey := rest[:len(rest)-listSeqBytes] + rawSeq := binary.BigEndian.Uint64(rest[len(rest)-listSeqBytes:]) + seq := int64(rawSeq) ^ math.MinInt64 //nolint:gosec // sortable-int64 sign-flip; mirrors store.encodeSortableInt64 + return userKey, seq, true +} + +// flushLists writes one JSON file per accumulated list to +// lists/.json. Empty lists (Len=0, no items) still emit a +// file when meta was seen, mirroring the hash encoder: their existence +// is observable to clients (LLEN, TYPE) and a restorer that drops the +// file would silently turn an empty-but-extant list into a +// non-existent key. Mismatched declared-vs-observed length surfaces +// an `redis_list_length_mismatch` warning, again matching the hash +// encoder's contract. +func (r *RedisDB) flushLists() error { + return flushWideColumnDir(r, r.lists, "lists", func(dir, uk string, st *redisListState) error { + if r.warn != nil && st.metaSeen && int64(len(st.items)) != st.declaredLen { + r.warn("redis_list_length_mismatch", + "user_key_len", len(uk), + "declared_len", st.declaredLen, + "observed_items", len(st.items), + "hint", "meta record's Len does not match the count of !lst|itm| keys for this user key") + } + return r.writeListJSON(dir, []byte(uk), st) + }) +} + +func (r *RedisDB) writeListJSON(dir string, userKey []byte, st *redisListState) error { + encoded := EncodeSegment(userKey) + if err := r.recordIfFallback(encoded, userKey); err != nil { + return err + } + path := filepath.Join(dir, encoded+".json") + body, err := marshalListJSON(st) + if err != nil { + return err + } + if err := writeFileAtomic(path, body); err != nil { + return cockroachdberr.WithStack(err) + } + return nil +} + +// marshalListJSON renders one list state as the design's +// `{format_version, items, expire_at_ms}` JSON shape. Items are +// emitted in ascending seq order — which equals the live read path's +// LPUSH-leftmost-to-RPUSH-rightmost contract — and each value is +// projected through marshalRedisBinaryValue so non-UTF-8 items round- +// trip via the `{"base64":"..."}` envelope rather than corrupting on +// the JSON string boundary. +func marshalListJSON(st *redisListState) ([]byte, error) { + seqs := make([]int64, 0, len(st.items)) + for s := range st.items { + seqs = append(seqs, s) + } + sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) + items := make([]json.RawMessage, 0, len(seqs)) + for _, s := range seqs { + v, err := marshalRedisBinaryValue(st.items[s]) + if err != nil { + return nil, err + } + items = append(items, v) + } + type out struct { + FormatVersion uint32 `json:"format_version"` + Items []json.RawMessage `json:"items"` + ExpireAtMs *uint64 `json:"expire_at_ms"` + } + rec := out{FormatVersion: 1, Items: items} + if st.hasTTL { + ms := st.expireAtMs + rec.ExpireAtMs = &ms + } + body, err := json.MarshalIndent(rec, "", " ") + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + return body, nil +} diff --git a/internal/backup/redis_list_test.go b/internal/backup/redis_list_test.go new file mode 100644 index 00000000..baa691c2 --- /dev/null +++ b/internal/backup/redis_list_test.go @@ -0,0 +1,394 @@ +package backup + +import ( + "encoding/binary" + "encoding/json" + "math" + "os" + "path/filepath" + "testing" + + "github.com/cockroachdb/errors" +) + +// listMetaValue builds the 24-byte (Head, Tail, Len) blob the live +// store/list_helpers.go emits at !lst|meta|. Tail = Head+Len +// is the live store's invariant; we honour it so the decoder sees the +// same shape it would see in a real snapshot. +func listMetaValue(head, length int64) []byte { + buf := make([]byte, 24) + binary.BigEndian.PutUint64(buf[0:8], uint64(head)) //nolint:gosec + binary.BigEndian.PutUint64(buf[8:16], uint64(head+length)) //nolint:gosec + binary.BigEndian.PutUint64(buf[16:24], uint64(length)) //nolint:gosec + return buf +} + +// listMetaKey is the test-side mirror of store.ListMetaKey: +// !lst|meta| (no length prefix). +func listMetaKey(userKey string) []byte { + return append([]byte(ListMetaPrefix), userKey...) +} + +// listItemKey is the test-side mirror of store.ListItemKey: +// !lst|itm|. The sortable encoding flips the +// sign bit so a forward byte-ordered scan yields ascending int64. +func listItemKey(userKey string, seq int64) []byte { + out := append([]byte(ListItemPrefix), userKey...) + var s [8]byte + binary.BigEndian.PutUint64(s[:], uint64(seq^math.MinInt64)) //nolint:gosec // sortable-int64 sign-flip + return append(out, s[:]...) +} + +// listMetaDeltaKey mirrors store.ListMetaDeltaKey: +// !lst|meta|d|. +// The shape is irrelevant to the encoder (it skips deltas), but we +// build a well-formed key here so the dispatcher integration test +// would exercise the same byte sequence the live store emits. +func listMetaDeltaKey(userKey string, commitTS uint64, seqInTxn uint32) []byte { + out := []byte(ListMetaDeltaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + var ts [8]byte + binary.BigEndian.PutUint64(ts[:], commitTS) + out = append(out, ts[:]...) + var seq [4]byte + binary.BigEndian.PutUint32(seq[:], seqInTxn) + return append(out, seq[:]...) +} + +// listClaimKey mirrors store.ListClaimKey: +// !lst|claim|. +func listClaimKey(userKey string, seq int64) []byte { + out := []byte(ListClaimPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + var s [8]byte + binary.BigEndian.PutUint64(s[:], uint64(seq^math.MinInt64)) //nolint:gosec // sortable-int64 sign-flip + return append(out, s[:]...) +} + +func readListJSON(t *testing.T, path string) map[string]any { + t.Helper() + b, err := os.ReadFile(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + t.Fatalf("unmarshal: %v", err) + } + return m +} + +func listItemsArray(t *testing.T, m map[string]any) []any { + t.Helper() + v, ok := m["items"] + if !ok { + t.Fatalf("items missing in %+v", m) + } + raw, ok := v.([]any) + if !ok { + t.Fatalf("items = %T(%v), want []any", v, v) + } + return raw +} + +// listFloat fetches a numeric JSON field with a type-asserting helper +// that fails loudly with the field name (mirrors hashFloat). +func listFloat(t *testing.T, m map[string]any, key string) float64 { + t.Helper() + v, ok := m[key] + if !ok { + t.Fatalf("field %q missing in %+v", key, m) + } + f, ok := v.(float64) + if !ok { + t.Fatalf("field %q = %T(%v), want float64", key, v, v) + } + return f +} + +// TestRedisDB_ListRoundTripPreservesLPUSHOrder pins the design's +// "Order is left-to-right" contract: LPUSH pushes negative seqs that +// sort first in the ascending-seq output, so the leftmost element is +// the one with the smallest seq. A round-trip with seqs {-2, -1, 0, +// 1} must surface as [b, c, d, e] in items order — regardless of the +// order the dispatcher hands us the !lst|itm| records. +func TestRedisDB_ListRoundTripPreservesLPUSHOrder(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleListMeta(listMetaKey("q"), listMetaValue(-2, 4)); err != nil { + t.Fatalf("HandleListMeta: %v", err) + } + // Deliberately submit items out of order to exercise the + // ascending-seq sort at flush time. + submitListItems(t, db, "q", map[int64]string{1: "e", -1: "c", -2: "b", 0: "d"}) + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + got := readListJSON(t, filepath.Join(root, "redis", "db_0", "lists", "q.json")) + if listFloat(t, got, "format_version") != 1 { + t.Fatalf("format_version = %v", got["format_version"]) + } + if got["expire_at_ms"] != nil { + t.Fatalf("expire_at_ms must be nil without TTL, got %v", got["expire_at_ms"]) + } + assertListItems(t, got, []any{"b", "c", "d", "e"}) +} + +// submitListItems calls HandleListItem for each (seq, value) pair and +// fails the test on first error. Folds the loop boilerplate out of +// the round-trip test so cyclomatic complexity stays under the linter +// ceiling. +func submitListItems(t *testing.T, db *RedisDB, userKey string, items map[int64]string) { + t.Helper() + for seq, v := range items { + if err := db.HandleListItem(listItemKey(userKey, seq), []byte(v)); err != nil { + t.Fatalf("HandleListItem(seq=%d): %v", seq, err) + } + } +} + +// assertListItems checks the decoded items array against the expected +// ordered slice, failing with the full surrounding context. Folded +// out of the round-trip test for the same cyclop reason. +func assertListItems(t *testing.T, got map[string]any, want []any) { + t.Helper() + items := listItemsArray(t, got) + if len(items) != len(want) { + t.Fatalf("len(items) = %d, want %d (got %v)", len(items), len(want), items) + } + for i := range want { + if items[i] != want[i] { + t.Fatalf("items[%d] = %v, want %v (full order: %v)", i, items[i], want[i], items) + } + } +} + +// TestRedisDB_ListEmptyListStillEmitsFile mirrors the hash encoder's +// emit-empty rule: LLEN==0 is observable to clients (TYPE returns +// "list", LLEN returns 0), so the dump must preserve existence. +func TestRedisDB_ListEmptyListStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleListMeta(listMetaKey("empty"), listMetaValue(0, 0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readListJSON(t, filepath.Join(root, "redis", "db_0", "lists", "empty.json")) + if items := listItemsArray(t, got); len(items) != 0 { + t.Fatalf("empty list should emit empty items array, got %v", items) + } +} + +// TestRedisDB_ListTTLInlinedFromScanIndex pins that !redis|ttl| records +// for a list user key fold into the list's JSON `expire_at_ms` rather +// than landing in a separate sidecar (the strings/HLL pattern). A +// list without this field would silently restore as permanent. +func TestRedisDB_ListTTLInlinedFromScanIndex(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleListMeta(listMetaKey("k"), listMetaValue(0, 1)); err != nil { + t.Fatal(err) + } + if err := db.HandleListItem(listItemKey("k", 0), []byte("v")); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readListJSON(t, filepath.Join(root, "redis", "db_0", "lists", "k.json")) + if listFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d", got["expire_at_ms"], fixedExpireMs) + } + if _, err := os.Stat(filepath.Join(root, "redis", "db_0", "lists_ttl.jsonl")); !os.IsNotExist(err) { + t.Fatalf("unexpected list TTL sidecar: stat err=%v", err) + } +} + +// TestRedisDB_ListLengthMismatchWarns pins the warn-on-mismatch +// contract — same shape as the hash encoder's +// redis_hash_length_mismatch. +func TestRedisDB_ListLengthMismatchWarns(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleListMeta(listMetaKey("l"), listMetaValue(0, 5)); err != nil { + t.Fatal(err) + } + if err := db.HandleListItem(listItemKey("l", 0), []byte("only")); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + want := "redis_list_length_mismatch" + found := false + for _, e := range events { + if e == want { + found = true + break + } + } + if !found { + t.Fatalf("expected %q warning, got %v", want, events) + } +} + +// TestRedisDB_ListBinaryItemUsesBase64Envelope confirms non-UTF-8 item +// bytes round-trip via the typed `{"base64":"..."}` envelope rather +// than corrupting on the JSON string boundary. +func TestRedisDB_ListBinaryItemUsesBase64Envelope(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleListMeta(listMetaKey("blob"), listMetaValue(0, 1)); err != nil { + t.Fatal(err) + } + if err := db.HandleListItem(listItemKey("blob", 0), []byte{0x80, 0xff, 0x01}); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readListJSON(t, filepath.Join(root, "redis", "db_0", "lists", "blob.json")) + items := listItemsArray(t, got) + if len(items) != 1 { + t.Fatalf("len(items) = %d, want 1", len(items)) + } + envelope, ok := items[0].(map[string]any) + if !ok { + t.Fatalf("expected base64 envelope, got %T(%v)", items[0], items[0]) + } + if envelope["base64"] == "" { + t.Fatalf("base64 envelope missing payload: %v", envelope) + } +} + +// TestRedisDB_ListHandleListMetaSkipsDeltaKey pins that the +// !lst|meta|d|... family is silently skipped by HandleListMeta. Without +// this, parsing the delta's userKeyLen prefix as the start of a +// userKey would corrupt the lists map. Mirrors the hash delta-key +// guard (Codex P1 round 14 PR #725). +func TestRedisDB_ListHandleListMetaSkipsDeltaKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + // Delta record carries the binary 16-byte ListMetaDelta payload, + // not the 24-byte meta. The encoder must skip the record without + // consulting its value at all. + deltaValue := make([]byte, 16) + if err := db.HandleListMeta(listMetaDeltaKey("k", 7, 0), deltaValue); err != nil { + t.Fatalf("delta key must be silently skipped, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(db.outRoot, "redis", "db_0", "lists")); !os.IsNotExist(err) { + t.Fatalf("delta-only run should not create lists/, stat err=%v", err) + } +} + +// TestRedisDB_ListHandleListClaimSkips pins that POP claim records do +// not perturb encoder state. The live read path doesn't consult +// claims (POPs also Del the item key in the same OCC commit), so +// the encoder mirrors that policy. +func TestRedisDB_ListHandleListClaimSkips(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleListMeta(listMetaKey("q"), listMetaValue(0, 2)); err != nil { + t.Fatal(err) + } + if err := db.HandleListItem(listItemKey("q", 0), []byte("a")); err != nil { + t.Fatal(err) + } + if err := db.HandleListItem(listItemKey("q", 1), []byte("b")); err != nil { + t.Fatal(err) + } + // Stray claim for an already-POP'd seq (item key would already be + // deleted in the live store; we're just confirming the encoder + // doesn't blow up or drop unrelated items). + if err := db.HandleListClaim(listClaimKey("q", -1), nil); err != nil { + t.Fatalf("claim must be silently skipped, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readListJSON(t, filepath.Join(root, "redis", "db_0", "lists", "q.json")) + items := listItemsArray(t, got) + if len(items) != 2 || items[0] != "a" || items[1] != "b" { + t.Fatalf("items = %v, want [a b]", items) + } +} + +// TestRedisDB_ListRejectsMalformedMetaValueLength pins that an +// !lst|meta| value of the wrong length surfaces as an error, not +// silent zero-length state. Mirrors the hash meta length check. +func TestRedisDB_ListRejectsMalformedMetaValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + // 8 bytes — too short for the 24-byte (Head, Tail, Len) blob. + err := db.HandleListMeta(listMetaKey("k"), make([]byte, 8)) + if err == nil { + t.Fatalf("expected error for short meta value") + } + if !errors.Is(err, ErrRedisInvalidListMeta) { + t.Fatalf("expected ErrRedisInvalidListMeta, got %v", err) + } +} + +// TestRedisDB_ListItemsWithoutMetaStillEmitsFile is the round-2 PR +// review regression: !lst|itm| records may arrive without a paired +// !lst|meta| in dump order (the snapshot iterator is not required to +// surface meta before items for a given userKey). The encoder must +// still emit the per-list JSON file from items alone, without +// firing the declared-vs-observed length mismatch warning (because +// metaSeen=false means we have no "declared" baseline to compare +// against). Mirrors the items-as-source-of-truth contract that +// makes the !lst|meta|d| delta family safe to skip. +func TestRedisDB_ListItemsWithoutMetaStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleListItem(listItemKey("q", 0), []byte("v")); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readListJSON(t, filepath.Join(root, "redis", "db_0", "lists", "q.json")) + items := listItemsArray(t, got) + if len(items) != 1 || items[0] != "v" { + t.Fatalf("items = %v, want [v]", items) + } + for _, e := range events { + if e == "redis_list_length_mismatch" { + t.Fatalf("items-without-meta must not fire length-mismatch warning, got events %v", events) + } + } +} + +// TestRedisDB_ListRejectsTruncatedItemKey pins that an !lst|itm| key +// missing the 8-byte trailing seq surfaces as a parse error rather +// than silently degenerating into a userKey-only state. +func TestRedisDB_ListRejectsTruncatedItemKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + // Item key with no trailing seq bytes — userKey == "" too short. + err := db.HandleListItem([]byte(ListItemPrefix), []byte("v")) + if err == nil { + t.Fatalf("expected error for truncated item key") + } + if !errors.Is(err, ErrRedisInvalidListKey) { + t.Fatalf("expected ErrRedisInvalidListKey, got %v", err) + } +} diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index c7d234e5..56d87f49 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -9,6 +9,7 @@ import ( "math" "os" "path/filepath" + "sort" "strings" cockroachdberr "github.com/cockroachdb/errors" @@ -71,6 +72,7 @@ const ( redisKindString redisKindHLL redisKindHash + redisKindList ) // RedisDB encodes one logical Redis database (`redis/db_/`). All @@ -157,6 +159,14 @@ type RedisDB struct { // each meta record arriving without a key set must still emit // the empty-hash file (HLEN==0, observable to clients). hashes map[string]*redisHashState + + // lists buffers per-userKey list state. The Phase 0a list design + // emits one JSON file per list at Finalize ordered by ascending + // item sequence (LPUSH → most-negative-seq-first, RPUSH → larger + // seqs at the tail). Buffering matches the hash strategy: real- + // world Redis lists are bounded by maxWideColumnItems on the live + // side, and the JSON shape requires the full item slice up front. + lists map[string]*redisListState } // NewRedisDB constructs a RedisDB rooted at /redis/db_/. @@ -174,6 +184,7 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { dirsCreated: make(map[string]struct{}), inlineTTLEmitted: make(map[string]struct{}), hashes: make(map[string]*redisHashState), + lists: make(map[string]*redisListState), } } @@ -252,11 +263,19 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.expireAtMs = expireAtMs st.hasTTL = true return nil + case redisKindList: + // Same per-record TTL inlining as hashes: the list JSON + // carries expire_at_ms so a restorer can replay LPUSH + + // EXPIRE in one shot without consulting a sidecar. + st := r.listState(userKey) + st.expireAtMs = expireAtMs + st.hasTTL = true + return nil case redisKindUnknown: // Track orphan TTL counts only — keys are unused before the - // wide-column encoders land, and buffering them allocates - // proportional to user-key size (up to 1 MiB per key) for - // no benefit. Codex P2 round 6. + // remaining wide-column encoders (set/zset/stream) land, and + // buffering them allocates proportional to user-key size + // (up to 1 MiB per key) for no benefit. Codex P2 round 6. r.orphanTTLCount++ return nil } @@ -271,6 +290,7 @@ func (r *RedisDB) Finalize() error { var firstErr error for _, step := range []func() error{ r.flushHashes, + r.flushLists, func() error { return closeJSONL(r.stringsTTL) }, func() error { return closeJSONL(r.hllTTL) }, r.closeKeymap, @@ -282,7 +302,7 @@ func (r *RedisDB) Finalize() error { if r.warn != nil && r.orphanTTLCount > 0 { r.warn("redis_orphan_ttl", "count", r.orphanTTLCount, - "hint", "wide-column type encoders (list/set/zset/stream) have not landed yet") + "hint", "remaining wide-column encoders (set/zset/stream) have not landed yet") } return firstErr } @@ -294,6 +314,48 @@ func (r *RedisDB) dbDir() string { return filepath.Join(r.outRoot, "redis", redisDBSegment(r.dbIndex)) } +// flushWideColumnDir is the shared "create subdir + sort user keys + +// iterate" boilerplate every wide-column encoder needs (hash, list, +// and the future set/zset/stream). The encoder hands in its state +// map plus a per-key flush callback that owns the type-specific +// mismatch warning and JSON write. +// +// Iteration order is sorted by user key so identical snapshots +// produce identical dump output across runs regardless of Go's +// randomised map iteration. Empty maps short-circuit without +// creating the directory so dumps that never observed a given type +// carry no spurious subdirectory. +// +// Error policy is "fail-fast per type": the first per-key flush +// error returns immediately without writing the remaining user keys +// of that type. Finalize continues with other types so a hash error +// does not strand list output, but a partial dump within a single +// type is intentional — a half-written `lists/` directory is easier +// to detect as corrupt than a silently-truncated one that "continue" +// would produce, and the alternative ("collect errors, write what +// we can") trades a noisy hard failure for a quiet soft failure +// that survives `find -name '*.json'` scrutiny. +func flushWideColumnDir[T any](r *RedisDB, states map[string]T, subdir string, flushOne func(dir, userKey string, st T) error) error { + if len(states) == 0 { + return nil + } + dir := filepath.Join(r.dbDir(), subdir) + if err := r.ensureDir(dir); err != nil { + return err + } + userKeys := make([]string, 0, len(states)) + for k := range states { + userKeys = append(userKeys, k) + } + sort.Strings(userKeys) + for _, uk := range userKeys { + if err := flushOne(dir, uk, states[uk]); err != nil { + return err + } + } + return nil +} + func redisDBSegment(idx int) string { if idx < 0 { idx = 0