Skip to content

feat(transformers): implement the support to emitting events in addition to current behavior #2940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

import logging
from typing import Collection
from opentelemetry.instrumentation.transformers.config import Config
from wrapt import wrap_function_wrapper

from opentelemetry.trace import get_tracer

from opentelemetry._events import get_event_logger
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

from opentelemetry.instrumentation.transformers.config import Config
from opentelemetry.instrumentation.transformers.text_generation_pipeline_wrapper import (
text_generation_pipeline_wrapper,
)
from opentelemetry.instrumentation.transformers.utils import should_emit_events
from opentelemetry.instrumentation.transformers.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

logger = logging.getLogger(__name__)

Expand All @@ -33,16 +33,28 @@
class TransformersInstrumentor(BaseInstrumentor):
"""An instrumentor for transformers library."""

def __init__(self, exception_logger=None):
def __init__(self, exception_logger=None, use_legacy_attributes=True):
super().__init__()
Config.exception_logger = exception_logger
Config.use_legacy_attributes = use_legacy_attributes

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)

event_logger = None

if should_emit_events():
event_logger_provider = kwargs.get("event_logger_provider")
event_logger = get_event_logger(
__name__,
__version__,
event_logger_provider=event_logger_provider,
)

for wrapped_method in WRAPPED_METHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")
Expand All @@ -51,7 +63,7 @@ def _instrument(self, **kwargs):
wrap_function_wrapper(
wrap_package,
f"{wrap_object}.{wrap_method}" if wrap_object else wrap_method,
wrapper(tracer, wrapped_method),
wrapper(tracer, event_logger, wrapped_method),
)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
class Config:
exception_logger = None
use_legacy_attributes = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from enum import Enum
from typing import Union

from opentelemetry._events import Event
from opentelemetry.instrumentation.transformers.event_models import (
CompletionEvent,
PromptEvent,
)
from opentelemetry.instrumentation.transformers.utils import (
dont_throw,
should_emit_events,
should_send_prompts,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)


