Skip to content

feat: responses api#89

Merged
bzp2010 merged 2 commits intomainfrom
bzp/feat-responses-api
May 5, 2026
Merged

feat: responses api#89
bzp2010 merged 2 commits intomainfrom
bzp/feat-responses-api

Conversation

@bzp2010
Copy link
Copy Markdown
Collaborator

@bzp2010 bzp2010 commented May 4, 2026

Summary by CodeRabbit

  • New Features
    • Added /v1/responses endpoint supporting OpenAI Responses API with streaming and non-streaming request handling
    • Implemented session replay via previous_response_id to maintain conversation history across requests
    • Added authorization and rate-limiting controls for the new endpoint

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 4, 2026

📝 Walkthrough

Walkthrough

This PR introduces a new /v1/responses API endpoint with comprehensive request/response handling, streaming support, observability instrumentation, and error management. It configures the gateway with an explicit in-memory session store and wires the handler into the proxy router. Multiple test modules have import statements reformatted for consistency.

Changes

Responses API Handler

Layer / File(s) Summary
Type Definitions
src/proxy/handlers/responses/types.rs
Defines ResponsesError enum covering authorization, rate-limiting, gateway, timeout, and missing-model errors; implements IntoResponse to map variants into HTTP responses with structured JSON error payloads.
Span Attributes & Telemetry
src/proxy/handlers/responses/span_attributes/{message_attributes,stream_output,telemetry}.rs
Implements observability infrastructure: StreamOutputCollector accumulates streamed events into output messages; telemetry functions build request/response/chunk span properties with OpenInference message attributes, tool definitions, and token usage; message converters translate requests/responses into traced message views.
Core Handler Implementation
src/proxy/handlers/responses/mod.rs
Implements the main responses handler: records observability spans, authorizes requests, applies rate-limiting, resolves model from context, dispatches gateway calls with optional timeout, and routes to either JSON or SSE response handling with streaming usage coordination.
Module Registry & Routing
src/proxy/handlers/mod.rs, src/proxy/mod.rs
Adds pub mod responses; to handlers registry and registers POST /v1/responses endpoint in the proxy router between /v1/messages and /v1/embeddings.
Test Coverage & Import Cleanup
tests/proxy/responses.test.ts, src/proxy/handlers/responses/span_attributes/tests.rs, plus test import reordering across src/config/entities/mod.rs, src/gateway/**/*.rs, src/proxy/**/*.rs, src/utils/instance.rs
Comprehensive E2E tests validating non-streaming/streaming responses, authorization, rate-limiting, session replay with previous_response_id, and invalid prior response handling; unit tests for span attribute generation and event accumulation; test module imports consistently reorganized.

Gateway Session Store Configuration

Layer / File(s) Summary
Service Initialization
src/lib.rs
Gateway construction in run_with_provider now explicitly configures an in-memory session store via .with_session_store(Arc::new(gateway::session::InMemorySessionStore::default())) instead of relying on default/unconfigured setup.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 5 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
E2e Test Quality Review ⚠️ Warning E2E test coverage is incomplete: missing timeout handling tests, model validation error tests, streaming race condition tests, and request input validation tests despite these being implemented in the handler. Add E2E tests for timeout, missing model, invalid inputs, and concurrent streaming scenarios; fix unresolved content_index routing issue in StreamOutputCollector to properly handle multi-part content.
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: responses api' directly summarizes the main change: implementation of a new Responses API handler with full feature support.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Security Check ✅ Passed Authorization checks are properly enforced in the responses handler, sensitive data is not exposed in error responses, span attributes properly isolate message content, and rate limiting is implemented.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bzp/feat-responses-api

Review rate limit: 4/5 reviews remaining, refill in 12 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

@bzp2010 bzp2010 marked this pull request as ready for review May 4, 2026 18:04
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
src/proxy/handlers/responses/types.rs (1)

14-26: ⚡ Quick win

Add a doc comment for the public ResponsesError type.

This enum is part of the public surface in this module and should be documented with /// to keep the proxy error contract explicit.

Suggested patch
 #[derive(Debug, Error)]
