Skip to content

Commit

Permalink
feat(llmobs): support in-code config for llmobs (#9172)
Browse files Browse the repository at this point in the history
Support in-code configuration for LLMObs users, to enable LLMObs and
specify the following configurations that currently require environment
variable configuration.

- ml_app
- list of integrations to patch (will patch all LLMObs integrations by
default)
- dd_llmobs_no_apm (turn off APM, telemetry, remote config, metrics)
- DD site, DD env, DD service (will override config/env vars)

```
from ddtrace.llmobs import LLMObs

LLMObs.enable(
    ml_app="comms/langchain", 
    integrations=["openai"],
    llmobs_agentless_enabled=True,
    # api_key =...
    # site=...
    # env=...
    # service=...
    # _tracer=None
)
```
Allowing in-code setup also improves the dev experience for people
tracing experimental apps with LLMObs. It also abstracts away a long
list of environment variables non-APM customers are required to set to
turn off all APM related features.

This PR should not break _any_ previous way of setting up the Python SDK
(e.g. using env vars and `ddtrace-run`).

Arguments passed to enable() should take precedence over environment
variables, with the exception of `DD_LLMOBS_ENABLED`.

This PR also does a couple minor things:
- If `DD_LLMOBS_NO_APM` env var is detected or configured through
LLMObs.enable(), the OpenAI and LangChain integrations will disable
submitting metrics unless the corresponding env vars
`DD_{OPENAI,LANGCHAIN}_METRICS_ENABLED` is set to True.
- We also automatically disable both telemetry writer and remote config
pollers if `DD_LLMOBS_NO_APM` is detected or configured through
LLMObs.enable().
- We automatically patch the LLMObs integrations on LLMObs.enable().
- Removes all LLMObs.enable() references in individual integration patch
code (openai, botocore, langchain)

Note:
- This change (only for LLMObs users) will override `config.service,
config.env` if these are passed in to `LLMObs.enable()`.
- If a user runs via `ddtrace-run`, they cannot use `LLMObs.enable()` to
configure their settings.

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: lievan <evan.li@datadoqhq.com>
Co-authored-by: Yun Kim <35776586+Yun-Kim@users.noreply.github.com>
Co-authored-by: Yun Kim <yun.kim@datadoghq.com>
  • Loading branch information
4 people authored May 17, 2024
1 parent 3b08d60 commit bf858f7
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 46 deletions.
6 changes: 0 additions & 6 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from ddtrace import config
from ddtrace.contrib.trace_utils import with_traced_module
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._integrations import BedrockIntegration
from ddtrace.settings.config import Config
from ddtrace.vendor import wrapt
Expand Down Expand Up @@ -86,9 +85,6 @@ def patch():
return
botocore.client._datadog_patch = True

if config._llmobs_enabled:
LLMObs.enable()

botocore._datadog_integration = BedrockIntegration(integration_config=config.botocore)
wrapt.wrap_function_wrapper("botocore.client", "BaseClient._make_api_call", patched_api_call(botocore))
Pin(service="aws").onto(botocore.client.BaseClient)
Expand All @@ -103,8 +99,6 @@ def unpatch():
botocore.client._datadog_patch = False
unwrap(botocore.parsers.ResponseParser, "parse")
unwrap(botocore.client.BaseClient, "_make_api_call")
if LLMObs.enabled:
LLMObs.disable()


def patch_submodules(submodules):
Expand Down
7 changes: 0 additions & 7 deletions ddtrace/contrib/langchain/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._integrations import LangChainIntegration
from ddtrace.pin import Pin
from ddtrace.vendor import wrapt
Expand Down Expand Up @@ -885,9 +884,6 @@ def patch():
if getattr(langchain, "_datadog_patch", False):
return

if config._llmobs_enabled:
LLMObs.enable()

langchain._datadog_patch = True

Pin().onto(langchain)
Expand Down Expand Up @@ -995,9 +991,6 @@ def unpatch():
if not getattr(langchain, "_datadog_patch", False):
return

if LLMObs.enabled:
LLMObs.disable()

langchain._datadog_patch = False

if SHOULD_PATCH_LANGCHAIN_COMMUNITY:
Expand Down
7 changes: 1 addition & 6 deletions ddtrace/contrib/openai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from ddtrace.internal.utils.formats import deep_getattr
from ddtrace.internal.utils.version import parse_version
from ddtrace.internal.wrapping import wrap
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._integrations import OpenAIIntegration

from ...pin import Pin
Expand Down Expand Up @@ -148,9 +147,6 @@ def patch():
if getattr(openai, "__datadog_patch", False):
return

if config._llmobs_enabled:
LLMObs.enable()

Pin().onto(openai)
integration = OpenAIIntegration(integration_config=config.openai, openai=openai)

Expand Down Expand Up @@ -202,8 +198,7 @@ def patch():
def unpatch():
# FIXME: add unpatching. The current wrapping.unwrap method requires
# the wrapper function to be provided which we don't keep a reference to.
if LLMObs.enabled:
LLMObs.disable()
pass


def _patched_client_init(openai, integration):
Expand Down
6 changes: 4 additions & 2 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ def __init__(self, integration_config: IntegrationConfig) -> None:
)
self._log_pc_sampler = RateSampler(sample_rate=integration_config.log_prompt_completion_sample_rate)
self.start_log_writer()
if self.llmobs_enabled:
self._llmobs_pc_sampler = RateSampler(sample_rate=config._llmobs_sample_rate)
self._llmobs_pc_sampler = RateSampler(sample_rate=config._llmobs_sample_rate)

