Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/crates/ai-adapters/src/stream/stream_handler/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use std::time::Duration;
use tokio::sync::mpsc;

const OPENAI_CHAT_COMPLETION_CHUNK_OBJECT: &str = "chat.completion.chunk";
/// MiniMax (and possibly other providers) close a streaming response with a
/// non-streaming `chat.completion` frame instead of a true `chunk`. That final
/// frame is the only one carrying authoritative usage, so we accept it too.
const OPENAI_CHAT_COMPLETION_OBJECT: &str = "chat.completion";
const AI_STREAM_RESPONSE_TARGET: &str = "ai::openai_stream_response";

#[derive(Debug)]
Expand Down Expand Up @@ -42,9 +46,14 @@ impl OpenAIResponseNormalizer {
}

fn is_valid_chat_completion_chunk_weak(event_json: &Value) -> bool {
// Standard streaming frames use `chat.completion.chunk`. MiniMax's final
// SSE frame, however, switches to the non-streaming `chat.completion`
// shape (choice carries `message` rather than `delta`) and is the ONLY
// chunk that contains the authoritative usage block. Accept both — the
// OpenAISSEData deserialization downstream tolerates either choice shape.
matches!(
event_json.get("object").and_then(|value| value.as_str()),
Some(OPENAI_CHAT_COMPLETION_CHUNK_OBJECT)
Some(OPENAI_CHAT_COMPLETION_CHUNK_OBJECT) | Some(OPENAI_CHAT_COMPLETION_OBJECT)
)
}

Expand Down Expand Up @@ -261,6 +270,19 @@ mod tests {
assert!(!is_valid_chat_completion_chunk_weak(&event));
}

#[test]
fn weak_filter_accepts_minimax_final_chat_completion_object() {
// MiniMax's last SSE frame uses `chat.completion` (non-streaming shape)
// instead of `chat.completion.chunk`. That frame carries the only
// authoritative usage block, so it must NOT be dropped at the gate.
let event = serde_json::json!({
"object": "chat.completion",
"choices": [{"finish_reason": "stop", "index": 0, "message": {}}],
"usage": {"prompt_tokens": 45, "completion_tokens": 47, "total_tokens": 92}
});
assert!(is_valid_chat_completion_chunk_weak(&event));
}

#[test]
fn extracts_api_error_message_from_object_shape() {
let event = serde_json::json!({
Expand Down
60 changes: 59 additions & 1 deletion src/crates/ai-adapters/src/stream/types/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ impl From<OpenAIUsage> for UnifiedTokenUsage {
struct Choice {
#[allow(dead_code)]
index: usize,
/// MiniMax's last SSE frame switches to non-streaming `chat.completion`
/// shape and puts the content under `message` instead of `delta`. We don't
/// need that frame's content (earlier chunks already streamed it), but the
/// frame also carries the only authoritative `usage` block. Default the
/// field so such frames deserialize cleanly and the top-level usage flows
/// through.
#[serde(default)]
delta: Delta,
finish_reason: Option<String>,
#[serde(default, deserialize_with = "deserialize_optional_stringish")]
Expand All @@ -66,7 +73,7 @@ struct ReasoningDetail {
text: Option<String>,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Default, Deserialize)]
struct Delta {
#[allow(dead_code)]
role: Option<String>,
Expand Down Expand Up @@ -454,6 +461,57 @@ mod tests {
assert!(responses[0].tool_call.is_none());
}

#[test]
fn parses_minimax_final_chunk_with_message_field_instead_of_delta() {
// MiniMax's last SSE frame uses non-streaming `chat.completion` shape:
// choice has `message` instead of `delta`, and the real usage lives at
// the top level. Pre-fix this chunk failed to deserialize (`delta` was
// a required field), so the real prompt/completion tokens were silently
// dropped. After the fix, the chunk parses cleanly and usage flows
// through.
let raw = r#"{
"id": "065b58b7a16cf30f1e20c8f1942efeae",
"created": 1779180983,
"model": "MiniMax-M2.7-highspeed",
"object": "chat.completion",
"choices": [{
"finish_reason": "stop",
"index": 0,
"message": {
"content": "hi",
"role": "assistant",
"name": "MiniMax AI",
"reasoning_content": "The user wants hi."
}
}],
"usage": {
"total_tokens": 92,
"prompt_tokens": 45,
"completion_tokens": 47,
"completion_tokens_details": {"reasoning_tokens": 45}
}
}"#;

let sse_data: OpenAISSEData = serde_json::from_str(raw)
.expect("MiniMax final chunk must deserialize even without delta");
let responses = sse_data.into_unified_responses();

// Critical: the usage from this chunk must propagate.
let usage = responses
.iter()
.find_map(|r| r.usage.as_ref())
.expect("usage from MiniMax final chunk must be preserved");
assert_eq!(usage.prompt_token_count, 45);
assert_eq!(usage.candidates_token_count, 47);
assert_eq!(usage.total_token_count, 92);

// finish_reason should also be preserved (lives at choice top level).
assert!(
responses.iter().any(|r| r.finish_reason.as_deref() == Some("stop")),
"finish_reason from MiniMax final chunk must be preserved"
);
}

#[test]
fn handles_empty_choices_without_usage_chunk() {
let raw = r#"{
Expand Down
Loading