Skip to content

Commit c99daef

Browse files
authored
[Fix]: /v1/messages - return streaming usage statistics when using litellm with bedrock models (#11469)
* fix: using litellm with claude code bedrock * fix: usage for bedrock with /messages * fix: bedrock_sse_wrapper * tests: test for test_chunk_parser_usage_transformation * test fix
1 parent f0cb80e commit c99daef

File tree

2 files changed

+109
-7
lines changed

2 files changed

+109
-7
lines changed

litellm/llms/bedrock/messages/invoke_transformations/anthropic_claude3_transformation.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Optional, Tuple, Union
23

34
import httpx
@@ -13,6 +14,7 @@
1314
AmazonInvokeConfig,
1415
)
1516
from litellm.types.router import GenericLiteLLMParams
17+
from litellm.types.utils import GenericStreamingChunk
1618
from litellm.types.utils import GenericStreamingChunk as GChunk
1719
from litellm.types.utils import ModelResponseStream
1820

@@ -113,9 +115,9 @@ def transform_anthropic_messages_request(
113115

114116
# 1. anthropic_version is required for all claude models
115117
if "anthropic_version" not in anthropic_messages_request:
116-
anthropic_messages_request[
117-
"anthropic_version"
118-
] = self.DEFAULT_BEDROCK_ANTHROPIC_API_VERSION
118+
anthropic_messages_request["anthropic_version"] = (
119+
self.DEFAULT_BEDROCK_ANTHROPIC_API_VERSION
120+
)
119121

120122
# 2. `stream` is not allowed in request body for bedrock invoke
121123
if "stream" in anthropic_messages_request:
@@ -139,7 +141,26 @@ def get_async_streaming_response_iterator(
139141
completion_stream = aws_decoder.aiter_bytes(
140142
httpx_response.aiter_bytes(chunk_size=aws_decoder.DEFAULT_CHUNK_SIZE)
141143
)
142-
return completion_stream
144+
# Convert decoded Bedrock events to Server-Sent Events expected by Anthropic clients.
145+
return self.bedrock_sse_wrapper(completion_stream)
146+
147+
async def bedrock_sse_wrapper(
148+
self,
149+
completion_stream: AsyncIterator[
150+
Union[bytes, GenericStreamingChunk, ModelResponseStream, dict]
151+
],
152+
):
153+
"""
154+
Bedrock invoke does not return SSE formatted data. This function is a wrapper to ensure litellm chunks are SSE formatted.
155+
"""
156+
async for chunk in completion_stream:
157+
if isinstance(chunk, dict):
158+
event_type: str = str(chunk.get("type", "message"))
159+
payload = f"event: {event_type}\n" f"data: {json.dumps(chunk)}\n\n"
160+
yield payload.encode()
161+
else:
162+
# For non-dict chunks, forward the original value unchanged so callers can leverage the richer Python objects if they wish.
163+
yield chunk
143164

144165

145166
class AmazonAnthropicClaudeMessagesStreamDecoder(AWSEventStreamDecoder):
@@ -159,8 +180,22 @@ def _chunk_parser(
159180
"""
160181
Parse the chunk data into anthropic /messages format
161182
162-
No transformation is needed for anthropic /messages format
163-
164-
since bedrock invoke returns the response in the correct format
183+
Bedrock returns usage metrics using camelCase keys. Convert these to
184+
the Anthropic `/v1/messages` specification so callers receive a
185+
consistent response shape when streaming.
165186
"""
187+
amazon_bedrock_invocation_metrics = chunk_data.pop(
188+
"amazon-bedrock-invocationMetrics", {}
189+
)
190+
if amazon_bedrock_invocation_metrics:
191+
anthropic_usage = {}
192+
if "inputTokenCount" in amazon_bedrock_invocation_metrics:
193+
anthropic_usage["input_tokens"] = amazon_bedrock_invocation_metrics[
194+
"inputTokenCount"
195+
]
196+
if "outputTokenCount" in amazon_bedrock_invocation_metrics:
197+
anthropic_usage["output_tokens"] = amazon_bedrock_invocation_metrics[
198+
"outputTokenCount"
199+
]
200+
chunk_data["usage"] = anthropic_usage
166201
return chunk_data
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import asyncio
2+
import json
3+
import os
4+
import sys
5+
6+
import pytest
7+
8+
# Ensure the project root is on the import path so `litellm` can be imported when
9+
# tests are executed from any working directory.
10+
sys.path.insert(0, os.path.abspath("../../../../../.."))
11+
12+
from litellm.llms.bedrock.messages.invoke_transformations.anthropic_claude3_transformation import (
13+
AmazonAnthropicClaude3MessagesConfig,
14+
AmazonAnthropicClaudeMessagesStreamDecoder,
15+
)
16+
17+
18+
@pytest.mark.asyncio
19+
async def test_bedrock_sse_wrapper_encodes_dict_chunks():
20+
"""Verify that `bedrock_sse_wrapper` converts dictionary chunks to properly formatted Server-Sent Events and forwards non-dict chunks unchanged."""
21+
22+
cfg = AmazonAnthropicClaude3MessagesConfig()
23+
24+
async def _dummy_stream(): # type: ignore[return-type]
25+
yield {"type": "message_delta", "text": "hello"}
26+
yield b"raw-bytes"
27+
28+
# Collect all chunks returned by the wrapper
29+
collected: list[bytes] = []
30+
async for chunk in cfg.bedrock_sse_wrapper(_dummy_stream()):
31+
collected.append(chunk)
32+
33+
assert collected, "No chunks returned from wrapper"
34+
35+
# First chunk should be SSE encoded
36+
first_chunk = collected[0]
37+
assert first_chunk.startswith(b"event: message_delta\n"), first_chunk
38+
assert first_chunk.endswith(b"\n\n"), first_chunk
39+
# Ensure the JSON payload is present in the SSE data line
40+
assert b'"hello"' in first_chunk # payload contains the text
41+
42+
# Second chunk should be forwarded unchanged
43+
assert collected[1] == b"raw-bytes"
44+
45+
46+
def test_chunk_parser_usage_transformation():
47+
"""Ensure Bedrock invocation metrics are transformed to Anthropic usage keys."""
48+
49+
decoder = AmazonAnthropicClaudeMessagesStreamDecoder(
50+
model="bedrock/invoke/anthropic.claude-3-sonnet-20240229-v1:0"
51+
)
52+
53+
chunk = {
54+
"type": "message_delta",
55+
"amazon-bedrock-invocationMetrics": {
56+
"inputTokenCount": 10,
57+
"outputTokenCount": 5,
58+
},
59+
}
60+
61+
parsed = decoder._chunk_parser(chunk.copy()) # use copy to avoid side-effects
62+
63+
# The invocation metrics key should be removed and replaced by `usage`
64+
assert "amazon-bedrock-invocationMetrics" not in parsed
65+
assert "usage" in parsed
66+
assert parsed["usage"]["input_tokens"] == 10
67+
assert parsed["usage"]["output_tokens"] == 5

0 commit comments

Comments
 (0)