In [1]:
import json
from dataclasses import dataclass
from typing import Any, Dict

import pydantic
from opentelemetry.trace import Status, StatusCode, get_current_span

from openinference.instrumentation import (
    get_input_attributes,
    get_output_attributes,
    get_span_kind_attributes,
    get_tool_attributes,
    suppress_tracing,
    using_attributes,
)

Either instrument with `TracerProvider` from `openinference.instrumentation`.

In [2]:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

from openinference.instrumentation import TracerProvider
from openinference.semconv.resource import ResourceAttributes

endpoint = "http://127.0.0.1:6006/v1/traces"
resource = Resource(attributes={ResourceAttributes.PROJECT_NAME: "openinference-tracer"})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
tracer = tracer_provider.get_tracer(__name__)

Or using `phoenix.otel.register` (in which case, comment out cell above and uncomment this cell).

In [None]:
# from phoenix.otel import register

# tracer_provider = register(protocol="http/protobuf")
# tracer = tracer_provider.get_tracer(__name__)

## LLMs

In [3]:
from typing import List

from anthropic import Anthropic
from openai import OpenAI
from openai.types.chat import (
    ChatCompletionMessageParam,
    ChatCompletionSystemMessageParam,
    ChatCompletionToolMessageParam,
    ChatCompletionToolParam,
    ChatCompletionUserMessageParam,
)

anthropic_client = Anthropic()
openai_client = OpenAI()

### Context Manager

In [None]:
with tracer.start_as_current_span("llm-span", openinference_span_kind="llm") as span:
    invocation_parameters = {
        "temperature": 0.5,
    }
    input_messages: List[ChatCompletionMessageParam] = [
        ChatCompletionSystemMessageParam(
            role="system",
            content="You are a helpful assistant.",
        ),
        ChatCompletionUserMessageParam(
            role="user",
            content="Hello, world!",
        ),
    ]
    span.set_llm(
        provider="openai",
        system="openai",
        input_messages=input_messages,
        model_name="gpt-4o",
        invocation_parameters=invocation_parameters,
    )
    span.set_input(input_messages[-1]["content"])
    message = openai_client.chat.completions.create(  # type: ignore[call-overload]
        messages=input_messages,
        model="gpt-4o",
        **invocation_parameters,
    )
    output_message = message.choices[0].message
    assert (token_usage := message.usage) is not None
    assert isinstance(completion_tokens := token_usage.completion_tokens, int)
    assert isinstance(prompt_tokens := token_usage.prompt_tokens, int)
    span.set_status(Status(StatusCode.OK))
    span.set_llm(
        output_messages=[
            {
                "role": output_message.role,
                "content": output_message.content,
            }
        ],
        token_count={
            "completion": completion_tokens,
            "prompt": prompt_tokens,
        },
    )
    span.set_output(output_message.content)

In [None]:
with tracer.start_as_current_span("llm-span-tool-calls", openinference_span_kind="llm") as span:
    input_messages = [
        ChatCompletionUserMessageParam(
            role="user",
            content="What's the weather like in San Francisco?",
        )
    ]
    invocation_parameters = {
        "temperature": 0.5,
    }
    tools: List[ChatCompletionToolParam] = [
        {
            "type": "function",
            "function": {
                "name": "get_weather",
                "description": "finds the weather for a given city",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "city": {
                            "type": "string",
                            "description": "The city to find the weather for, e.g. 'London'",
                        }
                    },
                    "required": ["city"],
                },
            },
        },
    ]
    span.set_llm(
        provider="openai",
        system="openai",
        input_messages=input_messages,
        model_name="gpt-4o",
        invocation_parameters=invocation_parameters,
    )
    span.set_input(input_messages[-1]["content"])
    message = openai_client.chat.completions.create(  # type: ignore[call-overload]
        model="gpt-4",
        tools=tools,
        messages=input_messages,
        **invocation_parameters,
    )
    span.set_status(Status(StatusCode.OK))
    output_message = message.choices[0].message
    assert (token_usage := message.usage) is not None
    assert isinstance(completion_tokens := token_usage.completion_tokens, int)
    assert isinstance(prompt_tokens := token_usage.prompt_tokens, int)
    assert (tool_calls := output_message.tool_calls)
    assert len(tool_calls) == 1
    tool_call = tool_calls[0]
    span.set_llm(
        output_messages=[
            {
                "role": output_message.role,
                "content": output_message.content,
                "tool_calls": [
                    {
                        "id": tool_call.id,
                        "function": {
                            "name": tool_call.function.name,
                            "arguments": tool_call.function.arguments,
                        },
                    }
                ],
            }
        ],
        token_count={
            "completion": completion_tokens,
            "prompt": prompt_tokens,
        },
    )
    span.set_output(f"{tool_call.function.name}({tool_call.function.arguments})")

