diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index b4f1a5265ea..b0db100b81e 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -13,7 +13,6 @@ from ddtrace import config from ddtrace.contrib.trace_utils import with_traced_module -from ddtrace.llmobs import LLMObs from ddtrace.llmobs._integrations import BedrockIntegration from ddtrace.settings.config import Config from ddtrace.vendor import wrapt @@ -86,9 +85,6 @@ def patch(): return botocore.client._datadog_patch = True - if config._llmobs_enabled: - LLMObs.enable() - botocore._datadog_integration = BedrockIntegration(integration_config=config.botocore) wrapt.wrap_function_wrapper("botocore.client", "BaseClient._make_api_call", patched_api_call(botocore)) Pin(service="aws").onto(botocore.client.BaseClient) @@ -103,8 +99,6 @@ def unpatch(): botocore.client._datadog_patch = False unwrap(botocore.parsers.ResponseParser, "parse") unwrap(botocore.client.BaseClient, "_make_api_call") - if LLMObs.enabled: - LLMObs.disable() def patch_submodules(submodules): diff --git a/ddtrace/contrib/langchain/patch.py b/ddtrace/contrib/langchain/patch.py index 019b0c489b3..d24d8b53348 100644 --- a/ddtrace/contrib/langchain/patch.py +++ b/ddtrace/contrib/langchain/patch.py @@ -47,7 +47,6 @@ from ddtrace.internal.utils.formats import asbool from ddtrace.internal.utils.formats import deep_getattr from ddtrace.internal.utils.version import parse_version -from ddtrace.llmobs import LLMObs from ddtrace.llmobs._integrations import LangChainIntegration from ddtrace.pin import Pin from ddtrace.vendor import wrapt @@ -885,9 +884,6 @@ def patch(): if getattr(langchain, "_datadog_patch", False): return - if config._llmobs_enabled: - LLMObs.enable() - langchain._datadog_patch = True Pin().onto(langchain) @@ -995,9 +991,6 @@ def unpatch(): if not getattr(langchain, "_datadog_patch", False): return - if LLMObs.enabled: - LLMObs.disable() - langchain._datadog_patch = False if SHOULD_PATCH_LANGCHAIN_COMMUNITY: diff --git a/ddtrace/contrib/openai/patch.py b/ddtrace/contrib/openai/patch.py index 5a504e1258c..aca4693ede2 100644 --- a/ddtrace/contrib/openai/patch.py +++ b/ddtrace/contrib/openai/patch.py @@ -10,7 +10,6 @@ from ddtrace.internal.utils.formats import deep_getattr from ddtrace.internal.utils.version import parse_version from ddtrace.internal.wrapping import wrap -from ddtrace.llmobs import LLMObs from ddtrace.llmobs._integrations import OpenAIIntegration from ...pin import Pin @@ -148,9 +147,6 @@ def patch(): if getattr(openai, "__datadog_patch", False): return - if config._llmobs_enabled: - LLMObs.enable() - Pin().onto(openai) integration = OpenAIIntegration(integration_config=config.openai, openai=openai) @@ -202,8 +198,7 @@ def patch(): def unpatch(): # FIXME: add unpatching. The current wrapping.unwrap method requires # the wrapper function to be provided which we don't keep a reference to. - if LLMObs.enabled: - LLMObs.disable() + pass def _patched_client_init(openai, integration): diff --git a/ddtrace/llmobs/_integrations/base.py b/ddtrace/llmobs/_integrations/base.py index 8e5396baa62..8a7c3f99086 100644 --- a/ddtrace/llmobs/_integrations/base.py +++ b/ddtrace/llmobs/_integrations/base.py @@ -52,12 +52,14 @@ def __init__(self, integration_config: IntegrationConfig) -> None: ) self._log_pc_sampler = RateSampler(sample_rate=integration_config.log_prompt_completion_sample_rate) self.start_log_writer() - if self.llmobs_enabled: - self._llmobs_pc_sampler = RateSampler(sample_rate=config._llmobs_sample_rate) + self._llmobs_pc_sampler = RateSampler(sample_rate=config._llmobs_sample_rate) @property def metrics_enabled(self) -> bool: """Return whether submitting metrics is enabled for this integration, or global config if not set.""" + env_metrics_enabled = asbool(os.getenv("DD_{}_METRICS_ENABLED".format(self._integration_name.upper()))) + if not env_metrics_enabled and asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED")): + return False if hasattr(self.integration_config, "metrics_enabled"): return asbool(self.integration_config.metrics_enabled) return False diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 8cd59bdc4af..fef9bdaf5f0 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -2,15 +2,19 @@ import os from typing import Any from typing import Dict +from typing import List from typing import Optional from typing import Union import ddtrace from ddtrace import Span from ddtrace import config +from ddtrace import patch from ddtrace.ext import SpanTypes from ddtrace.internal import atexit +from ddtrace.internal import telemetry from ddtrace.internal.logger import get_logger +from ddtrace.internal.remoteconfig.worker import remoteconfig_poller from ddtrace.internal.service import Service from ddtrace.internal.service import ServiceStatusError from ddtrace.internal.utils.formats import asbool @@ -43,6 +47,13 @@ log = get_logger(__name__) +SUPPORTED_INTEGRATIONS = { + "bedrock": lambda: patch(botocore=True), + "langchain": lambda: patch(langchain=True), + "openai": lambda: patch(openai=True), +} + + class LLMObs(Service): _instance = None # type: LLMObs enabled = False @@ -88,7 +99,29 @@ def _stop_service(self) -> None: log.warning("Failed to shutdown tracer", exc_info=True) @classmethod - def enable(cls, tracer=None): + def enable( + cls, + ml_app: Optional[str] = None, + integrations: Optional[List[str]] = None, + agentless_enabled: bool = False, + site: Optional[str] = None, + api_key: Optional[str] = None, + env: Optional[str] = None, + service: Optional[str] = None, + _tracer=None, + ): + """ + Enable LLM Observability tracing. + + :param str ml_app: The name of your ml application. + :param List[str] integrations: A list of integrations to enable auto-tracing for. + Must be subset of ("openai", "langchain", "bedrock") + :param bool agentless_enabled: Set to `true` to disable sending data that requires a Datadog Agent. + :param str site: Your datadog site. + :param str api_key: Your datadog api key. + :param str env: Your environment name. + :param str service: Your service name. + """ if cls.enabled: log.debug("%s already enabled", cls.__name__) return @@ -97,23 +130,47 @@ def enable(cls, tracer=None): log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service") return + # grab required values for LLMObs + config._dd_site = site or config._dd_site + config._dd_api_key = api_key or config._dd_api_key + config._llmobs_ml_app = ml_app or config._llmobs_ml_app + config.env = env or config.env + config.service = service or config.service + + # validate required values for LLMObs if not config._dd_api_key: raise ValueError( "DD_API_KEY is required for sending LLMObs data. " "Ensure this configuration is set before running your application." ) + if not config._dd_site: + raise ValueError( + "DD_SITE is required for sending LLMObs data. " + "Ensure this configuration is set before running your application." + ) if not config._llmobs_ml_app: raise ValueError( "DD_LLMOBS_APP_NAME is required for sending LLMObs data. " "Ensure this configuration is set before running your application." ) - # override the default _instance with a new tracer - cls._instance = cls(tracer=tracer) + if agentless_enabled or asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", "false")): + os.environ["DD_LLMOBS_AGENTLESS_ENABLED"] = "1" - cls.enabled = True + if not os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED"): + config._telemetry_enabled = False + log.debug("Telemetry disabled because DD_LLMOBS_AGENTLESS_ENABLED is set to true.") + telemetry.telemetry_writer.disable() + + if not os.getenv("DD_REMOTE_CONFIG_ENABLED"): + config._remote_config_enabled = False + log.debug("Remote configuration disabled because DD_LLMOBS_AGENTLESS_ENABLED is set to true.") + remoteconfig_poller.disable() - # turn on llmobs trace processing + cls._patch_integrations(integrations) + # override the default _instance with a new tracer + cls._instance = cls(tracer=_tracer) + cls.enabled = True cls._instance.start() atexit.register(cls.disable) @@ -146,6 +203,32 @@ def flush(cls): except Exception: log.warning("Failed to flush LLMObs spans and evaluation metrics.", exc_info=True) + @staticmethod + def _patch_integrations(integrations: Optional[List[str]] = None): + """ + Patch LLM integrations based on a list of integrations passed in. Patch all supported integrations by default. + """ + integrations_to_patch = {} + if integrations is None: + integrations_to_patch.update(SUPPORTED_INTEGRATIONS) + else: + for integration in integrations: + integration = integration.lower() + if integration in SUPPORTED_INTEGRATIONS: + integrations_to_patch.update({integration: SUPPORTED_INTEGRATIONS[integration]}) + else: + log.warning( + "%s is unsupported - LLMObs currently supports %s", + integration, + str(SUPPORTED_INTEGRATIONS.keys()), + ) + for integration in integrations_to_patch: + try: + SUPPORTED_INTEGRATIONS[integration]() + except Exception: + log.warning("couldn't patch %s", integration, exc_info=True) + return + @classmethod def export_span(cls, span: Optional[Span] = None) -> Optional[ExportedLLMObsSpan]: """Returns a simple representation of a span to export its span and trace IDs. diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index ac07cf1d484..815ada53461 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -45,7 +45,7 @@ class LLMObsTraceProcessor(TraceProcessor): def __init__(self, llmobs_span_writer): self._span_writer = llmobs_span_writer - self._no_apm_traces = asbool(os.getenv("DD_LLMOBS_NO_APM", False)) + self._no_apm_traces = asbool(os.getenv("DD_LLMOBS_AGENTLESS_ENABLED", False)) def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if not trace: diff --git a/tests/contrib/botocore/test_bedrock.py b/tests/contrib/botocore/test_bedrock.py index a94699813ed..d2f727f0372 100644 --- a/tests/contrib/botocore/test_bedrock.py +++ b/tests/contrib/botocore/test_bedrock.py @@ -153,6 +153,7 @@ def bedrock_client(boto3, request_vcr): ) bedrock_client = session.client("bedrock-runtime") yield bedrock_client + LLMObs.disable() @pytest.fixture @@ -487,7 +488,7 @@ def _test_llmobs_invoke(cls, provider, bedrock_client, mock_llmobs_span_writer, pin.override(bedrock_client, tracer=mock_tracer) # Need to disable and re-enable LLMObs service to use the mock tracer LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"]) if cassette_name is None: cassette_name = "%s_invoke.yaml" % provider @@ -523,7 +524,7 @@ def _test_llmobs_invoke_stream( pin.override(bedrock_client, tracer=mock_tracer) # Need to disable and re-enable LLMObs service to use the mock tracer LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"]) if cassette_name is None: cassette_name = "%s_invoke_stream.yaml" % provider @@ -623,7 +624,7 @@ def test_llmobs_error(self, ddtrace_global_config, bedrock_client, mock_llmobs_s pin.override(bedrock_client, tracer=mock_tracer) # Need to disable and re-enable LLMObs service to use the mock tracer LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["bedrock"]) with pytest.raises(botocore.exceptions.ClientError): with request_vcr.use_cassette("meta_invoke_error.yaml"): body, model = json.dumps(_REQUEST_BODIES["meta"]), _MODELS["meta"] diff --git a/tests/contrib/langchain/conftest.py b/tests/contrib/langchain/conftest.py index d0df10ec929..c6595e295c0 100644 --- a/tests/contrib/langchain/conftest.py +++ b/tests/contrib/langchain/conftest.py @@ -6,6 +6,7 @@ from ddtrace import Pin from ddtrace.contrib.langchain.patch import patch from ddtrace.contrib.langchain.patch import unpatch +from ddtrace.llmobs import LLMObs from tests.utils import DummyTracer from tests.utils import DummyWriter from tests.utils import override_config @@ -86,6 +87,7 @@ def mock_llmobs_span_writer(): yield m finally: patcher.stop() + LLMObs.disable() @pytest.fixture diff --git a/tests/contrib/langchain/test_langchain.py b/tests/contrib/langchain/test_langchain.py index 3597c2efbee..43d1e6153fd 100644 --- a/tests/contrib/langchain/test_langchain.py +++ b/tests/contrib/langchain/test_langchain.py @@ -1350,7 +1350,7 @@ def _test_llmobs_llm_invoke( different_py39_cassette=False, ): LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"]) if sys.version_info < (3, 10, 0) and different_py39_cassette: cassette_name = cassette_name.replace(".yaml", "_39.yaml") @@ -1386,7 +1386,7 @@ def _test_llmobs_chain_invoke( ): # disable the service before re-enabling it, as it was enabled in another test LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"]) if sys.version_info < (3, 10, 0) and different_py39_cassette: cassette_name = cassette_name.replace(".yaml", "_39.yaml") diff --git a/tests/contrib/langchain/test_langchain_community.py b/tests/contrib/langchain/test_langchain_community.py index e1a59a68703..929e53d6e68 100644 --- a/tests/contrib/langchain/test_langchain_community.py +++ b/tests/contrib/langchain/test_langchain_community.py @@ -1337,7 +1337,7 @@ def _test_llmobs_llm_invoke( output_role=None, ): LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"]) with request_vcr.use_cassette(cassette_name): generate_trace("Can you explain what an LLM chain is?") @@ -1370,7 +1370,7 @@ def _test_llmobs_chain_invoke( ): # disable the service before re-enabling it, as it was enabled in another test LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["langchain"]) with request_vcr.use_cassette(cassette_name): generate_trace("Can you explain what an LLM chain is?") diff --git a/tests/contrib/openai/conftest.py b/tests/contrib/openai/conftest.py index 542c1568236..f07e64787fa 100644 --- a/tests/contrib/openai/conftest.py +++ b/tests/contrib/openai/conftest.py @@ -191,9 +191,10 @@ def mock_tracer(ddtrace_global_config, openai, patch_openai, mock_logs, mock_met if ddtrace_global_config.get("_llmobs_enabled", False): # Have to disable and re-enable LLMObs to use to mock tracer. LLMObs.disable() - LLMObs.enable(tracer=mock_tracer) + LLMObs.enable(_tracer=mock_tracer, integrations=["openai"]) yield mock_tracer mock_logs.reset_mock() mock_metrics.reset_mock() + LLMObs.disable() diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index a0bc2daaec2..4287be56ec9 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -69,6 +69,6 @@ def LLMObs(mock_llmobs_span_writer, mock_llmobs_eval_metric_writer, ddtrace_glob global_config.update(ddtrace_global_config) with override_global_config(global_config): dummy_tracer = DummyTracer() - llmobs_service.enable(tracer=dummy_tracer) + llmobs_service.enable(_tracer=dummy_tracer) yield llmobs_service llmobs_service.disable() diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 74717cdf47a..75c8174edcd 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -3,6 +3,8 @@ import mock import pytest +from ddtrace._trace.span import Span +from ddtrace.ext import SpanTypes from ddtrace.llmobs import LLMObs as llmobs_service from ddtrace.llmobs._constants import INPUT_DOCUMENTS from ddtrace.llmobs._constants import INPUT_MESSAGES @@ -37,22 +39,48 @@ def mock_logs(): yield mock_logs +def run_llmobs_trace_filter(dummy_tracer): + for trace_filter in dummy_tracer._filters: + if isinstance(trace_filter, LLMObsTraceProcessor): + root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) + root_llm_span.set_tag_str(SPAN_KIND, "llm") + trace1 = [root_llm_span] + return trace_filter.process_trace(trace1) + raise ValueError("LLMObsTraceProcessor not found in tracer filters.") + + def test_service_enable(): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() - llmobs_service.enable(tracer=dummy_tracer) + llmobs_service.enable(_tracer=dummy_tracer) + llmobs_instance = llmobs_service._instance + assert llmobs_instance is not None + assert llmobs_service.enabled + assert llmobs_instance.tracer == dummy_tracer + assert any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in dummy_tracer._filters) + assert run_llmobs_trace_filter(dummy_tracer) is not None + + llmobs_service.disable() + + +def test_service_enable_with_apm_disabled(monkeypatch): + with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): + dummy_tracer = DummyTracer() + llmobs_service.enable(_tracer=dummy_tracer, agentless_enabled=True) llmobs_instance = llmobs_service._instance assert llmobs_instance is not None assert llmobs_service.enabled assert llmobs_instance.tracer == dummy_tracer assert any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in dummy_tracer._filters) + assert run_llmobs_trace_filter(dummy_tracer) is None + llmobs_service.disable() def test_service_disable(): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() - llmobs_service.enable(tracer=dummy_tracer) + llmobs_service.enable(_tracer=dummy_tracer) llmobs_service.disable() assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" @@ -63,7 +91,7 @@ def test_service_enable_no_api_key(): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() with pytest.raises(ValueError): - llmobs_service.enable(tracer=dummy_tracer) + llmobs_service.enable(_tracer=dummy_tracer) assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" @@ -73,7 +101,7 @@ def test_service_enable_no_ml_app_specified(): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() with pytest.raises(ValueError): - llmobs_service.enable(tracer=dummy_tracer) + llmobs_service.enable(_tracer=dummy_tracer) assert llmobs_service.enabled is False assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped" assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped" @@ -82,8 +110,8 @@ def test_service_enable_no_ml_app_specified(): def test_service_enable_already_enabled(mock_logs): with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="")): dummy_tracer = DummyTracer() - llmobs_service.enable(tracer=dummy_tracer) - llmobs_service.enable(tracer=dummy_tracer) + llmobs_service.enable(_tracer=dummy_tracer) + llmobs_service.enable(_tracer=dummy_tracer) llmobs_instance = llmobs_service._instance assert llmobs_instance is not None assert llmobs_service.enabled diff --git a/tests/llmobs/test_llmobs_trace_processor.py b/tests/llmobs/test_llmobs_trace_processor.py index 85d9f0541c2..828dc547219 100644 --- a/tests/llmobs/test_llmobs_trace_processor.py +++ b/tests/llmobs/test_llmobs_trace_processor.py @@ -30,6 +30,7 @@ def mock_logs(): def test_processor_returns_all_traces_by_default(monkeypatch): + monkeypatch.delenv("DD_LLMOBS_AGENTLESS_ENABLED", raising=False) """Test that the LLMObsTraceProcessor returns all traces by default.""" trace_filter = LLMObsTraceProcessor(llmobs_span_writer=mock.MagicMock()) root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) @@ -39,8 +40,8 @@ def test_processor_returns_all_traces_by_default(monkeypatch): def test_processor_returns_all_traces_if_no_apm_env_var_is_false(monkeypatch): - """Test that the LLMObsTraceProcessor returns all traces if DD_LLMOBS_NO_APM is not set to true.""" - monkeypatch.setenv("DD_LLMOBS_NO_APM", "0") + """Test that the LLMObsTraceProcessor returns all traces if DD_LLMOBS_AGENTLESS_ENABLED is not set to true.""" + monkeypatch.setenv("DD_LLMOBS_AGENTLESS_ENABLED", "0") trace_filter = LLMObsTraceProcessor(llmobs_span_writer=mock.MagicMock()) root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) root_llm_span.set_tag_str(SPAN_KIND, "llm") @@ -49,8 +50,8 @@ def test_processor_returns_all_traces_if_no_apm_env_var_is_false(monkeypatch): def test_processor_returns_none_if_no_apm_env_var_is_true(monkeypatch): - """Test that the LLMObsTraceProcessor returns None if DD_LLMOBS_NO_APM is set to true.""" - monkeypatch.setenv("DD_LLMOBS_NO_APM", "1") + """Test that the LLMObsTraceProcessor returns None if DD_LLMOBS_AGENTLESS_ENABLED is set to true.""" + monkeypatch.setenv("DD_LLMOBS_AGENTLESS_ENABLED", "1") trace_filter = LLMObsTraceProcessor(llmobs_span_writer=mock.MagicMock()) root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) root_llm_span.set_tag_str(SPAN_KIND, "llm")