Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d76a11d
chore(internal): add process_tags to first span of each payload
dubloom Nov 4, 2025
a3643d8
tests(process_tags): add tests
dubloom Nov 5, 2025
a52e8cc
lint
dubloom Nov 5, 2025
f943f2a
fix: suitespec
dubloom Nov 5, 2025
660bd64
fix: telemetry test
dubloom Nov 5, 2025
ace7fae
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 5, 2025
78dd521
fix telemetry 2
dubloom Nov 5, 2025
dd58490
simplify process_tags (brett review)
dubloom Nov 6, 2025
f47539e
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 6, 2025
184ef53
update python version
dubloom Nov 6, 2025
be2973e
put tests within internal suite
dubloom Nov 6, 2025
c6b4d7f
remove sys hack
dubloom Nov 6, 2025
c6cb1be
make tests compatible with CI
dubloom Nov 7, 2025
974b474
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 7, 2025
7416466
lint
dubloom Nov 7, 2025
0428dcd
brett review
dubloom Nov 10, 2025
b66d6a4
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 10, 2025
71b5ed7
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 12, 2025
a123350
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 12, 2025
32ddf35
improve tag normalization
dubloom Nov 14, 2025
ee77b0e
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 14, 2025
f5c3eee
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 17, 2025
7cf4143
gab review
dubloom Nov 17, 2025
ae20207
improving normalization
dubloom Nov 18, 2025
f70b7ed
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 18, 2025
efe28fa
remove print
dubloom Nov 18, 2025
e33db20
chore(rc): add process tags
dubloom Nov 7, 2025
8c6a18a
add tests
dubloom Nov 17, 2025
f957552
Update tests/internal/test_process_tags.py
dubloom Nov 18, 2025
86f04db
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 19, 2025
914d52a
add a test that activates the feature with env variable
dubloom Nov 19, 2025
609b353
Merge branch 'dubloom/process-tags-collection' into dubloom/process-t…
dubloom Nov 19, 2025
d1f3d33
tests fix attempt
dubloom Nov 19, 2025
34d86f5
change patch to patch.object
dubloom Nov 19, 2025
8e92add
Merge branch 'main' into dubloom/process-tags-remote-config
dubloom Nov 20, 2025
43d8697
lint
dubloom Nov 20, 2025
6bad34f
chore(dsm): add process tags to dsm
dubloom Nov 19, 2025
c012b48
Merge branch 'main' into dubloom/process-tags-dsm
dubloom Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ddtrace/internal/datastreams/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Union # noqa:F401

from ddtrace.internal import compat
from ddtrace.internal import process_tags
from ddtrace.internal.atexit import register_on_exit_signal
from ddtrace.internal.constants import DEFAULT_SERVICE_NAME
from ddtrace.internal.native import DDSketch
Expand Down Expand Up @@ -288,6 +289,8 @@ def periodic(self):
raw_payload["Env"] = compat.ensure_text(config.env)
if config.version:
raw_payload["Version"] = compat.ensure_text(config.version)
if p_tags := process_tags.process_tags:
raw_payload["ProcessTags"] = compat.ensure_text(p_tags)

payload = packb(raw_payload)
compressed = gzip_compress(payload)
Expand Down
60 changes: 60 additions & 0 deletions tests/datastreams/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import gzip
import os
import time

import mock
import msgpack
import pytest

from ddtrace.internal.datastreams.processor import PROPAGATION_KEY
from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64
Expand All @@ -15,6 +18,63 @@
mocked_time = 1642544540


def _decode_datastreams_payload(payload):
decompressed = gzip.decompress(payload)
decoded = msgpack.unpackb(decompressed, raw=False, strict_map_key=False)

return decoded


def test_periodic_payload_tags():
processor = DataStreamsProcessor("http://localhost:8126")
try:
captured_payloads = []
with mock.patch.object(processor, "_flush_stats_with_backoff", side_effect=captured_payloads.append):
processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], mocked_time, 1, 1)
processor.periodic()

assert captured_payloads, "expected periodic to send a payload"
decoded = _decode_datastreams_payload(captured_payloads[0])
assert decoded["Service"] == processor._service
assert decoded["TracerVersion"] == processor._version
assert decoded["Lang"] == "python"
assert decoded["Hostname"] == processor._hostname
assert "ProcessTags" not in decoded
finally:
processor.stop()
processor.join()


@pytest.mark.subprocess(
env=dict(
DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="true",
)
)
def test_periodic_payload_process_tags():
import mock

from ddtrace.internal.datastreams.processor import DataStreamsProcessor
from tests.datastreams.test_processor import _decode_datastreams_payload

processor = DataStreamsProcessor("http://localhost:8126")
try:
captured_payloads = []
with mock.patch.object(processor, "_flush_stats_with_backoff", side_effect=captured_payloads.append):
processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], 1642544540, 1, 1)
processor.periodic()

assert captured_payloads, "expected periodic to send a payload"
decoded = _decode_datastreams_payload(captured_payloads[0])
assert decoded["Service"] == processor._service
assert decoded["TracerVersion"] == processor._version
assert decoded["Lang"] == "python"
assert decoded["Hostname"] == processor._hostname
assert "ProcessTags" in decoded
finally:
processor.stop()
processor.join()


def test_data_streams_processor():
now = time.time()
processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], now, 1, 1)
Expand Down
Loading