with tracer.start_as_current_span("llm-span-content-blocks", openinference_span_kind="llm") as span:
    input_messages.append(output_message.model_dump())
    input_messages.append(
        ChatCompletionToolMessageParam(content="sunny", role="tool", tool_call_id=tool_call.id)
    )
    span.set_llm(
        provider="openai",
        system="openai",
        input_messages=input_messages,
        model_name="gpt-4o",
        invocation_parameters=invocation_parameters,
    )
    span.set_input(input_messages[-1]["content"])
    message = openai_client.chat.completions.create(  # type: ignore[call-overload]
        model="gpt-4",
        tools=tools,
        messages=input_messages,
        **invocation_parameters,
    )
    span.set_status(Status(StatusCode.OK))
    output_message = message.choices[0].message
    assert (token_usage := message.usage) is not None
    assert isinstance(completion_tokens := token_usage.completion_tokens, int)
    assert isinstance(prompt_tokens := token_usage.prompt_tokens, int)
    span.set_status(Status(StatusCode.OK))
    span.set_llm(
        output_messages=[
            {
                "role": output_message.role,
                "content": output_message.content,
            }
        ],
        token_count={
            "completion": completion_tokens,
            "prompt": prompt_tokens,
        },
    )
    span.set_output(output_message.content)

In [None]:
with tracer.start_as_current_span("llm-span-content-blocks", openinference_span_kind="llm") as span:
    invocation_parameters = {
        "max_tokens": 100,
    }
    span.set_llm(
        provider="anthropic",
        system="anthropic",
        input_messages=[
            {
                "role": "user",
                "contents": [
                    {
                        "type": "image",
                        "image": {
                            "url": "https://upload.wikimedia.org/wikipedia/commons/a/a7/Camponotus_flavomarginatus_ant.jpg",
                        },
                    },
                    {
                        "type": "text",
                        "text": "What is in the above image?",
                    },
                ],
            }
        ],
        model_name="gpt-4o",
        invocation_parameters=invocation_parameters,
    )
    span.set_input("What is in the above image?")
    message = anthropic_client.messages.create(  # type: ignore[call-overload]
        model="claude-3-5-sonnet-20240620",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "image",
                        "source": {
                            "type": "url",
                            "url": "https://upload.wikimedia.org/wikipedia/commons/a/a7/Camponotus_flavomarginatus_ant.jpg",
                        },
                    },
                    {"type": "text", "text": "What is in the above image?"},
                ],
            }
        ],
        **invocation_parameters,
    )
    span.set_status(Status(StatusCode.OK))
    token_usage = message.usage
    prompt_tokens = token_usage.input_tokens
    completion_tokens = token_usage.output_tokens
    output_message = message.content[0]
    assert output_message.type == "text"
    output_message_text = output_message.text
    span.set_llm(
        output_messages=[
            {
                "role": message.role,
                "content": output_message_text,
            }
        ],
        token_count={
            "completion": completion_tokens,
            "prompt": prompt_tokens,
        },
    )
    span.set_output(output_message_text)

### Decorator

In [None]:
from anthropic.types import Message as AnthropicMessage
from anthropic.types import MessageParam as AnthropicMessageParam

from openinference.instrumentation import get_llm_attributes
from openinference.instrumentation._types import (
    Image as OIImage,
)
from openinference.instrumentation._types import (
    ImageMessageContent as OIImageMessageContent,
)
from openinference.instrumentation._types import (
    Message as OIMessage,
)
from openinference.instrumentation._types import (
    MessageContent as OIMessageContent,
)
from openinference.instrumentation._types import (
    TextMessageContent as OITextMessageContent,
)


def get_attributes_from_inputs(
    input_messages: List[AnthropicMessageParam],
    invocation_parameters: Dict[str, Any],
) -> Dict[str, Any]:
    oi_input_messages = [to_oi_message(message) for message in input_messages]
    return {
        **get_input_attributes(
            {
                "input_messages": input_messages,
                "invocation_parameters": invocation_parameters,
            }
        ),
        **get_llm_attributes(
            input_messages=oi_input_messages,
            invocation_parameters=invocation_parameters,
        ),
    }


def to_oi_message(message: AnthropicMessageParam) -> OIMessage:
    role = message["role"]
    content = message["content"]
    if isinstance(content, str):
        return OIMessage(role=role, content=content)

    contents: List[OIMessageContent] = []
    for content_block in content:
        if not isinstance(content_block, dict):
            raise NotImplementedError("Only typed dict message params are supported")
        if (content_type := content_block["type"]) == "text":
            assert isinstance(text := content_block.get("text"), str)
            contents.append(OITextMessageContent(type="text", text=text))
        elif content_type == "image":
            assert isinstance(source := content_block.get("source"), dict)
            assert isinstance(url := source.get("url"), str)
            contents.append(
                OIImageMessageContent(
                    type="image",
                    image=OIImage(url=url),
                )
            )
        else:
            raise NotImplementedError("Only text and image message content blocks are supported")
    return OIMessage(role=role, contents=contents)


def get_attributes_from_outputs(message: AnthropicMessage) -> Dict[str, Any]:
    assert len(message.content) == 1
    content_block = message.content[0]
    assert content_block.type == "text"
    content_block_text = content_block.text
    return {
        **get_llm_attributes(
            output_messages=[
                {
                    "role": message.role,
                    "content": content_block_text,
                }
            ],
            token_count={
                "completion": message.usage.output_tokens,
                "prompt": message.usage.input_tokens,
            },
        ),
        **get_output_attributes(content_block_text),
    }


@tracer.llm(
    get_attributes_from_inputs=get_attributes_from_inputs,
    get_attributes_from_outputs=get_attributes_from_outputs,
)
def get_image_description(
    input_messages: List[AnthropicMessageParam], invocation_parameters: Dict[str, Any]
) -> AnthropicMessage:
    output_message = anthropic_client.messages.create(
        model="claude-3-5-sonnet-20240620",
        messages=input_messages,
        **invocation_parameters,
    )
    return output_message


response = get_image_description(
    input_messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "source": {
                        "type": "url",
                        "url": "https://upload.wikimedia.org/wikipedia/commons/a/a7/Camponotus_flavomarginatus_ant.jpg",
                    },
                },
                {"type": "text", "text": "What is in the above image?"},
            ],
        }
    ],
    invocation_parameters={"temperature": 0.5, "max_tokens": 100},
)

## Chains

In [4]:
with tracer.start_as_current_span(
    "chain-span-with-plain-text-io",
    openinference_span_kind="chain",
) as span:
    span.set_input("input")
    span.set_output("output")
    span.set_status(Status(StatusCode.OK))

### Context Manager

In [5]:
with tracer.start_as_current_span(
    "chain-span-with-json-io",
    openinference_span_kind="chain",
) as span:
    span.set_input(
        {"input-key": "input-value"},
    )
    span.set_output(
        json.dumps({"output-key": "output-value"}),
        mime_type="application/json",
    )
    span.set_status(Status(StatusCode.OK))

In [6]:
with tracer.start_as_current_span(
    "chain-span-with-attribute-getters",
    attributes={
        **get_span_kind_attributes("chain"),
        **get_input_attributes("input"),
    },
) as span:
    span.set_attributes(get_output_attributes("output"))
    span.set_status(Status(StatusCode.OK))

In [7]:
class InputModel(pydantic.BaseModel):
    input: str


@dataclass
class OutputModel:
    output: str


with tracer.start_as_current_span(
    "chain-span-with-pydantic-input-and-dataclass-output",
    openinference_span_kind="chain",
) as span:
    span.set_input(InputModel(input="input"))
    span.set_output(OutputModel(output="output"))
    span.set_status(Status(StatusCode.OK))

### Decorator

In [None]:
@tracer.chain
def decorated_chain_with_plain_text_output(input: str) -> str:
    return "output"


decorated_chain_with_plain_text_output("input")

In [None]:
@tracer.chain
def decorated_chain_with_json_output(input: str) -> Dict[str, Any]:
    return {"output": "output"}


decorated_chain_with_json_output("input")

In [None]:
@tracer.chain()
def decorated_chain_with_no_parameters(input: str) -> Dict[str, Any]:
    return {"output": "output"}


decorated_chain_with_no_parameters("input")

In [None]:
@tracer.chain(name="decorated-chain-with-overriden-name")
def this_name_should_be_overriden(input: str) -> Dict[str, Any]:
    return {"output": "output"}


this_name_should_be_overriden("input")

In [None]:
def chain_with_decorator_applied_as_function(input: str) -> Dict[str, Any]:
    return {"output": "output"}


decorated = tracer.chain(chain_with_decorator_applied_as_function)
decorated("input")

In [None]:
def this_name_should_be_overriden_with_decorator_applied_as_function_with_parameters(
    input: str,
) -> Dict[str, Any]:
    return {"output": "output"}


decorated = tracer.chain(
    name="decorated-chain-with-decorator-applied-as-function-with-overriden-name"
)(this_name_should_be_overriden_with_decorator_applied_as_function_with_parameters)
decorated("input")

In [None]:
@tracer.chain
async def decorated_async_chain(input: str) -> str:
    return "output"


await decorated_async_chain("input")  # type: ignore[top-level-await]

In [None]:
@tracer.chain
def decorated_chain_with_error(input: str) -> str:
    raise ValueError("error")


try:
    decorated_chain_with_error("input")
except ValueError as e:
    print(e)

