Skip to content

refactor: responses api server-side context management#93

Merged
bzp2010 merged 3 commits intomainfrom
bzp/feat-responses-previous-response
May 7, 2026
Merged

refactor: responses api server-side context management#93
bzp2010 merged 3 commits intomainfrom
bzp/feat-responses-previous-response

Conversation

@bzp2010
Copy link
Copy Markdown
Collaborator

@bzp2010 bzp2010 commented May 7, 2026

Summary by CodeRabbit

  • New Features

    • Conversation replay for OpenAI-format requests (prior turns are included in context)
    • Persistent message history so multi-turn conversations are stored and retrievable
    • Responses API: richer context merging and improved streamed-response accumulation
  • Refactor

    • Lifecycle-aware request handling for more reliable streaming and completion flows
    • Unified, consistent error responses across handlers for clearer upstream payloads
  • Tests

    • Added unit tests covering replay, history persistence, and responses lifecycle behaviors

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 7, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

This PR adds async message-history storage and an in-memory implementation; extends FormatHandlerAdapter with per-request lifecycle state and hooks; implements Responses runtime/accumulators to merge/replay/accumulate and optionally persist conversation history (including replay_messages); refactors gateway chat dispatch into chat_from_hub; centralizes OpenAI-style error response helpers; and updates tests, package pin, and app wiring.

Changes

Message History & Lifecycle Implementation

