From 0552136646495c0bbe6b9264b6307e828e7cf9fa Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 16 May 2026 06:03:45 +0900 Subject: [PATCH] backup: Redis set encoder (Phase 0a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the !st|meta| + !st|mem| → sets/.json encoder per the Phase 0 design doc (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md). Wire format mirrors store/set_helpers.go: - !st|meta| → 8-byte BE Len - !st|mem| → empty value (member bytes live in the key, binary-safe) - !st|meta|d|... → skipped silently (same policy as hash and list deltas) Output JSON shape matches the design's other wide-column types: {"format_version": 1, "members": [..., {"base64":"..."}, ...], "expire_at_ms": null | } Members are emitted as a sorted array (not a JSON object) for the same binary-safety reason the hash encoder uses an array for fields: distinct binary member names can collide under JSON's percent-encoded object-key path, and base64-envelope encoding for non-UTF-8 members keeps each record byte-faithful. Duplicate HandleSetMember calls collapse via a map[string]struct{} buffer so a snapshot iterator that re-emits the same !st|mem| key is harmless (Redis sets are mathematical sets, not multisets). TTL records on !redis|ttl| route into the set JSON's expire_at_ms field via the HandleTTL switch — same fold-into-record policy as hash + list encoders; no separate sidecar. The uint64 → int64 overflow guard for the declared-length field is applied symmetrically (matching the hash + list encoders) so a corrupted store with the high bit set fails closed at meta-record ingest rather than silently wrapping to negative declaredLen and firing spurious redis_set_length_mismatch warnings. Refactor: leverages the existing flushWideColumnDir generic helper introduced for the list encoder (PR #755) — no further changes to redis_string.go's shared infra. Tests cover: sorted byte-order round-trip; empty set still emits a file (SCARD==0 observable); TTL inlined from scan index; length- mismatch warning shape; binary member base64 envelope; meta-delta key silently skipped; malformed meta length rejected; overflow guard rejected; members-without-meta still emits without false-positive mismatch warning; duplicate members collapse idempotently; parser- level delta-key rejection; math.MaxInt64 boundary accepted. Phase 0a remaining after this PR: redis_zset / redis_stream encoders, cmd/elastickv-snapshot-decode CLI, cmd/elastickv-snap-token helper, docs/operations/snapshot_restore.md runbook. --- internal/backup/redis_set.go | 236 +++++++++++++++++++ internal/backup/redis_set_test.go | 363 ++++++++++++++++++++++++++++++ internal/backup/redis_string.go | 18 +- 3 files changed, 616 insertions(+), 1 deletion(-) create mode 100644 internal/backup/redis_set.go create mode 100644 internal/backup/redis_set_test.go diff --git a/internal/backup/redis_set.go b/internal/backup/redis_set.go new file mode 100644 index 00000000..d5027e27 --- /dev/null +++ b/internal/backup/redis_set.go @@ -0,0 +1,236 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "math" + "path/filepath" + "sort" + + cockroachdberr "github.com/cockroachdb/errors" +) + +// Redis set encoder. Translates raw !st|... snapshot records into the +// per-set `sets/.json` shape defined by Phase 0 +// (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md). +// +// Wire format mirrors store/set_helpers.go: +// - !st|meta| → 8-byte BE Len +// - !st|mem| → empty value; the +// member bytes live in the key (binary-safe, per Redis's SADD +// contract). +// - !st|meta|d|... → 8-byte LenDelta; +// skipped silently. Same policy as hash deltas: !st|mem| keys +// are the source of truth at backup time, and delta arithmetic +// does not need to be replayed. +const ( + RedisSetMetaPrefix = "!st|meta|" + RedisSetMemberPrefix = "!st|mem|" + RedisSetMetaDeltaPrefix = "!st|meta|d|" +) + +// ErrRedisInvalidSetMeta is returned when an !st|meta| value is not +// the expected 8-byte big-endian member count. +var ErrRedisInvalidSetMeta = cockroachdberr.New("backup: invalid !st|meta| value") + +// ErrRedisInvalidSetKey is returned when an !st| key cannot be parsed +// for its userKeyLen+userKey (or member) segments. +var ErrRedisInvalidSetKey = cockroachdberr.New("backup: malformed !st| key") + +// redisSetState buffers one userKey's set during a snapshot scan. +// Members are stored as a map keyed by their byte string so duplicate +// HandleSetMember calls collapse idempotently (a snapshot iterator +// that re-emits a member is harmless — Redis sets are mathematical +// sets, not multisets). +type redisSetState struct { + metaSeen bool + declaredLen int64 + members map[string]struct{} + expireAtMs uint64 + hasTTL bool +} + +// HandleSetMeta processes one !st|meta| record. The +// value is the 8-byte BE member count. We park the declared length +// so flushSets can warn on a mismatch with the observed member +// count and register the user key so a later !redis|ttl| +// record routes back to this set state. +// +// !st|meta|d|... delta keys share the !st|meta| string prefix, so a +// snapshot dispatcher that routes by "starts with RedisSetMetaPrefix" +// lands delta records here too. The hash encoder solved the analogous +// problem (Codex P1 round 14 PR #725) by silently skipping the delta +// family; we mirror that policy because !st|mem| records are the +// source of truth for the restored set contents. +func (r *RedisDB) HandleSetMeta(key, value []byte) error { + if bytes.HasPrefix(key, []byte(RedisSetMetaDeltaPrefix)) { + return nil + } + userKey, ok := parseSetMetaKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidSetKey, "meta key: %q", key) + } + if len(value) != redisUint64Bytes { + return cockroachdberr.Wrapf(ErrRedisInvalidSetMeta, + "length %d != %d", len(value), redisUint64Bytes) + } + // Bounds-check the uint64 declared count before narrowing to + // int64; without this a corrupted store with the high bit set + // would wrap to a negative declaredLen and fire spurious + // redis_set_length_mismatch warnings on every flush. Mirrors + // the hash + list encoders' symmetric guard. + rawLen := binary.BigEndian.Uint64(value) + if rawLen > math.MaxInt64 { + return cockroachdberr.Wrapf(ErrRedisInvalidSetMeta, + "declared len %d overflows int64", rawLen) + } + st := r.setState(userKey) + st.declaredLen = int64(rawLen) //nolint:gosec // bounds-checked above + st.metaSeen = true + return nil +} + +// HandleSetMember processes one !st|mem| +// record. The value is empty by design (Redis sets store the member +// bytes in the key, not the value), so HandleSetMember discards the +// value argument; the member bytes are extracted from the key's +// trailing segment. +func (r *RedisDB) HandleSetMember(key, _ []byte) error { + userKey, member, ok := parseSetMemberKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidSetKey, "member key: %q", key) + } + st := r.setState(userKey) + st.members[string(member)] = struct{}{} + return nil +} + +// HandleSetMetaDelta accepts and discards one !st|meta|d|... record. +// See HandleSetMeta's docstring for the rationale; !st|mem| is the +// source of truth at backup time. +func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil } + +// setState lazily creates per-key state. Mirrors the hash/list +// kindByKey-registration pattern so HandleSetMeta, HandleSetMember, +// and the HandleTTL back-edge all agree on the kind. +func (r *RedisDB) setState(userKey []byte) *redisSetState { + uk := string(userKey) + if st, ok := r.sets[uk]; ok { + return st + } + st := &redisSetState{members: make(map[string]struct{})} + r.sets[uk] = st + r.kindByKey[uk] = redisKindSet + return st +} + +// parseSetMetaKey strips !st|meta| and the 4-byte BE userKeyLen +// prefix. Returns (userKey, true) on success. Delta keys +// (!st|meta|d|...) share the meta string prefix and would otherwise +// be parsed as base-meta with a garbage userKeyLen — refuse them +// at the boundary so a misrouted delta surfaces a parse error +// rather than silent state corruption. Mirrors parseHashMetaKey's +// delta guard. +func parseSetMetaKey(key []byte) ([]byte, bool) { + if bytes.HasPrefix(key, []byte(RedisSetMetaDeltaPrefix)) { + return nil, false + } + rest := bytes.TrimPrefix(key, []byte(RedisSetMetaPrefix)) + if len(rest) == len(key) { + return nil, false + } + return parseUserKeyLenPrefix(rest) +} + +// parseSetMemberKey strips !st|mem| and the 4-byte BE userKeyLen +// prefix, then returns (userKey, member, true). The member bytes +// are everything after the userKey segment — binary-safe per +// Redis's SADD contract. +func parseSetMemberKey(key []byte) ([]byte, []byte, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisSetMemberPrefix)) + if len(rest) == len(key) { + return nil, nil, false + } + userKey, ok := parseUserKeyLenPrefix(rest) + if !ok { + return nil, nil, false + } + member := rest[hashUserKeyLenSize+len(userKey):] + return userKey, member, true +} + +// flushSets writes one JSON file per accumulated set to +// sets/.json. Empty sets (Len=0, no members) still emit a +// file when meta was seen, mirroring the hash/list encoders: their +// existence is observable to clients (TYPE returns "set", SCARD +// returns 0). Mismatched declared-vs-observed length surfaces an +// `redis_set_length_mismatch` warning. +func (r *RedisDB) flushSets() error { + return flushWideColumnDir(r, r.sets, "sets", func(dir, uk string, st *redisSetState) error { + if r.warn != nil && st.metaSeen && int64(len(st.members)) != st.declaredLen { + r.warn("redis_set_length_mismatch", + "user_key_len", len(uk), + "declared_len", st.declaredLen, + "observed_members", len(st.members), + "hint", "meta record's Len does not match the count of !st|mem| keys for this user key") + } + return r.writeSetJSON(dir, []byte(uk), st) + }) +} + +func (r *RedisDB) writeSetJSON(dir string, userKey []byte, st *redisSetState) error { + encoded := EncodeSegment(userKey) + if err := r.recordIfFallback(encoded, userKey); err != nil { + return err + } + path := filepath.Join(dir, encoded+".json") + body, err := marshalSetJSON(st) + if err != nil { + return err + } + if err := writeFileAtomic(path, body); err != nil { + return cockroachdberr.WithStack(err) + } + return nil +} + +// marshalSetJSON renders one set state as the design's +// `{format_version, members, expire_at_ms}` JSON shape. Members are +// emitted as an array (not a JSON object) and sorted by raw byte +// order so identical snapshots produce identical dump output across +// runs — same rationale as the hash encoder's fields array +// (binary-safe member names that would collide under JSON object +// keying when percent-encoded). Each value goes through +// marshalRedisBinaryValue so non-UTF-8 members round-trip via the +// `{"base64":"..."}` envelope. +func marshalSetJSON(st *redisSetState) ([]byte, error) { + members := make([]string, 0, len(st.members)) + for m := range st.members { + members = append(members, m) + } + sort.Strings(members) + out := make([]json.RawMessage, 0, len(members)) + for _, m := range members { + v, err := marshalRedisBinaryValue([]byte(m)) + if err != nil { + return nil, err + } + out = append(out, v) + } + type record struct { + FormatVersion uint32 `json:"format_version"` + Members []json.RawMessage `json:"members"` + ExpireAtMs *uint64 `json:"expire_at_ms"` + } + rec := record{FormatVersion: 1, Members: out} + if st.hasTTL { + ms := st.expireAtMs + rec.ExpireAtMs = &ms + } + body, err := json.MarshalIndent(rec, "", " ") + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + return body, nil +} diff --git a/internal/backup/redis_set_test.go b/internal/backup/redis_set_test.go new file mode 100644 index 00000000..d0fd0c4f --- /dev/null +++ b/internal/backup/redis_set_test.go @@ -0,0 +1,363 @@ +package backup + +import ( + "encoding/binary" + "encoding/json" + "math" + "os" + "path/filepath" + "testing" + + "github.com/cockroachdb/errors" +) + +// encodeSetMetaValue builds the 8-byte BE member-count value used by +// the live store/set_helpers.go (mirror of store.MarshalSetMeta). +func encodeSetMetaValue(memberCount int64) []byte { + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, uint64(memberCount)) //nolint:gosec + return v +} + +// setMetaKey is the test-side mirror of store.SetMetaKey: +// !st|meta|. +func setMetaKey(userKey string) []byte { + out := []byte(RedisSetMetaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + return append(out, userKey...) +} + +// setMemberKey mirrors store.SetMemberKey: +// !st|mem|. Member is binary-safe. +func setMemberKey(userKey string, member []byte) []byte { + out := []byte(RedisSetMemberPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + return append(out, member...) +} + +// setMetaDeltaKey mirrors store.SetMetaDeltaKey: +// !st|meta|d|. +func setMetaDeltaKey(userKey string, commitTS uint64, seqInTxn uint32) []byte { + out := []byte(RedisSetMetaDeltaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + var ts [8]byte + binary.BigEndian.PutUint64(ts[:], commitTS) + out = append(out, ts[:]...) + var seq [4]byte + binary.BigEndian.PutUint32(seq[:], seqInTxn) + return append(out, seq[:]...) +} + +func readSetJSON(t *testing.T, path string) map[string]any { + t.Helper() + b, err := os.ReadFile(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + t.Fatalf("unmarshal: %v", err) + } + return m +} + +func setMembersArray(t *testing.T, m map[string]any) []any { + t.Helper() + v, ok := m["members"] + if !ok { + t.Fatalf("members missing in %+v", m) + } + raw, ok := v.([]any) + if !ok { + t.Fatalf("members = %T(%v), want []any", v, v) + } + return raw +} + +func setFloat(t *testing.T, m map[string]any, key string) float64 { + t.Helper() + v, ok := m[key] + if !ok { + t.Fatalf("field %q missing in %+v", key, m) + } + f, ok := v.(float64) + if !ok { + t.Fatalf("field %q = %T(%v), want float64", key, v, v) + } + return f +} + +// TestRedisDB_SetRoundTripBasic confirms a multi-member set +// round-trips through the encoder in sorted byte order and emits +// the correct JSON shape. +func TestRedisDB_SetRoundTripBasic(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleSetMeta(setMetaKey("colors"), encodeSetMetaValue(3)); err != nil { + t.Fatalf("HandleSetMeta: %v", err) + } + // Submit out of byte order to exercise the sort at flush time. + for _, m := range []string{"red", "green", "blue"} { + if err := db.HandleSetMember(setMemberKey("colors", []byte(m)), nil); err != nil { + t.Fatalf("HandleSetMember(%s): %v", m, err) + } + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "colors.json")) + if setFloat(t, got, "format_version") != 1 { + t.Fatalf("format_version = %v", got["format_version"]) + } + if got["expire_at_ms"] != nil { + t.Fatalf("expire_at_ms must be nil without TTL, got %v", got["expire_at_ms"]) + } + members := setMembersArray(t, got) + // Sorted byte order: blue < green < red. + want := []any{"blue", "green", "red"} + if len(members) != len(want) { + t.Fatalf("len(members) = %d, want %d (got %v)", len(members), len(want), members) + } + for i := range want { + if members[i] != want[i] { + t.Fatalf("members[%d] = %v, want %v (full: %v)", i, members[i], want[i], members) + } + } +} + +// TestRedisDB_SetEmptySetStillEmitsFile mirrors the hash/list +// emit-empty rule: SCARD==0 is observable to clients, so the dump +// must preserve existence. +func TestRedisDB_SetEmptySetStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleSetMeta(setMetaKey("empty"), encodeSetMetaValue(0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "empty.json")) + if members := setMembersArray(t, got); len(members) != 0 { + t.Fatalf("empty set should emit empty members array, got %v", members) + } +} + +// TestRedisDB_SetTTLInlinedFromScanIndex pins that !redis|ttl| +// records for a set user key fold into the set's JSON expire_at_ms, +// not a separate sidecar (the strings/HLL pattern). +func TestRedisDB_SetTTLInlinedFromScanIndex(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleSetMeta(setMetaKey("k"), encodeSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("k", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "k.json")) + if setFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d", got["expire_at_ms"], fixedExpireMs) + } + if _, err := os.Stat(filepath.Join(root, "redis", "db_0", "sets_ttl.jsonl")); !os.IsNotExist(err) { + t.Fatalf("unexpected set TTL sidecar: stat err=%v", err) + } +} + +// TestRedisDB_SetLengthMismatchWarns pins the warn-on-mismatch +// contract — same shape as the hash/list encoders. +func TestRedisDB_SetLengthMismatchWarns(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleSetMeta(setMetaKey("s"), encodeSetMetaValue(5)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("s", []byte("only")), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + want := "redis_set_length_mismatch" + found := false + for _, e := range events { + if e == want { + found = true + break + } + } + if !found { + t.Fatalf("expected %q warning, got %v", want, events) + } +} + +// TestRedisDB_SetBinaryMemberUsesBase64Envelope confirms non-UTF-8 +// member bytes round-trip via the typed `{"base64":"..."}` envelope. +func TestRedisDB_SetBinaryMemberUsesBase64Envelope(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleSetMeta(setMetaKey("blob"), encodeSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("blob", []byte{0x80, 0xff, 0x01}), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "blob.json")) + members := setMembersArray(t, got) + if len(members) != 1 { + t.Fatalf("len(members) = %d, want 1", len(members)) + } + envelope, ok := members[0].(map[string]any) + if !ok { + t.Fatalf("expected base64 envelope, got %T(%v)", members[0], members[0]) + } + if envelope["base64"] == "" { + t.Fatalf("base64 envelope missing payload: %v", envelope) + } +} + +// TestRedisDB_SetHandleSetMetaSkipsDeltaKey pins that the +// !st|meta|d|... family is silently skipped by HandleSetMeta. +// Mirrors the hash/list delta-key guards. +func TestRedisDB_SetHandleSetMetaSkipsDeltaKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + // Delta value is 8-byte BE LenDelta; encoder must skip without + // consulting the value at all. + deltaValue := make([]byte, 8) + if err := db.HandleSetMeta(setMetaDeltaKey("k", 7, 0), deltaValue); err != nil { + t.Fatalf("delta key must be silently skipped, got %v", err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(db.outRoot, "redis", "db_0", "sets")); !os.IsNotExist(err) { + t.Fatalf("delta-only run should not create sets/, stat err=%v", err) + } +} + +// TestRedisDB_SetRejectsMalformedMetaValueLength pins that an +// !st|meta| value of the wrong length surfaces as an error. +func TestRedisDB_SetRejectsMalformedMetaValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + err := db.HandleSetMeta(setMetaKey("k"), []byte{0x00}) + if !errors.Is(err, ErrRedisInvalidSetMeta) { + t.Fatalf("err=%v want ErrRedisInvalidSetMeta", err) + } +} + +// TestRedisDB_SetRejectsOverflowingMetaValue pins the high-bit +// overflow guard — same shape as hash + list encoders. +func TestRedisDB_SetRejectsOverflowingMetaValue(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + overflow := make([]byte, 8) + binary.BigEndian.PutUint64(overflow, 1<<63) + err := db.HandleSetMeta(setMetaKey("k"), overflow) + if !errors.Is(err, ErrRedisInvalidSetMeta) { + t.Fatalf("err=%v want ErrRedisInvalidSetMeta", err) + } +} + +// TestRedisDB_SetMembersWithoutMetaStillEmitsFile pins that the +// items-without-meta contract from PR #755 round 2 holds for sets +// too: members may arrive before (or without) meta, and the encoder +// must still emit the JSON without firing the length-mismatch +// warning. +func TestRedisDB_SetMembersWithoutMetaStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleSetMember(setMemberKey("s", []byte("a")), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "s.json")) + members := setMembersArray(t, got) + if len(members) != 1 || members[0] != "a" { + t.Fatalf("members = %v, want [a]", members) + } + for _, e := range events { + if e == "redis_set_length_mismatch" { + t.Fatalf("members-without-meta must not fire length-mismatch warning, got events %v", events) + } + } +} + +// TestRedisDB_SetDuplicateMembersCollapse pins the idempotency +// contract: a snapshot iterator that emits the same !st|mem| key +// twice (rare but legal for some scan replays) must NOT produce a +// duplicate entry in the dump. Redis sets are mathematical sets, so +// {a, a, b} dumps as {a, b}. +func TestRedisDB_SetDuplicateMembersCollapse(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleSetMeta(setMetaKey("s"), encodeSetMetaValue(2)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("s", []byte("a")), nil); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("s", []byte("a")), nil); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("s", []byte("b")), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "s.json")) + members := setMembersArray(t, got) + if len(members) != 2 { + t.Fatalf("len(members) = %d, want 2 (duplicates must collapse)", len(members)) + } +} + +// TestRedisDB_SetParseSetMetaKeyRejectsDelta is the parser-level +// guard companion to the dispatcher skip — pins that a future +// refactor bypassing HandleSetMeta's prefix check still surfaces a +// parse failure rather than silent state corruption. +func TestRedisDB_SetParseSetMetaKeyRejectsDelta(t *testing.T) { + t.Parallel() + if _, ok := parseSetMetaKey(setMetaDeltaKey("k", 1, 0)); ok { + t.Fatalf("parseSetMetaKey must reject delta-prefixed keys") + } +} + +// TestRedisDB_SetMembersBytesPreservedThroughInt64Path is a sanity +// check that the overflow guard works at the math.MaxInt64 +// boundary — declaredLen=math.MaxInt64 must be accepted, only > that +// rejected. +func TestRedisDB_SetMembersBytesPreservedThroughInt64Path(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + boundary := make([]byte, 8) + binary.BigEndian.PutUint64(boundary, math.MaxInt64) // exactly the int64 max — must NOT reject + if err := db.HandleSetMeta(setMetaKey("k"), boundary); err != nil { + t.Fatalf("math.MaxInt64 boundary must be accepted, got %v", err) + } +} diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 56d87f49..58de12a1 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -73,6 +73,7 @@ const ( redisKindHLL redisKindHash redisKindList + redisKindSet ) // RedisDB encodes one logical Redis database (`redis/db_/`). All @@ -167,6 +168,12 @@ type RedisDB struct { // world Redis lists are bounded by maxWideColumnItems on the live // side, and the JSON shape requires the full item slice up front. lists map[string]*redisListState + + // sets buffers per-userKey set state. Members live in the !st|mem| + // key bytes (binary-safe), the value is empty. Flushed at + // Finalize into sets/.json with members sorted by raw byte + // order for deterministic dump output. + sets map[string]*redisSetState } // NewRedisDB constructs a RedisDB rooted at /redis/db_/. @@ -185,6 +192,7 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { inlineTTLEmitted: make(map[string]struct{}), hashes: make(map[string]*redisHashState), lists: make(map[string]*redisListState), + sets: make(map[string]*redisSetState), } } @@ -271,6 +279,13 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.expireAtMs = expireAtMs st.hasTTL = true return nil + case redisKindSet: + // Same per-record TTL inlining: SADD + EXPIRE replay in + // one shot from the per-set JSON, no separate sidecar. + st := r.setState(userKey) + st.expireAtMs = expireAtMs + st.hasTTL = true + return nil case redisKindUnknown: // Track orphan TTL counts only — keys are unused before the // remaining wide-column encoders (set/zset/stream) land, and @@ -291,6 +306,7 @@ func (r *RedisDB) Finalize() error { for _, step := range []func() error{ r.flushHashes, r.flushLists, + r.flushSets, func() error { return closeJSONL(r.stringsTTL) }, func() error { return closeJSONL(r.hllTTL) }, r.closeKeymap, @@ -302,7 +318,7 @@ func (r *RedisDB) Finalize() error { if r.warn != nil && r.orphanTTLCount > 0 { r.warn("redis_orphan_ttl", "count", r.orphanTTLCount, - "hint", "remaining wide-column encoders (set/zset/stream) have not landed yet") + "hint", "remaining wide-column encoders (zset/stream) have not landed yet") } return firstErr }