fix(core): avoid ReadableStream lock after fullStream startup probe#1103
fix(core): avoid ReadableStream lock after fullStream startup probe#1103
Conversation
🦋 Changeset detectedLatest commit: 39abf1d The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
This comment has been minimized.
This comment has been minimized.
|
No actionable comments were generated in the recent review. 🎉 📝 WalkthroughWalkthroughRefactors stream probing in agent.ts to preserve getter-based fullStream teeing behavior, adds helpers to detect teeing-capable streams, and introduces a test and changelog entry verifying that probing does not lock multi-consumer fullStream access. (43 words) Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
packages/core/src/agent/agent.spec.ts (2)
1140-1141: Consider asserting"start"is still present to directly cover the probe-swallowed-event scenarioThe
"start"event is the very chunk the agent's probe reads when detecting stream type. For a non-getter-based stream the probe would consume it, losing it for downstream consumers. Asserting it appears in the re-teed result strengthens the regression signal beyond just checking that content/finish survive.🧪 Suggested assertion addition
expect(emittedTypes).toContain("text-delta"); + expect(emittedTypes).toContain("start"); expect(emittedTypes).toContain("finish");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/agent/agent.spec.ts` around lines 1140 - 1141, Add an assertion to ensure the "start" event is preserved in the re-teed stream: update the test that collects emittedTypes (the assertion block containing expect(emittedTypes).toContain("text-delta") and expect(emittedTypes).toContain("finish")) to also assert expect(emittedTypes).toContain("start") so the probe-swallowed-event regression is directly covered; locate the emittedTypes variable in the failing test in agent.spec.ts and add the new expectation alongside the existing ones.
1085-1088:teeStream()should beprivateThe method is only ever invoked through the
get fullStream()getter. Leaving it accessible (no modifier =public) exposes internal state-mutation logic to the outer scope unnecessarily.♻️ Proposed fix
- teeStream(): ai.AsyncIterableStream<any> { + private teeStream(): ai.AsyncIterableStream<any> {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/agent/agent.spec.ts` around lines 1085 - 1088, The teeStream method is currently public but only used by the fullStream getter; mark it private to prevent exposing internal state mutation. Update the method declaration for teeStream() to include the private modifier (private teeStream(): ai.AsyncIterableStream<any>) and verify only the fullStream getter calls it; no other API surface should reference teeStream after this change.packages/core/src/agent/agent.ts (1)
5324-5369: Verify unused probed tee branch is cleaned up when preserving getter-based fullStream.If
probeStreamStarttees aReadableStream, returning the original result here leaves theprobedFullStreambranch unconsumed. With standardReadableStream.tee()backpressure, that can stall or leak on long streams. Please confirm AI SDK’s teeing implementation makes this safe; otherwise, consider cancelling/consuming the probed branch when you keep the original result.Possible safeguard (only if tee branches need cleanup)
if (this.usesGetterBasedTeeingFullStream(result)) { + this.discardStream(probedFullStream); // AI SDK stream results expose fullStream via a teeing getter. // Preserving the original instance keeps that multi-consumer behavior intact. return result; }
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/core/src/agent/agent.spec.ts`:
- Around line 1140-1141: Add an assertion to ensure the "start" event is
preserved in the re-teed stream: update the test that collects emittedTypes (the
assertion block containing expect(emittedTypes).toContain("text-delta") and
expect(emittedTypes).toContain("finish")) to also assert
expect(emittedTypes).toContain("start") so the probe-swallowed-event regression
is directly covered; locate the emittedTypes variable in the failing test in
agent.spec.ts and add the new expectation alongside the existing ones.
- Around line 1085-1088: The teeStream method is currently public but only used
by the fullStream getter; mark it private to prevent exposing internal state
mutation. Update the method declaration for teeStream() to include the private
modifier (private teeStream(): ai.AsyncIterableStream<any>) and verify only the
fullStream getter calls it; no other API surface should reference teeStream
after this change.
Deploying voltagent with
|
| Latest commit: |
39abf1d
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://b7e900bb.voltagent.pages.dev |
| Branch Preview URL: | https://fix-core-fullstream-probe-lo.voltagent.pages.dev |
There was a problem hiding this comment.
1 issue found across 3 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="packages/core/src/agent/agent.ts">
<violation number="1" location="packages/core/src/agent/agent.ts:5334">
P0: The unconsumed `probedFullStream` tee branch must be discarded to prevent unbounded memory buffering (memory leak).</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| probedFullStream: TResult["fullStream"], | ||
| ): TResult { | ||
| if (probedFullStream === originalFullStream) { | ||
| return result; |
There was a problem hiding this comment.
P0: The unconsumed probedFullStream tee branch must be discarded to prevent unbounded memory buffering (memory leak).
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/core/src/agent/agent.ts, line 5334:
<comment>The unconsumed `probedFullStream` tee branch must be discarded to prevent unbounded memory buffering (memory leak).</comment>
<file context>
@@ -5317,6 +5321,53 @@ export class Agent {
+ probedFullStream: TResult["fullStream"],
+ ): TResult {
+ if (probedFullStream === originalFullStream) {
+ return result;
+ }
+
</file context>
| return result; | |
| this.discardStream(probedFullStream); | |
| return result; |
PR Checklist
Please check if your PR fulfills the following requirements:
Bugs / Features
What is the current behavior?
Agent.streamText()/Agent.streamObject()can replace a provider result's getter-basedfullStreamwith a fixed stream after startup probing.For AI SDK results that rely on tee-based
fullStreamaccessors, this can break multi-consumer behavior and cause:TypeError [ERR_INVALID_STATE]: Invalid state: ReadableStream is lockedwhen SDK consumers iterate
result.fullStreamwhile other accessors (e.g.result.text/ UI stream helpers) are also active.What is the new behavior?
fullStreamresults after probing.fullStreamwhen the result is not getter/tee-based.streamTextandstreamObject.result.fullStream+result.textusage.fixes (issue)
Notes for reviewers
Changes:
packages/core/src/agent/agent.tspackages/core/src/agent/agent.spec.ts.changeset/witty-cameras-smile.mdValidation:
pnpm -C packages/core test -- src/agent/agent.spec.tspnpm -C packages/core test -- src/agent/subagent/index.spec.tsexamples/basewith env loaded):Agent.streamText(...)consumed viaresult.fullStreamandresult.textconcurrently, no lock error.Summary by cubic
Prevents ReadableStream lock errors after startup probing by preserving getter-based tee behavior for fullStream in streamText and streamObject. Enables concurrent use of result.fullStream and result.text without errors.
Written for commit 39abf1d. Summary will update on new commits.
Summary by CodeRabbit
Bug Fixes
Tests
Chores