In [None]:
@tracer.chain
def decorated_chain_with_child_span(input: str) -> str:
    with tracer.start_as_current_span(
        "child-span",
        openinference_span_kind="chain",
        attributes=get_input_attributes("child-span-input"),
    ) as child_span:
        output = "output"
        child_span.set_output(output)
        child_span.set_status(Status(StatusCode.OK))
        return output


decorated_chain_with_child_span("input")

In [None]:
@tracer.chain
def decorated_chain_with_child_span_error(input: str) -> str:
    with tracer.start_as_current_span(
        "child-span",
        openinference_span_kind="chain",
        attributes=get_input_attributes("child-span-input"),
    ):
        raise ValueError("error")


try:
    decorated_chain_with_child_span_error("input")
except ValueError as e:
    print(e)

In [None]:
class ChainRunner:
    @tracer.chain
    def decorated_chain_method(self, input1: str, input2: str) -> str:
        return "output"


chain_runner = ChainRunner()
chain_runner.decorated_chain_method("input1", "input2")

In [None]:
@tracer.chain
def decorated_chain_with_input_and_output_set_inside_the_wrapped_function(input: str) -> str:
    span = get_current_span()
    span.set_input("overridden-input")  # type: ignore[attr-defined]
    span.set_output("overridden-output")  # type: ignore[attr-defined]
    return "output"


decorated_chain_with_input_and_output_set_inside_the_wrapped_function("input")

### Suppress Tracing

In [20]:
with suppress_tracing():
    with tracer.start_as_current_span(
        "THIS-SPAN-SHOULD-NOT-BE-TRACED",
        openinference_span_kind="chain",
    ) as span:
        span.set_input("input")
        span.set_output("output")
        span.set_status(Status(StatusCode.OK))

In [21]:
@tracer.chain
def decorated_chain_with_suppress_tracing(input: str) -> str:
    return "output"


with suppress_tracing():
    decorated_chain_with_suppress_tracing("input")

### Context Attributes

In [22]:
with using_attributes(session_id="123"):
    with tracer.start_as_current_span(
        "chain-span-with-context-attributes",
        openinference_span_kind="chain",
    ) as span:
        span.set_input("input")
        span.set_output("output")
        span.set_status(Status(StatusCode.OK))

In [23]:
@tracer.chain
def decorated_chain_with_context_attributes(input: str) -> str:
    return "output"


with using_attributes(session_id="123"):
    decorated_chain_with_context_attributes("input")

## Agents

### Context Managers

In [24]:
with tracer.start_as_current_span(
    "agent-span-with-plain-text-io",
    openinference_span_kind="agent",
) as span:
    span.set_input("input")
    span.set_output("output")
    span.set_status(Status(StatusCode.OK))

### Decorators

In [None]:
@tracer.agent
def decorated_agent(input: str) -> str:
    return "output"


decorated_agent("input")

## Tools

### Context Managers

In [26]:
with tracer.start_as_current_span(
    "tool-span",
    openinference_span_kind="tool",
) as span:
    span.set_input("input")
    span.set_output("output")
    span.set_tool(
        name="tool-name",
        description="tool-description",
        parameters={"input": "input"},
    )
    span.set_status(Status(StatusCode.OK))

In [27]:
with tracer.start_as_current_span(
    "tool-span-with-getter",
    openinference_span_kind="tool",
) as span:
    span.set_attributes(
        get_tool_attributes(
            name="tool-name",
            description="tool-description",
            parameters={"input": "input"},
        )
    )
    span.set_status(Status(StatusCode.OK))

In [28]:
@tracer.tool
def decorated_tool(input1: str, input2: int) -> None:
    """
    tool-description
    """


decorated_tool("input1", 1)

In [29]:
@tracer.tool
async def decorated_tool_async(input1: str, input2: int) -> None:
    """
    tool-description
    """


await decorated_tool_async("input1", 1)  # type: ignore[top-level-await]

In [30]:
@tracer.tool(
    name="decorated-tool-with-overriden-name",
    description="overriden-tool-description",
)
def this_tool_name_should_be_overriden(input1: str, input2: int) -> None:
    """
    this tool description should be overriden
    """


this_tool_name_should_be_overriden("input1", 1)

In [None]:
@tracer.tool
def tool_with_changes_inside_the_wrapped_function(input1: str, input2: int) -> str:
    span = get_current_span()
    print(type(span))
    span.set_input("inside-input")  # type: ignore[attr-defined]
    span.set_output("inside-output")  # type: ignore[attr-defined]
    span.set_tool(  # type: ignore[attr-defined]
        name="inside-tool-name",
        description="inside-tool-description",
        parameters={"inside-input": "inside-input"},
    )
    return "output"


tool_with_changes_inside_the_wrapped_function("input1", 1)