Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions ccproxy/streaming/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,16 @@ async def _parse_collected_stream(
continue
event_type = payload.get("type")
if isinstance(event_type, str) and stream_accumulator is not None:
with contextlib.suppress(Exception):
try:
stream_accumulator.accumulate(event_type, payload)
except Exception as exc: # pragma: no cover - defensive logging
logger.warning(
"streaming_buffer_accumulate_failed",
event_type=event_type,
error=str(exc),
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new warning log on accumulator failures drops the traceback (exc_info), which makes diagnosing upstream event/schema drift harder in production. Consider logging exc_info=exc (or equivalent) alongside error=str(exc) so the stack/context is retained.

Suggested change
error=str(exc),
error=str(exc),
exc_info=exc,

Copilot uses AI. Check for mistakes.
request_id=getattr(request_context, "request_id", None),
category="streaming",
)
if event_type == "response.reasoning_summary_part.added":
part = payload.get("part")
if isinstance(part, dict):
Expand All @@ -584,7 +592,10 @@ async def _parse_collected_stream(
response_obj.setdefault("created_at", 0)
response_obj.setdefault("status", "completed")
response_obj.setdefault("model", response_obj.get("model") or "")
response_obj.setdefault("output", response_obj.get("output") or {})
# ResponseObject.output must be a list; coerce stray dicts/None to [].
existing_output = response_obj.get("output")
if not isinstance(existing_output, list):
response_obj["output"] = []
response_obj.setdefault(
"parallel_tool_calls", response_obj.get("parallel_tool_calls", False)
)
Expand All @@ -602,8 +613,16 @@ async def _parse_collected_stream(
continue
event_type = payload.get("type")
if isinstance(event_type, str):
with contextlib.suppress(Exception):
try:
accumulator_for_rebuild.accumulate(event_type, payload)
except Exception as exc: # pragma: no cover - defensive logging
logger.warning(
"streaming_buffer_rebuild_accumulate_failed",
event_type=event_type,
error=str(exc),
request_id=getattr(request_context, "request_id", None),
Comment on lines +619 to +623
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: this rebuild-accumulate warning omits exc_info, which limits observability when events fail validation/coercion. Including exc_info=exc would make these warnings actionable without having to reproduce locally.

Copilot uses AI. Check for mistakes.
category="streaming",
)

if accumulator_for_rebuild is not None:
completed_payload = accumulator_for_rebuild.get_completed_response()
Expand Down Expand Up @@ -635,12 +654,21 @@ async def _parse_collected_stream(
request_id=getattr(request_context, "request_id", None),
)
except Exception as exc: # pragma: no cover - defensive logging
logger.debug(
logger.warning(
"response_rebuild_failed",
error=str(exc),
request_id=getattr(request_context, "request_id", None),
category="streaming",
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response_rebuild_failed was upgraded to warning, but it still logs only error=str(exc) without exc_info. Adding exc_info=exc would preserve the traceback and help pinpoint the specific field/event causing rebuild failures.

Suggested change
category="streaming",
category="streaming",
exc_info=exc,

Copilot uses AI. Check for mistakes.
)

# Final safety net: ResponseObject.output is required and must be a
# list. If upstream event schema drift caused the accumulator to drop
# the completed event and rebuild to leave a non-list output behind,
# coerce it so downstream format chain validation doesn't explode
# with a bare "Field required" error.
if not isinstance(response_obj.get("output"), list):
response_obj["output"] = []

if not response_obj.get("usage"):
usage = self._extract_usage_from_chunks(chunks)
if usage:
Expand Down
213 changes: 213 additions & 0 deletions tests/unit/streaming/test_buffer_parse_responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""Regression tests for StreamingBufferService._parse_collected_stream.

Covers issue #55: when the Codex Responses API SSE stream has drifted away
from the shape ResponsesAccumulator knows about, the buffer must still emit
a dict whose ``output`` field is a list so the downstream format chain's
ResponseObject validation does not fail with ``Field required``.
"""

from __future__ import annotations

import json
from typing import Any

import httpx
import pytest

from ccproxy.llms.models import openai as openai_models
from ccproxy.llms.streaming.accumulators import ResponsesAccumulator
from ccproxy.streaming.buffer import StreamingBufferService


class _Ctx:
request_id = "test-req"
_tool_accumulator_class = ResponsesAccumulator


def _sse(event_type: str, payload: dict[str, Any]) -> bytes:
body = {"type": event_type, **payload}
return f"event: {event_type}\ndata: {json.dumps(body)}\n\n".encode()


@pytest.fixture
def buffer() -> StreamingBufferService:
return StreamingBufferService(http_client=httpx.AsyncClient())

Comment on lines +32 to +35
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fixture creates an httpx.AsyncClient but never closes it, which can leak connections and trigger unclosed-client warnings/flaky tests. Convert this into an async fixture that yields the StreamingBufferService and ensures the underlying AsyncClient is closed in a teardown (e.g., via async with or aclose() in finally).

Copilot uses AI. Check for mistakes.

@pytest.mark.asyncio
async def test_parse_collected_stream_output_is_always_a_list(
buffer: StreamingBufferService,
) -> None:
"""A Codex stream whose completed event is unrecognizable must still
yield a dict with ``output`` as a list (issue #55)."""

base_response = {
"id": "resp_abc",
"object": "response",
"model": "gpt-5-codex",
"parallel_tool_calls": True,
"top_logprobs": 0,
}
chunks = [
_sse(
"response.created",
{"sequence_number": 1, "response": base_response},
),
_sse(
"response.in_progress",
{"sequence_number": 2, "response": base_response},
),
_sse(
"response.completed",
{
"sequence_number": 3,
"response": {**base_response, "unexpected_future_field": True},
},
),
]

parsed = await buffer._parse_collected_stream(
chunks=chunks,
handler_config=None, # type: ignore[arg-type]
request_context=_Ctx(), # type: ignore[arg-type]
)
Comment on lines +69 to +73
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests pass handler_config=None (with type ignores) to a method that expects a HandlerConfig and may access handler_config.sse_parser on other code paths. This makes the tests fragile against refactors; prefer passing a minimal stub/real HandlerConfig instance with sse_parser=None so the signature is respected without type: ignore.

Copilot uses AI. Check for mistakes.

assert parsed is not None
assert isinstance(parsed.get("output"), list), (
f"output must be list for ResponseObject validation, got {type(parsed.get('output'))}"
)
openai_models.ResponseObject.model_validate(parsed)


@pytest.mark.asyncio
async def test_parse_collected_stream_coerces_non_list_output(
buffer: StreamingBufferService,
) -> None:
"""Even if upstream sends ``output`` as a bare dict, the buffer coerces it."""

base_response = {
"id": "resp_xyz",
"object": "response",
"model": "gpt-5-codex",
"parallel_tool_calls": False,
"output": {},
}
chunks = [
_sse(
"response.created",
{"sequence_number": 1, "response": base_response},
),
]

parsed = await buffer._parse_collected_stream(
chunks=chunks,
handler_config=None, # type: ignore[arg-type]
request_context=_Ctx(), # type: ignore[arg-type]
)

assert parsed is not None
assert isinstance(parsed.get("output"), list)
openai_models.ResponseObject.model_validate(parsed)


@pytest.mark.asyncio
async def test_parse_collected_stream_preserves_rebuilt_output(
buffer: StreamingBufferService,
) -> None:
"""When the accumulator successfully rebuilds message outputs from
valid events, those outputs must reach the parsed payload."""

response_dict: dict[str, Any] = {
"id": "resp_done",
"object": "response",
"model": "gpt-5-codex",
"parallel_tool_calls": False,
"output": [],
}
chunks = [
_sse(
"response.created",
{"sequence_number": 1, "response": response_dict},
),
_sse(
"response.output_item.added",
{
"sequence_number": 2,
"output_index": 0,
"item": {
"type": "message",
"id": "msg_1",
"status": "in_progress",
"role": "assistant",
"content": [],
},
},
),
_sse(
"response.output_text.delta",
{
"sequence_number": 3,
"item_id": "msg_1",
"output_index": 0,
"content_index": 0,
"delta": "hello",
},
),
_sse(
"response.output_text.done",
{
"sequence_number": 4,
"item_id": "msg_1",
"output_index": 0,
"content_index": 0,
"text": "hello",
},
),
_sse(
"response.output_item.done",
{
"sequence_number": 5,
"output_index": 0,
"item": {
"type": "message",
"id": "msg_1",
"status": "completed",
"role": "assistant",
"content": [{"type": "output_text", "text": "hello"}],
},
},
),
_sse(
"response.completed",
{
"sequence_number": 6,
"response": {
**response_dict,
"status": "completed",
"output": [
{
"type": "message",
"id": "msg_1",
"status": "completed",
"role": "assistant",
"content": [{"type": "output_text", "text": "hello"}],
}
],
},
},
),
]

parsed = await buffer._parse_collected_stream(
chunks=chunks,
handler_config=None, # type: ignore[arg-type]
request_context=_Ctx(), # type: ignore[arg-type]
)

assert parsed is not None
output = parsed.get("output")
assert isinstance(output, list) and output, (
"output should contain the rebuilt message"
)
validated = openai_models.ResponseObject.model_validate(parsed)
assert validated.output
Loading