feat: add aclose()/close() and async context manager to streaming outputs#5344
feat: add aclose()/close() and async context manager to streaming outputs#5344greysonlalonde merged 7 commits intomainfrom
Conversation
…puts Implements graceful cancellation for CrewStreamingOutput and FlowStreamingOutput so that in-flight LLM tasks are cancelled when the consumer stops iterating (e.g., client disconnect in FastAPI). Closes #5312
iris-clawd
left a comment
There was a problem hiding this comment.
Review: Streaming aclose()/close() + async context manager
Good feature — proper resource cleanup for streaming is important. A few things to address:
🔴 Code duplication between CrewStreamingOutput and FlowStreamingOutput
aclose(), close(), __aenter__, __aexit__, and is_cancelled are copy-pasted identically between both classes. FlowStreamingOutput already inherits from _BaseStreamingOutput, and CrewStreamingOutput does too — this should live in the base class once.
If there's a reason they can't share (different cleanup behavior later), at least extract a mixin. Right now it's 60+ lines of exact duplication that will drift.
🟡 _cancelled not initialized in FlowStreamingOutput.__init__
CrewStreamingOutput.__init__ sets self._cancelled = False, but FlowStreamingOutput.__init__ doesn't. The _BaseStreamingOutput class sets self._cancelled = False? Let me check...
Looking at the diff, _cancelled is added to _BaseStreamingOutput.__init__ (line in the __init__ block). That's fine then — both subclasses inherit it. But the methods themselves should also be on the base class.
🟡 Task cancellation in create_async_chunk_generator swallows all exceptions
try:
await task
except (asyncio.CancelledError, Exception): # noqa: S110
passException already covers CancelledError on Python 3.9+... wait, no — CancelledError inherits from BaseException since Python 3.9. So this is correct to list both. But catching bare Exception here means any unexpected error from the task is silently swallowed. Consider logging at debug level so failures are at least traceable:
except asyncio.CancelledError:
pass
except Exception:
logger.debug("Background streaming task failed", exc_info=True)🟢 Everything else looks solid
- Idempotency via
_cancelledflag — correct task.cancel()beforeawait task— right pattern for prompt cancellation- Context manager delegates to
aclose()— clean - Tests cover both Crew and Flow, sync and async, early exit and idempotency
- Docs updated across all 4 languages
The duplication is the main thing. 💬 190
iris-clawd
left a comment
There was a problem hiding this comment.
Re-review (605567d → d4dfea3)
All three items from my review addressed, plus good additional improvements.
✅ Duplication eliminated
Cancellation methods (aclose, close, __aenter__, __aexit__, is_cancelled) moved to StreamingOutputBase. FlowStreamingOutput is now essentially just a thin subclass with _set_result. Clean.
✅ Exception logging
asyncio.CancelledError and Exception are now separate handlers — cancelled is silently passed, unexpected errors logged at debug level. Exactly what I suggested.
✅ _exhausted flag — nice addition
is_cancelled is now correctly False after normal stream completion. aclose()/close() become no-ops when the stream was fully consumed (_exhausted = True). This is the right semantic distinction — cancellation ≠ completion. Tests updated accordingly.
✅ Handler leak prevention (d4dfea3)
register_cleanup() + _on_cleanup callback ensures the event handler is unregistered even if aclose()/close() is called before iteration starts. Good edge case catch — without this, calling aclose() on a never-iterated stream would leak the handler.
🟢 Self return type
Using typing_extensions.Self for __aenter__ — correct for subclass compatibility.
Solid work across all 6 commits. LGTM 🚀 💬 191
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit d4dfea3. Configure here.
iris-clawd
left a comment
There was a problem hiding this comment.
7f0fd2b — aclose/close also no-op when _error is not None. Makes sense — if the stream already errored, there's nothing to cancel and the cleanup already happened in the exception path. Still LGTM 🚀 💬 192

Summary
aclose(),close(),is_cancelled, and async context manager (async with) support toCrewStreamingOutputandFlowStreamingOutputasyncio.Taskincreate_async_chunk_generator's finally block instead of awaiting it indefinitely, so background LLM work stops promptly on consumer disconnectUsage
Closes #5312
Test plan
Note
Medium Risk
Changes core streaming iteration/cancellation behavior and background task lifecycle; mistakes could lead to leaked handlers, premature completion, or interrupted streaming in production.
Overview
Adds first-class cancellation to streaming results via
aclose()/close(),is_cancelled, andasync withsupport onCrewStreamingOutput/FlowStreamingOutput, centralizing sync/async iteration logic inStreamingOutputBase.Wires streaming kickoff paths (
Crew.kickoff*,Flow.kickoff*, andrun_for_each_async) to register cleanup callbacks so event handlers are unregistered even when consumers stop early, and updatescreate_async_chunk_generatorto cancel the background task infinallyto stop in-flight work promptly on disconnect.Extends streaming docs (EN/AR/KO/PT-BR) with cancellation/cleanup usage patterns and adds a new
TestStreamingCancellationsuite covering idempotency, early-exit cleanup, and both crew/flow sync+async cases.Reviewed by Cursor Bugbot for commit 7f0fd2b. Bugbot is set up for automated code reviews on this repo. Configure here.