diff --git a/ddtrace/internal/datastreams/processor.py b/ddtrace/internal/datastreams/processor.py index a229f48387b..83db2876f44 100644 --- a/ddtrace/internal/datastreams/processor.py +++ b/ddtrace/internal/datastreams/processor.py @@ -16,6 +16,7 @@ from typing import Union # noqa:F401 from ddtrace.internal import compat +from ddtrace.internal import process_tags from ddtrace.internal.atexit import register_on_exit_signal from ddtrace.internal.constants import DEFAULT_SERVICE_NAME from ddtrace.internal.native import DDSketch @@ -288,6 +289,8 @@ def periodic(self): raw_payload["Env"] = compat.ensure_text(config.env) if config.version: raw_payload["Version"] = compat.ensure_text(config.version) + if p_tags := process_tags.process_tags: + raw_payload["ProcessTags"] = compat.ensure_text(p_tags) payload = packb(raw_payload) compressed = gzip_compress(payload) diff --git a/tests/datastreams/test_processor.py b/tests/datastreams/test_processor.py index 03153c4506b..7ffab58f869 100644 --- a/tests/datastreams/test_processor.py +++ b/tests/datastreams/test_processor.py @@ -1,7 +1,10 @@ +import gzip import os import time import mock +import msgpack +import pytest from ddtrace.internal.datastreams.processor import PROPAGATION_KEY from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64 @@ -15,6 +18,63 @@ mocked_time = 1642544540 +def _decode_datastreams_payload(payload): + decompressed = gzip.decompress(payload) + decoded = msgpack.unpackb(decompressed, raw=False, strict_map_key=False) + + return decoded + + +def test_periodic_payload_tags(): + processor = DataStreamsProcessor("http://localhost:8126") + try: + captured_payloads = [] + with mock.patch.object(processor, "_flush_stats_with_backoff", side_effect=captured_payloads.append): + processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], mocked_time, 1, 1) + processor.periodic() + + assert captured_payloads, "expected periodic to send a payload" + decoded = _decode_datastreams_payload(captured_payloads[0]) + assert decoded["Service"] == processor._service + assert decoded["TracerVersion"] == processor._version + assert decoded["Lang"] == "python" + assert decoded["Hostname"] == processor._hostname + assert "ProcessTags" not in decoded + finally: + processor.stop() + processor.join() + + +@pytest.mark.subprocess( + env=dict( + DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="true", + ) +) +def test_periodic_payload_process_tags(): + import mock + + from ddtrace.internal.datastreams.processor import DataStreamsProcessor + from tests.datastreams.test_processor import _decode_datastreams_payload + + processor = DataStreamsProcessor("http://localhost:8126") + try: + captured_payloads = [] + with mock.patch.object(processor, "_flush_stats_with_backoff", side_effect=captured_payloads.append): + processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], 1642544540, 1, 1) + processor.periodic() + + assert captured_payloads, "expected periodic to send a payload" + decoded = _decode_datastreams_payload(captured_payloads[0]) + assert decoded["Service"] == processor._service + assert decoded["TracerVersion"] == processor._version + assert decoded["Lang"] == "python" + assert decoded["Hostname"] == processor._hostname + assert "ProcessTags" in decoded + finally: + processor.stop() + processor.join() + + def test_data_streams_processor(): now = time.time() processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], now, 1, 1)