Skip to content

Commit

Permalink
fix(tracing): do not raise exception if partial flush is triggered wi…
Browse files Browse the repository at this point in the history
…thout any spans (#9349)

Adds a guard against `on_span_finish()` with partial flushing on running
into an `IndexError` because there are no spans to flush (which may
happen if `tracer.configure()` was called between the time a span was
created and the time it was finished).

In practice, this turns into:
```
>>> import ddtrace
>>> with ddtrace.tracer.trace("regression"):
...     ddtrace.tracer.configure(partial_flush_min_spans=1)
...
Partial flush triggered but no spans to flush (was tracer reconfigured?)
```

This also refactors the test for our `os.fork()` wrapper to have the
child process unpatch `coverage` (just in case, since it occasionally
causes exceptions on exit) and exit cleanly (otherwise it would continue
running other tests which is not what we want).

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Brett Langdon <brett.langdon@datadoghq.com>
Co-authored-by: Federico Mon <federico.mon@datadoghq.com>
Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com>
(cherry picked from commit fffab01)
  • Loading branch information
romainkomorndatadog authored and github-actions[bot] committed Jun 18, 2024
1 parent faa3ce9 commit d05583e
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 12 deletions.
35 changes: 28 additions & 7 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ class _Trace(object):
type=Dict[str, DefaultDict],
)

def on_span_start(self, span):
# type: (Span) -> None
def on_span_start(self, span: Span) -> None:
with self._lock:
trace = self._traces[span.trace_id]
trace.spans.append(span)
Expand All @@ -304,6 +303,17 @@ def on_span_finish(self, span):
# type: (Span) -> None
with self._lock:
self._span_metrics["spans_finished"][span._span_api] += 1

# Calling finish on a span that we did not see the start for
# DEV: This can occur if the SpanAggregator is recreated while there is a span in progress
# e.g. `tracer.configure()` is called after starting a span
if span.trace_id not in self._traces:
log_msg = "finished span not connected to a trace"
if config._telemetry_enabled:
telemetry.telemetry_writer.add_log("ERROR", log_msg)
log.debug("%s: %s", log_msg, span)
return

trace = self._traces[span.trace_id]
trace.num_finished += 1
should_partial_flush = self._partial_flush_enabled and trace.num_finished >= self._partial_flush_min_spans
Expand All @@ -321,16 +331,27 @@ def on_span_finish(self, span):
finished = trace_spans

num_finished = len(finished)
trace.num_finished -= num_finished
if trace.num_finished != 0:
log_msg = "unexpected finished span count"
if config._telemetry_enabled:
telemetry.telemetry_writer.add_log("ERROR", log_msg)
log.debug("%s (%s) for span %s", log_msg, num_finished, span)
trace.num_finished = 0

# If we have removed all spans from this trace, then delete the trace from the traces dict
if len(trace.spans) == 0:
del self._traces[span.trace_id]

# No spans to process, return early
if not finished:
return

# Set partial flush tag on the first span
if should_partial_flush:
log.debug("Partially flushing %d spans for trace %d", num_finished, span.trace_id)
finished[0].set_metric("_dd.py.partial_flush", num_finished)

trace.num_finished -= num_finished

if len(trace.spans) == 0:
del self._traces[span.trace_id]

spans = finished # type: Optional[List[Span]]
for tp in self._trace_processors:
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing: fixes a potential crash where using partial flushes and ``tracer.configure()`` could result in an IndexError
8 changes: 7 additions & 1 deletion tests/contrib/subprocess/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,13 @@ def test_fork(tracer):
with tracer.trace("ossystem_test"):
pid = os.fork()
if pid == 0:
return
# Exit, otherwise the rest of this process will continue to be pytest
from ddtrace.contrib.coverage import unpatch

unpatch()
import pytest

pytest.exit("in forked child", returncode=0)

spans = tracer.pop()
assert spans
Expand Down
25 changes: 21 additions & 4 deletions tests/tracer/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,9 @@ def test_changing_tracer_sampler_changes_tracesamplingprocessor_sampler():
tracer = Tracer()
# get processor
for aggregator in tracer._deferred_processors:
if type(aggregator) == SpanAggregator:
if type(aggregator) is SpanAggregator:
for processor in aggregator._trace_processors:
if type(processor) == TraceSamplingProcessor:
if type(processor) is TraceSamplingProcessor:
sampling_processor = processor

assert sampling_processor.sampler is tracer._sampler
Expand Down Expand Up @@ -588,11 +588,11 @@ def assert_span_sampling_decision_tags(

def switch_out_trace_sampling_processor(tracer, sampling_processor):
for aggregator in tracer._deferred_processors:
if type(aggregator) == SpanAggregator:
if type(aggregator) is SpanAggregator:
i = 0
while i < len(aggregator._trace_processors):
processor = aggregator._trace_processors[i]
if type(processor) == TraceSamplingProcessor:
if type(processor) is TraceSamplingProcessor:
aggregator._trace_processors[i] = sampling_processor
break
i += 1
Expand Down Expand Up @@ -692,3 +692,20 @@ def on_span_finish(self, span):
with tracer.trace("test") as span:
assert span.get_tag("on_start") is None
assert span.get_tag("on_finish") is None


def _stderr_contains_log(stderr: str) -> bool:
return "finished span not connected to a trace" in stderr


@pytest.mark.subprocess(
err=_stderr_contains_log, env=dict(DD_TRACE_DEBUG="true", DD_API_KEY="test", DD_CIVISIBILITY_AGENTLESS_ENABLED=None)
)
def test_tracer_reconfigured_with_active_span_does_not_crash():
import ddtrace

with ddtrace.tracer.trace("regression1") as exploding_span:
# Reconfiguring the tracer clears active traces
# Calling .finish() manually bypasses the code that catches the exception
ddtrace.tracer.configure(partial_flush_enabled=True, partial_flush_min_spans=1)
exploding_span.finish()

0 comments on commit d05583e

Please sign in to comment.