Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ telemetry_distro = "elasticotel.sdk.resources:TelemetryDistroResourceDetector"
service_instance = "elasticotel.sdk.resources:ServiceInstanceResourceDetector"
_gcp = "opentelemetry.resourcedetector.gcp_resource_detector._detector:GoogleCloudResourceDetector"

[project.entry-points.opentelemetry_traces_sampler]
experimental_composite_parentbased_traceidratio = "elasticotel.sdk.sampler:DefaultSampler"

[project.scripts]
edot-bootstrap = "elasticotel.instrumentation.bootstrap:run"

Expand Down
5 changes: 3 additions & 2 deletions src/elasticotel/distro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
ELASTIC_OTEL_SYSTEM_METRICS_ENABLED,
)
from elasticotel.distro.resource_detectors import get_cloud_resource_detectors
from elasticotel.distro.config import opamp_handler, DEFAULT_SAMPLING_RATE, _initialize_config
from elasticotel.distro.config import opamp_handler, _initialize_config, DEFAULT_SAMPLING_RATE


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -93,6 +93,7 @@ def _configure(self, **kwargs):
HTTPOTLPMetricExporter: otlp_http_exporter_options,
HTTPOTLPSpanExporter: otlp_http_exporter_options,
}

super()._configure(**kwargs)

# set our local config based on environment variables
Expand Down Expand Up @@ -171,7 +172,7 @@ def _configure(self, **kwargs):
os.environ.setdefault(OTEL_METRICS_EXEMPLAR_FILTER, "always_off")
# preference to use DELTA temporality as we can handle only this kind of Histograms
os.environ.setdefault(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "DELTA")
os.environ.setdefault(OTEL_TRACES_SAMPLER, "parentbased_traceidratio")
os.environ.setdefault(OTEL_TRACES_SAMPLER, "experimental_composite_parentbased_traceidratio")
os.environ.setdefault(OTEL_TRACES_SAMPLER_ARG, str(DEFAULT_SAMPLING_RATE))

base_resource_detectors = [
Expand Down
18 changes: 6 additions & 12 deletions src/elasticotel/distro/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dataclasses import dataclass

from elasticotel.distro.sanitization import _sanitize_headers_env_vars
from elasticotel.sdk.sampler import DefaultSampler
from opentelemetry import trace
from opentelemetry._opamp import messages
from opentelemetry._opamp.agent import OpAMPAgent
Expand All @@ -31,7 +32,6 @@
)
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
from opentelemetry.sdk.environment_variables import OTEL_LOG_LEVEL, OTEL_TRACES_SAMPLER_ARG
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -160,20 +160,14 @@ def _handle_sampling_rate(remote_config) -> ConfigUpdate:
logger.debug("Cannot get sampler from tracer provider.")
return ConfigUpdate()

# FIXME: this needs to be updated for the consistent probability samplers
if not isinstance(sampler, ParentBasedTraceIdRatio):
if not isinstance(sampler, DefaultSampler):
logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler))
return ConfigUpdate()

# since sampler is parent based we need to update its root sampler
root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue]
if root_sampler.rate != sampling_rate: # type: ignore[reportAttributeAccessIssue]
# we don't have a proper way to update it :)
root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue]
root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue]
logger.debug("Updated sampler rate to %s", sampling_rate)
if _config:
_config.sampling_rate.update(value=config_sampling_rate)
sampler.set_ratio(sampling_rate)
logger.debug("Updated sampler rate to %s", sampling_rate)
if _config:
_config.sampling_rate.update(value=config_sampling_rate)
return ConfigUpdate()


Expand Down
75 changes: 75 additions & 0 deletions src/elasticotel/sdk/sampler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import logging
from typing import Sequence

from opentelemetry.context import Context
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult
from opentelemetry.trace import Link, SpanKind, TraceState
from opentelemetry.sdk.trace._sampling_experimental import (
composite_sampler,
composable_parent_threshold,
composable_traceid_ratio_based,
)
from opentelemetry.util.types import Attributes

logger = logging.getLogger(__name__)


class DefaultSampler(Sampler):
"""The default sampler for EDOT, which is a parent-based ratio sampler with the rate
updatable from central config."""

def __init__(self, ratio_str: str):
try:
ratio = float(ratio_str)
except ValueError:
logger.warning("Invalid sampling rate '%s', defaulting to 1.0", ratio_str)
ratio = 1.0
self._delegate = _new_sampler(ratio)

def should_sample(
self,
parent_context: Context | None,
trace_id: int,
name: str,
kind: SpanKind | None = None,
attributes: Attributes | None = None,
links: Sequence[Link] | None = None,
trace_state: TraceState | None = None,
) -> SamplingResult:
return self._delegate.should_sample(
parent_context,
trace_id,
name,
kind,
attributes,
links,
trace_state,
)

def set_ratio(self, ratio: float):
self._delegate = _new_sampler(ratio)

def get_description(self) -> str:
return self._delegate.get_description()


def _new_sampler(ratio: float):
return composite_sampler(composable_parent_threshold(composable_traceid_ratio_based(ratio)))
51 changes: 40 additions & 11 deletions tests/distro/test_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from elasticotel.distro import ElasticOpenTelemetryConfigurator, ElasticOpenTelemetryDistro, logger as distro_logger
from elasticotel.distro.config import opamp_handler, logger as config_logger, Config
from elasticotel.distro.environment_variables import ELASTIC_OTEL_OPAMP_ENDPOINT, ELASTIC_OTEL_SYSTEM_METRICS_ENABLED
from elasticotel.sdk.sampler import DefaultSampler
from opentelemetry.environment_variables import (
OTEL_LOGS_EXPORTER,
OTEL_METRICS_EXPORTER,
Expand All @@ -35,11 +36,18 @@
OTEL_TRACES_SAMPLER,
OTEL_TRACES_SAMPLER_ARG,
)
from opentelemetry import trace
from opentelemetry.sdk.trace import sampling
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
from opentelemetry.util._once import Once


class TestDistribution(TestCase):
def setUp(self):
# Hackily reset global trace provider to allow tests to initialize it.
trace._TRACER_PROVIDER = None
trace._TRACER_PROVIDER_SET_ONCE = Once()

@mock.patch.dict("os.environ", {}, clear=True)
def test_default_configuration(self):
distro = ElasticOpenTelemetryDistro()
Expand All @@ -54,21 +62,39 @@ def test_default_configuration(self):
)
self.assertEqual("always_off", os.environ.get(OTEL_METRICS_EXEMPLAR_FILTER))
self.assertEqual("DELTA", os.environ.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE))
self.assertEqual("parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER))
self.assertEqual("experimental_composite_parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER))
self.assertEqual("1.0", os.environ.get(OTEL_TRACES_SAMPLER_ARG))

@mock.patch.dict("os.environ", {}, clear=True)
def test_sampler_configuration(self):
distro = ElasticOpenTelemetryDistro()
distro._configure()
parent_sampler = sampling._get_from_env_or_default()

assert isinstance(parent_sampler, sampling.ParentBasedTraceIdRatio)
ElasticOpenTelemetryDistro()._configure()
ElasticOpenTelemetryConfigurator()._configure()
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
self.assertTrue(isinstance(sampler, DefaultSampler))
self.assertIn(
"ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=0, ratio=1.0}}",
sampler.get_description(),
)

sampler = parent_sampler._root
@mock.patch.dict("os.environ", {}, clear=True)
def test_sampler_configuration_sampler_arg(self):
os.environ[OTEL_TRACES_SAMPLER_ARG] = "0.0"
ElasticOpenTelemetryDistro()._configure()
ElasticOpenTelemetryConfigurator()._configure()
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
self.assertTrue(isinstance(sampler, DefaultSampler))
self.assertIn(
"ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=max, ratio=0.0}}",
sampler.get_description(),
)

assert isinstance(sampler, sampling.TraceIdRatioBased)
assert sampler.rate == 1.0
@mock.patch.dict("os.environ", {}, clear=True)
def test_sampler_configuration_user_configured(self):
os.environ[OTEL_TRACES_SAMPLER] = "always_on"
ElasticOpenTelemetryDistro()._configure()
ElasticOpenTelemetryConfigurator()._configure()
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
self.assertTrue(isinstance(sampler, sampling._AlwaysOn))

@mock.patch.dict("os.environ", {}, clear=True)
def test_load_instrumentor_call_with_default_kwargs_for_SystemMetricsInstrumentor(self):
Expand Down Expand Up @@ -517,7 +543,7 @@ def test_warns_if_logging_level_does_not_match_our_map(self, get_logger_mock, ge
@mock.patch("opentelemetry.trace.get_tracer_provider")
def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_mock):
get_config_mock.return_value = Config()
sampler = sampling.ParentBasedTraceIdRatio(rate=1.0)
sampler = DefaultSampler(1.0)
get_tracer_provider_mock.return_value.sampler = sampler
agent = mock.Mock()
client = mock.Mock()
Expand All @@ -528,7 +554,10 @@ def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
opamp_handler(agent, client, message)

self.assertEqual(sampler._root.rate, 0.5)
self.assertIn(
"ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=8, ratio=0.5}}",
sampler.get_description(),
)

client._update_remote_config_status.assert_called_once_with(
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
Expand Down