feat(session-ingest): stream large payloads via R2 + Queue#884
Merged
feat(session-ingest): stream large payloads via R2 + Queue#884
Conversation
Support payloads up to 5GB by streaming request bodies to R2 (zero worker memory) and processing asynchronously via Queue consumer with streaming JSON parser. - Ingest route streams body to R2 and enqueues processing message - Queue consumer parses items one at a time via @streamparser/json - Items > 1.94MB stored in R2 with reference in DO SQLite - Items > 50MB skipped with warning (no full materialization) - Single item failure doesn't stop processing the rest - Timestamp guard (ingestedAt) prevents stale retries overwriting newer data - DO getAll() resolves R2-backed items; new getAllStream() for row-by-row export - Schema migration adds item_data_r2_key and ingested_at columns - Integration tests via @cloudflare/vitest-pool-workers
Remove getAll(), rewrite getAllStream() to stream R2 object bodies chunk-by-chunk instead of buffering via .text(). Build snapshot JSON incrementally in 3 phases: scan refs from SQLite, group into message/part structure, stream JSON with R2 body piping. Replace RPC exportSession (32MB DO limit) with secret-protected HTTP route /internal/session/:sessionId/export using service binding fetch. Uses crypto.subtle.timingSafeEqual for secret comparison. Keep createSessionForCloudAgent and deleteSessionForCloudAgent as RPC (small payloads, no streaming needed).
Contributor
Code Review SummaryStatus: 2 Issues Found | Recommendation: Address before merge Overview
Fix these issues in Kilo Cloud Issue Details (click to expand)WARNING
Other Observations (not in diff)None. Files Reviewed (15 files)
|
Replace 3-phase approach (scan all refs into memory, group, stream) with single-pass that queries SQLite incrementally and streams JSON directly: session row first, then cursor-iterate messages one at a time, querying parts per message via LIKE prefix. Only one row + one R2 stream in flight at any time.
Replace buffered JSONParser with Tokenizer + per-item TokenParser. Items from $.data[] are now processed one at a time between chunk reads instead of being collected into an array first. Per-item byte budget via offset tracking: if an item exceeds MAX_SINGLE_ITEM_BYTES, its tokens are discarded without ever building a JS object, preventing OOM from oversized items. A fresh TokenParser is created per item so budget violations don't corrupt parser state.
Delete share-output.ts, ingest-batching.ts and their tests. Remove MAX_DO_INGEST_CHUNK_BYTES, byteLengthUtf8, SessionSyncInputSchema.
…st-streaming # Conflicts: # cloudflare-session-ingest/package.json
- Use TextEncoder byte length for DO SQLite row limit check (non-ASCII safety) - Escape LIKE wildcards in msgId for part queries - Clean up R2 item blobs in SessionIngestDO.clear()
…cs comment
- Use sql template with ESCAPE '\' for LIKE wildcard escaping (SQLite requires it)
- Add comment explaining why R2-backed items show as {} in metrics
…ard, queue rollback - Add ingestedAt to oversized-item R2 keys to prevent stale overwrites - Clean up orphaned R2 blobs when items are replaced or skipped by timestamp guard - Wrap queue send in try/catch to delete staging R2 object on failure - Check session existence before processing queued messages to prevent deleted sessions from being repopulated
- Add secrets_store_secrets to cloud-agent-next dev env (missing binding) - Surface tokenizer parse errors to trigger queue retry/DLQ instead of silently acking malformed payloads
Cloudflare Workers requires queue() to be a property on the default export, not a named export. Move queue to the default export object alongside fetch.
…destructured value Destructuring the getter property captured the initial null, so parse errors were silently ignored. Replace with getParseError() method that is called after streaming completes.
Resolve merge conflicts between streaming HTTP export (this branch) and combined exportSessionWithDiff RPC (main). Keep streaming approach for session export, move diff extraction client-side into cloud-agent-next via new extractDiffsFromMessages helper. Remove server-side RPC export methods and related DO code (readItems, getAll, collectDiffs, etc.). Adapt tests to use fetch mocks with snapshot-embedded diffs.
…on trigger token When the closing `}` token triggers the byte budget overflow, `skippingItem` was set `true` and `depth` decremented to `2`, but the flag was never cleared. This caused the next valid item to be silently discarded.
Session deletion (clear()) can race with queued ingests that repopulate the DO after it's been wiped. Add a 'deleted' key to ingestMeta that is set after clear() and checked at the top of ingest() and alarm() to silently reject work on deleted sessions.
Previously, a missing R2 blob during getAllStream() export silently fell
back to `{}`, producing semantically corrupted snapshot data with no
diagnostic signal. Add console.error before the fallback.
…gration tests for deleted flag / timestamp guard
…batch export queries - ingest() now deletes caller-uploaded R2 blobs when session is deleted, preventing orphaned oversized item blobs after clear() - Export cursor queries batched with LIMIT 30 instead of LIMIT 1, reducing query count ~30x for large sessions - Fix R2-backed message test: r2References key was 'message:msg_r2' but getItemIdentity() produces 'message/msg_r2', so the R2 export path was never exercised - Fix misleading comment in cloud-agent-next about "avoids buffering"
3 tasks
eshurakov
approved these changes
Mar 9, 2026
eshurakov
added a commit
that referenced
this pull request
Mar 9, 2026
…908) ## Summary - Move snapshot download from worker memory to sandbox via `curl` — the full JSON never materializes in the 128MB worker - Replace worker-side diff extraction (`extractDiffsFromMessages`) and application (`applySessionDiff`) with a sandbox-side Node script that reads, deduplicates, and applies diffs directly on disk - Refactor `restoreSessionSnapshot` to accept a file path (curl writes directly) instead of a string payload - Remove `resolve` and `zod` imports that were only used by deleted methods Depends on PR #884 which adds the streaming `/api/session/:id/export` endpoint. ## Test plan - [x] All 601 existing tests pass (updated 4 cold-start tests for new exec-based flow) - [x] Typecheck passes - [ ] Integration test: cold-start resume with real session to verify curl download + kilo import + diff application
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Support ingest payloads up to 5GB by streaming request bodies to R2 (zero worker memory) and processing asynchronously via Queue consumer with streaming JSON parser.
ingestedAttimestampsrc/queue-consumer.ts): reads R2 object, parses items one-at-a-time via@streamparser/json, sends each to DO individuallyingest()acceptsingestedAt+r2References;getAll()resolves R2-backed items; newgetAllStream()for row-by-row exportitem_data_r2_keyandingested_atcolumnsSESSION_INGEST_R2), Queue (INGEST_QUEUEwith DLQ)Infrastructure prerequisites
cd cloudflare-session-ingest npx wrangler r2 bucket create session-ingest-staging npx wrangler queues create session-ingest-processing npx wrangler queues create session-ingest-dlqTest plan
@cloudflare/vitest-pool-workers)