fix: async vision streaming + stateful SSE parser (#6284)#6285
Conversation
Bug 1: _ask_vision_stream blocked the event loop for 5-8s per image chat by using sync openai.chat.completions.create inside an async handler. Convert to AsyncOpenAI client and run producer as asyncio.create_task so chunks stream progressively and other requests aren't starved. Bug 2: SSE parser in shared.dart used a 1024-byte heuristic that failed when TCP segments split SSE lines at arbitrary byte boundaries. Replace with a stateful remainder buffer that only emits complete events delimited by \n\n. Fix applied to both makeStreamingApiCall and makeMultipartStreamingApiCall. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Greptile SummaryThis PR fixes two confirmed bugs: (1)
Confidence Score: 4/5Safe to merge after fixing the producer-exception deadlock in graph.py; all other changes are correct. One P1 issue remains: if the OpenAI stream raises mid-response the consumer loop hangs indefinitely, which is worse than the original sync-blocking behavior it replaces. The Dart SSE fix and the async conversion itself are correct and well-tested. backend/utils/retrieval/graph.py — producer exception path needs a try/finally or sentinel guarantee before merge. Important Files Changed
Sequence DiagramsequenceDiagram
participant Router as FastAPI Router
participant Graph as graph.py
participant Callback as AsyncQueue
participant Producer as _ask_vision_stream
participant OAI as AsyncOpenAI
Router->>Graph: async for chunk in _execute_file_chat_stream()
Graph->>Callback: create AsyncStreamingCallback
Graph->>Producer: asyncio.create_task(_produce())
activate Producer
Producer->>OAI: await chat.completions.create(stream=True)
OAI-->>Producer: async stream
loop each chunk
Producer->>Callback: await callback.put_data(delta)
Callback-->>Graph: queue.get() → yield chunk
Graph-->>Router: yield "data: ..."
end
alt Normal completion
Producer->>Callback: await callback.end() → put(None)
Callback-->>Graph: queue.get() → None → break
Graph->>Producer: await task → answer
else Exception raised (P1 bug)
Producer--xCallback: end() never called
Note over Graph,Callback: Consumer blocks on queue.get() forever
end
Reviews (1): Last reviewed commit: "fix: async vision streaming + stateful S..." | Re-trigger Greptile |
| task = asyncio.create_task(_produce()) | ||
|
|
||
| # Drain the queue concurrently while the producer runs | ||
| while True: | ||
| chunk = await callback.queue.get() | ||
| if chunk: | ||
| yield chunk | ||
| else: | ||
| break | ||
|
|
||
| answer = await task |
There was a problem hiding this comment.
Consumer hangs indefinitely when producer raises without signaling end
If _produce() raises an exception (e.g., OpenAI API error mid-stream, network timeout), _ask_vision_stream propagates the exception before reaching await callback.end(). Because asyncio.create_task stores the exception in the task object — not propagating it to the creator — the consumer loop is stuck at await callback.queue.get() forever: no None sentinel is ever enqueued, the except block is never reached, and the generator hangs indefinitely.
The test test_producer_error_terminates_queue does not cover this case; it manually reads one item then awaits the task, skipping the production while True drain loop entirely.
Fix: guard _produce with a try/finally that guarantees the sentinel:
async def _produce():
try:
return await fc_tool.process_chat_with_file_stream(question, file_ids, callback=callback)
except Exception:
await callback.end() # ensure consumer is always unblocked
raise| file_content = await _async_openai.files.content(file.openai_file_id) | ||
| b64 = base64.b64encode(file_content.read()).decode('utf-8') |
There was a problem hiding this comment.
Large image bytes not freed after base64 encoding
file_content.read() loads the full image into memory as a bytes object. After base64.b64encode() produces b64 (which is ~33% larger), the raw bytes remain live until file_content is garbage-collected. For multi-image chats the loop keeps all raw buffers in memory simultaneously. Per the project memory-management guidelines, large byte arrays should be freed immediately after use.
| file_content = await _async_openai.files.content(file.openai_file_id) | |
| b64 = base64.b64encode(file_content.read()).decode('utf-8') | |
| raw_bytes = file_content.read() | |
| b64 = base64.b64encode(raw_bytes).decode('utf-8') | |
| del raw_bytes |
Context Used: Memory management - free large objects immediately... (source)
Reviewer (CODEx) caught that _ask_vision_stream only called callback.end() on the success path. If OpenAI raises mid-stream, the consumer queue.get() would hang indefinitely. Wrap the streaming loop in try/finally so the end sentinel is always enqueued. Also fix the error-path test to actually verify queue termination with a timeout, not just await the task exception. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move try/finally to cover files.content() and file_content.read() calls, not just the streaming loop. If the early file fetch raises, the consumer queue still receives the end sentinel. Add test for early-failure path (before any stream data). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Reviewer caught that ask_stream (Assistants API path for non-image files) also lacked error-path cleanup. If _fill_question or the stream iteration raises, callback.end_nowait() was never called, leaving the consumer hanging. Wrap in try/finally. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move _fill_question inside ask_stream's try/finally so thread message creation failures also terminate the callback. Add try/except around _ensure_thread_and_assistant in process_chat_with_file_stream to ensure the callback sentinel is sent if thread/assistant setup fails. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cover sync callback error patterns (ask_stream, _ensure_thread_and_assistant failure) and producer/consumer callback_data population (success, error, None callback_data). Addresses tester coverage gaps from CP8 review. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CP8 Test Cycle — TESTS_APPROVEDTest Detail Table
Test ResultsBackend (11 tests): App (16 tests): Tester Assessment
by AI for @beastoin |
CP9A — Level 1 Live Test (Build & Run Changed Components Standalone)Backend L1 — Async Vision Streaming
App L1 — SSE Parser Fix
TCP Fragment Proxy (chaos engineering)
Changed-Path Coverage Checklist
EvidenceText chat through fragment proxy (P8): Image chat — async vision path (P1, P7): Backend log trace: No errors in backend logs from our code paths. Pre-existing errors (LangSmith not configured, encryption) are unrelated. L1 SynthesisAll changed paths proven at L1. P1 (async vision) and P7 (producer/consumer) confirmed via image chat that routed through the async by AI for @beastoin |
CP9B — Level 2 Live Test (Service + App Integrated)Infrastructure
Test Scenarios1. Text chat through fragment proxy (P8/P9):
2. Image chat — async vision path (P1/P4/P7):
3. Multi-turn image follow-up (P1/P7 repeat):
Changed-Path Coverage Checklist (L2 update)
EvidenceFollow-up image response (multi-turn, 3 files, async): Backend log trace (L2): L2 SynthesisAll changed paths proven at L2 integrated scope. P1/P4/P7 (async vision + producer/consumer) confirmed via 2 image chat rounds with a total of 4 async OpenAI API calls, all streamed through the TCP fragment proxy and rendered correctly in the app. P8/P9 (SSE parser) confirmed via text chat through the same fragment proxy. Error paths (P5/P6) proven at unit scope. No paths remain UNTESTED. No "server issues" errors observed across any test. by AI for @beastoin |
|
lgtm |
Includes SSE parser fix from BasedHardware#6285 (stateful remainder buffer replaces 1024-byte heuristic for TCP-fragmented responses). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@kodjima33 — both fixes for #6284 are live:
Image chat should no longer show "Looks like we are having issues with the server." by AI for @beastoin |
Includes SSE parser fix from BasedHardware#6285 (stateful remainder buffer replaces 1024-byte heuristic for TCP-fragmented responses). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>



Fixes #6284 — image chat messages fail with "server error" despite 200 OK.
Two confirmed bugs, both reproduced end-to-end with local dev backend + dev app + TCP fragmenting proxy.
Bug 1 — Backend: sync call blocks event loop (5-8s per image chat)
_ask_vision_stream()used syncopenai.chat.completions.create(stream=True)inside an async handler, blocking the event loop for 5-8s per request. Under load, this starves all other requests on the pod.Fix: Convert to
AsyncOpenAIclient._ask_vision_streamandprocess_chat_with_file_streamare now async._execute_file_chat_streamingraph.pyruns the producer asasyncio.create_taskso chunks stream progressively while the queue drains concurrently. All failure paths (files.content, streaming,_ensure_thread_and_assistant,_fill_question) guarantee the callback sentinel is sent via try/finally.Bug 2 — App: SSE parser fails on TCP-fragmented responses
shared.dart:326used a 1024-byte heuristic to decide if a chunk is "complete." When TCP segments split an SSE line at arbitrary byte boundaries, fragments < 1024 bytes pass through unbuffered →parseMessageChunkreturns null →failedMessage()→ "Looks like we are having issues with the server."Fix: Replace the 1024-byte heuristic with a stateful remainder buffer. Accumulate data across TCP reads and only emit complete events delimited by
\n\n. Applied to bothmakeStreamingApiCallandmakeMultipartStreamingApiCall.Deploy
Backend (Bug 1 fix):
gh workflow run "Deploy Backend to Cloud RUN" --repo BasedHardware/omi --ref main -f environment=prodchat_file.pyandgraph.pyare not in pusher's import chainApp (Bug 2 fix):
shared.dartSSE parser fix ships with the next mobile release (no separate deploy needed)No new dependencies, no env vars, no secrets.
Post-deploy: watch Cloud Run 5xx for 30min, especially
/v2/messagesendpoint which hits the vision path.Testing
Backend (11 tests):
_fill_questionfailure →end_nowaitvia try/finallyend_nowait+ partial data_ensure_thread_and_assistant→end_nowaitbefore re-raisecallback_data(answer, memories, nps)callback_data['error']Nonecallback_data doesn't crashApp (16 tests):
da+ta:), payload split, delimiter split, many small fragments, large events (>1024 bytes)done:payload intact and split across fragmentsLive testing (CP9A + CP9B)
flutter build apk --flavor dev --debug, installed on emulator_ask_vision_stream) processed 2 image rounds with 4 async OpenAI calls_async_openai.files.content, all streamed correctlyReview cycle
6 review rounds via CODEx → PR_APPROVED_LGTM. Fixes across rounds:
_ask_vision_streamfor error-path cleanupask_stream(non-vision path) with try/finally_fill_questioninside try/finally, guard_ensure_thread_and_assistantTester cycle: 2 rounds → TESTS_APPROVED. Added 6 error-path/callback_data tests.
Files changed
backend/utils/other/chat_file.py_ask_vision_stream+process_chat_with_file_stream, try/finally on all pathsbackend/utils/retrieval/graph.pyasyncio.create_taskproducer + concurrent queue drainapp/lib/backend/http/shared.dartbackend/tests/unit/test_vision_stream_async.pyapp/test/unit/sse_parser_test.dartbackend/test.shRisks
ask_stream(non-vision file chat via Assistants API) remains sync — separate issue, different code pathAsyncOpenAI()at module level requires OPENAI_API_KEY env var (always set in production)by AI for @beastoin