class Roles(Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"


VALID_MESSAGE_ROLES = {role.value for role in Roles}
"""The valid roles for naming the message event."""

EVENT_ATTRIBUTES = {GenAIAttributes.GEN_AI_SYSTEM: "transformers"}
"""The attributes to be used for the event."""


@dont_throw
def emit_prompt_events(args, kwargs, event_logger) -> None:
if not should_emit_events() or event_logger is None:
return

if args and len(args) > 0:
prompts_list = args[0]
else:
prompts_list = kwargs.get("args")

if isinstance(prompts_list, str):
prompts_list = [prompts_list]

for prompt in prompts_list:
emit_event(PromptEvent(content=prompt, role="user"), event_logger)


@dont_throw
def emit_response_events(response, event_logger) -> None:
if response and len(response) > 0:
for i, completion in enumerate(response):
emit_event(
CompletionEvent(
index=i,
message={
"content": completion.get("generated_text"),
"role": "assistant",
},
finish_reason="unknown",
),
event_logger,
)


def emit_event(event: Union[PromptEvent, CompletionEvent], event_logger) -> None:
"""
Emit an event to the OpenTelemetry SDK.

Args:
event: The event to emit.
"""
if not should_emit_events():
return

if isinstance(event, PromptEvent):
_emit_prompt_event(event, event_logger)
elif isinstance(event, CompletionEvent):
_emit_completion_event(event, event_logger)
else:
raise TypeError("Unsupported event type")


def _emit_prompt_event(event: PromptEvent, event_logger) -> None:
body = {
"content": event.content,
"role": event.role,
"tool_calls": event.tool_calls,
}

if event.role in VALID_MESSAGE_ROLES:
name = "gen_ai.{}.message".format(event.role)
# According to the semantic conventions, the role is conditionally required if available
# and not equal to the "role" in the message name. So, remove the role from the body if
# it is the same as the in the event name.
body.pop("role", None)
else:
name = "gen_ai.user.message"

# According to the semantic conventions, only the assistant role has tool call
if event.role != Roles.ASSISTANT.value and event.tool_calls is not None:
del body["tool_calls"]
elif event.tool_calls is None:
del body["tool_calls"]

if not should_send_prompts():
del body["content"]
if body.get("tool_calls") is not None:
for tool_call in body["tool_calls"]:
tool_call["function"].pop("arguments", None)

event_logger.emit(Event(name=name, body=body, attributes=EVENT_ATTRIBUTES))


def _emit_completion_event(event: CompletionEvent, event_logger) -> None:
body = {
"index": event.index,
"message": event.message,
"finish_reason": event.finish_reason,
"tool_calls": event.tool_calls,
}

if event.message["role"] == Roles.ASSISTANT.value:
# According to the semantic conventions, the role is conditionally required if available
# and not equal to "assistant", so remove the role from the body if it is "assistant".
body["message"].pop("role", None)

if event.tool_calls is None:
del body["tool_calls"]

if not should_send_prompts():
body["message"].pop("content", None)
if body.get("tool_calls") is not None:
for tool_call in body["tool_calls"]:
tool_call["function"].pop("arguments", None)

event_logger.emit(
Event(name="gen_ai.choice", body=body, attributes=EVENT_ATTRIBUTES)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from dataclasses import dataclass
from typing import Any, List, Literal, Optional, TypedDict


class _FunctionToolCall(TypedDict):
function_name: str
arguments: Optional[dict[str, Any]]


class ToolCall(TypedDict):
"""Represents a tool call in the AI model."""

id: str
function: _FunctionToolCall
type: Literal["function"]


class CompletionMessage(TypedDict):
"""Represents a message in the AI model."""

content: Any
role: str = "assistant"


@dataclass
class PromptEvent:
"""Represents an input event for the AI model."""

content: Any
role: str = "user"
tool_calls: Optional[List[ToolCall]] = None


@dataclass
class CompletionEvent:
"""Represents a completion event for the AI model."""

index: int
message: CompletionMessage
finish_reason: str = "unknown"
tool_calls: Optional[List[ToolCall]] = None

@property
def total_tokens(self) -> Optional[int]:
"""Returns the total number of tokens used in the event."""
if self.input_tokens is None or self.output_tokens is None:
return None
return self.input_tokens + self.output_tokens
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from opentelemetry.instrumentation.transformers.utils import dont_throw
from opentelemetry.semconv_ai import (
SpanAttributes,
)


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


@dont_throw
def set_input_attributes(span, instance, args, kwargs):
if not span.is_recording():
return

if args and len(args) > 0:
prompts_list = args[0]
else:
prompts_list = kwargs.get("args")

_set_span_prompts(span, prompts_list)


@dont_throw
def set_model_input_attributes(span, instance):
if not span.is_recording():
return

forward_params = instance._forward_params

_set_span_attribute(
span, SpanAttributes.LLM_REQUEST_MODEL, instance.model.config.name_or_path
)
_set_span_attribute(
span, SpanAttributes.LLM_SYSTEM, instance.model.config.model_type
)
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_TYPE, "completion")
_set_span_attribute(
span, SpanAttributes.LLM_REQUEST_TEMPERATURE, forward_params.get("temperature")
)
_set_span_attribute(
span, SpanAttributes.LLM_REQUEST_TOP_P, forward_params.get("top_p")
)
_set_span_attribute(
span, SpanAttributes.LLM_REQUEST_MAX_TOKENS, forward_params.get("max_length")
)
_set_span_attribute(
span,
SpanAttributes.LLM_REQUEST_REPETITION_PENALTY,
forward_params.get("repetition_penalty"),
)


@dont_throw
def set_response_attributes(span, response):
if response and span.is_recording():
if len(response) > 0:
_set_span_completions(span, response)


def _set_span_completions(span, completions):
if completions is None:
return

for i, completion in enumerate(completions):
prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{i}"
_set_span_attribute(span, f"{prefix}.content", completion.get("generated_text"))


def _set_span_prompts(span, messages):
if messages is None:
return

if isinstance(messages, str):
messages = [messages]

for i, msg in enumerate(messages):
prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}"
_set_span_attribute(span, f"{prefix}.content", msg)
Loading