Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(llmobs): support in-code config for llmobs [backport 2.9] #9366

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from ddtrace import config
from ddtrace.contrib.trace_utils import with_traced_module
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._integrations import BedrockIntegration
from ddtrace.settings.config import Config
from ddtrace.vendor import wrapt
Expand Down Expand Up @@ -87,9 +86,6 @@ def patch():
return
botocore.client._datadog_patch = True

if config._llmobs_enabled:
LLMObs.enable()

botocore._datadog_integration = BedrockIntegration(integration_config=config.botocore)
wrapt.wrap_function_wrapper("botocore.client", "BaseClient._make_api_call", patched_api_call(botocore))
Pin(service="aws").onto(botocore.client.BaseClient)
Expand All @@ -104,8 +100,6 @@ def unpatch():
botocore.client._datadog_patch = False
unwrap(botocore.parsers.ResponseParser, "parse")
unwrap(botocore.client.BaseClient, "_make_api_call")
if LLMObs.enabled:
LLMObs.disable()


def patch_submodules(submodules):
Expand Down
7 changes: 0 additions & 7 deletions ddtrace/contrib/langchain/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._integrations import LangChainIntegration
from ddtrace.pin import Pin
from ddtrace.vendor import wrapt
Expand Down Expand Up @@ -885,9 +884,6 @@ def patch():
if getattr(langchain, "_datadog_patch", False):
return

if config._llmobs_enabled:
LLMObs.enable()

langchain._datadog_patch = True

Pin().onto(langchain)
Expand Down Expand Up @@ -995,9 +991,6 @@ def unpatch():
if not getattr(langchain, "_datadog_patch", False):
return

if LLMObs.enabled:
LLMObs.disable()

langchain._datadog_patch = False

if SHOULD_PATCH_LANGCHAIN_COMMUNITY:
Expand Down
7 changes: 1 addition & 6 deletions ddtrace/contrib/openai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.internal.wrapping import wrap
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._integrations import OpenAIIntegration

from ...pin import Pin
Expand Down Expand Up @@ -127,9 +126,6 @@ def patch():
if getattr(openai, "__datadog_patch", False):
return

if config._llmobs_enabled:
LLMObs.enable()

Pin().onto(openai)
integration = OpenAIIntegration(integration_config=config.openai, openai=openai)

Expand Down Expand Up @@ -175,8 +171,7 @@ def patch():
def unpatch():
# FIXME: add unpatching. The current wrapping.unwrap method requires
# the wrapper function to be provided which we don't keep a reference to.
if LLMObs.enabled:
LLMObs.disable()
pass


def _patched_client_init(openai, integration):
Expand Down
6 changes: 4 additions & 2 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ def __init__(self, integration_config: IntegrationConfig) -> None:
)
self._log_pc_sampler = RateSampler(sample_rate=integration_config.log_prompt_completion_sample_rate)
self.start_log_writer()
if self.llmobs_enabled:
self._llmobs_pc_sampler = RateSampler(sample_rate=config._llmobs_sample_rate)
self._llmobs_pc_sampler = RateSampler(sample_rate=config._llmobs_sample_rate)

@property
def metrics_enabled(self) -> bool:
"""Return whether submitting metrics is enabled for this integration, or global config if not set."""
env_metrics_enabled = asbool(os.getenv("DD_{}_METRICS_ENABLED".format(self._integration_name.upper())))
if not env_metrics_enabled and asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED")):
return False
if hasattr(self.integration_config, "metrics_enabled"):
return asbool(self.integration_config.metrics_enabled)
return False
Expand Down
93 changes: 88 additions & 5 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import ddtrace
from ddtrace import Span
from ddtrace import config
from ddtrace import patch
from ddtrace.ext import SpanTypes
from ddtrace.internal import atexit
from ddtrace.internal import telemetry
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.service import Service
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.utils.formats import asbool
Expand Down Expand Up @@ -43,6 +47,13 @@
log = get_logger(__name__)


SUPPORTED_INTEGRATIONS = {
"bedrock": lambda: patch(botocore=True),
"langchain": lambda: patch(langchain=True),
"openai": lambda: patch(openai=True),
}


