Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@
"METHOD": "anthropic.messages.create",
"ENDPOINT": "/v1/messages",
},
"MESSAGES_STREAM": {
"METHOD": "anthropic.messages.stream",
"ENDPOINT": "/v1/messages",
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper
from typing import Any
from langtrace_python_sdk.instrumentation.anthropic.patch import messages_create
from langtrace_python_sdk.instrumentation.anthropic.patch import messages_create, messages_stream

logging.basicConfig(level=logging.FATAL)

Expand All @@ -46,6 +46,12 @@ def _instrument(self, **kwargs: dict[str, Any]) -> None:
"Messages.create",
messages_create(version, tracer),
)

wrap_function_wrapper(
"anthropic.resources.messages",
"Messages.stream",
messages_stream(version, tracer),
)

def _instrument_module(self, module_name: str) -> None:
pass
Expand Down
155 changes: 151 additions & 4 deletions src/langtrace_python_sdk/instrumentation/anthropic/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ def traced_method(
prompts = kwargs.get("messages", [])
system = kwargs.get("system")
if system:
prompts = [{"role": "system", "content": system}] + kwargs.get(
"messages", []
)
prompts.append({"role": "system", "content": system})
span_attributes = {
**get_langtrace_attributes(version, service_provider),
**get_llm_request_attributes(kwargs, prompts=prompts),
Expand All @@ -72,7 +70,14 @@ def traced_method(
span = tracer.start_span(
name=get_span_name(APIS["MESSAGES_CREATE"]["METHOD"]), kind=SpanKind.CLIENT
)

set_span_attributes(span, attributes)

tools = []
if kwargs.get("tools") is not None and kwargs.get("tools"):
tools.append(json.dumps(kwargs.get("tools")))
set_span_attribute(span, SpanAttributes.LLM_TOOLS, json.dumps(tools))

try:
# Attempt to call the original method
result = wrapped(*args, **kwargs)
Expand Down Expand Up @@ -127,7 +132,149 @@ def set_response_attributes(
span.end()
return result
else:
return StreamWrapper(result, span)
return StreamWrapper(result, span, tool_calls=True)

# return the wrapped method
return traced_method


def messages_stream(version: str, tracer: Tracer) -> Callable[..., Any]:

def traced_method(
wrapped: Callable[..., Any],
instance: Any,
args: List[Any],
kwargs: MessagesCreateKwargs,
) -> Any:
service_provider = SERVICE_PROVIDERS["ANTHROPIC"]

prompts = kwargs.get("messages", [])
system = kwargs.get("system")
if system:
prompts.append({"role": "assistant", "content": system})
span_attributes = {
**get_langtrace_attributes(version, service_provider),
**get_llm_request_attributes(kwargs, prompts=prompts),
**get_llm_url(instance),
SpanAttributes.LLM_PATH: APIS["MESSAGES_STREAM"]["ENDPOINT"],
**get_extra_attributes(),
}

attributes = LLMSpanAttributes(**span_attributes)

span = tracer.start_span(
name=get_span_name(APIS["MESSAGES_STREAM"]["METHOD"]), kind=SpanKind.CLIENT
)

set_span_attributes(span, attributes)

tools = []
if kwargs.get("tools") is not None:
tools.append(json.dumps(kwargs.get("tools")))
set_span_attribute(span, SpanAttributes.LLM_TOOLS, json.dumps(tools))

try:
# Create the original message stream manager
original_stream_manager = wrapped(*args, **kwargs)

# Create a new stream manager that will instrument the stream
# while preserving the stream
class InstrumentedMessageStreamManager:
def __init__(self, original_manager, span):
self.original_manager = original_manager
self.span = span

def __enter__(self):
# Enter the original context manager to get the stream
original_stream = self.original_manager.__enter__()

# Create a wrapper iterator
class InstrumentedStream:
def __init__(self, original_stream, span):
self.original_stream = original_stream
self.span = span
self.message_stop_processed = False

def __iter__(self):
return self

def __next__(self):
try:
chunk = next(self.original_stream)

# Apply instrumentation only once on message_stop
if chunk.type == "message_stop" and not self.message_stop_processed:
self.message_stop_processed = True
response_message = chunk.message

responses = [
{
"role": (
response_message.role
if response_message.role
else "assistant"
),
"content": message.text,
}
for message in response_message.content if message.type == "text"
]

set_event_completion(self.span, responses)

if hasattr(response_message, "usage") and response_message.usage is not None:
set_span_attribute(
self.span,
SpanAttributes.LLM_USAGE_PROMPT_TOKENS,
response_message.usage.input_tokens,
)
set_span_attribute(
self.span,
SpanAttributes.LLM_USAGE_COMPLETION_TOKENS,
response_message.usage.output_tokens,
)
set_span_attribute(
self.span,
SpanAttributes.LLM_USAGE_TOTAL_TOKENS,
response_message.usage.input_tokens + response_message.usage.output_tokens,
)

# Forward the chunk
return chunk
except StopIteration:
# End the span when we're done with the stream
self.span.end()
raise
except Exception as err:
self.span.record_exception(err)
self.span.set_status(StatusCode.ERROR, str(err))
self.span.end()
raise

def close(self):
self.original_stream.close()
if not self.message_stop_processed:
self.span.end()

# Return our instrumented stream wrapper
return InstrumentedStream(original_stream, self.span)

def __exit__(self, exc_type, exc_val, exc_tb):
result = self.original_manager.__exit__(exc_type, exc_val, exc_tb)

if exc_type is not None:
self.span.record_exception(exc_val)
self.span.set_status(StatusCode.ERROR, str(exc_val))
self.span.end()

return result

# Return the instrumented stream manager
return InstrumentedMessageStreamManager(original_stream_manager, span)

except Exception as err:
span.record_exception(err)
span.set_status(StatusCode.ERROR, str(err))
span.end()
raise

return traced_method
2 changes: 1 addition & 1 deletion src/langtrace_python_sdk/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.8.16"
__version__ = "3.8.17"
Loading