+/// Error type returned by the `/v1/responses` proxy handler.
 pub enum ResponsesError {

As per coding guidelines, "Use /// for doc comments on public items in Rust".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/responses/types.rs` around lines 14 - 26, The public enum
ResponsesError lacks a doc comment; add a triple-slash doc comment (///)
immediately above the ResponsesError declaration describing its purpose (it
represents proxy response errors returned by this module) and briefly mention
key variants (AuthorizationError, RateLimitError, GatewayError, Timeout,
MissingModelInContext) so downstream users understand the error contract and
when each variant is used.
src/lib.rs (1)

87-93: 🏗️ Heavy lift

Make the session store backend configurable instead of hard-coding in-memory.

Line 92 currently forces a process-local store for all deployments. That can fragment conversation/session state across replicas and makes memory growth depend entirely on runtime traffic patterns. Consider selecting the store via config (with in-memory as a dev default) so production can use a shared/bounded backend.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/lib.rs` around lines 87 - 93, The code currently hard-codes an in-memory
session store when building the gateway
(gateway::Gateway::new(...).with_session_store(Arc::new(gateway::session::InMemorySessionStore::default()))),
which prevents shared/bounded backends in production; change this to choose the
session store implementation from configuration (use in-memory as the default
for dev). Add a factory or builder that reads config (env/struct) and returns
Arc<dyn SessionStore> (or a concrete boxed type) and replace the direct
InMemorySessionStore instantiation with that factory so
gateway::with_session_store(...) receives the configured store; ensure the
factory supports at least in-memory and one shared backend (e.g., Redis) and
preserves Arc wrapping and trait compatibility.
src/proxy/handlers/responses/mod.rs (3)

14-21: ⚡ Quick win

Import ordering: external crate tokio is mixed with local imports.

Per coding guidelines, imports should be ordered: standard library first, then external crates (alphabetically), then local modules. The tokio import on line 20 should be grouped with other external crates.

♻️ Suggested reordering
 use fastrace::prelude::{Event as TraceEvent, *};
 use log::error;
+use tokio::sync::{oneshot, oneshot::error::TryRecvError};
+
 use span_attributes::{
     StreamOutputCollector, apply_span_properties, chunk_span_properties, event_starts_output,
     request_span_properties, response_span_properties, usage_span_properties,
 };
-use tokio::sync::{oneshot, oneshot::error::TryRecvError};
 pub use types::ResponsesError;

As per coding guidelines: "Sort imports alphabetically with rustfmt rules: standard library first, then external crates (alphabetical), then local modules".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/responses/mod.rs` around lines 14 - 21, The import block
mixes external crate `tokio` with local imports; reorder imports so external
crates are grouped and alphabetized (standard lib first if present), e.g. place
fastrace, log, span_attributes, tokio (alphabetical) together, then local
modules like `pub use types::ResponsesError;`; update the block containing the
use statements (look for the lines importing fastrace, log, span_attributes,
tokio::sync, and ResponsesError) to follow that ordering and grouping.

218-244: ⚡ Quick win

Duplicated usage collection logic could be extracted.

The try_recv handling for usage_rx in both the error case (lines 218-244) and end-of-stream case (lines 265-291) is nearly identical. Consider extracting this into a helper function to reduce duplication.

♻️ Example extraction
async fn try_collect_usage(
    usage_rx: Option<oneshot::Receiver<Usage>>,
    request_ctx: &mut RequestContext,
    span: &Span,
) {
    let Some(mut usage_rx) = usage_rx else { return };
    
    match usage_rx.try_recv() {
        Ok(usage) => {
            if let Err(err) = hooks::rate_limit::post_check_streaming(request_ctx, &usage).await {
                error!("Rate limit post_check_streaming error: {}", err);
            }
            hooks::observability::record_streaming_usage(request_ctx, &usage).await;
            span.add_properties(|| usage_span_properties(&usage));
        }
        Err(TryRecvError::Empty) => {
            spawn_stream_usage_observer(request_ctx.clone(), usage_rx);
        }
        Err(TryRecvError::Closed) => {
            error!("Failed to receive streaming usage from gateway: channel closed");
        }
    }
}

Also applies to: 265-291

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/responses/mod.rs` around lines 218 - 244, Extract the
duplicated try_recv handling into a helper async function (e.g.,
try_collect_usage) that accepts Option<oneshot::Receiver<Usage>> (or the same
usage_rx type), &mut RequestContext, and &Span; inside it perform the match on
usage_rx.try_recv(), call hooks::rate_limit::post_check_streaming,
hooks::observability::record_streaming_usage,
span.add_properties(usage_span_properties),
spawn_stream_usage_observer(request_ctx.clone(), usage_rx) for the Empty case,
and log the Closed case; then replace the duplicated blocks in the response
handler with calls to this helper (ensure to take() the usage_rx before calling
so the ownership/Option semantics remain correct).

150-168: 💤 Low value

The unfold state tuple has many elements, making it hard to follow.

The 7-element tuple for stream state is unwieldy. While functional, a named struct would improve readability and make the state transitions clearer.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/responses/mod.rs` around lines 150 - 168, The 7-tuple used
as the state for futures::stream::unfold inside handle_stream_request is hard to
read; replace it with a small named struct (e.g., StreamState) that contains
fields for stream: ChatResponseStream<ResponsesApiFormat>, span: Span,
request_ctx: RequestContext, ended: bool, usage_rx:
Option<oneshot::Receiver<Usage>>, collector: StreamOutputCollector, and
is_first_chunk: bool; initialize StreamState::default() or a constructor when
calling unfold, update the closure to destructure and mutate the named fields
instead of tuple indices, and derive/implement Clone/Default or provide explicit
construction so the rest of the logic using stream_request_ctx, usage_rx,
StreamOutputCollector, and span continues to work without changing behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/proxy/handlers/responses/span_attributes/stream_output.rs`:
- Around line 35-68: The handler currently discards the event's content_index
(patterned with ..) and always updates content.first_mut(); update the match
arms for ResponsesApiStreamEvent::OutputTextDelta and ::OutputTextDone to
capture content_index and use it to target the correct content part: pass
content_index (or use it to index into the message's content Vec) when calling
append_message_text and set_message_text so deltas and completions are applied
to the specified part instead of always the first; adjust
append_message_text/set_message_text callsites/logic to accept or use the
content_index to route updates to the correct
ResponsesOutputItem::Message.content entry.

---

Nitpick comments:
In `@src/lib.rs`:
- Around line 87-93: The code currently hard-codes an in-memory session store
when building the gateway
(gateway::Gateway::new(...).with_session_store(Arc::new(gateway::session::InMemorySessionStore::default()))),
which prevents shared/bounded backends in production; change this to choose the
session store implementation from configuration (use in-memory as the default
for dev). Add a factory or builder that reads config (env/struct) and returns
Arc<dyn SessionStore> (or a concrete boxed type) and replace the direct
InMemorySessionStore instantiation with that factory so
gateway::with_session_store(...) receives the configured store; ensure the
factory supports at least in-memory and one shared backend (e.g., Redis) and
preserves Arc wrapping and trait compatibility.

In `@src/proxy/handlers/responses/mod.rs`:
- Around line 14-21: The import block mixes external crate `tokio` with local
imports; reorder imports so external crates are grouped and alphabetized
(standard lib first if present), e.g. place fastrace, log, span_attributes,
tokio (alphabetical) together, then local modules like `pub use
types::ResponsesError;`; update the block containing the use statements (look
for the lines importing fastrace, log, span_attributes, tokio::sync, and
ResponsesError) to follow that ordering and grouping.
- Around line 218-244: Extract the duplicated try_recv handling into a helper
async function (e.g., try_collect_usage) that accepts
Option<oneshot::Receiver<Usage>> (or the same usage_rx type), &mut
RequestContext, and &Span; inside it perform the match on usage_rx.try_recv(),
call hooks::rate_limit::post_check_streaming,
hooks::observability::record_streaming_usage,
span.add_properties(usage_span_properties),
spawn_stream_usage_observer(request_ctx.clone(), usage_rx) for the Empty case,
and log the Closed case; then replace the duplicated blocks in the response
handler with calls to this helper (ensure to take() the usage_rx before calling
so the ownership/Option semantics remain correct).
- Around line 150-168: The 7-tuple used as the state for futures::stream::unfold
inside handle_stream_request is hard to read; replace it with a small named
struct (e.g., StreamState) that contains fields for stream:
ChatResponseStream<ResponsesApiFormat>, span: Span, request_ctx: RequestContext,
ended: bool, usage_rx: Option<oneshot::Receiver<Usage>>, collector:
StreamOutputCollector, and is_first_chunk: bool; initialize
StreamState::default() or a constructor when calling unfold, update the closure
to destructure and mutate the named fields instead of tuple indices, and
derive/implement Clone/Default or provide explicit construction so the rest of
the logic using stream_request_ctx, usage_rx, StreamOutputCollector, and span
continues to work without changing behavior.

In `@src/proxy/handlers/responses/types.rs`:
- Around line 14-26: The public enum ResponsesError lacks a doc comment; add a
triple-slash doc comment (///) immediately above the ResponsesError declaration
describing its purpose (it represents proxy response errors returned by this
module) and briefly mention key variants (AuthorizationError, RateLimitError,
GatewayError, Timeout, MissingModelInContext) so downstream users understand the
error contract and when each variant is used.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ffaa4ed9-d771-485c-bdc3-2b6cd40fe580

📥 Commits

Reviewing files that changed from the base of the PR and between 0359219 and 6fbf12d.

📒 Files selected for processing (32)
  • src/config/entities/mod.rs
  • src/gateway/provider_instance.rs
  • src/gateway/providers/macros.rs
  • src/gateway/providers/modelscope.rs
  • src/gateway/providers/moonshot.rs
  • src/gateway/providers/openrouter.rs
  • src/gateway/streams/bridged.rs
  • src/gateway/streams/hub.rs
  • src/gateway/streams/native.rs
  • src/gateway/streams/reader/aws_event_stream.rs
  • src/gateway/streams/reader/sse.rs
  • src/gateway/traits/chat_format.rs
  • src/gateway/traits/provider.rs
  • src/gateway/types/common.rs
  • src/lib.rs
  • src/proxy/handlers/messages/types.rs
  • src/proxy/handlers/mod.rs
  • src/proxy/handlers/responses/mod.rs
  • src/proxy/handlers/responses/span_attributes/message_attributes.rs
  • src/proxy/handlers/responses/span_attributes/mod.rs
  • src/proxy/handlers/responses/span_attributes/stream_output.rs
  • src/proxy/handlers/responses/span_attributes/telemetry.rs
  • src/proxy/handlers/responses/span_attributes/tests.rs
  • src/proxy/handlers/responses/types.rs
  • src/proxy/hooks/rate_limit/concurrent/mod.rs
  • src/proxy/hooks/rate_limit/concurrent/utils.rs
  • src/proxy/hooks/rate_limit/ratelimit/local.rs
  • src/proxy/hooks/rate_limit/ratelimit/mod.rs
  • src/proxy/hooks/rate_limit/ratelimit/utils.rs
  • src/proxy/mod.rs
  • src/utils/instance.rs
  • tests/proxy/responses.test.ts

Comment on lines +35 to +68
ResponsesApiStreamEvent::OutputTextDelta {
output_index,
delta,
..
} => {
if !delta.is_empty() {
append_message_text(
self.items.entry(*output_index).or_insert_with(|| {
ResponsesOutputItem::Message {
id: String::new(),
role: "assistant".into(),
content: vec![],
status: "in_progress".into(),
}
}),
delta,
);
}
}
ResponsesApiStreamEvent::OutputTextDone {
output_index, text, ..
} => {
set_message_text(
self.items.entry(*output_index).or_insert_with(|| {
ResponsesOutputItem::Message {
id: String::new(),
role: "assistant".into(),
content: vec![],
status: "completed".into(),
}
}),
text,
);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== ResponsesApiStreamEvent shape =="
rg -n -C3 'enum\s+ResponsesApiStreamEvent|OutputTextDelta|OutputTextDone|content_index|part_index' src

echo
echo "== Collector handling =="
rg -n -C3 'OutputTextDelta|OutputTextDone|append_message_text|set_message_text|first_mut|content.clear' src/proxy/handlers/responses/span_attributes/stream_output.rs

Repository: api7/aisix

Length of output: 16420


Handle multi-part content streams using content_index from events.

Lines 35-39 and 54-55 drop the content_index field via pattern .., while append_message_text (line 136) targets only content.first_mut() and set_message_text (lines 154-157) clears and replaces all parts. Although content_index is currently hardcoded to 0 in all event generation (src/gateway/formats/openai/responses.rs), the event structure supports per-part indices and the handler should respect them. Refactor to route text deltas and completions to the correct content part index rather than always the first.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/responses/span_attributes/stream_output.rs` around lines
35 - 68, The handler currently discards the event's content_index (patterned
with ..) and always updates content.first_mut(); update the match arms for
ResponsesApiStreamEvent::OutputTextDelta and ::OutputTextDone to capture
content_index and use it to target the correct content part: pass content_index
(or use it to index into the message's content Vec) when calling
append_message_text and set_message_text so deltas and completions are applied
to the specified part instead of always the first; adjust
append_message_text/set_message_text callsites/logic to accept or use the
content_index to route updates to the correct
ResponsesOutputItem::Message.content entry.

@bzp2010 bzp2010 merged commit 7b2d7c9 into main May 5, 2026
3 checks passed
@bzp2010 bzp2010 deleted the bzp/feat-responses-api branch May 5, 2026 03:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant