refactor(producer): extract captureStreamingStage (single-machine fusion)#731
Conversation
…ion)
Move the streaming encode fusion path (`useStreamingEncode === true` with
successful encoder spawn) out of `executeRenderJob` into
`services/render/stages/captureStreamingStage.ts`. The stage owns:
- `spawnStreamingEncoder` invocation, including the abort-rethrow vs.
graceful-fallback handling.
- Parallel + sequential capture-to-stdin loops (Stage 4 absorbs Stage 5
for streaming renders).
- The streaming encoder's `close()` + result check.
- Defensive cleanup of the streaming encoder in the stage's own
`try/finally`.
The stage returns either `{ success: true, ... }` (sequencer skips the
disk path AND inline Stage 5) or `{ success: false }` (sequencer falls
back to the disk path). The sequencer's `useStreamingEncode` flag is
no longer flipped imperatively — the result type makes the branch
selection explicit.
Hard constraints preserved verbatim:
- `probeSession` is closed at the same code points (parallel: after
capture; sequential: in session finally). The local binding nulls
via the returned result.
- `lastBrowserConsole` is set to the buffer of whichever session was
active last (probe close path or sequential session finally).
- `job.framesRendered` is updated per-frame; `Streaming frame N/M
[(K workers)]` `updateJobStatus` payloads fire at the same 30-frame
and completion checkpoints (parallel) or every frame (sequential),
with the same percentage math `25 + frameProgress * 55`.
- `Streaming encode failed: <err>` still throws on the encoder's
`success: false` close result.
- The defensive `try/finally` close-on-throw is preserved, now inside
the stage instead of the orchestrator.
- `perfStages.captureMs` is still set by the sequencer from
`stage4Start`; the stage also returns `encodeMs` for the encoder's
overlapped duration (assigned to `perfStages.encodeMs`).
Removes the orphaned `createFrameReorderBuffer` and
`prepareCaptureSessionForReuse` imports from the orchestrator after
the streaming code moved.
Verified inside `Dockerfile.test`:
- 5/5 fixtures PASS (font-variant-numeric, many-cuts, variables-prod,
sub-composition-video, gsap-letters-render-compat).
- `gsap-letters-render-compat` (single-worker render, 4s duration)
exercises the new streaming stage end-to-end —
`streaming-encode gate enabled=true` confirmed in the log.
- The other 4 fixtures exercise the disk path (workerCount > 1).
Known follow-up: same runtime import cycle situation as captureStage —
the stage imports `updateJobStatus` and types from
`renderOrchestrator.ts`, which imports the stage back. Safe (deferred
to runtime); a future PR will flatten this once all 8 stages are
extracted.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
vanceingalls
left a comment
There was a problem hiding this comment.
Verdict: approve.
Extraction of the single-machine fusion (streaming capture → encode) path. Sibling to #730: the SDR disk path is the else branch in the sequencer; this is the if (useStreamingEncode) branch.
Verified the extraction is byte-clean:
- The spawn-failure flow preserves the exact
useStreamingEncode = falsefall-through. The sequencer now flips the flag based onstreamingRes.successinstead of in-place, but the observable behavior is identical: ifspawnStreamingEncoderthrows (non-abort), the stage returns{ success: false }and the sequencer takes the disk path. If the encoder spawned successfully and the rest threw (capture failure, broken pipe, abort), the stage's outerfinallycloses the encoder defensively and the error propagates upward — same as pre-#731. lastBrowserConsoleassignment ordering preserved: probe close path in the parallel branch, sequential session'sfinallyotherwise.reorderBuffersemantics +writeFramecall sites identical. ThecurrentEncoder: StreamingEncoder = streamingEncoderconst right after spawn is preserved verbatim — that was load-bearing for TypeScript narrowing past thelet streamingEncoder: StreamingEncoder | null = null;declaration.- The
Streaming encode failed: <err>throw onencodeResult.success === falseis preserved.
Stage interface consistency with #730: both stages take fileServer, workDir, framesDir, job, totalFrames, cfg, log, workerCount, probeSession, buildCaptureOptions, createRenderVideoFrameInjector, abortSignal, assertNotAborted, onProgress. Good — the capture-family shares a shape. captureStreamingStage adds videoOnlyPath, outputFormat, streamingEncoderOptions; captureStage adds needsAlpha, captureAttempts. The divergence is justified — streaming owns the encoder spawn and needs its options + output path; disk owns the retry array.
Important (non-blocking, cross-stack):
captureStreamingStage.ts:208's sequential streaming message wasStreaming frame ${i + 1}/${job.totalFrames}pre-extraction — the new code switched toStreaming frame ${i + 1}/${totalFrames}(line 268 in the new stage), which is the same value becausetotalFramesis the narrowedjob.totalFrames. Worth a tiny sanity test pinning the string format — same audit-all-sites pattern aslastBrowserConsole: every place that constructs a progress payload should agree on which symbol it reads from, and a regression test guards future drift.
Nits:
CaptureStreamingStageResult.captureDurationMsis in the result but #735 immediately removes it because the sequencer always recomputesDate.now() - stage4StartforperfStages.captureMs. Worth dropping in this PR instead of carrying it to #735.- Same runtime cycle pattern as #730 (imports
updateJobStatusfrom the orchestrator). #737 resolves it.
Praise: the explicit success: true | false discriminated union for the result is the right shape for a fallback-friendly stage — much better than a nullable encodeResult or a thrown-from-spawn pattern that the sequencer would have to catch. The defensive-close finally block with the streamingEncoderClosed gate is preserved exactly and the JSDoc spells out why it's idempotent — easy to verify, easy to maintain.
— Vai
miguel-heygen
left a comment
There was a problem hiding this comment.
Clean mechanical extraction — no behavior changes, no introduced bugs. Verified imports, error handling, and cleanup invariants are preserved. LGTM. — Magi