class LLMObs(Service):
_instance = None # type: LLMObs
enabled = False
Expand Down Expand Up @@ -88,7 +99,29 @@ def _stop_service(self) -> None:
log.warning("Failed to shutdown tracer", exc_info=True)

@classmethod
def enable(cls, tracer=None):
def enable(
cls,
ml_app: Optional[str] = None,
integrations: Optional[List[str]] = None,
agentless_enabled: bool = False,
site: Optional[str] = None,
api_key: Optional[str] = None,
env: Optional[str] = None,
service: Optional[str] = None,
_tracer=None,
):
"""
Enable LLM Observability tracing.

:param str ml_app: The name of your ml application.
:param List[str] integrations: A list of integrations to enable auto-tracing for.
Must be subset of ("openai", "langchain", "bedrock")
:param bool agentless_enabled: Set to `true` to disable sending data that requires a Datadog Agent.
:param str site: Your datadog site.
:param str api_key: Your datadog api key.
:param str env: Your environment name.
:param str service: Your service name.
"""
if cls.enabled:
log.debug("%s already enabled", cls.__name__)
return
Expand All @@ -97,23 +130,47 @@ def enable(cls, tracer=None):
log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service")
return

# grab required values for LLMObs
config._dd_site = site or config._dd_site
config._dd_api_key = api_key or config._dd_api_key
config._llmobs_ml_app = ml_app or config._llmobs_ml_app
config.env = env or config.env
config.service = service or config.service

