Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/backup/redis_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,11 @@ type hashFieldRecord struct {
Value json.RawMessage `json:"value"`
}

func marshalHashJSON(st *redisHashState) ([]byte, error) {
// nolint comment lives at the function head: dupl pairs this with
// marshalZSetJSON, which carries the rationale (parallel design-spec
// wrappers that can't collapse into a shared helper without breaking
// JSON field-order determinism). See redis_zset.go:marshalZSetJSON.
func marshalHashJSON(st *redisHashState) ([]byte, error) { //nolint:dupl // see comment above + redis_zset.go
// Sort by raw byte order for deterministic output across runs.
names := make([]string, 0, len(st.fields))
for name := range st.fields {
Expand Down
12 changes: 12 additions & 0 deletions internal/backup/redis_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,24 @@ func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil }
// setState lazily creates per-key state. Mirrors the hash/list
// kindByKey-registration pattern so HandleSetMeta, HandleSetMember,
// and the HandleTTL back-edge all agree on the kind.
//
// On first registration we drain any pendingTTL for the user key.
// `!redis|ttl|<k>` lex-sorts BEFORE `!st|...` (because `r` < `s`),
// so in real snapshot order the TTL arrives FIRST; HandleTTL parks
// it in pendingTTL, and this function inlines it into the set's
// `expire_at_ms`. Without this drain step, every TTL'd set would
// restore as permanent — a latent bug in PR #758 surfaced by codex
// on PR #790. Phase 0a tests added in the same PR pin the ordering.
func (r *RedisDB) setState(userKey []byte) *redisSetState {
uk := string(userKey)
if st, ok := r.sets[uk]; ok {
return st
}
st := &redisSetState{members: make(map[string]struct{})}
if expireAtMs, ok := r.claimPendingTTL(userKey); ok {
st.expireAtMs = expireAtMs
st.hasTTL = true
}
r.sets[uk] = st
r.kindByKey[uk] = redisKindSet
return st
Expand Down
219 changes: 205 additions & 14 deletions internal/backup/redis_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ var ErrRedisInvalidStringValue = cockroachdberr.New("backup: invalid !redis|str|
// expected 8-byte big-endian uint64 millisecond expiry.
var ErrRedisInvalidTTLValue = cockroachdberr.New("backup: invalid !redis|ttl| value")

// ErrPendingTTLBufferFull is returned by HandleTTL when an
// unknown-kind TTL arrives but the pendingTTL buffer is already at
// pendingTTLCap. The encoder fails closed here rather than silently
// counting the TTL as an orphan because in real Pebble snapshot
// order (`!redis|ttl|` lex-sorts before `!st|`/`!stream|`/`!zs|`),
// the dropped entry would likely belong to a valid wide-column key
// that arrives later — losing its expire_at_ms would produce a
// restored database with non-expiring data that the source
// snapshot's clients expected to expire. Codex P1 on PR #790
// round 5.
//
// Recovery: raise WithPendingTTLCap above the snapshot's count of
// unmatched-at-intake TTLs, or set the cap to 0 to explicitly opt
// into the lossy counter-only mode (callers that prefer not to
// see this error must accept that orphan-counted entries will be
// dropped without inlining into wide-column state).
var ErrPendingTTLBufferFull = cockroachdberr.New("backup: pendingTTL buffer at cap; raise WithPendingTTLCap or accept orphan-counter mode via WithPendingTTLCap(0)")

// redisKeyKind tracks which Redis-type prefix introduced a user key, so that
// when a later !redis|ttl|<K> record arrives we know whether to write its
// expiry into strings_ttl.jsonl, hll_ttl.jsonl, or buffer it for a wide-
Expand All @@ -85,6 +103,7 @@ const (
redisKindHash
redisKindList
redisKindSet
redisKindZSet
)

// RedisDB encodes one logical Redis database (`redis/db_<n>/`). All
Expand Down Expand Up @@ -185,8 +204,60 @@ type RedisDB struct {
// Finalize into sets/<key>.json with members sorted by raw byte
// order for deterministic dump output.
sets map[string]*redisSetState

// zsets buffers per-userKey sorted-set state. Score lives in the
// !zs|mem| value (8-byte IEEE 754 big-endian); member name is the
// trailing key bytes (binary-safe). Flushed at Finalize into
// zsets/<key>.json sorted by member-name bytes (not by score) so
// `diff -r` between dumps stays line-stable across score-only
// mutations.
zsets map[string]*redisZSetState

// pendingTTL buffers expiries whose user-key prefix sorts AFTER
// `!redis|ttl|` in the snapshot's lex-ordered stream. Pebble
// snapshots emit records in encoded-key order
// (`store/snapshot_pebble.go::iter.First()/Next()`), and
// `!redis|ttl|` lex-sorts before all `!st|`/`!stream|`/`!zs|`
// prefixes (`r` < `s`/`s`/`z`). Without buffering, HandleTTL
// would see kindByKey == redisKindUnknown and count the TTL
// as an orphan, dropping it before zsetState / setState /
// streamState had a chance to claim the user key — TTL'd
// sorted sets, sets, and streams would silently restore as
// permanent.
//
// Lifecycle: HandleTTL files the expiry here when kind is
// still unknown. Each wide-column state-init function
// (setState / zsetState / streamState etc.) drains the entry
// when it first registers the user key. Finalize fires the
// orphan-TTL warning for whatever remains (those keys never
// appeared as a typed record — likely a corrupted store).
pendingTTL map[string]uint64

// pendingTTLCap caps pendingTTL's in-memory size. Once the map
// reaches this many entries, subsequent unknown-kind TTLs fall
// back to incrementing orphanTTLCount directly without
// buffering the user-key bytes. Without this cap, an
// adversarial or corrupt snapshot whose `!redis|ttl|` records
// never find a typed-record claimer would grow pendingTTL
// unboundedly and could OOM the decoder. Codex P1 finding
// surfaced on PR #791 round 2; the same fix is applied here on
// PR #790 because the two PRs share the pendingTTL
// infrastructure and the bug is identical.
pendingTTLCap int

// pendingTTLOverflow counts entries skipped because the
// pendingTTL buffer was at cap. Surfaced in the Finalize
// warning so operators can distinguish "snapshot exceeded the
// buffer cap" from "TTL records remained unmatched".
pendingTTLOverflow int
}

// defaultPendingTTLCap caps pendingTTL at 1 MiB entries by default
// (~64 MiB worst-case memory at ~64 B per Go map entry). Override
// via WithPendingTTLCap for hosts that need a different memory /
// coverage trade-off.
const defaultPendingTTLCap = 1 << 20

// NewRedisDB constructs a RedisDB rooted at <outRoot>/redis/db_<n>/.
// dbIndex selects <n>; today the producer always passes 0, but accepting
// the index as a parameter prevents a future multi-db dump from silently
Expand All @@ -204,9 +275,25 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB {
hashes: make(map[string]*redisHashState),
lists: make(map[string]*redisListState),
sets: make(map[string]*redisSetState),
zsets: make(map[string]*redisZSetState),
pendingTTL: make(map[string]uint64),
pendingTTLCap: defaultPendingTTLCap,
}
}

// WithPendingTTLCap overrides the default cap on the pendingTTL
// buffer. A value of 0 disables buffering — every unknown-kind TTL
// becomes an immediate orphan (matches the pre-pendingTTL behavior).
// Negative inputs are coerced to 0. Returns the receiver so it can
// be chained with other With* setters.
func (r *RedisDB) WithPendingTTLCap(capacity int) *RedisDB {
if capacity < 0 {
capacity = 0
}
r.pendingTTLCap = capacity
return r
}

// WithWarnSink wires a structured-warning sink. The sink is called with
// stable event names ("redis_orphan_ttl", etc.) and key=value pairs.
func (r *RedisDB) WithWarnSink(fn func(event string, fields ...any)) *RedisDB {
Expand Down Expand Up @@ -246,15 +333,32 @@ func (r *RedisDB) HandleHLL(userKey, value []byte) error {
return r.writeBlob("hll", userKey, value)
}

// HandleTTL processes one !redis|ttl|<userKey> record. Routing depends on
// what HandleString/HandleHLL recorded for the same userKey:
// HandleTTL processes one !redis|ttl|<userKey> record. Routing
// depends on what the encoder has previously recorded for the user
// key. There are two ordering regimes the snapshot stream presents:
//
// 1. Prefix sorts BEFORE !redis|ttl| in encoded-key order
// (!hs|, !lst|, !redis|str|, !redis|hll|). The typed record
// arrives FIRST, kindByKey is already set when HandleTTL fires,
// and we route directly to the per-type sidecar / inline field.
// 2. Prefix sorts AFTER !redis|ttl| (!st|, !stream|, !zs|, because
// `r` < `s`/`s`/`z`). The TTL arrives FIRST and kindByKey is
// still redisKindUnknown. We park the expiry in pendingTTL and
// let each wide-column state-init function (setState /
// zsetState / streamState) drain it when the user key finally
// surfaces as a typed record. Codex P1 finding on PR #790.
//
// Routing:
//
// - redisKindHLL -> hll_ttl.jsonl
// - redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL
// lives in !redis|ttl| rather than the inline magic-prefix header)
// - redisKindUnknown -> counted in orphanTTLCount; reported via the
// warn sink on Finalize because Phase 0a's wide-column encoders
// have not landed yet.
// - redisKindHLL -> hll_ttl.jsonl (case 1)
// - redisKindString -> strings_ttl.jsonl (case 1; legacy strings
// whose TTL lives in !redis|ttl| rather than the inline header)
// - redisKindHash/List/Set/ZSet/Stream -> inlined into the
// per-key JSON (case 1 for hash/list, case 2 for set/zset/stream
// where the state-init already drained from pendingTTL before
// HandleTTL would even be called the second time)
// - redisKindUnknown -> bufferPendingTTL. Finalize counts truly
// unmatched entries (key never registered as a typed record).
func (r *RedisDB) HandleTTL(userKey, value []byte) error {
expireAtMs, err := decodeRedisTTLValue(value)
if err != nil {
Expand Down Expand Up @@ -297,17 +401,87 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error {
st.expireAtMs = expireAtMs
st.hasTTL = true
return nil
case redisKindZSet:
// Same per-record TTL inlining: ZADD + EXPIRE replay in
// one shot from the per-zset JSON, no separate sidecar.
st := r.zsetState(userKey)
st.expireAtMs = expireAtMs
st.hasTTL = true
Comment on lines +404 to +409
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Buffer zset TTLs that arrive before zset rows

For expiring sorted sets in real snapshot order, this redisKindZSet branch is not reached: Pebble snapshots are emitted by iterator order (store/snapshot_pebble.go uses iter.First(); iter.Next()), and the TTL key prefix !redis|ttl| sorts before all zset prefixes !zs|.... That means HandleTTL sees redisKindUnknown, increments orphanTTLCount, and discards the expiry before zsetState is created, so every restored zset with a TTL becomes persistent. The zset TTL needs to be buffered/rerouted when the zset row is observed rather than relying on prior type observation.

Useful? React with 👍 / 👎.

return nil
case redisKindUnknown:
// Track orphan TTL counts only — keys are unused before the
// remaining wide-column encoders (set/zset/stream) land, and
// buffering them allocates proportional to user-key size
// (up to 1 MiB per key) for no benefit. Codex P2 round 6.
return r.parkUnknownTTL(userKey, expireAtMs)
}
return nil
}

// parkUnknownTTL buffers a redisKindUnknown TTL into pendingTTL, or
// fails closed when the buffer is at cap. Extracted from HandleTTL's
// switch so the parent stays under the cyclop budget.
//
// Three modes determined by pendingTTLCap:
//
// - cap > 0 and buffer NOT full: store the (userKey, expireAtMs)
// pair so a later wide-column state-init can drain it.
// - cap == 0: counter-only mode. The TTL becomes an immediate
// orphan. Operator-explicit opt-out for callers that prefer
// constant-space orphan counting over the buffered drain path.
// - cap > 0 and buffer FULL: fail closed with
// ErrPendingTTLBufferFull. Silently counting the entry as an
// orphan would permanently lose `expire_at_ms` for the wide-
// column key that arrives later — restored data becomes
// non-expiring without the operator noticing. Codex P1 on PR
// #790 round 5.
//
// Storage: userKey is COPIED (`string([]byte)` allocates) because
// the snapshot reader reuses key buffers across iterations — an
// alias slice would race with the next record.
func (r *RedisDB) parkUnknownTTL(userKey []byte, expireAtMs uint64) error {
if r.pendingTTLCap == 0 {
// Counter-only mode (operator explicitly disabled the buffer).
r.orphanTTLCount++
return nil
}
if len(r.pendingTTL) >= r.pendingTTLCap {
// Fail closed: refuse to silently drop a TTL that may
// belong to a wide-column key arriving later in the
// snapshot scan. The operator should raise the cap
// (WithPendingTTLCap) or investigate the snapshot for
// corruption. We still increment pendingTTLOverflow so
// the Finalize warning surfaces the count even if the
// caller swallows the error.
r.pendingTTLOverflow++
return cockroachdberr.Wrapf(ErrPendingTTLBufferFull,
"buffer at cap=%d (user_key_len=%d)", r.pendingTTLCap, len(userKey))
}
r.pendingTTL[string(userKey)] = expireAtMs
return nil
}

// claimPendingTTL drains any buffered TTL for userKey into the
// caller-provided state. Called by the wide-column state-init
// functions (setState / zsetState / streamState) when they first
// register a user key, so the parked expiry inlines into the same
// per-key JSON the rest of the record assembles.
//
// Returns (expireAtMs, true) when a buffered TTL existed. The
// caller should set state.expireAtMs / state.hasTTL on the
// returned value. The pending entry is removed so Finalize's
// orphan-count loop only sees truly-unmatched TTLs.
//
// Safe to call from hashState/listState too even though those
// types' typed records sort before `!redis|ttl|`; pendingTTL will
// always be empty for them. Keeping the call site uniform keeps
// the state-init contract simple.
func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) {
uk := string(userKey)
expireAtMs, ok := r.pendingTTL[uk]
if !ok {
return 0, false
}
delete(r.pendingTTL, uk)
return expireAtMs, true
}

// Finalize flushes all open sidecar writers and emits warnings for any
// pending TTL records whose user key was never claimed by the wide-column
// encoders. Call exactly once after every snapshot record has been
Expand All @@ -318,6 +492,7 @@ func (r *RedisDB) Finalize() error {
r.flushHashes,
r.flushLists,
r.flushSets,
r.flushZSets,
func() error { return closeJSONL(r.stringsTTL) },
func() error { return closeJSONL(r.hllTTL) },
r.closeKeymap,
Expand All @@ -326,10 +501,26 @@ func (r *RedisDB) Finalize() error {
firstErr = err
}
}
// At this point all type-prefixed records have been processed
// and every wide-column state-init drained its claimPendingTTL.
// Whatever remains in pendingTTL is truly unmatched — the
// user key never appeared as a typed record. Likely causes:
// store corruption, a snapshot mid-write where the typed
// record was dropped, or a `!redis|ttl|` entry written for a
// key whose type prefix we don't recognise (a future Redis
// type added on the live side without a backup-encoder update).
r.orphanTTLCount += len(r.pendingTTL)
if r.warn != nil && r.orphanTTLCount > 0 {
r.warn("redis_orphan_ttl",
fields := []any{
"count", r.orphanTTLCount,
"hint", "remaining wide-column encoders (zset/stream) have not landed yet")
"hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix",
}
if r.pendingTTLOverflow > 0 {
fields = append(fields,
"pending_ttl_buffer_overflow", r.pendingTTLOverflow,
"pending_ttl_buffer_cap", r.pendingTTLCap)
}
r.warn("redis_orphan_ttl", fields...)
}
return firstErr
}
Expand Down
Loading
Loading