What
Phase 1 PR 1.7 of the distributed-render refactor. Moves the single-machine fused capture+encode (streaming) path out of
executeRenderJobintopackages/producer/src/services/render/stages/captureStreamingStage.ts. The sequencer now callsrunCaptureStreamingStagewhenuseStreamingEncode === true; onsuccess: falsethe sequencer falls back to the disk-capture stage (PR 1.6).The HDR layered branch (
useLayeredComposite === true) stays inline — extracted in PR 1.8.Why
Continues the Phase 1 mechanical extraction. The streaming path is the second-largest capture branch and has its own lifecycle (spawn → capture loop → close → defensive cleanup) distinct from the disk path. Extracting it as its own stage keeps both branches readable and gives the eventual distributed renderer a clear "this won't survive across machines" boundary — chunk workers will always use the disk path.
The result type (
success: true | false) replaces the imperativeuseStreamingEncode = false; streamingEncoder = nullfallback flag-flip with an explicit branch on the return value. Slightly clearer; behavior is identical.How
captureStreamingStage.tsexportsrunCaptureStreamingStage(input)returningCaptureStreamingStageResult. Sequencer constructs the typedStreamingEncoderOptionsfrom in-scope preset / dimensions / quality / bitrate fields and passes them through; the stage doesn't reach into the preset's shape.} else {standard-capture block is restructured: whenuseStreamingEncodeis true it callsrunCaptureStreamingStage; onsuccess: trueit sets the timings and skips the disk path; onsuccess: falseit falls back torunCaptureStage(PR 1.6).try { ... } finally { defensive close streamingEncoder }is gone — the defensive cleanup now lives inside the stage.createFrameReorderBufferandprepareCaptureSessionForReuseimports from the orchestrator (oxlint flagged after the move).Preserved invariants
probeSessionis closed at the same code points; the parallel path closes afterexecuteParallelCapturereturns; the sequential path closes in the session'sfinally.lastBrowserConsoleis set to whichever session's buffer was active last.job.framesRenderedis updated every frame;Streaming frame N/M [(K workers)]updateJobStatuspayloads fire at the same 30-frame and completion checkpoints (parallel) or every frame (sequential), with25 + frameProgress * 55.Streaming encode failed: <err>throws on the encoder'ssuccess: falseclose result.streamingEncoderruns in the stage's owntry/finally— gated bystreamingEncoderClosedso it's idempotent.perfStages.captureMsis still computed by the sequencer fromstage4Start(so the window includes the in-sequencer setup).perfStages.encodeMsis taken from the encoder's reporteddurationMs(overlapped with capture).Test plan
bunx oxlint+bunx oxfmt --check— clean.bun run --filter @hyperframes/producer typecheck+build— clean.bun test packages/producer/src/services/— 176 pass, same single pre-existing unrelated failure.docker run hyperframes-producer:test --sequential font-variant-numeric many-cuts variables-prod sub-composition-video gsap-letters-render-compat— 5/5 PASS. Thestreaming-encode gatelog confirms both paths are exercised:gsap-letters-render-compat(single worker, 4s) →enabled=true→ runs the new streaming stage end-to-end.enabled=false→ falls through torunCaptureStage(disk path).regressionworkflow.Known follow-up: import cycle
Same as PR 1.6 — the stage imports
updateJobStatusand types fromrenderOrchestrator.ts(runtime), which importsrunCaptureStreamingStageback. Safe (deferred to runtime); a future PR will consolidate these helpers into a shared module once all 8 stages are extracted.🤖 Generated with Claude Code