From 429351a885f324b7a318eee4d0e0b3c9d318368f Mon Sep 17 00:00:00 2001 From: nonoqing Date: Tue, 19 May 2026 17:49:47 +0800 Subject: [PATCH] fix(usage): accept MiniMax-style chat.completion final SSE frame MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MiniMax (and possibly other OpenAI-compatible providers) close a streaming response with a non-streaming `chat.completion` frame instead of a true `chat.completion.chunk`. That final frame is the only one carrying the authoritative usage block — earlier chunks send only zero placeholders. Before this fix BitFun dropped the frame at two layers: 1. The weak validator in stream_handler/openai.rs required the object string to be exactly "chat.completion.chunk" and labeled MiniMax's final frame as `skip:non_standard_event`, so it never reached deserialization. 2. Even if the validator had let it through, the choice in that frame uses `message` instead of `delta`, and OpenAISSEData required `Choice.delta` to be present. The net effect: BitFun recorded 0 input, 0 output, 0 cached for every MiniMax call. Fix both layers: - Validator now accepts both `chat.completion.chunk` and `chat.completion` object strings. - `Choice.delta` is now `#[serde(default)]` with `Delta` deriving `Default`, so frames lacking `delta` parse cleanly. We don't need the frame's content (earlier chunks streamed it); we only need top-level usage and finish_reason to propagate. Each fix has a regression test reproducing MiniMax's observed on-the-wire shape captured from a live MiniMax-M2.7-highspeed response. --- .../src/stream/stream_handler/openai.rs | 24 +++++++- .../ai-adapters/src/stream/types/openai.rs | 60 ++++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/src/crates/ai-adapters/src/stream/stream_handler/openai.rs b/src/crates/ai-adapters/src/stream/stream_handler/openai.rs index ef670581e..f0e093ff6 100644 --- a/src/crates/ai-adapters/src/stream/stream_handler/openai.rs +++ b/src/crates/ai-adapters/src/stream/stream_handler/openai.rs @@ -12,6 +12,10 @@ use std::time::Duration; use tokio::sync::mpsc; const OPENAI_CHAT_COMPLETION_CHUNK_OBJECT: &str = "chat.completion.chunk"; +/// MiniMax (and possibly other providers) close a streaming response with a +/// non-streaming `chat.completion` frame instead of a true `chunk`. That final +/// frame is the only one carrying authoritative usage, so we accept it too. +const OPENAI_CHAT_COMPLETION_OBJECT: &str = "chat.completion"; const AI_STREAM_RESPONSE_TARGET: &str = "ai::openai_stream_response"; #[derive(Debug)] @@ -42,9 +46,14 @@ impl OpenAIResponseNormalizer { } fn is_valid_chat_completion_chunk_weak(event_json: &Value) -> bool { + // Standard streaming frames use `chat.completion.chunk`. MiniMax's final + // SSE frame, however, switches to the non-streaming `chat.completion` + // shape (choice carries `message` rather than `delta`) and is the ONLY + // chunk that contains the authoritative usage block. Accept both — the + // OpenAISSEData deserialization downstream tolerates either choice shape. matches!( event_json.get("object").and_then(|value| value.as_str()), - Some(OPENAI_CHAT_COMPLETION_CHUNK_OBJECT) + Some(OPENAI_CHAT_COMPLETION_CHUNK_OBJECT) | Some(OPENAI_CHAT_COMPLETION_OBJECT) ) } @@ -261,6 +270,19 @@ mod tests { assert!(!is_valid_chat_completion_chunk_weak(&event)); } + #[test] + fn weak_filter_accepts_minimax_final_chat_completion_object() { + // MiniMax's last SSE frame uses `chat.completion` (non-streaming shape) + // instead of `chat.completion.chunk`. That frame carries the only + // authoritative usage block, so it must NOT be dropped at the gate. + let event = serde_json::json!({ + "object": "chat.completion", + "choices": [{"finish_reason": "stop", "index": 0, "message": {}}], + "usage": {"prompt_tokens": 45, "completion_tokens": 47, "total_tokens": 92} + }); + assert!(is_valid_chat_completion_chunk_weak(&event)); + } + #[test] fn extracts_api_error_message_from_object_shape() { let event = serde_json::json!({ diff --git a/src/crates/ai-adapters/src/stream/types/openai.rs b/src/crates/ai-adapters/src/stream/types/openai.rs index fe7ff5c9d..cbdcff3fe 100644 --- a/src/crates/ai-adapters/src/stream/types/openai.rs +++ b/src/crates/ai-adapters/src/stream/types/openai.rs @@ -51,6 +51,13 @@ impl From for UnifiedTokenUsage { struct Choice { #[allow(dead_code)] index: usize, + /// MiniMax's last SSE frame switches to non-streaming `chat.completion` + /// shape and puts the content under `message` instead of `delta`. We don't + /// need that frame's content (earlier chunks already streamed it), but the + /// frame also carries the only authoritative `usage` block. Default the + /// field so such frames deserialize cleanly and the top-level usage flows + /// through. + #[serde(default)] delta: Delta, finish_reason: Option, #[serde(default, deserialize_with = "deserialize_optional_stringish")] @@ -66,7 +73,7 @@ struct ReasoningDetail { text: Option, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Default, Deserialize)] struct Delta { #[allow(dead_code)] role: Option, @@ -454,6 +461,57 @@ mod tests { assert!(responses[0].tool_call.is_none()); } + #[test] + fn parses_minimax_final_chunk_with_message_field_instead_of_delta() { + // MiniMax's last SSE frame uses non-streaming `chat.completion` shape: + // choice has `message` instead of `delta`, and the real usage lives at + // the top level. Pre-fix this chunk failed to deserialize (`delta` was + // a required field), so the real prompt/completion tokens were silently + // dropped. After the fix, the chunk parses cleanly and usage flows + // through. + let raw = r#"{ + "id": "065b58b7a16cf30f1e20c8f1942efeae", + "created": 1779180983, + "model": "MiniMax-M2.7-highspeed", + "object": "chat.completion", + "choices": [{ + "finish_reason": "stop", + "index": 0, + "message": { + "content": "hi", + "role": "assistant", + "name": "MiniMax AI", + "reasoning_content": "The user wants hi." + } + }], + "usage": { + "total_tokens": 92, + "prompt_tokens": 45, + "completion_tokens": 47, + "completion_tokens_details": {"reasoning_tokens": 45} + } + }"#; + + let sse_data: OpenAISSEData = serde_json::from_str(raw) + .expect("MiniMax final chunk must deserialize even without delta"); + let responses = sse_data.into_unified_responses(); + + // Critical: the usage from this chunk must propagate. + let usage = responses + .iter() + .find_map(|r| r.usage.as_ref()) + .expect("usage from MiniMax final chunk must be preserved"); + assert_eq!(usage.prompt_token_count, 45); + assert_eq!(usage.candidates_token_count, 47); + assert_eq!(usage.total_token_count, 92); + + // finish_reason should also be preserved (lives at choice top level). + assert!( + responses.iter().any(|r| r.finish_reason.as_deref() == Some("stop")), + "finish_reason from MiniMax final chunk must be preserved" + ); + } + #[test] fn handles_empty_choices_without_usage_chunk() { let raw = r#"{