@property
def metrics_enabled(self) -> bool:
"""Return whether submitting metrics is enabled for this integration, or global config if not set."""
env_metrics_enabled = asbool(os.getenv("DD_{}_METRICS_ENABLED".format(self._integration_name.upper())))
if not env_metrics_enabled and asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED")):
return False
if hasattr(self.integration_config, "metrics_enabled"):
return asbool(self.integration_config.metrics_enabled)
return False
Expand Down
93 changes: 88 additions & 5 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import ddtrace
from ddtrace import Span
from ddtrace import config
from ddtrace import patch
from ddtrace.ext import SpanTypes
from ddtrace.internal import atexit
from ddtrace.internal import telemetry
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.service import Service
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.utils.formats import asbool
Expand Down Expand Up @@ -43,6 +47,13 @@
log = get_logger(__name__)


SUPPORTED_INTEGRATIONS = {
"bedrock": lambda: patch(botocore=True),
"langchain": lambda: patch(langchain=True),
"openai": lambda: patch(openai=True),
}


class LLMObs(Service):
_instance = None # type: LLMObs
enabled = False
Expand Down Expand Up @@ -88,7 +99,29 @@ def _stop_service(self) -> None:
log.warning("Failed to shutdown tracer", exc_info=True)

@classmethod
def enable(cls, tracer=None):
def enable(
cls,
ml_app: Optional[str] = None,
integrations: Optional[List[str]] = None,
agentless_enabled: bool = False,
site: Optional[str] = None,
api_key: Optional[str] = None,
env: Optional[str] = None,
service: Optional[str] = None,
_tracer=None,
):
"""
Enable LLM Observability tracing.
:param str ml_app: The name of your ml application.
:param List[str] integrations: A list of integrations to enable auto-tracing for.
Must be subset of ("openai", "langchain", "bedrock")
:param bool agentless_enabled: Set to `true` to disable sending data that requires a Datadog Agent.
:param str site: Your datadog site.
:param str api_key: Your datadog api key.
:param str env: Your environment name.
:param str service: Your service name.
"""
if cls.enabled:
log.debug("%s already enabled", cls.__name__)
return
Expand All @@ -97,23 +130,47 @@ def enable(cls, tracer=None):
log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service")
return

# grab required values for LLMObs
config._dd_site = site or config._dd_site
config._dd_api_key = api_key or config._dd_api_key
config._llmobs_ml_app = ml_app or config._llmobs_ml_app
config.env = env or config.env
config.service = service or config.service

