fix(streaming): coerce Responses API output field to list (#55)#57
fix(streaming): coerce Responses API output field to list (#55)#57
Conversation
When the Codex upstream SSE stream drifts and the ResponsesAccumulator silently drops response.completed / response.created events, the buffer was leaving response_obj["output"] as an empty dict (or missing), which caused every /codex/v1/chat/completions request to fail downstream with "ResponseObject.output Field required". Fixes: - _parse_collected_stream now always coerces non-list output values to [] both before and after accumulator rebuild, so the payload handed to the format chain always satisfies ResponseObject validation. - Silent contextlib.suppress and debug-only except blocks around accumulator.accumulate and rebuild_response_object are upgraded to warning-level logs so upstream SSE drift is visible in production. Adds regression tests covering: - unrecognized response.completed event shape (the production bug) - upstream sending output as a bare dict - happy path where accumulator successfully rebuilds message outputs
There was a problem hiding this comment.
Pull request overview
This PR fixes Codex streaming failures (#55) by ensuring the reconstructed OpenAI Responses API payload always conforms to ResponseObject validation—specifically, that output is present and always a list—so /codex/v1/chat/completions no longer returns 502s when upstream SSE event shapes drift.
Changes:
- Coerce any non-list
outputto[]during Responses payload reconstruction (both pre- and post-accumulator rebuild). - Promote previously-suppressed/hidden accumulator + rebuild failures to
warninglogs for better visibility into upstream SSE drift. - Add regression tests covering the production failure mode, non-list
output, and the successful rebuild path.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
ccproxy/streaming/buffer.py |
Makes _parse_collected_stream resilient to upstream SSE drift by coercing output to a list and surfacing accumulator/rebuild failures via warnings. |
tests/unit/streaming/test_buffer_parse_responses.py |
Adds unit regression coverage to ensure parsed Responses payloads always validate as ResponseObject, including drift/fallback scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @pytest.fixture | ||
| def buffer() -> StreamingBufferService: | ||
| return StreamingBufferService(http_client=httpx.AsyncClient()) | ||
|
|
There was a problem hiding this comment.
The test fixture creates an httpx.AsyncClient but never closes it, which can leak connections and trigger unclosed-client warnings/flaky tests. Convert this into an async fixture that yields the StreamingBufferService and ensures the underlying AsyncClient is closed in a teardown (e.g., via async with or aclose() in finally).
| parsed = await buffer._parse_collected_stream( | ||
| chunks=chunks, | ||
| handler_config=None, # type: ignore[arg-type] | ||
| request_context=_Ctx(), # type: ignore[arg-type] | ||
| ) |
There was a problem hiding this comment.
These tests pass handler_config=None (with type ignores) to a method that expects a HandlerConfig and may access handler_config.sse_parser on other code paths. This makes the tests fragile against refactors; prefer passing a minimal stub/real HandlerConfig instance with sse_parser=None so the signature is respected without type: ignore.
| logger.warning( | ||
| "streaming_buffer_accumulate_failed", | ||
| event_type=event_type, | ||
| error=str(exc), |
There was a problem hiding this comment.
The new warning log on accumulator failures drops the traceback (exc_info), which makes diagnosing upstream event/schema drift harder in production. Consider logging exc_info=exc (or equivalent) alongside error=str(exc) so the stack/context is retained.
| error=str(exc), | |
| error=str(exc), | |
| exc_info=exc, |
| logger.warning( | ||
| "streaming_buffer_rebuild_accumulate_failed", | ||
| event_type=event_type, | ||
| error=str(exc), | ||
| request_id=getattr(request_context, "request_id", None), |
There was a problem hiding this comment.
Same as above: this rebuild-accumulate warning omits exc_info, which limits observability when events fail validation/coercion. Including exc_info=exc would make these warnings actionable without having to reproduce locally.
| "response_rebuild_failed", | ||
| error=str(exc), | ||
| request_id=getattr(request_context, "request_id", None), | ||
| category="streaming", |
There was a problem hiding this comment.
response_rebuild_failed was upgraded to warning, but it still logs only error=str(exc) without exc_info. Adding exc_info=exc would preserve the traceback and help pinpoint the specific field/event causing rebuild failures.
| category="streaming", | |
| category="streaming", | |
| exc_info=exc, |
Summary
/codex/v1/chat/completionsrequest was returning HTTP 502 withResponseObject.output Field requiredbecause_parse_collected_streamset the fallbackoutputto{}(a dict) when the upstream SSE stream shape drifted andResponsesAccumulatorsilently droppedresponse.completed/response.createdevents.outputto[]both before and after the accumulator rebuild, so the payload handed to the format chain always validates againstResponseObject.contextlib.suppress(Exception)+debuglogs aroundaccumulator.accumulateandrebuild_response_objectare upgraded towarning, so future upstream SSE drift is visible instead of hidden.chatgpt.com/backend-api/codex/responsesstream. The defensive coercion is enough to stop the 502s in the meantime, and the new warning logs will surface which events are being dropped.Test plan
tests/unit/streaming/test_buffer_parse_responses.pycovering:response.completedshape (the production bug)outputas a bare dictuv run pytest tests/unit/streaming/ tests/unit/llms/streaming/(10 passed)make pre-commit