diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index f1aaae6013f..381513f77f0 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -215,7 +215,7 @@ def __init__( ) self._single_span_sampling_rules = get_span_sampling_rules() # type: List[SpanSamplingRule] self._writer = writer # type: TraceWriter - self._partial_flush_enabled = asbool(os.getenv("DD_TRACE_PARTIAL_FLUSH_ENABLED", default=False)) + self._partial_flush_enabled = asbool(os.getenv("DD_TRACE_PARTIAL_FLUSH_ENABLED", default=True)) self._partial_flush_min_spans = int(os.getenv("DD_TRACE_PARTIAL_FLUSH_MIN_SPANS", default=500)) self._appsec_enabled = config._appsec_enabled diff --git a/docs/configuration.rst b/docs/configuration.rst index 08facac7d89..0f0e580303e 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -233,6 +233,12 @@ below: - 512 - The maximum length of ``x-datadog-tags`` header allowed in the Datadog propagation style. Must be a value between 0 to 512. If 0, propagation of ``x-datadog-tags`` is disabled. + .. _dd-trace-partial-flush-enabled: + * - ``DD_TRACE_PARTIAL_FLUSH_ENABLED`` + - Boolean + - True + - Prevents large payloads being sent to APM. + .. _dd-profiling-enabled: * - ``DD_PROFILING_ENABLED`` - Boolean diff --git a/tests/commands/test_runner.py b/tests/commands/test_runner.py index cda6e5aec46..286bdf3618d 100644 --- a/tests/commands/test_runner.py +++ b/tests/commands/test_runner.py @@ -410,7 +410,7 @@ def test_info_no_configs(): Log injection enabled: False Health metrics enabled: False Priority sampling enabled: True - Partial flushing enabled: False + Partial flushing enabled: True Partial flush minimum number of spans: 500 \x1b[92m\x1b[1mTagging:\x1b[0m DD Service: None diff --git a/tests/contrib/asyncio/test_tracer_safety.py b/tests/contrib/asyncio/test_tracer_safety.py index 39ccf2929a9..99f093d8a47 100644 --- a/tests/contrib/asyncio/test_tracer_safety.py +++ b/tests/contrib/asyncio/test_tracer_safety.py @@ -45,11 +45,12 @@ async def coro(): with tracer.trace("coroutine"): await asyncio.sleep(0.01) - futures = [asyncio.ensure_future(coro()) for x in range(1000)] + # partial flushing is enabled, ensure the number of spans generated is less than 500 + futures = [asyncio.ensure_future(coro()) for x in range(400)] for future in futures: await future # the trace is wrong but the Context is finished traces = tracer.pop_traces() assert 1 == len(traces) - assert 1000 == len(traces[0]) + assert 400 == len(traces[0]) diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index b17581a8350..5bfe02efb7f 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- import itertools import logging import os @@ -219,6 +220,47 @@ def test_metrics(encoding, monkeypatch): with override_global_config(dict(health_metrics_enabled=True)): t = Tracer() + assert t._partial_flush_min_spans == 500 + statsd_mock = mock.Mock() + t._writer.dogstatsd = statsd_mock + assert t._writer._report_metrics + with mock.patch("ddtrace.internal.writer.log") as log: + for _ in range(5): + spans = [] + for i in range(3000): + spans.append(t.trace("op")) + # Since _partial_flush_min_spans is set to 500 we will flush spans in 6 batches + # each batch will contain 500 spans + for s in spans: + s.finish() + + t.shutdown() + log.warning.assert_not_called() + log.error.assert_not_called() + + statsd_mock.distribution.assert_has_calls( + [ + mock.call("datadog.tracer.http.sent.bytes", AnyInt()), + mock.call("datadog.tracer.http.sent.traces", 30), + mock.call("datadog.tracer.writer.accepted.traces", 30, tags=[]), + mock.call("datadog.tracer.buffer.accepted.traces", 30, tags=[]), + mock.call("datadog.tracer.buffer.accepted.spans", 15000, tags=[]), + mock.call("datadog.tracer.http.requests", 1, tags=[]), + mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=[]), + ], + any_order=True, + ) + + +@allencodings +def test_metrics_partial_flush_disabled(encoding, monkeypatch): + monkeypatch.setenv("DD_TRACE_API_VERSION", encoding) + + with override_global_config(dict(health_metrics_enabled=True)): + t = Tracer() + t.configure( + partial_flush_enabled=False, + ) statsd_mock = mock.Mock() t._writer.dogstatsd = statsd_mock assert t._writer._report_metrics @@ -248,8 +290,37 @@ def test_metrics(encoding, monkeypatch): @allencodings def test_single_trace_too_large(encoding, monkeypatch): monkeypatch.setenv("DD_TRACE_API_VERSION", encoding) + # setting writer interval to 5 seconds so that buffer can fit larger traces + monkeypatch.setenv("DD_TRACE_WRITER_INTERVAL_SECONDS", "5.0") t = Tracer() + assert t._partial_flush_enabled is True + with mock.patch("ddtrace.internal.writer.log") as log: + key = "a" * 250 + with t.trace("huge"): + for i in range(200000): + with t.trace("operation") as s: + # Need to make the strings unique so that the v0.5 encoding doesn’t compress the data + s.set_tag(key + str(i), key + str(i)) + t.shutdown() + log.warning.assert_any_call( + "trace buffer (%s traces %db/%db) cannot fit trace of size %db, dropping", + AnyInt(), + AnyInt(), + AnyInt(), + AnyInt(), + ) + log.error.assert_not_called() + + +@allencodings +def test_single_trace_too_large_partial_flush_disabled(encoding, monkeypatch): + monkeypatch.setenv("DD_TRACE_API_VERSION", encoding) + + t = Tracer() + t.configure( + partial_flush_enabled=False, + ) with mock.patch("ddtrace.internal.writer.log") as log: with t.trace("huge"): for i in range(200000): @@ -717,7 +788,6 @@ def test_partial_flush_log(run_python_code_in_subprocess, encoding, monkeypatch) t = Tracer() t.configure( - partial_flush_enabled=True, partial_flush_min_spans=partial_flush_min_spans, )