Layer / File(s) Summary
Data Contracts
crates/aisix-llm/src/types/openai/responses.rs, src/proxy/message_history.rs
ResponsesApiRequest gains replay_messages: Vec<ChatMessage> (#[serde(skip)]); StoredMessageHistory and MessageHistoryStorage trait added.
Message History Storage
src/proxy/message_history.rs
InMemoryMessageHistoryStorage implements async get_by_response_id and put with RwLock; test-only delete helper and round-trip tests included.
Format Adapter Trait
src/proxy/handlers/format_handler.rs
FormatHandlerAdapter adds type LifecycleState and lifecycle hooks (prepare_lifecycle, handle_complete_response, handle_stream_success, handle_stream_failure), plus handle_stream_item() and lifecycle_error_event; default no-op implementations provided.
Streaming & Finalization
src/proxy/handlers/format_handler.rs
format_handler initializes and threads lifecycle_state through complete and streaming flows, calls per-chunk handle_stream_item, and finalizes via finalize_stream_success / finalize_stream_usage_observation.
Responses Runtime & Accumulator
src/proxy/handlers/responses/runtime.rs
Introduces ResponsesLifecycleState and ResponsesStreamAccumulator; provides init_lifecycle, load_previous_messages, build_merged_input_messages, accumulate_stream_event, accumulate_complete, accumulate_stream_success, persist_if_enabled, conversion helpers, and unit tests.
Responses Adapter Implementation
src/proxy/handlers/responses/mod.rs, src/proxy/handlers/responses/runtime.rs
ResponsesAdapter implements lifecycle hooks: prepares lifecycle (init + load + merge), sets request.replay_messages, accumulates/persists complete and streamed-success snapshots, rewrites response/output IDs, and accumulates stream events per chunk; runtime module provides helpers.
Gateway Refactor
crates/aisix-llm/src/gateway.rs
Gateway::chat now converts format requests to hub ChatCompletionRequest + BridgeContext and delegates to chat_from_hub which handles streaming vs non-streaming flows and moves session persistence into the completion branch.
App Integration
src/lib.rs, src/proxy/mod.rs
AppState now holds message_history_storage: Arc<dyn MessageHistoryStorage>; AppState::new signature updated; run_with_provider constructs InMemoryMessageHistoryStorage and passes it into AppState::new; gateway creation simplified (no explicit session-store wiring).
OpenAI Error Helpers & Mapping
src/proxy/handlers/openai_error.rs, src/proxy/handlers/*/types.rs
Adds gateway_error_response, timeout_response, missing_model_response; refactors IntoResponse impls in Chat Completions, Embeddings, and Responses to delegate to these helpers; tests updated to assert new response shapes.
Format Conversions & Tests
crates/aisix-llm/src/formats/openai/responses.rs, src/proxy/handlers/chat_completions/mod.rs, src/proxy/handlers/messages/mod.rs, crates/aisix-llm/src/lib.rs, tests/package.json
ResponsesApiFormat::to_hub appends request.replay_messages after optional system instruction; new unit test verifies replay ordering; ChatCompletionsAdapter and MessagesAdapter set LifecycleState = ();; crate root stops exporting session module; tests/package.json packageManager pin updated.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • api7/aisix#92: Extends adapter-based handler infrastructure and lifecycle hook changes.
  • api7/aisix#86: Prior work on ResponsesApiFormat and hub translation; related to replay_messages changes.
  • api7/aisix#89: Overlapping Responses flow and session/replay logic changes.

Important

Pre-merge checks failed

Please resolve all errors before merging. Addressing warnings is optional.

❌ Failed checks (1 error, 1 warning)

Check name Status Explanation Resolution
Security Check ❌ Error Missing authorization check: prepare_lifecycle loads prior messages without validating API key ownership of the referenced previous_response_id, allowing any key to access any conversation history. Add api_key_id field to StoredMessageHistory. In prepare_lifecycle, validate requesting API key matches stored history owner before returning cumulative_messages. Add unauthorized access tests.
E2e Test Quality Review ⚠️ Warning E2E test coverage is incomplete. Streaming test and replay-only test are separate. No test combines stream: true with previous_response_id, leaving a critical business flow path untested. Add E2E test combining streaming with previous_response_id. Add tests for multi-turn chaining (3+ requests) and edge cases (empty history, long histories). Current tests don't cover full feature scope required by this PR.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main refactoring work: introducing server-side context management (lifecycle state, message history storage, and persistence) for the Responses API.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bzp/feat-responses-previous-response

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/aisix-llm/src/gateway.rs (1)

72-99: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

This drops previous_response_id handling from the public bridged Gateway API.

ResponsesApiFormat::to_hub() only consumes request.replay_messages, and this layer no longer resolves or persists any response history before calling it. As a result, direct aisix_llm::Gateway::chat::<ResponsesApiFormat> callers lose server-side context unless they already inject replay themselves, even though previous_response_id is still part of the request type. Either keep a pluggable history/session hook at the gateway layer, or reject bridged previous_response_id outside the proxy path instead of silently ignoring it.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/aisix-llm/src/gateway.rs` around lines 72 - 99, The bridged Gateway
path in chat_from_hub drops previous_response_id handling
(ResponsesApiFormat::to_hub only consumes replay_messages), so callers to
Gateway::chat::<ResponsesApiFormat> lose server-side context; fix by detecting
and handling request.previous_response_id inside chat_from_hub (or its caller)
before calling ResponsesApiFormat::to_hub/call_chat_hub/call_chat_hub_stream:
either resolve previous_response_id to replay messages from the session/history
store and merge them into hub_request.replay_messages (and persist any new
response IDs after call), or explicitly reject/return an error when
previous_response_id is present for bridged requests to prevent silent ignoring.
Ensure the logic updates the hub_request passed to
call_chat_hub/call_chat_hub_stream and keeps usage/BridgedStream behavior
intact.
🧹 Nitpick comments (2)
src/lib.rs (1)

89-101: Plan for a shared history backend before relying on this in multi-instance deployments.

InMemoryMessageHistoryStorage makes previous_response_id process-local, so chains break after a restart or when the next turn lands on another replica. If this is only meant as a default/dev backend, I'd wire the storage from config or document the sticky-routing/shared-store requirement up front.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/lib.rs` around lines 89 - 101, The current instantiation of
InMemoryMessageHistoryStorage makes previous_response_id process-local and
breaks multi-instance chains; update the initialization in the
proxy::create_router call so MessageHistoryStorage is pluggable from
configuration (e.g., read config to choose between
proxy::message_history::InMemoryMessageHistoryStorage for dev vs a shared
implementation for production), instantiate the appropriate implementation and
pass that Arc<dyn proxy::message_history::MessageHistoryStorage> into
proxy::AppState::new; if you intend InMemoryMessageHistoryStorage to remain only
a default/dev option, make that explicit in config and documentation (or add a
warning) so deployments requiring sticky routing are configured with a shared
backend.
src/proxy/handlers/format_handler.rs (1)

151-155: ⚡ Quick win

Add the repo-standard trace boundary to this public handler.

This is a request-scoped public function, so it should carry #[fastrace::trace] even though it also creates manual child spans.

Suggested change
+#[fastrace::trace]
 pub(crate) async fn format_handler<A>(
     State(state): State<AppState>,
     mut request_ctx: RequestContext,
     Json(mut request_data): Json<AdapterRequest<A>>,

As per coding guidelines "Use #[fastrace::trace] decorator for distributed tracing on public functions".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/format_handler.rs` around lines 151 - 155, The public
request-scoped handler format_handler must be decorated with the
repository-standard tracing attribute; add #[fastrace::trace] immediately above
the pub(crate) async fn format_handler<A>(...) declaration so the function
carries the distributed-tracing boundary while keeping its existing manual child
spans and internal span creation unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 381-410: The code currently acknowledges stream completion before
ensuring lifecycle persistence via A::handle_stream_success, causing lost
Responses history; change the flow so you do not detach or return success until
handle_stream_success completes successfully: in the TryRecvError::Empty branch
remove spawn_stream_success_observer and instead await
A::handle_stream_success(state, request_ctx, lifecycle_state, None/Some(&usage)
as appropriate) and if it returns Err propagate/return that error (do not just
log), and likewise in the TryRecvError::Closed branch and the final post-branch
call, ensure failures from A::handle_stream_success are propagated to the caller
rather than only logged so the client is not acknowledged until lifecycle
persistence succeeds (referencing A::handle_stream_success,
spawn_stream_success_observer, TryRecvError::Empty, TryRecvError::Closed,
usage_rx, usage).

In `@src/proxy/handlers/responses/mod.rs`:
- Around line 99-111: prepare_lifecycle is rewriting client-visible response IDs
into gateway-local IDs but leaves request.previous_response_id unchanged,
causing native providers to receive gateway-generated IDs they don't recognize;
fix by ensuring upstream/native IDs are preserved for native flows or by storing
a reverse mapping when you rewrite IDs (in the lifecycle state or a dedicated
store) and translate request.previous_response_id back to the upstream/native ID
before any native dispatch/serialization (check init_lifecycle,
load_previous_messages, build_merged_input_messages, request.replay_messages and
the native Responses serialization path) so chained /v1/responses requests see
the original provider IDs.

In `@src/proxy/handlers/responses/runtime.rs`:
- Around line 423-433: The code in ResponsesContentPart::InputImage currently
returns Err(GatewayError::Validation(...)) when image_url is None, which turns a
non-persistable file_id into a hard request failure; instead, change the branch
to skip persisting that image part or store a safe placeholder: when let
Some(url) = image_url.as_ref() fails, do not return an error—either continue
without adding this ResponsesContentPart to persistence or assign a
placeholder/local-marker derived from file_id.as_deref().unwrap_or("<unknown>")
and proceed; update the handling around image_url, file_id, and detail in the
ResponsesContentPart::InputImage match arm so the request flow continues without
GatewayError::Validation.
- Around line 360-362: The generate_response_id function is using the wrong
prefix ("aresp_") causing init_lifecycle and tests to expect a "resp_" prefix;
update generate_response_id to use "resp_" as the prefix (i.e., change the
format call in fn generate_response_id() -> String to produce "resp_{}") so
generated IDs match the rest of the flow and tests (function name:
generate_response_id, referenced by init_lifecycle).

In `@src/proxy/message_history.rs`:
- Around line 29-58: The in-memory store InMemoryMessageHistoryStorage (fields:
histories, method: put, StoredMessageHistory.cumulative_messages) is unbounded
and stores full conversation snapshots per response, which will OOM; replace or
limit it before using on the request path by either (a) switching to an
external/persistent store (DB, Redis, S3) and moving put/get to that backend, or
(b) implementing a bounded in-process cache (e.g., LRU or size/TTL eviction)
instead of RwLock<HashMap>, and/or stop storing full cumulative_messages by
storing diffs/partial history or truncating messages to a capped depth; ensure
the new implementation keeps the same MessageHistoryStorage async methods
(get_by_response_id, put) and preserves response_id-based lookup while enforcing
eviction/persistence to prevent unbounded growth.

---

Outside diff comments:
In `@crates/aisix-llm/src/gateway.rs`:
- Around line 72-99: The bridged Gateway path in chat_from_hub drops
previous_response_id handling (ResponsesApiFormat::to_hub only consumes
replay_messages), so callers to Gateway::chat::<ResponsesApiFormat> lose
server-side context; fix by detecting and handling request.previous_response_id
inside chat_from_hub (or its caller) before calling
ResponsesApiFormat::to_hub/call_chat_hub/call_chat_hub_stream: either resolve
previous_response_id to replay messages from the session/history store and merge
them into hub_request.replay_messages (and persist any new response IDs after
call), or explicitly reject/return an error when previous_response_id is present
for bridged requests to prevent silent ignoring. Ensure the logic updates the
hub_request passed to call_chat_hub/call_chat_hub_stream and keeps
usage/BridgedStream behavior intact.

---

Nitpick comments:
In `@src/lib.rs`:
- Around line 89-101: The current instantiation of InMemoryMessageHistoryStorage
makes previous_response_id process-local and breaks multi-instance chains;
update the initialization in the proxy::create_router call so
MessageHistoryStorage is pluggable from configuration (e.g., read config to
choose between proxy::message_history::InMemoryMessageHistoryStorage for dev vs
a shared implementation for production), instantiate the appropriate
implementation and pass that Arc<dyn
proxy::message_history::MessageHistoryStorage> into proxy::AppState::new; if you
intend InMemoryMessageHistoryStorage to remain only a default/dev option, make
that explicit in config and documentation (or add a warning) so deployments
requiring sticky routing are configured with a shared backend.

In `@src/proxy/handlers/format_handler.rs`:
- Around line 151-155: The public request-scoped handler format_handler must be
decorated with the repository-standard tracing attribute; add #[fastrace::trace]
immediately above the pub(crate) async fn format_handler<A>(...) declaration so
the function carries the distributed-tracing boundary while keeping its existing
manual child spans and internal span creation unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: db882082-9c2c-4a90-a1df-602b32ad3ee2

📥 Commits

Reviewing files that changed from the base of the PR and between db7317b and 963720d.

📒 Files selected for processing (13)
  • crates/aisix-llm/src/formats/openai/responses.rs
  • crates/aisix-llm/src/gateway.rs
  • crates/aisix-llm/src/lib.rs
  • crates/aisix-llm/src/session.rs
  • crates/aisix-llm/src/types/openai/responses.rs
  • src/lib.rs
  • src/proxy/handlers/chat_completions/mod.rs
  • src/proxy/handlers/format_handler.rs
  • src/proxy/handlers/messages/mod.rs
  • src/proxy/handlers/responses/mod.rs
  • src/proxy/handlers/responses/runtime.rs
  • src/proxy/message_history.rs
  • src/proxy/mod.rs
💤 Files with no reviewable changes (2)
  • crates/aisix-llm/src/lib.rs
  • crates/aisix-llm/src/session.rs

Comment thread src/proxy/handlers/format_handler.rs Outdated
Comment thread src/proxy/handlers/responses/mod.rs
Comment thread src/proxy/handlers/responses/runtime.rs
Comment thread src/proxy/handlers/responses/runtime.rs Outdated
Comment thread src/proxy/message_history.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/proxy/handlers/responses/runtime.rs (1)

113-122: 💤 Low value

Silent data loss if event order is unexpected.

ensure_function_call_item() returns the existing item at output_index regardless of its variant type. If a Message item already exists at that index (due to out-of-order events or protocol bugs), the if let ResponsesOutputItem::FunctionCall pattern silently drops the delta.

Consider logging a warning when the pattern doesn't match, so protocol violations or upstream bugs become visible:

🔧 Proposed observability improvement
 ResponsesApiStreamEvent::FunctionCallArgumentsDelta {
     output_index,
     delta,
 } => {
-    if let ResponsesOutputItem::FunctionCall { arguments, .. } =
-        self.ensure_function_call_item(*output_index)
-    {
-        arguments.push_str(delta);
+    match self.ensure_function_call_item(*output_index) {
+        ResponsesOutputItem::FunctionCall { arguments, .. } => {
+            arguments.push_str(delta);
+        }
+        other => {
+            tracing::warn!(
+                output_index,
+                item_type = ?std::mem::discriminant(other),
+                "FunctionCallArgumentsDelta received for non-FunctionCall item; delta dropped"
+            );
+        }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/responses/runtime.rs` around lines 113 - 122, The
FunctionCallArgumentsDelta branch calls
self.ensure_function_call_item(*output_index) but silently drops deltas if the
returned ResponsesOutputItem is not a ResponsesOutputItem::FunctionCall; update
the handler for ResponsesApiStreamEvent::FunctionCallArgumentsDelta to detect
when the pattern doesn't match and emit a warning (include output_index and the
actual variant debug string) so protocol violations are observable, and decide
whether to append, replace, or reinitialize the item (e.g., create a new
FunctionCall item) after logging; reference ensure_function_call_item,
ResponsesApiStreamEvent::FunctionCallArgumentsDelta, and
ResponsesOutputItem::FunctionCall when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/proxy/handlers/responses/runtime.rs`:
- Around line 113-122: The FunctionCallArgumentsDelta branch calls
self.ensure_function_call_item(*output_index) but silently drops deltas if the
returned ResponsesOutputItem is not a ResponsesOutputItem::FunctionCall; update
the handler for ResponsesApiStreamEvent::FunctionCallArgumentsDelta to detect
when the pattern doesn't match and emit a warning (include output_index and the
actual variant debug string) so protocol violations are observable, and decide
whether to append, replace, or reinitialize the item (e.g., create a new
FunctionCall item) after logging; reference ensure_function_call_item,
ResponsesApiStreamEvent::FunctionCallArgumentsDelta, and
ResponsesOutputItem::FunctionCall when making the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3a66f7c6-f3dc-4587-921d-a0fe806bd3dc

📥 Commits

Reviewing files that changed from the base of the PR and between 963720d and 6839528.

📒 Files selected for processing (6)
  • src/proxy/handlers/chat_completions/types.rs
  • src/proxy/handlers/embeddings/types.rs
  • src/proxy/handlers/mod.rs
  • src/proxy/handlers/openai_error.rs
  • src/proxy/handlers/responses/runtime.rs
  • src/proxy/handlers/responses/types.rs
✅ Files skipped from review due to trivial changes (1)
  • src/proxy/handlers/mod.rs

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/proxy/handlers/format_handler.rs (1)

388-412: 💤 Low value

Consider extracting the unfold state tuple into a named struct.

The 10-element tuple (state, stream, span, request_ctx, should_terminate, saw_item, usage_rx, output_collector, first_output_arrived, lifecycle_state) is unwieldy and error-prone. A named struct would improve readability and reduce the risk of misaligned fields when constructing return tuples.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/proxy/handlers/format_handler.rs` around lines 388 - 412, The 10-element
state tuple passed into futures::stream::unfold is hard to read and fragile;
define a small named struct (e.g., UnfoldState) containing the fields currently
in the tuple (state, stream, span, request_ctx/stream_request_ctx,
should_terminate, saw_item, usage_rx, output_collector/AdapterCollector,
first_output_arrived, lifecycle_state), replace the tuple construction used as
the initial state for sse_stream with an instance of that struct, and update the
closure signature and any places that read or return the tuple to use the
struct's named fields instead; ensure all references to stream_state, usage_rx,
AdapterCollector::<A>::default(), and lifecycle_state are moved into the new
struct so the unfold logic remains identical but much clearer.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/proxy/handlers/format_handler.rs`:
- Around line 388-412: The 10-element state tuple passed into
futures::stream::unfold is hard to read and fragile; define a small named struct
(e.g., UnfoldState) containing the fields currently in the tuple (state, stream,
span, request_ctx/stream_request_ctx, should_terminate, saw_item, usage_rx,
output_collector/AdapterCollector, first_output_arrived, lifecycle_state),
replace the tuple construction used as the initial state for sse_stream with an
instance of that struct, and update the closure signature and any places that
read or return the tuple to use the struct's named fields instead; ensure all
references to stream_state, usage_rx, AdapterCollector::<A>::default(), and
lifecycle_state are moved into the new struct so the unfold logic remains
identical but much clearer.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 420e1a24-8666-47e7-a39a-41987816f367

📥 Commits

Reviewing files that changed from the base of the PR and between 6839528 and 0543368.

📒 Files selected for processing (5)
  • src/proxy/handlers/format_handler.rs
  • src/proxy/handlers/responses/mod.rs
  • src/proxy/handlers/responses/runtime.rs
  • src/proxy/message_history.rs
  • tests/package.json
✅ Files skipped from review due to trivial changes (1)
  • tests/package.json

@bzp2010 bzp2010 merged commit bb8c0c9 into main May 7, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant