Skip to content

Commit

Permalink
feat: add support for endpoint counts in profile uploads (#4840)
Browse files Browse the repository at this point in the history
## Description

Add support for uploading endpoint invocations counts along with
profiles.
<!-- Briefly describe the change and why it was required. -->

<!-- If this is a breaking change, explain why it is necessary. Breaking
changes must append `!` after the type/scope. See
https://ddtrace.readthedocs.io/en/stable/contributing.html for more
details. -->

## Checklist
- [x] Followed the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines)
when writing a release note.
- [x] Add additional sections for `feat` and `fix` pull requests.
- [x] [Library
documentation](https://github.com/DataDog/dd-trace-py/tree/1.x/docs)
and/or [Datadog's documentation
site](https://github.com/DataDog/documentation/) is updated. Link to doc
PR in description.

<!-- Copy and paste the relevant snippet based on the type of pull
request -->

<!-- START feat -->

## Motivation

This is ti support mo in depth profile analysis.

<!-- Expand on why the change is required, include relevant context for
reviewers -->

## Design 
<!-- Include benefits from the change as well as possible drawbacks and
trade-offs -->

## Testing strategy

Use web-based service with profiling enabled. Normalization by endpoint
counts should appear in the UI.
<!-- Describe the automated tests and/or the steps for manual testing.

<!-- END feat -->

## Reviewer Checklist
- [x] Title is accurate.
- [x] Description motivates each change.
- [x] No unnecessary changes were introduced in this PR.
- [x] Avoid breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Tests provided or description of manual testing performed is
included in the code or PR.
- [x] Release note has been added and follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines),
or else `changelog/no-changelog` label added.
- [x] All relevant GitHub issues are correctly linked.
- [x] Backports are identified and tagged with Mergifyio.

Co-authored-by: Tahir H. Butt <tahir.butt@datadoghq.com>
Co-authored-by: Munir Abdinur <munir.abdinur@datadoghq.com>
  • Loading branch information
3 people authored and emmettbutler committed Jan 30, 2023
1 parent 99c89a9 commit 7a75af3
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 16 deletions.
44 changes: 44 additions & 0 deletions ddtrace/internal/processor/endpoint_call_counter.py
@@ -0,0 +1,44 @@
import typing

import attr

from ddtrace.ext import SpanTypes
from ddtrace.internal import forksafe
from ddtrace.internal.compat import ensure_str
from ddtrace.internal.processor import SpanProcessor
from ddtrace.span import Span


EndpointCountsType = typing.Dict[str, int]


@attr.s(eq=False)
class EndpointCallCounterProcessor(SpanProcessor):

endpoint_counts = attr.ib(init=False, repr=False, type=EndpointCountsType, factory=lambda: {}, eq=False)
_endpoint_counts_lock = attr.ib(init=False, repr=False, factory=forksafe.Lock, eq=False)
_enabled = attr.ib(default=False, repr=False, eq=False)

def enable(self):
# type: () -> None
self._enabled = True

def on_span_start(self, span):
# type: (Span) -> None
pass

def on_span_finish(self, span):
# type: (Span) -> None
if not self._enabled:
return
if span._local_root == span and span.span_type == SpanTypes.WEB:
resource = ensure_str(span.resource, errors="backslashreplace")
with self._endpoint_counts_lock:
self.endpoint_counts[resource] = self.endpoint_counts.get(resource, 0) + 1

def reset(self):
# type: () -> EndpointCountsType
with self._endpoint_counts_lock:
counts = self.endpoint_counts
self.endpoint_counts = {}
return counts
2 changes: 1 addition & 1 deletion ddtrace/profiling/collector/stack.pyx
Expand Up @@ -448,7 +448,7 @@ class StackCollector(collector.PeriodicCollector):
max_time_usage_pct = attr.ib(factory=attr_utils.from_env("DD_PROFILING_MAX_TIME_USAGE_PCT", 1, float))
nframes = attr.ib(factory=attr_utils.from_env("DD_PROFILING_MAX_FRAMES", 64, int))
ignore_profiler = attr.ib(factory=attr_utils.from_env("DD_PROFILING_IGNORE_PROFILER", False, formats.asbool))
endpoint_collection_enabled = attr.ib(factory=attr_utils.from_env("DD_PROFILING_ENDPOINT_COLLECTION_ENABLED", True, formats.asbool))
endpoint_collection_enabled = attr.ib(default=None)
tracer = attr.ib(default=None)
_thread_time = attr.ib(init=False, repr=False, eq=False)
_last_wall_time = attr.ib(init=False, repr=False, eq=False, type=int)
Expand Down
10 changes: 9 additions & 1 deletion ddtrace/profiling/exporter/http.py
Expand Up @@ -7,6 +7,8 @@
import os
import platform
import typing
from typing import Any
from typing import Dict

import attr
import six
Expand All @@ -16,6 +18,7 @@
import ddtrace
from ddtrace.internal import agent
from ddtrace.internal import runtime
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
from ddtrace.internal.runtime import container
from ddtrace.internal.utils import attr as attr_utils
from ddtrace.internal.utils.formats import parse_tags_str
Expand Down Expand Up @@ -60,6 +63,8 @@ class PprofHTTPExporter(pprof.PprofExporter):
_retry_upload = attr.ib(init=False, eq=False)
endpoint_path = attr.ib(default="/profiling/v1/input")

endpoint_call_counter_span_processor = attr.ib(default=None, type=EndpointCallCounterProcessor)

def __attrs_post_init__(self):
if self.max_retry_delay is None:
self.max_retry_delay = self.timeout * 3
Expand Down Expand Up @@ -202,7 +207,10 @@ def export(
"tags_profiler": self._get_tags(service),
"start": (datetime.datetime.utcfromtimestamp(start_time_ns / 1e9).replace(microsecond=0).isoformat() + "Z"),
"end": (datetime.datetime.utcfromtimestamp(end_time_ns / 1e9).replace(microsecond=0).isoformat() + "Z"),
}
} # type: Dict[str, Any]

if self.endpoint_call_counter_span_processor is not None:
event["endpoint_counts"] = self.endpoint_call_counter_span_processor.reset()

content_type, body = self._encode_multipart_formdata(
event=json.dumps(event).encode("utf-8"),
Expand Down
12 changes: 11 additions & 1 deletion ddtrace/profiling/profiler.py
Expand Up @@ -123,6 +123,9 @@ class _ProfilerInstance(service.Service):
factory=attr_utils.from_env("DD_PROFILING_ENABLE_CODE_PROVENANCE", False, formats.asbool),
type=bool,
)
endpoint_collection_enabled = attr.ib(
factory=attr_utils.from_env("DD_PROFILING_ENDPOINT_COLLECTION_ENABLED", True, formats.asbool)
)

_recorder = attr.ib(init=False, default=None)
_collectors = attr.ib(init=False, default=None)
Expand Down Expand Up @@ -170,6 +173,10 @@ def _build_default_exporters(self):
if self._lambda_function_name is not None:
self.tags.update({"functionname": self._lambda_function_name})

endpoint_call_counter_span_processor = self.tracer._endpoint_call_counter_span_processor
if self.endpoint_collection_enabled:
endpoint_call_counter_span_processor.enable()

return [
http.PprofHTTPExporter(
service=self.service,
Expand All @@ -180,6 +187,7 @@ def _build_default_exporters(self):
endpoint=endpoint,
endpoint_path=endpoint_path,
enable_code_provenance=self.enable_code_provenance,
endpoint_call_counter_span_processor=endpoint_call_counter_span_processor,
),
]

Expand All @@ -202,7 +210,9 @@ def __attrs_post_init__(self):
)

self._collectors = [
stack.StackCollector(r, tracer=self.tracer), # type: ignore[call-arg]
stack.StackCollector(
r, tracer=self.tracer, endpoint_collection_enabled=self.endpoint_collection_enabled
), # type: ignore[call-arg]
threading.ThreadingLockCollector(r, tracer=self.tracer),
]
if _asyncio.asyncio_available:
Expand Down
9 changes: 9 additions & 0 deletions ddtrace/tracer.py
Expand Up @@ -17,6 +17,7 @@

from ddtrace import config
from ddtrace.filters import TraceFilter
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
from ddtrace.internal.sampling import SpanSamplingRule
from ddtrace.internal.sampling import get_span_sampling_rules
from ddtrace.vendor import debtcollector
Expand Down Expand Up @@ -101,6 +102,7 @@ def _default_span_processors_factory(
compute_stats_enabled, # type: bool
single_span_sampling_rules, # type: List[SpanSamplingRule]
agent_url, # type: str
profiling_span_processor, # type: EndpointCallCounterProcessor
):
# type: (...) -> List[SpanProcessor]
"""Construct the default list of span processors to use."""
Expand Down Expand Up @@ -145,6 +147,8 @@ def _default_span_processors_factory(
),
)

span_processors.append(profiling_span_processor)

if single_span_sampling_rules:
span_processors.append(SpanSamplingProcessor(single_span_sampling_rules))

Expand Down Expand Up @@ -226,6 +230,8 @@ def __init__(
self._appsec_enabled = config._appsec_enabled
self._iast_enabled = config._iast_enabled

self._endpoint_call_counter_span_processor = EndpointCallCounterProcessor()

self._span_processors = _default_span_processors_factory(
self._filters,
self._writer,
Expand All @@ -236,6 +242,7 @@ def __init__(
self._compute_stats,
self._single_span_sampling_rules,
self._agent_url,
self._endpoint_call_counter_span_processor,
)

self._hooks = _hooks.Hooks()
Expand Down Expand Up @@ -472,6 +479,7 @@ def configure(
self._compute_stats,
self._single_span_sampling_rules,
self._agent_url,
self._endpoint_call_counter_span_processor,
)

if context_provider is not None:
Expand Down Expand Up @@ -517,6 +525,7 @@ def _child_after_fork(self):
self._compute_stats,
self._single_span_sampling_rules,
self._agent_url,
self._endpoint_call_counter_span_processor,
)

self._new_process = True
Expand Down
3 changes: 3 additions & 0 deletions releasenotes/notes/endpoint-counts-6fa40e642b44b78c.yaml
@@ -0,0 +1,3 @@
features:
- |
profiling: Collects endpoint invocation counts.
7 changes: 3 additions & 4 deletions tests/profiling/collector/test_stack.py
Expand Up @@ -270,7 +270,7 @@ def test_repr():
stack.StackCollector,
"StackCollector(status=<ServiceStatus.STOPPED: 'stopped'>, "
"recorder=Recorder(default_max_events=16384, max_events={}), min_interval_time=0.01, max_time_usage_pct=1.0, "
"nframes=64, ignore_profiler=False, endpoint_collection_enabled=True, tracer=None)",
"nframes=64, ignore_profiler=False, endpoint_collection_enabled=None, tracer=None)",
)


Expand Down Expand Up @@ -444,7 +444,7 @@ def test_exception_collection_trace(
@pytest.fixture
def tracer_and_collector(tracer):
r = recorder.Recorder()
c = stack.StackCollector(r, tracer=tracer)
c = stack.StackCollector(r, endpoint_collection_enabled=True, tracer=tracer)
c.start()
try:
yield tracer, c
Expand Down Expand Up @@ -567,9 +567,8 @@ def test_collect_span_resource_after_finish(tracer_and_collector):


def test_resource_not_collected(monkeypatch, tracer):
monkeypatch.setenv("DD_PROFILING_ENDPOINT_COLLECTION_ENABLED", "false")
r = recorder.Recorder()
collector = stack.StackCollector(r, tracer=tracer)
collector = stack.StackCollector(r, endpoint_collection_enabled=False, tracer=tracer)
collector.start()
try:
resource = str(uuid.uuid4())
Expand Down
65 changes: 56 additions & 9 deletions tests/profiling/exporter/test_http.py
Expand Up @@ -15,6 +15,7 @@

import ddtrace
from ddtrace.internal import compat
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
from ddtrace.internal.utils.formats import parse_tags_str
from ddtrace.profiling import exporter
from ddtrace.profiling.exporter import http
Expand All @@ -30,6 +31,7 @@


_API_KEY = "my-api-key"
_ENDPOINT_COUNTS = {"a": 1, "b": 2}


class _APIEndpointRequestHandlerTest(BaseHTTPServer.BaseHTTPRequestHandler):
Expand All @@ -55,6 +57,10 @@ def _check_tags(tags):
and tags[6] == platform.python_version(),
)

@staticmethod
def _check_endpoints(endpoints):
return endpoints == _ENDPOINT_COUNTS

def _check_event(self, event_json):
event = json.loads(event_json.decode())
for key, check in {
Expand All @@ -63,6 +69,7 @@ def _check_event(self, event_json):
"family": lambda x: x == "python",
"version": lambda x: x == "4",
"tags_profiler": self._check_tags,
"endpoint_counts": self._check_endpoints,
}.items():
if not check(event[key]):
return False
Expand Down Expand Up @@ -175,21 +182,39 @@ def endpoint_test_unknown_server():
thread.join()


def _get_span_processor():
endpoint_call_counter_span_processor = EndpointCallCounterProcessor()
endpoint_call_counter_span_processor.endpoint_counts = _ENDPOINT_COUNTS
return endpoint_call_counter_span_processor


def test_wrong_api_key(endpoint_test_server):
# This is mostly testing our test server, not the exporter
exp = http.PprofHTTPExporter(endpoint=_ENDPOINT, api_key="this is not the right API key", max_retry_delay=2)
exp = http.PprofHTTPExporter(
endpoint=_ENDPOINT,
api_key="this is not the right API key",
max_retry_delay=2,
endpoint_call_counter_span_processor=_get_span_processor(),
)
with pytest.raises(exporter.ExportError) as t:
exp.export(test_pprof.TEST_EVENTS, 0, 1)
assert str(t.value) == "Server returned 400, check your API key"


def test_export(endpoint_test_server):
exp = http.PprofHTTPExporter(endpoint=_ENDPOINT, api_key=_API_KEY)
exp = http.PprofHTTPExporter(
endpoint=_ENDPOINT, api_key=_API_KEY, endpoint_call_counter_span_processor=_get_span_processor()
)
exp.export(test_pprof.TEST_EVENTS, 0, compat.time_ns())


def test_export_server_down():
exp = http.PprofHTTPExporter(endpoint="http://localhost:2", api_key=_API_KEY, max_retry_delay=2)
exp = http.PprofHTTPExporter(
endpoint="http://localhost:2",
api_key=_API_KEY,
max_retry_delay=2,
endpoint_call_counter_span_processor=_get_span_processor(),
)
with pytest.raises(http.UploadFailed) as t:
exp.export(test_pprof.TEST_EVENTS, 0, 1)
e = t.value.last_attempt.exception()
Expand All @@ -198,7 +223,13 @@ def test_export_server_down():


def test_export_timeout(endpoint_test_timeout_server):
exp = http.PprofHTTPExporter(endpoint=_TIMEOUT_ENDPOINT, api_key=_API_KEY, timeout=1, max_retry_delay=2)
exp = http.PprofHTTPExporter(
endpoint=_TIMEOUT_ENDPOINT,
api_key=_API_KEY,
timeout=1,
max_retry_delay=2,
endpoint_call_counter_span_processor=_get_span_processor(),
)
with pytest.raises(http.UploadFailed) as t:
exp.export(test_pprof.TEST_EVENTS, 0, 1)
e = t.value.last_attempt.exception()
Expand All @@ -207,7 +238,13 @@ def test_export_timeout(endpoint_test_timeout_server):


def test_export_reset(endpoint_test_reset_server):
exp = http.PprofHTTPExporter(endpoint=_RESET_ENDPOINT, api_key=_API_KEY, timeout=1, max_retry_delay=2)
exp = http.PprofHTTPExporter(
endpoint=_RESET_ENDPOINT,
api_key=_API_KEY,
timeout=1,
max_retry_delay=2,
endpoint_call_counter_span_processor=_get_span_processor(),
)
with pytest.raises(http.UploadFailed) as t:
exp.export(test_pprof.TEST_EVENTS, 0, 1)
e = t.value.last_attempt.exception()
Expand All @@ -219,7 +256,7 @@ def test_export_reset(endpoint_test_reset_server):


def test_export_404_agent(endpoint_test_unknown_server):
exp = http.PprofHTTPExporter(endpoint=_UNKNOWN_ENDPOINT)
exp = http.PprofHTTPExporter(endpoint=_UNKNOWN_ENDPOINT, endpoint_call_counter_span_processor=_get_span_processor())
with pytest.raises(exporter.ExportError) as t:
exp.export(test_pprof.TEST_EVENTS, 0, 1)
assert str(t.value) == (
Expand All @@ -228,7 +265,9 @@ def test_export_404_agent(endpoint_test_unknown_server):


def test_export_404_agentless(endpoint_test_unknown_server):
exp = http.PprofHTTPExporter(endpoint=_UNKNOWN_ENDPOINT, api_key="123", timeout=1)
exp = http.PprofHTTPExporter(
endpoint=_UNKNOWN_ENDPOINT, api_key="123", timeout=1, endpoint_call_counter_span_processor=_get_span_processor()
)
with pytest.raises(exporter.ExportError) as t:
exp.export(test_pprof.TEST_EVENTS, 0, 1)
assert str(t.value) == "HTTP Error 404"
Expand All @@ -237,15 +276,23 @@ def test_export_404_agentless(endpoint_test_unknown_server):
def test_export_tracer_base_path(endpoint_test_server):
# Base path is prepended to the endpoint path because
# it does not start with a slash.
exp = http.PprofHTTPExporter(endpoint=_ENDPOINT + "/profiling/", api_key=_API_KEY, endpoint_path="v1/input")
exp = http.PprofHTTPExporter(
endpoint=_ENDPOINT + "/profiling/",
api_key=_API_KEY,
endpoint_path="v1/input",
endpoint_call_counter_span_processor=_get_span_processor(),
)
exp.export(test_pprof.TEST_EVENTS, 0, compat.time_ns())


def test_export_tracer_base_path_agent_less(endpoint_test_server):
# Base path is ignored by the profiling HTTP exporter
# because the endpoint path starts with a slash.
exp = http.PprofHTTPExporter(
endpoint=_ENDPOINT + "/profiling/", api_key=_API_KEY, endpoint_path="/profiling/v1/input"
endpoint=_ENDPOINT + "/profiling/",
api_key=_API_KEY,
endpoint_path="/profiling/v1/input",
endpoint_call_counter_span_processor=_get_span_processor(),
)
exp.export(test_pprof.TEST_EVENTS, 0, compat.time_ns())

Expand Down

0 comments on commit 7a75af3

Please sign in to comment.