# validate required values for LLMObs
if not config._dd_api_key:
raise ValueError(
"DD_API_KEY is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)
if not config._dd_site:
raise ValueError(
"DD_SITE is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)
if not config._llmobs_ml_app:
raise ValueError(
"DD_LLMOBS_APP_NAME is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)

# override the default _instance with a new tracer
cls._instance = cls(tracer=tracer)
if agentless_enabled or asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", "false")):
os.environ["DD_LLMOBS_AGENTLESS_ENABLED"] = "1"

cls.enabled = True
if not os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED"):
config._telemetry_enabled = False
log.debug("Telemetry disabled because DD_LLMOBS_AGENTLESS_ENABLED is set to true.")
telemetry.telemetry_writer.disable()

if not os.getenv("DD_REMOTE_CONFIG_ENABLED"):
config._remote_config_enabled = False
log.debug("Remote configuration disabled because DD_LLMOBS_AGENTLESS_ENABLED is set to true.")
remoteconfig_poller.disable()

# turn on llmobs trace processing
cls._patch_integrations(integrations)
# override the default _instance with a new tracer
cls._instance = cls(tracer=_tracer)
cls.enabled = True
cls._instance.start()

atexit.register(cls.disable)
Expand Down Expand Up @@ -146,6 +203,32 @@ def flush(cls):
except Exception:
log.warning("Failed to flush LLMObs spans and evaluation metrics.", exc_info=True)

@staticmethod
def _patch_integrations(integrations: Optional[List[str]] = None):
"""
Patch LLM integrations based on a list of integrations passed in. Patch all supported integrations by default.
"""
integrations_to_patch = {}
if integrations is None:
integrations_to_patch.update(SUPPORTED_INTEGRATIONS)
else:
for integration in integrations:
integration = integration.lower()
if integration in SUPPORTED_INTEGRATIONS:
integrations_to_patch.update({integration: SUPPORTED_INTEGRATIONS[integration]})
else:
log.warning(
"%s is unsupported - LLMObs currently supports %s",
integration,
str(SUPPORTED_INTEGRATIONS.keys()),
)
for integration in integrations_to_patch:
try:
SUPPORTED_INTEGRATIONS[integration]()
except Exception:
log.warning("couldn't patch %s", integration, exc_info=True)
return

@classmethod
def export_span(cls, span: Optional[Span] = None) -> Optional[ExportedLLMObsSpan]:
"""Returns a simple representation of a span to export its span and trace IDs.
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/llmobs/_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class LLMObsTraceProcessor(TraceProcessor):

def __init__(self, llmobs_span_writer):
self._span_writer = llmobs_span_writer
self._no_apm_traces = asbool(os.getenv("DD_LLMOBS_NO_APM", False))
self._no_apm_traces = asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", False))

def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if not trace:
Expand Down
7 changes: 4 additions & 3 deletions tests/contrib/botocore/test_bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def bedrock_client(boto3, request_vcr):
)
bedrock_client = session.client("bedrock-runtime")
yield bedrock_client
LLMObs.disable()


@pytest.fixture
Expand Down Expand Up @@ -487,7 +488,7 @@ def _test_llmobs_invoke(cls, provider, bedrock_client, mock_llmobs_span_writer,
pin.override(bedrock_client, tracer=mock_tracer)
# Need to disable and re-enable LLMObs service to use the mock tracer
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"])

if cassette_name is None:
cassette_name = "%s_invoke.yaml" % provider
Expand Down Expand Up @@ -523,7 +524,7 @@ def _test_llmobs_invoke_stream(
pin.override(bedrock_client, tracer=mock_tracer)
# Need to disable and re-enable LLMObs service to use the mock tracer
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"])

if cassette_name is None:
cassette_name = "%s_invoke_stream.yaml" % provider
Expand Down Expand Up @@ -623,7 +624,7 @@ def test_llmobs_error(self, ddtrace_global_config, bedrock_client, mock_llmobs_s
pin.override(bedrock_client, tracer=mock_tracer)
# Need to disable and re-enable LLMObs service to use the mock tracer
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"])
with pytest.raises(botocore.exceptions.ClientError):
with request_vcr.use_cassette("meta_invoke_error.yaml"):
body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"]
Expand Down
2 changes: 2 additions & 0 deletions tests/contrib/langchain/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ddtrace import Pin
from ddtrace.contrib.langchain.patch import patch
from ddtrace.contrib.langchain.patch import unpatch
from ddtrace.llmobs import LLMObs
from tests.utils import DummyTracer
from tests.utils import DummyWriter
from tests.utils import override_config
Expand Down Expand Up @@ -86,6 +87,7 @@ def mock_llmobs_span_writer():
yield m
finally:
patcher.stop()
LLMObs.disable()


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/langchain/test_langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ def _test_llmobs_llm_invoke(
different_py39_cassette=False,
):
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

if sys.version_info < (3, 10, 0) and different_py39_cassette:
cassette_name = cassette_name.replace(".yaml", "_39.yaml")
Expand Down Expand Up @@ -1386,7 +1386,7 @@ def _test_llmobs_chain_invoke(
):
# disable the service before re-enabling it, as it was enabled in another test
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

if sys.version_info < (3, 10, 0) and different_py39_cassette:
cassette_name = cassette_name.replace(".yaml", "_39.yaml")
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/langchain/test_langchain_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ def _test_llmobs_llm_invoke(
output_role=None,
):
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

with request_vcr.use_cassette(cassette_name):
generate_trace("Can you explain what an LLM chain is?")
Expand Down Expand Up @@ -1370,7 +1370,7 @@ def _test_llmobs_chain_invoke(
):
# disable the service before re-enabling it, as it was enabled in another test
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

with request_vcr.use_cassette(cassette_name):
generate_trace("Can you explain what an LLM chain is?")
Expand Down
3 changes: 2 additions & 1 deletion tests/contrib/openai/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,10 @@ def mock_tracer(ddtrace_global_config, openai, patch_openai, mock_logs, mock_met
if ddtrace_global_config.get("_llmobs_enabled", False):
# Have to disable and re-enable LLMObs to use to mock tracer.
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["openai"])

yield mock_tracer

mock_logs.reset_mock()
mock_metrics.reset_mock()
LLMObs.disable()
2 changes: 1 addition & 1 deletion tests/llmobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ def LLMObs(mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_glob
global_config.update(ddtrace_global_config)
with override_global_config(global_config):
dummy_tracer = DummyTracer()
llmobs_service.enable(tracer=dummy_tracer)
llmobs_service.enable(_tracer=dummy_tracer)
yield llmobs_service
llmobs_service.disable()
Loading
Loading