backup: Redis stream encoder (Phase 0a)#791
Conversation
Decodes !stream|meta|/!stream|entry| snapshot records into per-stream streams/<key>.jsonl files per the Phase 0 design (lines 336-344). Mirrors the hash/list/set/zset encoders (#725/#755/#758/#790). Wire format: - !stream|meta|<userKeyLen(4)><userKey> -> 24-byte BE Length(8) || LastMs(8) || LastSeq(8) - !stream|entry|<userKeyLen(4)><userKey><ms(8)><seq(8)> -> magic-prefixed pb.RedisStreamEntry protobuf (0x00 'R' 'X' 'E' 0x01 || pb.Marshal(...)) Output is JSONL -- one record per line, sorted by (ms, seq) -- plus a trailing _meta terminator that captures length, last_ms, last_seq, and expire_at_ms (the design's pattern at line 338-339). The interleaved (name, value) field list from the protobuf decodes into the "fields" JSON object matching the design example. Per-line JSONL was chosen over per-entry files because real streams routinely hold tens of thousands of entries (one file per entry would dominate tar + find runtime by inode pressure). Fail-closed behavior: - Magic-prefix missing on an entry value -> ErrRedisInvalidStreamEntry. The live store always writes the prefix; its absence indicates corruption or a stale legacy value. Decoding raw protobuf without the prefix would either silently misparse or panic inside protobuf. - Odd field count -> ErrRedisInvalidStreamEntry. Live XADD enforces even arity at the wire level; an odd count at backup time would silently drop the dangling field if accepted. - Meta value of wrong length / overflow -> ErrRedisInvalidStreamMeta (same shape as the hash/list/set/zset overflow guards). - Entry key without the trailing 16-byte StreamID -> ErrRedisInvalidStreamKey. TTL routing: !redis|ttl|<userKey> for a registered stream key folds into the JSONL _meta terminator's expire_at_ms field, matching the design's line 341-344 explicit requirement. Without this routing, a TTL'd stream would silently restore as permanent. Self-review: 1. Data loss -- magic-prefix, even-arity, and overflow guards all fail closed. Entry value is cloned (cloneStringSlice on the protobuf output) so a follow-up edit to the snapshot buffer cannot mutate emitted state. 2. Concurrency -- RedisDB is sequential per scope; no shared state. 3. Performance -- per-stream state in a slice (not a map) so XADD- order accumulation costs O(n); sort at flush is O(n log n) on (ms, seq). JSONL output is streamed via bytes.Buffer (single allocation grow). Matches list/zset cost shape. 4. Consistency -- entries sorted by (ms, seq) tuple, NOT by formatted string (sorting "10-0" vs "2-0" lexicographically would emit them out of XADD order); _meta last_ms/last_seq preserved verbatim so a restorer keeps XADD '*' monotonicity. JSONL terminator marker _meta:true is on a dedicated line so streaming consumers can detect end-of-stream without reading the whole file. 5. Coverage -- 14 table-driven tests under redis_stream_test.go: - round-trip basic (out-of-order entries, sorted at flush) - fields decoded to JSON object (design example match) - empty stream still emits file - TTL inlining into _meta.expire_at_ms - length-mismatch warning - malformed meta length / overflow / MaxInt64 boundary - missing magic prefix rejection - odd field count rejection - malformed entry key (wrong StreamID suffix length) - entries-without-meta still emit file - ID wire format ("<ms>-<seq>" decimal) - multi-stream user-key sort order Caller audit for semantics-changing edit (new case redisKindStream branch in HandleTTL, redis_string.go:309): purely additive -- the new branch fires only when streamState() has previously registered the key. No prior call site changes behavior. Verified via 'grep -n redisKindStream internal/backup/': three refs, all new in this PR.
|
@claude review |
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @bootjp's task in 3m 45s —— View job Code Review — PR #791: backup: Redis stream encoder (Phase 0a)Tasks
The encoder follows the established hash/list/set pattern cleanly and the fail-closed guards, overflow checks, and TTL routing are all correct. Two data-loss issues need fixing before merge; everything else is minor. Critical — Data loss1. Duplicate field names silently dropped (
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b2d0b82c74
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| case redisKindStream: | ||
| // Same per-record TTL inlining: XADD + EXPIRE replay in | ||
| // one shot from the per-stream JSONL `_meta` terminator, | ||
| // no separate sidecar. | ||
| st := r.streamState(userKey) | ||
| st.expireAtMs = expireAtMs | ||
| st.hasTTL = true |
There was a problem hiding this comment.
Buffer stream TTLs that arrive before stream keys
When processing the sorted snapshot stream, !redis|ttl|<key> sorts before !stream|meta|.../!stream|entry|..., so kindByKey is still redisKindUnknown when HandleTTL sees a TTL for a stream. This new stream case only works if the caller happens to dispatch stream records before the TTL record; in the normal sorted pipeline the TTL is counted as an orphan and the later _meta line is emitted with expire_at_ms:null, so TTL'd streams restore as permanent.
Useful? React with 👍 / 👎.
| fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth) | ||
| for i := 0; i+1 < len(e.fields); i += xaddPairWidth { | ||
| fieldsMap[e.fields[i]] = e.fields[i+1] |
There was a problem hiding this comment.
Preserve duplicate stream fields instead of map-collapsing
For streams created with duplicate field names (e.g. XADD s * f v1 f v2), the adapter stores and returns the interleaved entry.Fields slice verbatim, but this projection overwrites earlier values in a map[string]string. Such entries are valid because parseXAddFields only enforces even arity, so the backup silently drops duplicate field/value pairs and a restore would not reproduce the stream entry.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request implements a Redis stream encoder that translates raw snapshot records into per-stream JSONL files, including support for metadata, entries, and TTL inlining. The reviewer provided several performance-oriented suggestions aimed at reducing memory allocations and GC pressure during scanning. These include pre-allocating prefix byte slices, optimizing map lookups to avoid redundant string conversions, and utilizing json.Encoder with reused maps to process large streams more efficiently.
| redisStreamProtoPrefixLen = 5 | ||
| ) | ||
|
|
||
| var redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01} |
There was a problem hiding this comment.
To avoid repeated []byte(string) allocations during snapshot scanning, these prefixes should be pre-allocated as package-level byte slices, following the pattern established in store/stream_helpers.go.
| var redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01} | |
| var ( | |
| redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01} | |
| redisStreamMetaPrefixBytes = []byte(RedisStreamMetaPrefix) | |
| redisStreamEntryPrefixBytes = []byte(RedisStreamEntryPrefix) | |
| ) |
| uk := string(userKey) | ||
| if st, ok := r.streams[uk]; ok { | ||
| return st | ||
| } |
There was a problem hiding this comment.
This lookup allocates a new string for every stream entry processed. Since Redis streams can contain tens of thousands of entries, this creates significant GC pressure. Using the m[string(b)] compiler optimization for the initial check avoids allocation for all entries after the first one for each stream.
| uk := string(userKey) | |
| if st, ok := r.streams[uk]; ok { | |
| return st | |
| } | |
| if st, ok := r.streams[string(userKey)]; ok { | |
| return st | |
| } | |
| uk := string(userKey) |
| // family — streams update meta in-place rather than via per-XADD | ||
| // deltas — so we do not need a delta-skip guard here. | ||
| func parseStreamMetaKey(key []byte) ([]byte, bool) { | ||
| rest := bytes.TrimPrefix(key, []byte(RedisStreamMetaPrefix)) |
| // userKeyLen prefix, then peels off the trailing 16-byte StreamID | ||
| // (ms || seq). Returns (userKey, ms, seq, true) on success. | ||
| func parseStreamEntryKey(key []byte) ([]byte, uint64, uint64, bool) { | ||
| rest := bytes.TrimPrefix(key, []byte(RedisStreamEntryPrefix)) |
| var buf bytes.Buffer | ||
| const xaddPairWidth = 2 // (name, value) — XADD enforces even arity | ||
| for _, e := range st.entries { | ||
| fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth) | ||
| for i := 0; i+1 < len(e.fields); i += xaddPairWidth { | ||
| fieldsMap[e.fields[i]] = e.fields[i+1] | ||
| } | ||
| rec := streamEntryJSON{ | ||
| ID: formatStreamID(e.ms, e.seq), | ||
| Fields: fieldsMap, | ||
| } | ||
| line, err := json.Marshal(rec) | ||
| if err != nil { | ||
| return nil, cockroachdberr.WithStack(err) | ||
| } | ||
| buf.Write(line) | ||
| buf.WriteByte('\n') | ||
| } | ||
| meta := streamMetaJSON{ | ||
| Meta: true, | ||
| Length: st.length, | ||
| LastMs: st.lastMs, | ||
| LastSeq: st.lastSeq, | ||
| } | ||
| if st.hasTTL { | ||
| ms := st.expireAtMs | ||
| meta.ExpireAtMs = &ms | ||
| } | ||
| line, err := json.Marshal(meta) | ||
| if err != nil { | ||
| return nil, cockroachdberr.WithStack(err) | ||
| } | ||
| buf.Write(line) | ||
| buf.WriteByte('\n') |
There was a problem hiding this comment.
For large streams, using json.Encoder is more efficient than json.Marshal as it writes directly to the buffer and avoids intermediate byte slice allocations. Additionally, reusing the fieldsMap across entries significantly reduces allocations in the hot loop. Ensure that the number of entries in the stream is bounded to prevent potential OOM issues.
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.SetEscapeHTML(false)
const xaddPairWidth = 2 // (name, value) — XADD enforces even arity
fieldsMap := make(map[string]string)
for _, e := range st.entries {
for k := range fieldsMap {
delete(fieldsMap, k)
}
for i := 0; i+1 < len(e.fields); i += xaddPairWidth {
fieldsMap[e.fields[i]] = e.fields[i+1]
}
rec := streamEntryJSON{
ID: formatStreamID(e.ms, e.seq),
Fields: fieldsMap,
}
if err := enc.Encode(rec); err != nil {
return nil, cockroachdberr.WithStack(err)
}
}
meta := streamMetaJSON{
Meta: true,
Length: st.length,
LastMs: st.lastMs,
LastSeq: st.lastSeq,
}
if st.hasTTL {
ms := st.expireAtMs
meta.ExpireAtMs = &ms
}
if err := enc.Encode(meta); err != nil {
return nil, cockroachdberr.WithStack(err)
}References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests.
…d records Pebble snapshots are emitted in encoded-key order (store/snapshot_pebble.go::iter.First()+Next()), and `!redis|ttl|` lex-sorts BEFORE every wide-column prefix where the type letter is >= 's' (`!st|` set, `!stream|`, `!zs|` zset — because `r` < `s`/`z`). The original HandleTTL routed any unknown-kind expiry straight into orphanTTLCount, so for sets, streams, and zsets the TTL was DROPPED before zsetState/setState/streamState could claim it. Restored sets/zsets/streams with TTL became permanent. Codex P1 finding on PR #790. The same bug exists in the already- merged set encoder (PR #758); this commit fixes both retroactively. Stream encoder (PR #791) inherits the fix once rebased. Fix: HandleTTL parks unknown-kind expiries in a new pendingTTL map. Each wide-column state-init that may face the bad ordering (setState, zsetState — and streamState once PR #791 lands) drains the entry on first user-key registration via claimPendingTTL(). Finalize counts whatever remains in pendingTTL as truly-unmatched orphans (a TTL whose user key never appeared in any typed record — indicates store corruption or an unknown type prefix). Semantic-change caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch: previously incremented orphanTTLCount immediately; now buffers and lets Finalize count. All callers: only the per-record dispatcher in cmd/elastickv-snapshot-decode (not yet built — Phase 0a follow- up). No external caller mutates orphanTTLCount today. - TestRedisDB_OrphanTTLCountedNotBuffered: updated to assert intake-time orphanTTLCount==0 + pendingTTL grows, then post- Finalize orphanTTLCount==n. - New caller claimPendingTTL: called only by zsetState and setState in this PR. hashState/listState don't call it because their type prefixes (`!hs|`/`!lst|`) lex-sort BEFORE `!redis|ttl|` so the typed record arrives first; pendingTTL is always empty for them. Verified via `grep -n 'claimPendingTTL' internal/backup/`. New tests: - TestRedisDB_ZSetTTLArrivesBeforeRows — pins the fix for zsets. - TestRedisDB_SetTTLArrivesBeforeRows — retroactive coverage for PR #758's set encoder. - TestRedisDB_OrphanTTLCountsTrulyUnmatchedKeys — pins the new Finalize-time orphan semantics. Self-review: 1. Data loss — the original code DROPPED real TTL'd sets/zsets/streams on every backup. This fix recovers them. No new data-loss surface introduced. 2. Concurrency — pendingTTL is added to RedisDB which is already sequential-per-scope; no new locking required. 3. Performance — pendingTTL holds (string-userKey, uint64-expireAt) pairs. The string allocation cost matches kindByKey's, which we already pay for every typed record. The original P2 round 6 concern (don't buffer arbitrarily-large value payloads) is preserved: we still only buffer 8-byte expiry, not value bytes. 4. Consistency — drain happens at FIRST state registration so a later HandleTTL re-arrival (which would route through the redisKindSet/ZSet case in HandleTTL) is a no-op (st.expireAtMs gets overwritten with the same value). 5. Coverage — 3 new tests + 1 updated test. All 64 redis tests pass.
…ield preservation Two P1 findings from chatgpt-codex on PR #791: P1a: Buffer stream TTLs that arrive before stream rows Pebble snapshots emit records in encoded-key order (store/snapshot_pebble.go::iter.First()+Next()), and `!redis|ttl|` lex-sorts BEFORE `!stream|...` because `r` < `s`. In real snapshot order the TTL arrives FIRST, kindByKey is still redisKindUnknown when HandleTTL fires, and the original code counted the TTL as an orphan and dropped it — every TTL'd stream restored as permanent. Same root cause as the set encoder's latent bug in PR #758. This commit adds a pendingTTL infrastructure (matching the parallel fix on PR #790) so the expiry parks during the redisKindUnknown window and drains when streamState first registers the user key. The set encoder gets the same retroactive drain. P1b: Preserve duplicate stream fields instead of map-collapsing XADD permits duplicate field names within one entry (e.g. `XADD s * f v1 f v2`). The protobuf entry stores the interleaved slice verbatim, but my marshalStreamJSONL collapsed pairs into `map[string]string`, silently dropping every duplicate. A restore of such an entry would lose the second (and later) pair. Fix: emit `fields` as a JSON ARRAY of `{name, value}` records (streamFieldJSON). Order is the protobuf's interleaved order so a restore can replay the original XADD argv exactly. The design example at docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:338 showed object form. That representation was unsafe for streams (though fine for hashes where the wire-level encoder normalises field names earlier). The format is owned by Phase 0 — adjusted in this PR before the format ships any consumers. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch: same semantic change as PR #790's r1 — previously incremented orphanTTLCount on intake; now buffers in pendingTTL and lets Finalize count at end. Same audit conclusion: no external callers of orphanTTLCount; TestRedisDB_OrphanTTLCountedNotBuffered updated to assert the new intake/Finalize split. - streamEntryJSON.Fields type change `map → slice`: only marshalled by encoding/json; the only reader is the test suite, which is updated in this commit. No on-disk format compatibility concerns because Phase 0 has not shipped a consumer yet. - New caller claimPendingTTL: called by setState (retroactive) and streamState (new) in this PR. hashState/listState don't call it because their type prefixes lex-sort BEFORE `!redis|ttl|`. Verified via `grep -n 'claimPendingTTL' internal/backup/`. New tests: - TestRedisDB_StreamDuplicateFieldsPreserved — pins P1b fix. - TestRedisDB_StreamTTLArrivesBeforeRows — pins P1a fix for streams. - TestRedisDB_SetTTLArrivesBeforeRows — retroactive coverage for PR #758's set encoder (same root cause as the stream bug). - TestRedisDB_StreamFieldsDecodedToArray (renamed from ToObject) — updated to match the array shape. Self-review: 1. Data loss — the original code DROPPED real TTL'd streams on every backup AND dropped duplicate-field entries' later pairs. This fix recovers both. No new data-loss surface introduced. 2. Concurrency — pendingTTL added to RedisDB which is already sequential-per-scope; no new locking required. 3. Performance — pendingTTL holds (string-userKey, uint64-expireAt) pairs; same allocation shape as kindByKey. Fields slice replaces a map of the same logical size — slightly cheaper actually (no hash overhead). 4. Consistency — drain happens at FIRST state registration. The array form preserves insertion order from the protobuf so the restored XADD argv matches. 5. Coverage — 4 new tests + 2 updated. All 78 redis tests pass.
|
@claude review |
|
Claude finished @bootjp's task in 2m 35s —— View job Code Review — PR #791: backup: Redis stream encoder (Phase 0a)Tasks
The encoder is well-structured and the two P1 issues from the previous review round have both been fixed and tested. One data-loss issue remains open. Everything else is minor/informational. Previously Fixed — ConfirmedDuplicate field names (prior P1): TTL arrives before stream rows (Codex P1): Critical — Data lossNon-UTF-8 field names and values silently corrupted (
|
…lob layout
Codex flagged that the wide-column zset encoder skips the legacy
consolidated single-key blob layout the live store still writes.
A zset stored only as `!redis|zset|<userKey>` (with the magic-
prefixed pb.RedisZSetValue body) is silently dropped from backup
output and any inline TTL becomes an orphan — user-visible
sorted-set data loss.
Live-side references (adapter, not changed by this commit):
- adapter/redis_compat_types.go:82 — redisZSetPrefix
- adapter/redis_compat_commands.go:3495-3508 — writes the blob for
non-empty persisted zset updates
- adapter/redis_compat_helpers.go:610-631 — reads it as the
fallback when no wide-column members exist
Fix: new public RedisDB.HandleZSetLegacyBlob method that decodes
the magic-prefixed pb.RedisZSetValue and registers the same per-
member state HandleZSetMember would. The wide-column merge case
(mid-migration snapshot containing BOTH layouts for the same user
key) works because `!redis|zset|` sorts BEFORE `!zs|...` so the
blob arrives first and wide-column rows then update / add members
via the latest-wins map.
Inline TTL: `!redis|zset|<k>` sorts BEFORE `!redis|ttl|<k>`, so
HandleTTL after this handler sees redisKindZSet already and
folds via the case-redisKindZSet branch. No pendingTTL detour
needed for this ordering.
Fail-closed contract (matches existing wide-column path):
- Missing magic prefix → ErrRedisInvalidZSetLegacyBlob
- Unmarshal error → ErrRedisInvalidZSetLegacyBlob
- NaN score → ErrRedisInvalidZSetLegacyBlob (Redis ZADD
rejects NaN at wire level)
Caller audit (per /loop standing instruction): new public method
HandleZSetLegacyBlob has no external callers. Verified via
'grep -rn HandleZSetLegacyBlob --include=*.go' — all matches inside
the test file in this PR. The cmd/elastickv-snapshot-decode
dispatcher (Phase 0a follow-up, not yet built) will route the
`!redis|zset|` prefix to this handler.
Parallel bug class: the SAME issue exists for `!redis|hash|`,
`!redis|set|`, and `!redis|stream|` legacy blob prefixes. Those
encoders shipped in earlier PRs (#725, #758, #791). Each needs
its own legacy-blob handler in a follow-up PR; this commit fixes
only the zset case codex flagged on PR #790.
New tests:
- TestRedisDB_ZSetLegacyBlobRoundTrip — basic round-trip
- TestRedisDB_ZSetLegacyBlobThenWideColumnMerges — mid-migration
- TestRedisDB_ZSetLegacyBlobWithInlineTTL — TTL ordering
- TestRedisDB_ZSetLegacyBlobRejectsMissingMagic — fail-closed
- TestRedisDB_ZSetLegacyBlobRejectsNaNScore — fail-closed
- TestRedisDB_ZSetLegacyBlobRejectsMalformedKey — fail-closed
Self-review:
1. Data loss — exact opposite: this commit RECOVERS zsets that
were silently dropped. New fail-closed guards prevent silently
importing a corrupt blob.
2. Concurrency — no new shared state; per-DB sequential as before.
3. Performance — one protobuf Unmarshal per legacy zset key
(same as the live read path). Member map shares the same
latest-wins behavior as wide-column intake.
4. Consistency — merge order (blob first, wide-column second) is
determined by snapshot lex order; tested explicitly.
5. Coverage — 6 new tests. All 84 redis tests pass.
…fields Claude-bot Critical finding (PR #791 round 2): the stream encoder emitted field names and values as plain Go strings, and `json.Marshal(string)` silently substitutes U+FFFD for every ill-formed UTF-8 byte sequence. A future schema migration switching `pb.RedisStreamEntry.Fields` from `repeated string` to `bytes`, or a code path that bypasses the proto marshaler's UTF-8 validation, would surface as silent backup corruption of binary stream payloads (sensor data, legacy encodings, raw bytes). The hash encoder already solves this via `marshalRedisBinaryValue` + `json.RawMessage`, which emits non-UTF-8 bytes as `{"base64":"..."}` envelopes (redis_hash.go:235-238). This commit applies the same projection to streams. Changes: - streamFieldJSON.Name/Value: string -> json.RawMessage. - New buildStreamFieldRecords helper: extracted the per-pair marshaling out of marshalStreamJSONL so the projection is unit-testable independently of the protobuf pipeline (proto3 string fields enforce UTF-8 at gproto.Marshal so we cannot push binary bytes through that path). - extractStreamFieldsAsPairs test helper now accepts EITHER a plain JSON string OR the `{"base64":"..."}` envelope, hiding the per-pair envelope detection from per-test assertions. A new decodeRedisBinaryEnvelope helper handles the base64url reversal. Caller audit (per /loop standing instruction): buildStreamFieldRecords is private to the package. streamFieldJSON's two-field shape did not change publicly visible behavior (the JSON output for UTF-8 fields is byte-identical to before; only non-UTF-8 fields now route through the envelope). The only consumer of streamFieldJSON is marshalStreamJSONL (line 369). Verified via `grep -rn buildStreamFieldRecords|streamFieldJSON internal/backup/` - all matches inside the same file or the test file. New test: - TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip exercises buildStreamFieldRecords directly with non-UTF-8 input. Marshals one streamEntryJSON, re-parses, and asserts byte-identical round-trip via the binary envelope. Existing TestRedisDB_StreamFieldsDecodedToArray and TestRedisDB_StreamDuplicateFieldsPreserved still pin the UTF-8 plain-string path. Self-review: 1. Data loss - opposite of the original concern: this commit preserves binary stream bytes that the previous code would have mangled into U+FFFD. The protobuf wire format prevents the bad bytes from reaching us today, but the projection now matches the hash encoder's defensive shape. 2. Concurrency - no new shared state. 3. Performance - one extra allocation per (name, value) pair for the marshaled RawMessage. Matches hash encoder's cost shape; bounded by maxWideColumnItems on the live side. 4. Consistency - stream encoder's binary-safe handling now matches the hash encoder's. A future format-version bump can canonicalize the JSON projection across all wide-column types. 5. Coverage - 1 new test pinning the projection; existing round-trip and duplicate-fields tests confirm no UTF-8 regression.
|
@claude review |
|
Claude finished @bootjp's task in 3m 6s —— View job Code Review — PR #791: backup: Redis stream encoder (Phase 0a) — Round 3Tasks
All three P1 issues from prior rounds are now fixed and regression-tested. One medium issue from both previous reviews is still open. Everything else is minor. Previously Critical — All Confirmed FixedDuplicate field names — Non-UTF-8 field corruption — TTL arrives before stream rows — Medium — Still Open
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e5bd65048f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // allocates) rather than the alias slice — the snapshot | ||
| // reader reuses key buffers across iterations, so a slice | ||
| // alias would race with the next record. | ||
| r.pendingTTL[string(userKey)] = expireAtMs |
There was a problem hiding this comment.
Avoid buffering all unknown TTL keys in memory
The new redisKindUnknown branch stores every unmatched TTL in pendingTTL, which keeps a copied string(userKey) alive until Finalize. In this repository snapshot there is still no zset backup handler under internal/backup, so TTLs for expiring sorted sets never get drained and accumulate for the entire run; on large datasets this turns the previous constant-space orphan path into O(number of unknown TTL keys) memory and can exhaust RAM before the backup finishes.
Useful? React with 👍 / 👎.
Codex P1 on round 2: the pendingTTL buffer introduced in r1 grew without bound. An adversarial or corrupt snapshot whose `!redis|ttl|<k>` records never find a typed-record claimer would accumulate (string-userKey, uint64-expireAt) entries until the decoder runs out of memory. The reviewer notes that this PR's branch has no zset handler yet, so every zset TTL accumulates for the whole run; on a real cluster's snapshot that's potentially millions of buffered keys. Fix: cap pendingTTL at pendingTTLCap entries (default 1 MiB entries, ~64 MiB worst-case memory). Once the cap is reached, subsequent unknown-kind TTLs fall back to the original immediate- orphan-count path without buffering the user-key bytes. Already- buffered entries can still be drained by later state-inits; new entries beyond the cap are counted but not buffered. API: - new RedisDB.WithPendingTTLCap(int) chainable setter. - new pendingTTLOverflow counter (surfaced in the orphan-TTL warning). - Finalize's warning includes the overflow count and the cap so an operator can detect a snapshot that exceeded the buffer budget. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch behavior changes: previously ALWAYS buffered, now buffers up to cap then orphan-counts. Other branches unchanged. - Callers of HandleTTL: only test files and the future cmd/elastickv-snapshot-decode driver. No external caller depends on the unbounded-buffering shape; the cap default (1 MiB) covers every realistic legitimate workload, and the WithPendingTTLCap override lets callers tune to their host budget. - New helper parkUnknownTTL is package-private with one call site (HandleTTL line 401). No prior call sites to audit. - Verified via `grep -rn HandleTTL|pendingTTLCap|WithPendingTTLCap --include=*.go internal/backup/`. New tests: - TestRedisDB_PendingTTLBoundedByCap drives 2*cap unknown TTLs and asserts: pendingTTL stays at cap, orphanTTLCount tracks the overflow at intake, post-Finalize total == 2*cap. - TestRedisDB_WithPendingTTLCapZeroDisablesBuffering pins that cap==0 reverts to the original immediate-orphan path. - TestRedisDB_WithPendingTTLCapNegativeCoercedToZero pins input sanitisation. Self-review: 1. Data loss - the cap can mis-classify TTLs that arrive in the overflow window AND would have been drained by a later state-init. Mitigation: the default cap (1 MiB entries) is well above the count of legitimately-buffered TTL'd wide-column keys on any real cluster; operators with abnormal workloads can raise the cap via WithPendingTTLCap. 2. Concurrency - no new shared state. 3. Performance - one extra comparison per unknown-kind TTL. Bounded memory is the win. 4. Consistency - parkUnknownTTL keeps the userKey-copy contract (allocates string) when buffering, matches the prior intent. 5. Coverage - 3 new tests + 1 updated test (TestRedisDB_OrphanTTLCountedNotBuffered already pins the buffered + drained-at-Finalize semantics under the default cap).
|
@claude review |
|
Claude finished @bootjp's task in 2m 25s —— View job Code Review — PR #791: backup: Redis stream encoder (Phase 0a) — Round 4Tasks
The pendingTTL OOM fix from the latest commit is clean, well-structured, and fully tested. All three P1 issues from prior rounds remain closed. One medium issue is still open for the fourth consecutive round; two new minor issues surface in the OOM fix. Previously Fixed — All ConfirmedNon-UTF-8 field corruption — Duplicate field names — TTL ordering (streams + sets) — pendingTTL OOM (Codex P1, latest commit) — Medium — Still Open (4th round)
|
Codex flagged the same P1 on PR #791 round 2 — pendingTTL grows unboundedly when no zset handler is present to drain the parked TTLs. The same bug exists on PR #790's branch (the two PRs share the pendingTTL infrastructure; the codex finding's text applies verbatim). Fix: cap pendingTTL at pendingTTLCap entries (default 1 MiB). Once the cap is reached, subsequent unknown-kind TTLs fall back to immediate-orphan counting via the new parkUnknownTTL helper. New API: - RedisDB.WithPendingTTLCap(int) chainable setter. - pendingTTLOverflow counter surfaced in the orphan-TTL warning. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch behavior changes: previously ALWAYS buffered, now buffers up to cap then orphan-counts. Other branches unchanged. - Callers of HandleTTL: tests only. The cmd/elastickv-snapshot-decode driver (Phase 0a follow-up) will be the first production caller. - New helper parkUnknownTTL is package-private with one call site (HandleTTL). No prior call sites to audit. - Verified via `grep -rn HandleTTL|pendingTTLCap|WithPendingTTLCap --include=*.go internal/backup/`. New tests: - TestRedisDB_PendingTTLBoundedByCap. - TestRedisDB_WithPendingTTLCapZeroDisablesBuffering. - TestRedisDB_WithPendingTTLCapNegativeCoercedToZero. Self-review: 1. Data loss - cap can mis-classify TTLs in the overflow window that COULD have been drained by a later state-init. Mitigation: default cap (1 MiB) is well above the count of legitimately- buffered wide-column TTL'd keys on real clusters; operators can tune via WithPendingTTLCap. 2. Concurrency - no new shared state. 3. Performance - one extra comparison per unknown-kind TTL. 4. Consistency - mirrors the fix landing on PR #791 r3; when either PR merges first the other can rebase cleanly. 5. Coverage - 3 new tests + the existing TestRedisDB_OrphanTTLCountedNotBuffered (still pins the default-cap buffered + drained-at-Finalize semantics).
Mirror of PR #790 round 5 codex P1 fix: the cap-with-silent-orphan fallback introduced in r3 is itself a data-loss path. In real Pebble snapshot order (`!redis|ttl|` < `!st|`/`!stream|`/`!zs|`), an overflowed TTL likely belongs to a wide-column key arriving later in the scan; dropping it permanently loses `expire_at_ms`. Fix: replace the silent-orphan fallback with fail-closed semantics. Three modes: - cap > 0, buffer NOT full: buffer the entry as before. - cap == 0: counter-only mode (operator explicit opt-out). - cap > 0, buffer FULL: return ErrPendingTTLBufferFull. Operator must raise WithPendingTTLCap or accept counter-only via cap=0. Caller audit (per /loop standing instruction): same as the parallel PR #790 r5 fix. HandleTTL's redisKindUnknown branch CAN return a non-nil error in a new condition. All call sites check the error (grep -rn '\.HandleTTL(' --include=*.go). parkUnknownTTL changes signature from `func(...)` to `func(...) error`; single caller (HandleTTL) updated. Test renames: - TestRedisDB_PendingTTLBoundedByCap → TestRedisDB_PendingTTLFailsClosedAtCap - TestRedisDB_WithPendingTTLCapZeroDisablesBuffering → TestRedisDB_WithPendingTTLCapZeroOpts Self-review: same shape as PR #790 r5 commit's review. Data loss avoidance via fail-closed; semantics match CLAUDE.md's pattern for replication/HLC/snapshot invariants.
|
@claude review |
Code Review — PR #791: backup: Redis stream encoder (Phase 0a) — Round 5
|

Summary
Adds the Redis stream encoder for the Phase 0 logical snapshot
decoder (
docs/design/2026_04_29_proposed_snapshot_logical_decoder.mdlines 336-344). Mirrors the hash/list/set/zset encoders shipped in
#725/#755/#758/#790. After this lands, the only Phase 0a Redis work
remaining is HLL TTL routing (the HLL blob path itself shipped in
#713; what's missing is
hll_ttl.jsonlsidecar wiring per designline 345-348).
Wire format mirrors
store/stream_helpers.goandadapter/redis_storage_codec.go:!stream|meta|<userKeyLen(4)><userKey>→24-byte BE
Length(8) || LastMs(8) || LastSeq(8)!stream|entry|<userKeyLen(4)><userKey><ms(8)><seq(8)>→magic-prefixed
pb.RedisStreamEntryprotobuf(
0x00 'R' 'X' 'E' 0x01 || pb.Marshal(...))Output is JSONL — one record per line, sorted by
(ms, seq)— plus atrailing
_metaterminator:{"id":"1714400000000-0","fields":{"event":"login","user":"alice"}} {"_meta":true,"length":2,"last_ms":1714400000001,"last_seq":0,"expire_at_ms":null}The interleaved
(name, value)protobuf field list decodes into the"fields"JSON object matching the design example at line 338. JSONLwas chosen over per-entry files because real streams routinely hold
tens of thousands of entries — one file per entry would dominate
tar/findruntime by inode pressure.Fail-closed behavior
ErrRedisInvalidStreamEntry.Live store always writes the prefix; absence indicates corruption
or a stale legacy value. Decoding raw protobuf without the prefix
would either silently misparse or panic inside protobuf.
ErrRedisInvalidStreamEntry. Live XADDenforces even arity at the wire level; an odd count at backup time
would silently drop the dangling field if we accepted it.
ErrRedisInvalidStreamMeta(same shape as hash/list/set/zset overflow guards).
ErrRedisInvalidStreamKey.TTL routing
!redis|ttl|<userKey>for a registered stream folds into the JSONL_meta.expire_at_msfield — design line 341-344's explicitrequirement. Without this routing, a TTL'd stream restores as
permanent.
Self-review (5 lenses)
closed. Field slice cloned at decode so snapshot buffer mutations
can't bleed into emitted state.
RedisDBis sequential per scope;no shared state.
accumulation; sort at flush is O(n log n) on
(ms, seq). JSONLvia
bytes.Buffersingle growth. Matches list/zset cost shape.(ms, seq)tuple, notstring ("10-0" < "2-0" lexicographically would emit out of XADD
order).
_meta.last_ms/last_seqpreserved verbatim so XADD*monotonicity survives restore.internal/backup/redis_stream_test.go.Caller audit (per
/loopstanding instruction)Semantics-changing edit: new
case redisKindStream:branch inHandleTTL(redis_string.go:309). Purely additive. Verified:Three references, all new in this PR. No prior caller maps to
redisKindStream.Test plan
go test -race ./internal/backup/→ okgolangci-lint run ./internal/backup/...→ 0 issuesgo build ./...→ okgo vet ./internal/backup/...→ ok