Skip to content

feat(provider): add stream core#22

Merged
bzp2010 merged 2 commits intomainfrom
bzp/feat-new-provider-stream-core
Apr 7, 2026
Merged

feat(provider): add stream core#22
bzp2010 merged 2 commits intomainfrom
bzp/feat-new-provider-stream-core

Conversation

@bzp2010
Copy link
Copy Markdown
Collaborator

@bzp2010 bzp2010 commented Apr 7, 2026

As the 8th part of the major refactoring of the provider, add the core components of the stream module.

Summary by CodeRabbit

  • New Features

    • Added stream adapters to process SSE/provider lines into ordered chat completion chunks with buffered emission and token/usage tracking.
  • Documentation

    • New LLM Stream Core docs describing the stream pipeline, adapters, usage tracking, and extended stream state.
  • Tests

    • Added unit tests validating line splitting, EOF flushing, error handling, and chunk buffering/order.

Copilot AI review requested due to automatic review settings April 7, 2026 02:39
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 7, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1ed3a73a-9954-4dca-adcf-0fd4005bc821

📥 Commits

Reviewing files that changed from the base of the PR and between eb476f8 and 227ac71.

📒 Files selected for processing (2)
  • src/gateway/streams/hub.rs
  • src/gateway/streams/reader.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/gateway/streams/hub.rs

📝 Walkthrough

Walkthrough

Adds Layer 3 streaming primitives: an SSE byte→line reader (sse_reader) and a hub adapter (HubChunkStream) that turns provider stream lines into ChatCompletionChunks while tracking token usage in shared ChatStreamState. Also exposes a new gateway::streams module and adds design docs.

Changes

Cohort / File(s) Summary
Documentation
docs/internals/llm-streams.md
New design doc describing the LLM Stream Core, sse_reader, HubChunkStream control flow, usage tracking, and ChatStreamState responsibilities.
Module exports
src/gateway/mod.rs, src/gateway/streams/mod.rs
Added pub mod streams; and a new gateway::streams entry that re-exports HubChunkStream and sse_reader.
SSE line reader
src/gateway/streams/reader.rs
New sse_reader function: accumulates incoming Bytes into a buffer, splits at last newline to emit complete UTF‑8 lines (drops empty separator lines), appends synthetic newline at EOF to flush trailing partial line, emits single error on upstream failure. Includes unit tests.
Hub chunk adapter
src/gateway/streams/hub.rs
New HubChunkStream struct implementing Stream<Item = Result<ChatCompletionChunk>>: drains internal chunk buffer FIFO, polls inner raw-line stream, transforms lines via ProviderCapabilities::transform_stream_chunk(), enqueues extra produced chunks, updates ChatStreamState (token counts, chunk_index, streamed metadata), and returns errors immediately. Includes unit tests.

Sequence Diagram(s)

sequenceDiagram
    participant ByteStream as Provider Byte Stream
    participant SSEReader as sse_reader
    participant HubChunk as HubChunkStream
    participant Transform as ProviderCapabilities
    participant State as ChatStreamState

    ByteStream->>SSEReader: bytes (chunks)
    SSEReader->>SSEReader: buffer & split at newlines
    SSEReader->>HubChunk: complete SSE lines (String)
    HubChunk->>HubChunk: drain buffered ChatCompletionChunk(s) first
    HubChunk->>Transform: transform_stream_chunk(raw_line)
    Transform->>HubChunk: Vec<ChatCompletionChunk>
    HubChunk->>HubChunk: enqueue additional chunks (VecDeque)
    HubChunk->>State: update usage (prompt_tokens, completion_tokens) & chunk_index
    HubChunk->>Client: return first ChatCompletionChunk
    Note over HubChunk,State: subsequent polls emit queued chunks before polling inner stream
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 3 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
E2e Test Quality Review ⚠️ Warning PR contains only unit tests with mocked components; zero E2E tests in tests/ directory. Full stream pipeline (HTTP → SSE parsing → transformation → state accumulation) is never tested end-to-end. Error scenarios including malformed SSE frames, invalid UTF-8, empty streams, and transformation errors lack dedicated test coverage. Add integration/E2E tests in tests/ directory flowing realistic HTTP byte streams through full pipeline, covering success and failure paths. Add unit tests for error scenarios: empty streams, invalid UTF-8, malformed data, transform failures, and edge cases.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding stream core components (sse_reader, HubChunkStream, and documentation) as part of the provider refactoring.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bzp/feat-new-provider-stream-core

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new gateway::streams core module to standardize provider streaming: decoding raw SSE byte streams into lines and converting provider stream lines into hub ChatCompletionChunk items with centralized state/usage tracking.

Changes:

  • Introduce sse_reader to split a reqwest byte stream into SSE lines and flush trailing data at EOF.
  • Introduce HubChunkStream to buffer multi-chunk provider transforms, enforce correct polling order, and accumulate usage into ChatStreamState.
  • Add internal documentation describing the stream core building blocks and intended scope.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/gateway/streams/reader.rs Adds SSE byte-stream → line reader plus unit test.
src/gateway/streams/hub.rs Adds line-stream → ChatCompletionChunk adapter with buffering/state updates plus tests.
src/gateway/streams/mod.rs Wires the new streams submodules and re-exports key types.
src/gateway/mod.rs Exposes the new streams module from gateway.
docs/internals/llm-streams.md Documents the new stream-core components and behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/gateway/streams/hub.rs`:
- Around line 17-38: Add Rustdoc comments to the public HubChunkStream type and
its constructor new: document that HubChunkStream buffers ChatCompletionChunk
items in buffer (VecDeque) to preserve ordering across the inner Stream<String>
input, describe the semantics of state mutations via the state field
(ChatStreamState) and any side effects consumers should expect, and note
thread/send guarantees (Send) and that def: Arc<dyn ProviderCapabilities> is
consulted for provider-specific behavior; place these /// comments immediately
above the pub struct HubChunkStream declaration and above impl
HubChunkStream::new so generated API docs clearly state buffering, ordering
guarantees, and state side effects.

In `@src/gateway/streams/reader.rs`:
- Around line 8-10: Add a /// doc comment to the public function sse_reader
describing its contract: explain that sse_reader accepts a byte stream and
yields UTF-8 decoded Strings split on newline boundaries, that it filters out
empty lines (skipping zero-length messages), and that it flushes any partial
buffered line as a final item on EOF; reference the function name sse_reader and
mention the stream input type and the returned Stream<Item = Result<String>> so
callers know the expected behavior and error handling.
- Around line 12-37: The scan closure currently flushes buffered partial data
after a transport Err because the Err arm returns Some(stream) and the earlier
synthetic newline can cause emission of truncated data; change the Err arm in
the scan over buffer to clear the buffer (buffer.clear()) and return
futures::future::ready(None) so the scan terminates and no further items are
emitted after the first error, and add a regression test that exercises the
sequence Ok(partial) -> Err(_) -> EOF to verify no partial line is emitted.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a704503e-44b0-426d-bd2b-167a98ec6439

📥 Commits

Reviewing files that changed from the base of the PR and between 0c7b13c and eb476f8.

📒 Files selected for processing (5)
  • docs/internals/llm-streams.md
  • src/gateway/mod.rs
  • src/gateway/streams/hub.rs
  • src/gateway/streams/mod.rs
  • src/gateway/streams/reader.rs

@bzp2010 bzp2010 merged commit 84a3a3e into main Apr 7, 2026
10 checks passed
@bzp2010 bzp2010 deleted the bzp/feat-new-provider-stream-core branch April 7, 2026 03:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants