Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
8287871
fix history
ZStriker19 Feb 20, 2024
cf4f3ee
fix botocore tests due to missing tracer pass in
ZStriker19 Feb 20, 2024
8d8efb3
sample before dbm propagation
ZStriker19 Feb 20, 2024
4d9b315
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 20, 2024
5b571e6
just import tracer into propagator rather than pass in at integration…
ZStriker19 Feb 20, 2024
a19b577
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 21, 2024
65def8b
chore(trace): handle import of ddtrace.tracer
majorgreys Feb 21, 2024
67784e9
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 22, 2024
62e80ec
Merge branch 'majorgreys/fix-ddtrace-tracer-import' into zachg/extend…
ZStriker19 Feb 22, 2024
5c41da7
change tracer import
ZStriker19 Feb 22, 2024
b0767ad
sample correctly before database propagation
ZStriker19 Feb 22, 2024
b3c9353
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 22, 2024
fbfaabd
add span into inject for httppropagator
ZStriker19 Feb 22, 2024
4bb7b17
update tags in sampler
ZStriker19 Feb 26, 2024
ae58cb7
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 26, 2024
3dcbfa2
fix dbm inject to sample before generating comment
ZStriker19 Feb 26, 2024
60789ee
fix remote config updating sampler when resetting to user original se…
ZStriker19 Feb 26, 2024
7434573
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 27, 2024
33f1777
run sampler before generating dbm comments in tests
ZStriker19 Feb 27, 2024
d7f0f8d
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 27, 2024
b087291
fix otel tests
ZStriker19 Feb 27, 2024
6629dac
fix merge conflict
ZStriker19 Feb 27, 2024
840564b
fix tests after merge of .sampled change and fix otel context test
ZStriker19 Feb 27, 2024
7dfc689
run sampler before forking in integration-snapshot tests
ZStriker19 Feb 27, 2024
b38d646
fix logic for openai checking if sampled
ZStriker19 Feb 27, 2024
7e873ba
fix other integration_snapshot test by sampling before forking
ZStriker19 Feb 27, 2024
1d70f3a
if HTTPPropagator.inject called without span, get current root span t…
ZStriker19 Feb 28, 2024
4350138
use setter for tracer._sampler to always update the processor that us…
ZStriker19 Feb 28, 2024
4567753
sample before forking
ZStriker19 Feb 28, 2024
299b77e
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Feb 28, 2024
f2beeea
add before fork sampling
ZStriker19 Mar 1, 2024
efcf424
manually sample before multiprocessing.Process tests
ZStriker19 Mar 1, 2024
bfc0c63
check if there is an active span before calling sample for fork
ZStriker19 Mar 1, 2024
bba6b97
make sure we check that sampling has not already run, before trying t…
ZStriker19 Mar 1, 2024
570acc3
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 1, 2024
fab0b27
check that context.sampling_priority is None, not just is
ZStriker19 Mar 1, 2024
d547f54
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 4, 2024
05d819a
update propagation tests where sampling runs due to inject now
ZStriker19 Mar 4, 2024
b4d624c
fix merge conflic in forksafe
ZStriker19 Mar 4, 2024
0a7ec87
default to span being sampled when sampling has yet to run for appsec
ZStriker19 Mar 4, 2024
7799193
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 4, 2024
ccc56fd
first run at rn and docs updates
ZStriker19 Mar 4, 2024
f061878
change popen snapshot and test names, add flakey fork test
ZStriker19 Mar 4, 2024
568ad31
clean up
ZStriker19 Mar 4, 2024
1be0fb8
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 4, 2024
e24e9ba
add to rn
ZStriker19 Mar 4, 2024
68e32c8
fix merge conflict
ZStriker19 Mar 7, 2024
d762f74
fix merge conflict
ZStriker19 Mar 7, 2024
b2b02cd
fix merge conflict now that tracer.sampler is public
ZStriker19 Mar 8, 2024
6090cea
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 18, 2024
af8e17e
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 18, 2024
3807fcb
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 18, 2024
f85789a
fix merge conflict
ZStriker19 Mar 18, 2024
c0ee71e
decrease flakiness of test_priority_sampling tets
ZStriker19 Mar 18, 2024
0e4881b
decrease flakiness of test_settings tests
ZStriker19 Mar 18, 2024
f4998d5
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 18, 2024
40cd8b2
add wrong_metric_types snapshot that must've been missed in merge con…
ZStriker19 Mar 18, 2024
0b43db8
fix flakey extended sampling snapshot test
ZStriker19 Mar 18, 2024
a311d0b
remove flakey snapshot directly calling fork after realizing pOpen te…
ZStriker19 Mar 19, 2024
b07e402
decrease flakiness of propagation multispan test
ZStriker19 Mar 19, 2024
2cfc44e
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 19, 2024
7b13543
Merge branch 'main' into zachg/test_lazy_against_updated_system-tests
ZStriker19 Mar 19, 2024
8d563c5
Update releasenotes/notes/lazy_sampling-93057adeaccbb46f.yaml
ZStriker19 Mar 19, 2024
5e824a0
Update ddtrace/propagation/http.py
ZStriker19 Mar 19, 2024
94534c9
Update docs/advanced_usage.rst
ZStriker19 Mar 19, 2024
f5e0163
Update ddtrace/contrib/elasticsearch/patch.py
ZStriker19 Mar 19, 2024
c916d22
Update ddtrace/contrib/algoliasearch/patch.py
ZStriker19 Mar 19, 2024
b4e4106
Update docs/advanced_usage.rst
ZStriker19 Mar 19, 2024
bff056a
Update ddtrace/_trace/processor/__init__.py
ZStriker19 Mar 19, 2024
3164297
update before fork naming
ZStriker19 Mar 19, 2024
ebfc350
Update ddtrace/propagation/http.py
ZStriker19 Mar 19, 2024
6100df4
simplify telemetry config items return
ZStriker19 Mar 19, 2024
eb7fe91
remove uneeded method to update tags in processor, update tags in sam…
ZStriker19 Mar 19, 2024
dfeb2cb
update system-tests ref
ZStriker19 Mar 19, 2024
a80df8b
Update ddtrace/_trace/tracer.py
ZStriker19 Mar 19, 2024
bd07f2d
nit
ZStriker19 Mar 19, 2024
4838639
change otel propagation test to test automatic sampling and injection
ZStriker19 Mar 20, 2024
9628a9c
merge tracer.sample PR and remove span from inject signature
ZStriker19 Mar 20, 2024
1fc48f3
clean up
ZStriker19 Mar 20, 2024
070ee9b
add test for changing tracer's sampler also changes the processor's s…
ZStriker19 Mar 20, 2024
50ff37d
clean up
ZStriker19 Mar 20, 2024
4827e90
Merge branch 'zachg/extended_sampling_api' into zachg/test_lazy_again…
ZStriker19 Mar 20, 2024
ebb1e22
refactor forksafe hook running code
ZStriker19 Mar 20, 2024
416b2c0
Merge branch 'zachg/extended_sampling_api' into zachg/test_lazy_again…
ZStriker19 Mar 21, 2024
0e6cd3e
Merge branch 'main' into zachg/test_lazy_against_updated_system-tests
ZStriker19 Mar 21, 2024
15328e0
more forksafe refactoring
ZStriker19 Mar 21, 2024
1da9cc7
more forksafe refactoring
ZStriker19 Mar 21, 2024
2df6f37
add span arg back to inject method
ZStriker19 Mar 21, 2024
d9d1a4b
Merge branch 'zachg/extended_sampling_api' into zachg/test_lazy_again…
ZStriker19 Mar 21, 2024
24b05fc
refactor span sampling span processor into tracesampling processor so…
ZStriker19 Mar 21, 2024
f024603
add logic for picking root_span in inject method
ZStriker19 Mar 22, 2024
72b33ba
reset system-tests run to target main
ZStriker19 Mar 22, 2024
312b7d0
switch out spansampling processor for tracesamplingprocessor in tests
ZStriker19 Mar 22, 2024
d361632
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 22, 2024
ae5d4dd
cover Munir's nits and comments
ZStriker19 Mar 24, 2024
3ccd8ae
add special casing for floats that have non-zero decimal
ZStriker19 Mar 24, 2024
689c593
add special float case tests
ZStriker19 Mar 24, 2024
1419524
fix forksafe for profiling by copying list
ZStriker19 Mar 25, 2024
8bee0a8
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 25, 2024
3659dab
Update ddtrace/propagation/http.py
ZStriker19 Mar 25, 2024
056a146
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 25, 2024
860f4ee
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 25, 2024
16ee92a
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 25, 2024
98994f1
Update docs/advanced_usage.rst
ZStriker19 Mar 25, 2024
6797470
Merge branch 'main' into zachg/extended_sampling_api
ZStriker19 Mar 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 31 additions & 39 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,55 @@ def unregister(self):

@attr.s
class TraceSamplingProcessor(TraceProcessor):
"""Processor that keeps traces that have sampled spans. If all spans
are unsampled then ``None`` is returned.
"""Processor that runs both trace and span sampling rules.

Note that this processor is only effective if complete traces are sent. If
the spans of a trace are divided in separate lists then it's possible that
parts of the trace are unsampled when the whole trace should be sampled.
* Span sampling must be applied after trace sampling priority has been set.
* Span sampling rules are specified with a sample rate or rate limit as well as glob patterns
for matching spans on service and name.
* If the span sampling decision is to keep the span, then span sampling metrics are added to the span.
* If a dropped trace includes a span that had been kept by a span sampling rule, then the span is sent to the
Agent even if the dropped trace is not (as is the case when trace stats computation is enabled).
"""

_compute_stats_enabled = attr.ib(type=bool)
sampler = attr.ib()
single_span_rules = attr.ib(type=List[SpanSamplingRule])

def process_trace(self, trace):
# type: (List[Span]) -> Optional[List[Span]]

if trace:
chunk_root = trace[0]
root_ctx = chunk_root._context

# only trace sample if we haven't already sampled
if root_ctx and root_ctx.sampling_priority is None:
self.sampler.sample(trace[0])
# When stats computation is enabled in the tracer then we can
# safely drop the traces.
if self._compute_stats_enabled:
priority = trace[0]._context.sampling_priority if trace[0]._context is not None else None
priority = root_ctx.sampling_priority if root_ctx is not None else None
if priority is not None and priority <= 0:
# When any span is marked as keep by a single span sampling
# decision then we still send all and only those spans.
single_spans = [_ for _ in trace if is_single_span_sampled(_)]

return single_spans or None

# single span sampling rules are applied after trace sampling
if self.single_span_rules:
for span in trace:
if span.context.sampling_priority is not None and span.context.sampling_priority <= 0:
for rule in self.single_span_rules:
if rule.match(span):
rule.sample(span)
# If stats computation is enabled, we won't send all spans to the agent.
# In order to ensure that the agent does not update priority sampling rates
# due to single spans sampling, we set all of these spans to manual keep.
if config._trace_compute_stats:
span.set_metric(SAMPLING_PRIORITY_KEY, USER_KEEP)
break

return trace

log.debug("dropping trace %d with %d spans", trace[0].trace_id, len(trace))
Expand Down Expand Up @@ -360,36 +385,3 @@ def _queue_span_count_metrics(self, metric_name, tag_name, min_count=100):
TELEMETRY_NAMESPACE_TAG_TRACER, metric_name, count, tags=((tag_name, tag_value),)
)
self._span_metrics[metric_name] = defaultdict(int)


@attr.s
class SpanSamplingProcessor(SpanProcessor):
"""SpanProcessor for sampling single spans:

* Span sampling must be applied after trace sampling priority has been set.
* Span sampling rules are specified with a sample rate or rate limit as well as glob patterns
for matching spans on service and name.
* If the span sampling decision is to keep the span, then span sampling metrics are added to the span.
* If a dropped trace includes a span that had been kept by a span sampling rule, then the span is sent to the
Agent even if the dropped trace is not (as is the case when trace stats computation is enabled).
"""

rules = attr.ib(type=List[SpanSamplingRule])

def on_span_start(self, span):
# type: (Span) -> None
pass

def on_span_finish(self, span):
# type: (Span) -> None
# only sample if the span isn't already going to be sampled by trace sampler
if span.context.sampling_priority is not None and span.context.sampling_priority <= 0:
for rule in self.rules:
if rule.match(span):
rule.sample(span)
# If stats computation is enabled, we won't send all spans to the agent.
# In order to ensure that the agent does not update priority sampling rates
# due to single spans sampling, we set all of these spans to manual keep.
if config._trace_compute_stats:
span.set_metric(SAMPLING_PRIORITY_KEY, USER_KEEP)
break
50 changes: 40 additions & 10 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from ddtrace._trace.context import Context
from ddtrace._trace.processor import SpanAggregator
from ddtrace._trace.processor import SpanProcessor
from ddtrace._trace.processor import SpanSamplingProcessor
from ddtrace._trace.processor import TopLevelSpanProcessor
from ddtrace._trace.processor import TraceProcessor
from ddtrace._trace.processor import TraceSamplingProcessor
Expand Down Expand Up @@ -114,13 +113,18 @@ def _default_span_processors_factory(
compute_stats_enabled: bool,
single_span_sampling_rules: List[SpanSamplingRule],
agent_url: str,
trace_sampler: BaseSampler,
profiling_span_processor: EndpointCallCounterProcessor,
) -> Tuple[List[SpanProcessor], Optional[Any], List[SpanProcessor]]:
# FIXME: type should be AppsecSpanProcessor but we have a cyclic import here
"""Construct the default list of span processors to use."""
trace_processors: List[TraceProcessor] = []
trace_processors += [TraceTagsProcessor(), PeerServiceProcessor(_ps_config), BaseServiceProcessor()]
trace_processors += [TraceSamplingProcessor(compute_stats_enabled)]
trace_processors += [
PeerServiceProcessor(_ps_config),
BaseServiceProcessor(),
TraceSamplingProcessor(compute_stats_enabled, trace_sampler, single_span_sampling_rules),
TraceTagsProcessor(),
]
trace_processors += trace_filters

span_processors: List[SpanProcessor] = []
Expand Down Expand Up @@ -167,9 +171,6 @@ def _default_span_processors_factory(

span_processors.append(profiling_span_processor)

if single_span_sampling_rules:
span_processors.append(SpanSamplingProcessor(single_span_sampling_rules))

# These need to run after all the other processors
deferred_processors: List[SpanProcessor] = [
SpanAggregator(
Expand Down Expand Up @@ -266,6 +267,7 @@ def __init__(
self._compute_stats,
self._single_span_sampling_rules,
self._agent_url,
self._sampler,
self._endpoint_call_counter_span_processor,
)
if config._data_streams_enabled:
Expand All @@ -278,6 +280,7 @@ def __init__(

self._hooks = _hooks.Hooks()
atexit.register(self._atexit)
forksafe.register_before_fork(self._sample_before_fork)
forksafe.register(self._child_after_fork)

self._shutdown_lock = RLock()
Expand All @@ -298,7 +301,10 @@ def _atexit(self) -> None:
self.shutdown(timeout=self.SHUTDOWN_TIMEOUT)

def sample(self, span):
self._sampler.sample(span)
if self._sampler is not None:
self._sampler.sample(span)
else:
log.error("No sampler available to sample span")

@property
def sampler(self):
Expand Down Expand Up @@ -341,6 +347,29 @@ def deregister_on_start_span(self, func: Callable) -> Callable:
self._hooks.deregister(self.__class__.start_span, func)
return func

def _sample_before_fork(self) -> None:
span = self.current_root_span()
if span is not None and span.context.sampling_priority is None:
self.sample(span)

@property
def _sampler(self):
return self._sampler_current

@_sampler.setter
def _sampler(self, value):
self._sampler_current = value
# we need to update the processor that uses the sampler
if getattr(self, "_deferred_processors", None):
for aggregator in self._deferred_processors:
if type(aggregator) == SpanAggregator:
for processor in aggregator._trace_processors:
if type(processor) == TraceSamplingProcessor:
processor.sampler = value
break
else:
log.debug("No TraceSamplingProcessor available to update sampling rate")

@property
def debug_logging(self):
return log.isEnabledFor(logging.DEBUG)
Expand Down Expand Up @@ -525,6 +554,7 @@ def configure(
self._compute_stats,
self._single_span_sampling_rules,
self._agent_url,
self._sampler,
self._endpoint_call_counter_span_processor,
)

Expand Down Expand Up @@ -590,6 +620,7 @@ def _child_after_fork(self):
self._compute_stats,
self._single_span_sampling_rules,
self._agent_url,
self._sampler,
self._endpoint_call_counter_span_processor,
)

Expand Down Expand Up @@ -775,9 +806,6 @@ def _start_span(
if service and service not in self._services and self._is_span_internal(span):
self._services.add(service)

if not trace_id:
self.sample(span)

# Only call span processors if the tracer is enabled
if self.enabled:
for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors):
Expand Down Expand Up @@ -1042,6 +1070,7 @@ def shutdown(self, timeout: Optional[float] = None) -> None:

atexit.unregister(self._atexit)
forksafe.unregister(self._child_after_fork)
forksafe.unregister_before_fork(self._sample_before_fork)

self.start_span = self._start_span_after_shutdown # type: ignore[assignment]

Expand Down Expand Up @@ -1103,6 +1132,7 @@ def _on_global_config_update(self, cfg, items):
sample_rate = cfg._trace_sample_rate
else:
sample_rate = None

sampler = DatadogSampler(default_sample_rate=sample_rate)
self._sampler = sampler

Expand Down
6 changes: 5 additions & 1 deletion ddtrace/appsec/_api_security/api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ def _schema_callback(self, env):

try:
# check both current span and root span for sampling priority
# if sampling has not yet run for the span, we default to treating it as sampled
if root.context.sampling_priority is None and env.span.context.sampling_priority is None:
priorities = (1,)
else:
priorities = (root.context.sampling_priority or 0, env.span.context.sampling_priority or 0)
# if any of them is set to USER_KEEP or USER_REJECT, we should respect it
priorities = (root.context.sampling_priority or 0, env.span.context.sampling_priority or 0)
if constants.USER_KEEP in priorities:
priority = constants.USER_KEEP
elif constants.USER_REJECT in priorities:
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/contrib/algoliasearch/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _patched_search(func, instance, wrapt_args, wrapt_kwargs):
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)

span.set_tag(SPAN_MEASURED_KEY)
if span.context.sampling_priority is None or span.context.sampling_priority <= 0:
if span.context.sampling_priority is not None and span.context.sampling_priority <= 0:
return func(*wrapt_args, **wrapt_kwargs)

if config.algoliasearch.collect_query_text:
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/contrib/elasticsearch/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def _perform_request(func, instance, args, kwargs):
span.set_tag(SPAN_MEASURED_KEY)

# Only instrument if trace is sampled or if we haven't tried to sample yet
if span.context.sampling_priority is None or span.context.sampling_priority <= 0:
if span.context.sampling_priority is not None and span.context.sampling_priority <= 0:
yield func(*args, **kwargs)
return

Expand Down
41 changes: 24 additions & 17 deletions ddtrace/internal/forksafe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
An API to provide fork-safe functions.
"""
import functools
import logging
import os
import threading
Expand All @@ -14,6 +15,7 @@


_registry = [] # type: typing.List[typing.Callable[[], None]]
_registry_before_fork = [] # type: typing.List[typing.Callable[[], None]]

# Some integrations might require after-fork hooks to be executed after the
# actual call to os.fork with earlier versions of Python (<= 3.6), else issues
Expand All @@ -36,29 +38,27 @@ def has_forked():
return _forked


def ddtrace_after_in_child():
# type: () -> None
global _registry

# DEV: we make a copy of the registry to prevent hook execution from
# introducing new hooks, potentially causing an infinite loop.
for hook in list(_registry):
def run_hooks(registry):
# type: (typing.List[typing.Callable[[], None]]) -> None
for hook in list(registry):
try:
hook()
except Exception:
# Mimic the behaviour of Python's fork hooks.
log.exception("Exception ignored in forksafe hook %r", hook)


def register(after_in_child):
# type: (typing.Callable[[], None]) -> typing.Callable[[], None]
"""Register a function to be called after fork in the child process.
ddtrace_before_fork = functools.partial(run_hooks, _registry_before_fork)
ddtrace_after_in_child = functools.partial(run_hooks, _registry)

Note that ``after_in_child`` will be called in all child processes across
multiple forks unless it is unregistered.
"""
_registry.append(after_in_child)
return after_in_child

def register_hook(registry, hook):
registry.append(hook)
return hook


register_before_fork = functools.partial(register_hook, _registry_before_fork)
register = functools.partial(register_hook, _registry)


def unregister(after_in_child):
Expand All @@ -69,9 +69,16 @@ def unregister(after_in_child):
log.info("after_in_child hook %s was unregistered without first being registered", after_in_child.__name__)


# should always be true on unix systems with Python 3.7+. This check is for if we're on Windows
def unregister_before_fork(before_fork):
# type: (typing.Callable[[], None]) -> None
try:
_registry_before_fork.remove(before_fork)
except ValueError:
log.info("before_in_child hook %s was unregistered without first being registered", before_fork.__name__)


if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=ddtrace_after_in_child, after_in_parent=set_forked)
os.register_at_fork(before=ddtrace_before_fork, after_in_child=ddtrace_after_in_child, after_in_parent=set_forked)

_resetable_objects = weakref.WeakSet() # type: weakref.WeakSet[ResetObject]

Expand Down
12 changes: 8 additions & 4 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ def llmobs_enabled(self) -> bool:
return config._llmobs_enabled

def is_pc_sampled_span(self, span: Span) -> bool:
if span.context.sampling_priority is None or span.context.sampling_priority <= 0:
return False
if span.context.sampling_priority is not None:
if span.context.sampling_priority <= 0:
return False
return self._span_pc_sampler.sample(span)

def is_pc_sampled_log(self, span: Span) -> bool:
sampled = span.context.sampling_priority is not None or span.context.sampling_priority <= 0 # type: ignore
if not self.logs_enabled or not sampled:
if span.context.sampling_priority is not None:
if span.context.sampling_priority <= 0:
return False

if not self.logs_enabled:
return False
return self._log_pc_sampler.sample(span)

Expand Down
8 changes: 8 additions & 0 deletions ddtrace/propagation/_database_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import TYPE_CHECKING # noqa:F401
from typing import Union # noqa:F401

import ddtrace
from ddtrace.internal.logger import get_logger
from ddtrace.settings.peer_service import PeerServiceConfig
from ddtrace.vendor.sqlcommenter import generate_sql_comment as _generate_sql_comment
Expand Down Expand Up @@ -62,6 +63,13 @@ def __init__(
self.peer_db_name_tag = peer_db_name_tag

def inject(self, dbspan, args, kwargs):
# run sampling before injection to propagate correct sampling priority
if hasattr(ddtrace, "tracer") and hasattr(ddtrace.tracer, "sample"):
if dbspan.context.sampling_priority is None:
ddtrace.tracer.sample(dbspan._local_root)
else:
log.error("ddtrace.tracer.sample is not available, unable to sample span.")

dbm_comment = self._get_dbm_comment(dbspan)
if dbm_comment is None:
# injection_mode is disabled
Expand Down
Loading