Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 151 additions & 31 deletions sentry_sdk/integrations/litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sentry_sdk.ai.monitoring import record_token_usage
from sentry_sdk.ai.utils import (
get_start_span_function,
normalize_message_roles,
set_data_normalized,
truncate_and_annotate_messages,
transform_openai_content_part,
Expand All @@ -17,7 +18,7 @@
from sentry_sdk.utils import event_from_exception

if TYPE_CHECKING:
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from datetime import datetime

try:
Expand All @@ -39,6 +40,23 @@ def _get_metadata_dict(kwargs: "Dict[str, Any]") -> "Dict[str, Any]":
return metadata


def _read_usage_field(usage: "Any", *names: str) -> "Optional[int]":
"""Read the first non-None field from a usage container.

The usage object can be either a typed Pydantic model (attribute access) or
a plain dict (litellm hands us a dict for the assembled async-streaming
response), so we try both shapes.
Comment on lines +46 to +48
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just read from the dictionary int he asynchronous streaming scenario and otherwise access the attribute on the Pydantic model 😄 ?

These responses have types, so an isinstance check can tell you which branch you are in.

In the end we're developing against a library with a finite number of return types, and we should just check which case we are handling instead of probing around. Probing around is less robust, since new return types accidentally trigger hasattr() checks.

"""
for name in names:
if isinstance(usage, dict):
value = usage.get(name)
else:
value = getattr(usage, name, None)
if value is not None:
return value
return None


def _convert_message_parts(messages: "List[Dict[str, Any]]") -> "List[Dict[str, Any]]":
"""
Convert the message parts from OpenAI format to the `gen_ai.request.messages` format
Expand Down Expand Up @@ -84,16 +102,17 @@ def _input_callback(kwargs: "Dict[str, Any]") -> None:
call_type = kwargs.get("call_type", None)
if call_type == "embedding" or call_type == "aembedding":
operation = "embeddings"
op = consts.OP.GEN_AI_EMBEDDINGS
elif call_type == "responses" or call_type == "aresponses":
operation = "responses"
op = consts.OP.GEN_AI_RESPONSES
else:
operation = "chat"
op = consts.OP.GEN_AI_CHAT

# Start a new span/transaction
span = get_start_span_function()(
op=(
consts.OP.GEN_AI_CHAT
if operation == "chat"
else consts.OP.GEN_AI_EMBEDDINGS
),
op=op,
name=f"{operation} {model}",
origin=LiteLLMIntegration.origin,
)
Expand All @@ -106,14 +125,15 @@ def _input_callback(kwargs: "Dict[str, Any]") -> None:
set_data_normalized(span, SPANDATA.GEN_AI_SYSTEM, provider)
set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, operation)

# Record input/messages if allowed
if should_send_default_pii() and integration.include_prompts:
if operation == "embeddings":
# For embeddings, look for the 'input' parameter
# Per-operation request data. Conversation id (responses) is set
# unconditionally; user-content fields are gated on PII / include_prompts.
record_prompts = should_send_default_pii() and integration.include_prompts
scope = sentry_sdk.get_current_scope()

if operation == "embeddings":
if record_prompts:
embedding_input = kwargs.get("input")
if embedding_input:
scope = sentry_sdk.get_current_scope()
# Normalize to list format
input_list = (
embedding_input
if isinstance(embedding_input, list)
Expand All @@ -129,11 +149,50 @@ def _input_callback(kwargs: "Dict[str, Any]") -> None:
messages_data,
unpack=False,
)
else:
# For chat, look for the 'messages' parameter

elif operation == "responses":
# litellm unpacks `extra_body` into the request body, so the
# `conversation` field shows up in additional_args.complete_input_dict
# rather than as a top-level kwarg.
complete_input = (kwargs.get("additional_args") or {}).get(
"complete_input_dict"
) or {}
conversation = complete_input.get("conversation")
if conversation is not None:
conversation_id: "Optional[str]" = None
if isinstance(conversation, str):
conversation_id = conversation
elif isinstance(conversation, dict):
conversation_id = conversation.get("id")
if conversation_id is not None:
set_data_normalized(
span, SPANDATA.GEN_AI_CONVERSATION_ID, conversation_id
)

if record_prompts:
# `input` is either a string or a list of message dicts (same
# shape as OpenAI Responses API).
responses_input = kwargs.get("input")
if responses_input:
if isinstance(responses_input, str):
input_messages = [responses_input]
else:
input_messages = list(responses_input)
normalized = normalize_message_roles(input_messages) # type: ignore[arg-type]
messages_data = truncate_and_annotate_messages(normalized, span, scope)
if messages_data is not None:
set_data_normalized(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the marshaling above you know that messages_data is a list. You should just use span.set_data() when you know the type of an attribute (again, removing cognitive overhead by avoiding dead code).

span,
SPANDATA.GEN_AI_REQUEST_MESSAGES,
messages_data,
unpack=False,
)

else:
# Chat completions.
if record_prompts:
messages = kwargs.get("messages", [])
if messages:
scope = sentry_sdk.get_current_scope()
messages = _convert_message_parts(messages)
messages_data = truncate_and_annotate_messages(messages, span, scope)
if messages_data is not None:
Expand Down Expand Up @@ -166,11 +225,24 @@ async def _async_input_callback(kwargs: "Dict[str, Any]") -> None:

def _success_callback(
kwargs: "Dict[str, Any]",
completion_response: "Any",
response: "Any",
start_time: "datetime",
end_time: "datetime",
) -> None:
"""Handle successful completion."""
"""Handle a successful chat completion, embeddings, or Responses API call.

The shape of `response` differs between API paths:
- Chat Completions: ModelResponse with ``.choices[].message`` and
``.usage`` carrying ``prompt_tokens`` / ``completion_tokens``.
- Responses API (non-streaming): ResponsesAPIResponse with ``.output[]``
items (``message`` / ``function_call``) and ``.usage`` carrying
``input_tokens`` / ``output_tokens``.
- Responses API (streaming): a ResponseCompletedEvent wrapper
``{type: "response.completed", response: ResponsesAPIResponse}``,
which we unwrap below.
- Embeddings: CreateEmbeddingResponse with ``.usage`` only (no choices
or output).
"""

metadata = _get_metadata_dict(kwargs)
span = metadata.get("_sentry_span")
Expand All @@ -181,18 +253,25 @@ def _success_callback(
if integration is None:
Comment thread
sentry[bot] marked this conversation as resolved.
return

# Streaming Responses API: unwrap the ResponseCompletedEvent so the rest of
# the function sees the assembled ResponsesAPIResponse directly.
if getattr(response, "type", None) == "response.completed" and hasattr(
response, "response"
):
response = response.response

try:
# Record model information
if hasattr(completion_response, "model"):
set_data_normalized(
span, SPANDATA.GEN_AI_RESPONSE_MODEL, completion_response.model
)
# `model` is set by all API shapes (chat / responses / embeddings).
if hasattr(response, "model"):
set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_MODEL, response.model)

# Record response content if allowed
# Response content: structure depends on the API shape. Embeddings have
# neither ``choices`` nor ``output``, so we just skip this block.
if should_send_default_pii() and integration.include_prompts:
if hasattr(completion_response, "choices"):
if hasattr(response, "choices"):
# Chat Completions API.
response_messages = []
for choice in completion_response.choices:
for choice in response.choices:
if hasattr(choice, "message"):
if hasattr(choice.message, "model_dump"):
response_messages.append(choice.message.model_dump())
Expand All @@ -213,15 +292,56 @@ def _success_callback(
set_data_normalized(
span, SPANDATA.GEN_AI_RESPONSE_TEXT, response_messages
)
elif hasattr(response, "output"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are adding code here which runs for all possible types of object that have an output field.

As a result the branch can easily be accidentally triggered as litellm evolves. There are multiple approaches to narrow down if you have a response in the Chat Completion API schema or a response in the Responses API schema. For example, you can check

isinstance(response, (ResponsesAPIResponse, BaseResponsesAPIStreamingIterator))

based on the signature of the library function

https://github.com/BerriAI/litellm/blob/a67b7a7e87f11bed01f9e073125a7f8f180105a2/litellm/responses/main.py#L449.

# Responses API: split message text from function-call items.
output_text: "List[Any]" = []
tool_calls: "List[Any]" = []
for output in response.output:
output_type = getattr(output, "type", None)
if output_type == "function_call":
if hasattr(output, "model_dump"):
tool_calls.append(output.model_dump())
elif hasattr(output, "dict"):
tool_calls.append(output.dict())
elif output_type == "message":
for content_item in getattr(output, "content", []) or []:
text = getattr(content_item, "text", None)
if text is not None:
output_text.append(text)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has reached a lot of indentation for Python code. Usually you can keep code readable by adding early returns or breaking up into functions where appropriate.

elif hasattr(content_item, "model_dump"):
output_text.append(content_item.model_dump())
elif hasattr(content_item, "dict"):
output_text.append(content_item.dict())

if tool_calls:
set_data_normalized(
span,
SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
tool_calls,
unpack=False,
)
if output_text:
set_data_normalized(
span, SPANDATA.GEN_AI_RESPONSE_TEXT, output_text
)

# Record token usage
if hasattr(completion_response, "usage"):
usage = completion_response.usage
# Token usage field names differ across APIs:
# Chat Completions / Embeddings: prompt_tokens / completion_tokens
# Responses API (non-streaming): input_tokens / output_tokens
# Responses API (streaming): prompt_tokens / completion_tokens
# (litellm normalizes to chat-completion names when assembling the
# streaming response). For the async-streaming variant, the
# assembled `usage` is a plain dict, not a Pydantic model — hence
# `_read_usage_field` supports both shapes.
if hasattr(response, "usage"):
usage = response.usage
record_token_usage(
span,
input_tokens=getattr(usage, "prompt_tokens", None),
output_tokens=getattr(usage, "completion_tokens", None),
total_tokens=getattr(usage, "total_tokens", None),
input_tokens=_read_usage_field(usage, "prompt_tokens", "input_tokens"),
output_tokens=_read_usage_field(
usage, "completion_tokens", "output_tokens"
),
total_tokens=_read_usage_field(usage, "total_tokens"),
)
Comment on lines +336 to 345
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already probe above to determine which API is used.

As a result, reading prompt_tokens or input_tokens is dead code conditioned on knowing which API you are handling (adding cognitive overhead when reading).


finally:
Expand Down
Loading
Loading