diff --git a/pyproject.toml b/pyproject.toml index f71c06a..95c5d00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,9 @@ telemetry_distro = "elasticotel.sdk.resources:TelemetryDistroResourceDetector" service_instance = "elasticotel.sdk.resources:ServiceInstanceResourceDetector" _gcp = "opentelemetry.resourcedetector.gcp_resource_detector._detector:GoogleCloudResourceDetector" +[project.entry-points.opentelemetry_traces_sampler] +experimental_composite_parentbased_traceidratio = "elasticotel.sdk.sampler:DefaultSampler" + [project.scripts] edot-bootstrap = "elasticotel.instrumentation.bootstrap:run" diff --git a/src/elasticotel/distro/__init__.py b/src/elasticotel/distro/__init__.py index 44162e0..76e0503 100644 --- a/src/elasticotel/distro/__init__.py +++ b/src/elasticotel/distro/__init__.py @@ -62,7 +62,7 @@ ELASTIC_OTEL_SYSTEM_METRICS_ENABLED, ) from elasticotel.distro.resource_detectors import get_cloud_resource_detectors -from elasticotel.distro.config import opamp_handler, DEFAULT_SAMPLING_RATE, _initialize_config +from elasticotel.distro.config import opamp_handler, _initialize_config, DEFAULT_SAMPLING_RATE logger = logging.getLogger(__name__) @@ -93,6 +93,7 @@ def _configure(self, **kwargs): HTTPOTLPMetricExporter: otlp_http_exporter_options, HTTPOTLPSpanExporter: otlp_http_exporter_options, } + super()._configure(**kwargs) # set our local config based on environment variables @@ -171,7 +172,7 @@ def _configure(self, **kwargs): os.environ.setdefault(OTEL_METRICS_EXEMPLAR_FILTER, "always_off") # preference to use DELTA temporality as we can handle only this kind of Histograms os.environ.setdefault(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "DELTA") - os.environ.setdefault(OTEL_TRACES_SAMPLER, "parentbased_traceidratio") + os.environ.setdefault(OTEL_TRACES_SAMPLER, "experimental_composite_parentbased_traceidratio") os.environ.setdefault(OTEL_TRACES_SAMPLER_ARG, str(DEFAULT_SAMPLING_RATE)) base_resource_detectors = [ diff --git a/src/elasticotel/distro/config.py b/src/elasticotel/distro/config.py index 28bc61d..21a99c3 100644 --- a/src/elasticotel/distro/config.py +++ b/src/elasticotel/distro/config.py @@ -21,6 +21,7 @@ from dataclasses import dataclass from elasticotel.distro.sanitization import _sanitize_headers_env_vars +from elasticotel.sdk.sampler import DefaultSampler from opentelemetry import trace from opentelemetry._opamp import messages from opentelemetry._opamp.agent import OpAMPAgent @@ -31,7 +32,6 @@ ) from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2 from opentelemetry.sdk.environment_variables import OTEL_LOG_LEVEL, OTEL_TRACES_SAMPLER_ARG -from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio logger = logging.getLogger(__name__) @@ -160,20 +160,14 @@ def _handle_sampling_rate(remote_config) -> ConfigUpdate: logger.debug("Cannot get sampler from tracer provider.") return ConfigUpdate() - # FIXME: this needs to be updated for the consistent probability samplers - if not isinstance(sampler, ParentBasedTraceIdRatio): + if not isinstance(sampler, DefaultSampler): logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler)) return ConfigUpdate() - # since sampler is parent based we need to update its root sampler - root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue] - if root_sampler.rate != sampling_rate: # type: ignore[reportAttributeAccessIssue] - # we don't have a proper way to update it :) - root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue] - root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue] - logger.debug("Updated sampler rate to %s", sampling_rate) - if _config: - _config.sampling_rate.update(value=config_sampling_rate) + sampler.set_ratio(sampling_rate) + logger.debug("Updated sampler rate to %s", sampling_rate) + if _config: + _config.sampling_rate.update(value=config_sampling_rate) return ConfigUpdate() diff --git a/src/elasticotel/sdk/sampler/__init__.py b/src/elasticotel/sdk/sampler/__init__.py new file mode 100644 index 0000000..ef055b2 --- /dev/null +++ b/src/elasticotel/sdk/sampler/__init__.py @@ -0,0 +1,75 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +from typing import Sequence + +from opentelemetry.context import Context +from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult +from opentelemetry.trace import Link, SpanKind, TraceState +from opentelemetry.sdk.trace._sampling_experimental import ( + composite_sampler, + composable_parent_threshold, + composable_traceid_ratio_based, +) +from opentelemetry.util.types import Attributes + +logger = logging.getLogger(__name__) + + +class DefaultSampler(Sampler): + """The default sampler for EDOT, which is a parent-based ratio sampler with the rate + updatable from central config.""" + + def __init__(self, ratio_str: str): + try: + ratio = float(ratio_str) + except ValueError: + logger.warning("Invalid sampling rate '%s', defaulting to 1.0", ratio_str) + ratio = 1.0 + self._delegate = _new_sampler(ratio) + + def should_sample( + self, + parent_context: Context | None, + trace_id: int, + name: str, + kind: SpanKind | None = None, + attributes: Attributes | None = None, + links: Sequence[Link] | None = None, + trace_state: TraceState | None = None, + ) -> SamplingResult: + return self._delegate.should_sample( + parent_context, + trace_id, + name, + kind, + attributes, + links, + trace_state, + ) + + def set_ratio(self, ratio: float): + self._delegate = _new_sampler(ratio) + + def get_description(self) -> str: + return self._delegate.get_description() + + +def _new_sampler(ratio: float): + return composite_sampler(composable_parent_threshold(composable_traceid_ratio_based(ratio))) diff --git a/tests/distro/test_distro.py b/tests/distro/test_distro.py index 53f3fed..58250da 100644 --- a/tests/distro/test_distro.py +++ b/tests/distro/test_distro.py @@ -22,6 +22,7 @@ from elasticotel.distro import ElasticOpenTelemetryConfigurator, ElasticOpenTelemetryDistro, logger as distro_logger from elasticotel.distro.config import opamp_handler, logger as config_logger, Config from elasticotel.distro.environment_variables import ELASTIC_OTEL_OPAMP_ENDPOINT, ELASTIC_OTEL_SYSTEM_METRICS_ENABLED +from elasticotel.sdk.sampler import DefaultSampler from opentelemetry.environment_variables import ( OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, @@ -35,11 +36,18 @@ OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG, ) +from opentelemetry import trace from opentelemetry.sdk.trace import sampling from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2 +from opentelemetry.util._once import Once class TestDistribution(TestCase): + def setUp(self): + # Hackily reset global trace provider to allow tests to initialize it. + trace._TRACER_PROVIDER = None + trace._TRACER_PROVIDER_SET_ONCE = Once() + @mock.patch.dict("os.environ", {}, clear=True) def test_default_configuration(self): distro = ElasticOpenTelemetryDistro() @@ -54,21 +62,39 @@ def test_default_configuration(self): ) self.assertEqual("always_off", os.environ.get(OTEL_METRICS_EXEMPLAR_FILTER)) self.assertEqual("DELTA", os.environ.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE)) - self.assertEqual("parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER)) + self.assertEqual("experimental_composite_parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER)) self.assertEqual("1.0", os.environ.get(OTEL_TRACES_SAMPLER_ARG)) @mock.patch.dict("os.environ", {}, clear=True) def test_sampler_configuration(self): - distro = ElasticOpenTelemetryDistro() - distro._configure() - parent_sampler = sampling._get_from_env_or_default() - - assert isinstance(parent_sampler, sampling.ParentBasedTraceIdRatio) + ElasticOpenTelemetryDistro()._configure() + ElasticOpenTelemetryConfigurator()._configure() + sampler = getattr(trace.get_tracer_provider(), "sampler", None) + self.assertTrue(isinstance(sampler, DefaultSampler)) + self.assertIn( + "ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=0, ratio=1.0}}", + sampler.get_description(), + ) - sampler = parent_sampler._root + @mock.patch.dict("os.environ", {}, clear=True) + def test_sampler_configuration_sampler_arg(self): + os.environ[OTEL_TRACES_SAMPLER_ARG] = "0.0" + ElasticOpenTelemetryDistro()._configure() + ElasticOpenTelemetryConfigurator()._configure() + sampler = getattr(trace.get_tracer_provider(), "sampler", None) + self.assertTrue(isinstance(sampler, DefaultSampler)) + self.assertIn( + "ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=max, ratio=0.0}}", + sampler.get_description(), + ) - assert isinstance(sampler, sampling.TraceIdRatioBased) - assert sampler.rate == 1.0 + @mock.patch.dict("os.environ", {}, clear=True) + def test_sampler_configuration_user_configured(self): + os.environ[OTEL_TRACES_SAMPLER] = "always_on" + ElasticOpenTelemetryDistro()._configure() + ElasticOpenTelemetryConfigurator()._configure() + sampler = getattr(trace.get_tracer_provider(), "sampler", None) + self.assertTrue(isinstance(sampler, sampling._AlwaysOn)) @mock.patch.dict("os.environ", {}, clear=True) def test_load_instrumentor_call_with_default_kwargs_for_SystemMetricsInstrumentor(self): @@ -517,7 +543,7 @@ def test_warns_if_logging_level_does_not_match_our_map(self, get_logger_mock, ge @mock.patch("opentelemetry.trace.get_tracer_provider") def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_mock): get_config_mock.return_value = Config() - sampler = sampling.ParentBasedTraceIdRatio(rate=1.0) + sampler = DefaultSampler(1.0) get_tracer_provider_mock.return_value.sampler = sampler agent = mock.Mock() client = mock.Mock() @@ -528,7 +554,10 @@ def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_ message = opamp_pb2.ServerToAgent(remote_config=remote_config) opamp_handler(agent, client, message) - self.assertEqual(sampler._root.rate, 0.5) + self.assertIn( + "ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=8, ratio=0.5}}", + sampler.get_description(), + ) client._update_remote_config_status.assert_called_once_with( remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""