# validate required values for LLMObs
if not config._dd_api_key:
raise ValueError(
"DD_API_KEY is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)
if not config._dd_site:
raise ValueError(
"DD_SITE is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)
if not config._llmobs_ml_app:
raise ValueError(
"DD_LLMOBS_APP_NAME is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)

# override the default _instance with a new tracer
cls._instance = cls(tracer=tracer)
if agentless_enabled or asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", "false")):
os.environ["DD_LLMOBS_AGENTLESS_ENABLED"] = "1"

cls.enabled = True
if not os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED"):
config._telemetry_enabled = False
log.debug("Telemetry disabled because DD_LLMOBS_AGENTLESS_ENABLED is set to true.")
telemetry.telemetry_writer.disable()

if not os.getenv("DD_REMOTE_CONFIG_ENABLED"):
config._remote_config_enabled = False
log.debug("Remote configuration disabled because DD_LLMOBS_AGENTLESS_ENABLED is set to true.")
remoteconfig_poller.disable()

# turn on llmobs trace processing
cls._patch_integrations(integrations)
# override the default _instance with a new tracer
cls._instance = cls(tracer=_tracer)
cls.enabled = True
cls._instance.start()

atexit.register(cls.disable)
Expand Down Expand Up @@ -146,6 +203,32 @@ def flush(cls):
except Exception:
log.warning("Failed to flush LLMObs spans and evaluation metrics.", exc_info=True)

@staticmethod
def _patch_integrations(integrations: Optional[List[str]] = None):
"""
Patch LLM integrations based on a list of integrations passed in. Patch all supported integrations by default.
"""
integrations_to_patch = {}
if integrations is None:
integrations_to_patch.update(SUPPORTED_INTEGRATIONS)
else:
for integration in integrations:
integration = integration.lower()
if integration in SUPPORTED_INTEGRATIONS:
integrations_to_patch.update({integration: SUPPORTED_INTEGRATIONS[integration]})
else:
log.warning(
"%s is unsupported - LLMObs currently supports %s",
integration,
str(SUPPORTED_INTEGRATIONS.keys()),
)
for integration in integrations_to_patch:
try:
SUPPORTED_INTEGRATIONS[integration]()
except Exception:
log.warning("couldn't patch %s", integration, exc_info=True)
return

@classmethod
def export_span(cls, span: Optional[Span] = None) -> Optional[ExportedLLMObsSpan]:
"""Returns a simple representation of a span to export its span and trace IDs.
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/llmobs/_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class LLMObsTraceProcessor(TraceProcessor):

def __init__(self, llmobs_span_writer):
self._span_writer = llmobs_span_writer
self._no_apm_traces = asbool(os.getenv("DD_LLMOBS_NO_APM", False))
self._no_apm_traces = asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", False))

def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if not trace:
Expand Down
7 changes: 4 additions & 3 deletions tests/contrib/botocore/test_bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def bedrock_client(boto3, request_vcr):
)
bedrock_client = session.client("bedrock-runtime")
yield bedrock_client
LLMObs.disable()


@pytest.fixture
Expand Down Expand Up @@ -487,7 +488,7 @@ def _test_llmobs_invoke(cls, provider, bedrock_client, mock_llmobs_span_writer,
pin.override(bedrock_client, tracer=mock_tracer)
# Need to disable and re-enable LLMObs service to use the mock tracer
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"])

if cassette_name is None:
cassette_name = "%s_invoke.yaml" % provider
Expand Down Expand Up @@ -523,7 +524,7 @@ def _test_llmobs_invoke_stream(
pin.override(bedrock_client, tracer=mock_tracer)
# Need to disable and re-enable LLMObs service to use the mock tracer
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"])

if cassette_name is None:
cassette_name = "%s_invoke_stream.yaml" % provider
Expand Down Expand Up @@ -623,7 +624,7 @@ def test_llmobs_error(self, ddtrace_global_config, bedrock_client, mock_llmobs_s
pin.override(bedrock_client, tracer=mock_tracer)
# Need to disable and re-enable LLMObs service to use the mock tracer
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"])
with pytest.raises(botocore.exceptions.ClientError):
with request_vcr.use_cassette("meta_invoke_error.yaml"):
body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"]
Expand Down
2 changes: 2 additions & 0 deletions tests/contrib/langchain/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ddtrace import Pin
from ddtrace.contrib.langchain.patch import patch
from ddtrace.contrib.langchain.patch import unpatch
from ddtrace.llmobs import LLMObs
from tests.utils import DummyTracer
from tests.utils import DummyWriter
from tests.utils import override_config
Expand Down Expand Up @@ -86,6 +87,7 @@ def mock_llmobs_span_writer():
yield m
finally:
patcher.stop()
LLMObs.disable()


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/langchain/test_langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ def _test_llmobs_llm_invoke(
different_py39_cassette=False,
):
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

if sys.version_info < (3, 10, 0) and different_py39_cassette:
cassette_name = cassette_name.replace(".yaml", "_39.yaml")
Expand Down Expand Up @@ -1386,7 +1386,7 @@ def _test_llmobs_chain_invoke(
):
# disable the service before re-enabling it, as it was enabled in another test
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

if sys.version_info < (3, 10, 0) and different_py39_cassette:
cassette_name = cassette_name.replace(".yaml", "_39.yaml")
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/langchain/test_langchain_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ def _test_llmobs_llm_invoke(
output_role=None,
):
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

with request_vcr.use_cassette(cassette_name):
generate_trace("Can you explain what an LLM chain is?")
Expand Down Expand Up @@ -1370,7 +1370,7 @@ def _test_llmobs_chain_invoke(
):
# disable the service before re-enabling it, as it was enabled in another test
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"])

with request_vcr.use_cassette(cassette_name):
generate_trace("Can you explain what an LLM chain is?")
Expand Down
3 changes: 2 additions & 1 deletion tests/contrib/openai/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,10 @@ def mock_tracer(ddtrace_global_config, openai, patch_openai, mock_logs, mock_met
if ddtrace_global_config.get("_llmobs_enabled", False):
# Have to disable and re-enable LLMObs to use to mock tracer.
LLMObs.disable()
LLMObs.enable(tracer=mock_tracer)
LLMObs.enable(_tracer=mock_tracer, integrations=["openai"])

yield mock_tracer

mock_logs.reset_mock()
mock_metrics.reset_mock()
LLMObs.disable()
2 changes: 1 addition & 1 deletion tests/llmobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ def LLMObs(mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_glob
global_config.update(ddtrace_global_config)
with override_global_config(global_config):
dummy_tracer = DummyTracer()
llmobs_service.enable(tracer=dummy_tracer)
llmobs_service.enable(_tracer=dummy_tracer)
yield llmobs_service
llmobs_service.disable()
Loading

0 comments on commit bf858f7

Please sign in to comment.