Skip to content

Commit

Permalink
feat(tracer): add TracerFlareSubscriber to enable the tracer flare (#…
Browse files Browse the repository at this point in the history
…9150)

## Overview
This PR adds a remote config subscriber for the tracer flare. The tracer
flare logic was already implemented in a previous PR, but was not wired
up to actually react to the RC products yet. This PR is the last piece
to enable the tracer flare for dd-trace-py.

A tracer flare is a tar file containing tracer logs from (approximately)
the last 5 minutes, as well as a JSON file of the current tracer
configurations. This pair of files will be generated per tracer instance
that is connected to the agent, and the tar containing all of these
files will be sent to a Zendesk ticket. Users can trigger the tracer
flare the same way they trigger agent flares. See Agent Flare
documentation
[here](https://docs.datadoghq.com/agent/troubleshooting/send_a_flare/?tab=agentv6v7).

For details on the flare implementation, see
#8961 and
#8969.

## Risks
### `AGENT_CONFIG` doesn't get cleared very frequently
Something I noticed when doing some E2E testing is that if you try to do
consecutive tracer flare requests, this won't work because the
`AGENT_CONFIG` is still retaining the state from the previous request.
This means that there isn't an update/publish event that can get picked
up on our end, so we can't trigger another flare for some amount of time
(not sure what this duration is exactly). The current implementation
depends on the publish event, so trying to trigger consecutive tracer
flare requests will not work until the state gets cleared.

### `AGENT_CONFIG` and `AGENT_TASK` are not exclusive to tracer flare
use
Currently, the tracer is listening for changes to the `AGENT_CONFIG` and
`AGENT_TASK` remote config products. This is originally intended for the
**agent** flare, not the tracer flare, but at this time we are
piggy-backing on this signal. For this reason, it's been flagged by
other tracer teams that the format/contents of the products may not be
guaranteed. In the case that we start to notice flares not being
triggered/generated as expected, this may be a code fix to check for.
The current expectation for the products is:

`AGENT_CONFIG`
```json
{
   "metadata":[
      {
         "id":"flare-log-level.<log-level>",
         "product_name":"AGENT_CONFIG",
         "sha256_hash":"xxx",
         "length":63,
         "tuf_version":3,
         "apply_state":2,
         "apply_error":"None"
      }
   ],
   "config":[
      {
         "config":{
            "log_level":"<log-level>"
         },
         "name":"flare-log-level.<log-level>"
      }
   ],
   "shared_data_counter":2
}
```

`AGENT_TASK`
```json
{
   "metadata":[
      {
         "id":"id1",
         "product_name":"AGENT_TASK",
         "sha256_hash":"xxx",
         "length":139,
         "tuf_version":4,
         "apply_state":2,
         "apply_error":"None"
      }
   ],
   "config":[
      false,
      {
         "args":{
            "case_id":"111",
            "hostname":"myhostname",
            "user_handle":"user.name@datadoghq.com"
         },
         "task_type":"tracer_flare",
         "uuid":"yyyyyy"
      }
   ],
   "shared_data_counter":5
}

```

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Brett Langdon <brett.langdon@datadoghq.com>
  • Loading branch information
erikayasuda and brettlangdon committed May 15, 2024
1 parent 5897cab commit b73738e
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 33 deletions.
5 changes: 4 additions & 1 deletion .circleci/config.templ.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mongo_image: &mongo_image mongo:3.6@sha256:19c11a8f1064fd2bb713ef1270f79a742a184
httpbin_image: &httpbin_image kennethreitz/httpbin@sha256:2c7abc4803080c22928265744410173b6fea3b898872c01c5fd0f0f9df4a59fb
vertica_image: &vertica_image vertica/vertica-ce:latest
rabbitmq_image: &rabbitmq_image rabbitmq:3.7-alpine
testagent_image: &testagent_image ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent:v1.16.0
testagent_image: &testagent_image ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent:v1.17.0

parameters:
coverage:
Expand Down Expand Up @@ -543,6 +543,9 @@ jobs:

internal:
<<: *contrib_job_small
docker:
- image: *ddtrace_dev_image
- *testagent
steps:
- run_test:
pattern: "internal"
Expand Down
Empty file.
78 changes: 78 additions & 0 deletions ddtrace/internal/flare/_subscribers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from datetime import datetime
import os
from typing import Callable # noqa:F401
from typing import Optional # noqa:F401

from ddtrace.internal.flare.flare import Flare
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector # noqa:F401
from ddtrace.internal.remoteconfig._subscribers import RemoteConfigSubscriber


log = get_logger(__name__)

DEFAULT_STALE_FLARE_DURATION_MINS = 20


class TracerFlareSubscriber(RemoteConfigSubscriber):
def __init__(
self,
data_connector: PublisherSubscriberConnector,
callback: Callable,
flare: Flare,
stale_flare_age: int = DEFAULT_STALE_FLARE_DURATION_MINS,
):
super().__init__(data_connector, callback, "TracerFlareConfig")
self.current_request_start: Optional[datetime] = None
self.stale_tracer_flare_num_mins = stale_flare_age
self.flare = flare

def has_stale_flare(self) -> bool:
if self.current_request_start:
curr = datetime.now()
flare_age = (curr - self.current_request_start).total_seconds()
stale_age = self.stale_tracer_flare_num_mins * 60
return flare_age >= stale_age
return False

def _get_data_from_connector_and_exec(self):
if self.has_stale_flare():
log.info(
"Tracer flare request started at %s is stale, reverting "
"logger configurations and cleaning up resources now",
self.current_request_start,
)
self.current_request_start = None
self._callback(self.flare, {}, True)
return

data = self._data_connector.read()
metadata = data.get("metadata")
if not metadata:
log.debug("No metadata received from data connector")
return

for md in metadata:
product_type = md.get("product_name")
if product_type == "AGENT_CONFIG":
# We will only process one tracer flare request at a time
if self.current_request_start is not None:
log.warning(
"There is already a tracer flare job started at %s. Skipping new request.",
str(self.current_request_start),
)
return
self.current_request_start = datetime.now()
elif product_type == "AGENT_TASK":
# Possible edge case where we don't have an existing flare request
# In this case we won't have anything to send, so we log and do nothing
if self.current_request_start is None:
log.warning("There is no tracer flare job to complete. Skipping new request.")
return
self.current_request_start = None
else:
log.debug("Received unexpected product type for tracer flare: {}", product_type)
return
log.debug("[PID %d] %s _exec_callback: %s", os.getpid(), self, str(data)[:50])
self._callback(self.flare, data)
return
50 changes: 32 additions & 18 deletions ddtrace/internal/flare.py → ddtrace/internal/flare/flare.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from typing import Optional
from typing import Tuple

from ddtrace import config
from ddtrace._logger import _add_file_handler
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.http import get_connection
Expand All @@ -36,14 +35,26 @@ class FlareSendRequest:
source: str = "tracer_python"


class TracerFlareSendError(Exception):
pass


class Flare:
def __init__(self, timeout_sec: int = DEFAULT_TIMEOUT_SECONDS, flare_dir: str = TRACER_FLARE_DIRECTORY):
def __init__(
self,
trace_agent_url: str,
api_key: Optional[str] = None,
timeout_sec: int = DEFAULT_TIMEOUT_SECONDS,
flare_dir: str = TRACER_FLARE_DIRECTORY,
):
self.original_log_level: int = logging.NOTSET
self.timeout: int = timeout_sec
self.flare_dir: pathlib.Path = pathlib.Path(flare_dir)
self.file_handler: Optional[RotatingFileHandler] = None
self.url: str = trace_agent_url
self._api_key: Optional[str] = api_key

def prepare(self, log_level: str):
def prepare(self, config: dict, log_level: str):
"""
Update configurations to start sending tracer logs to a file
to be sent in a flare later.
Expand Down Expand Up @@ -77,7 +88,7 @@ def prepare(self, log_level: str):
)

# Create and add config file
self._generate_config_file(pid)
self._generate_config_file(config, pid)

def send(self, flare_send_req: FlareSendRequest):
"""
Expand All @@ -96,35 +107,38 @@ def send(self, flare_send_req: FlareSendRequest):
log.error("Failed to create %s file", lock_path)
raise e
try:
client = get_connection(config._trace_agent_url, timeout=self.timeout)
client = get_connection(self.url, timeout=self.timeout)
headers, body = self._generate_payload(flare_send_req.__dict__)
client.request("POST", TRACER_FLARE_ENDPOINT, body, headers)
response = client.getresponse()
if response.status == 200:
log.info("Successfully sent the flare to Zendesk ticket %s", flare_send_req.case_id)
else:
log.error(
"Tracer flare upload to Zendesk ticket %s failed with %s status code:(%s) %s",
flare_send_req.case_id,
msg = "Tracer flare upload responded with status code %s:(%s) %s" % (
response.status,
response.reason,
response.read().decode(),
)
raise TracerFlareSendError(msg)
except Exception as e:
log.error("Failed to send tracer flare to Zendesk ticket %s", flare_send_req.case_id)
log.error("Failed to send tracer flare to Zendesk ticket %s: %s", flare_send_req.case_id, e)
raise e
finally:
client.close()
# Clean up files regardless of success/failure
self.clean_up_files()
return

def _generate_config_file(self, pid: int):
def _generate_config_file(self, config: dict, pid: int):
config_file = self.flare_dir / f"tracer_config_{pid}.json"
try:
with open(config_file, "w") as f:
# Redact API key if present
api_key = config.get("_dd_api_key")
if api_key:
config["_dd_api_key"] = "*" * (len(api_key) - 4) + api_key[-4:]

tracer_configs = {
"configs": config.__dict__,
"configs": config,
}
json.dump(
tracer_configs,
Expand Down Expand Up @@ -161,20 +175,20 @@ def _generate_payload(self, params: Dict[str, str]) -> Tuple[dict, bytes]:
encoded_key = key.encode()
encoded_value = value.encode()
body.write(b"--" + boundary + newline)
body.write(b'Content-Disposition: form-data; name="{%s}"{%s}{%s}' % (encoded_key, newline, newline))
body.write(b"{%s}{%s}" % (encoded_value, newline))
body.write(b'Content-Disposition: form-data; name="%s"%s%s' % (encoded_key, newline, newline))
body.write(b"%s%s" % (encoded_value, newline))

body.write(b"--" + boundary + newline)
body.write((b'Content-Disposition: form-data; name="flare_file"; filename="flare.tar"{%s}' % newline))
body.write(b"Content-Type: application/octet-stream{%s}{%s}" % (newline, newline))
body.write((b'Content-Disposition: form-data; name="flare_file"; filename="flare.tar"%s' % newline))
body.write(b"Content-Type: application/octet-stream%s%s" % (newline, newline))
body.write(tar_stream.getvalue() + newline)
body.write(b"--" + boundary + b"--")
headers = {
"Content-Type": b"multipart/form-data; boundary=%s" % boundary,
"Content-Length": body.getbuffer().nbytes,
}
if config._dd_api_key:
headers["DD-API-KEY"] = config._dd_api_key
if self._api_key:
headers["DD-API-KEY"] = self._api_key
return headers, body.getvalue()

def _get_valid_logger_level(self, flare_log_level: int) -> int:
Expand Down
90 changes: 90 additions & 0 deletions ddtrace/internal/flare/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import Any
from typing import Callable
from typing import List

from ddtrace.internal.flare.flare import Flare
from ddtrace.internal.flare.flare import FlareSendRequest
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


def _tracerFlarePubSub():
from ddtrace.internal.flare._subscribers import TracerFlareSubscriber
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisher
from ddtrace.internal.remoteconfig._pubsub import PubSub

class _TracerFlarePubSub(PubSub):
__publisher_class__ = RemoteConfigPublisher
__subscriber_class__ = TracerFlareSubscriber
__shared_data__ = PublisherSubscriberConnector()

def __init__(self, callback: Callable, flare: Flare):
self._publisher = self.__publisher_class__(self.__shared_data__, None)
self._subscriber = self.__subscriber_class__(self.__shared_data__, callback, flare)

return _TracerFlarePubSub


def _handle_tracer_flare(flare: Flare, data: dict, cleanup: bool = False):
if cleanup:
flare.revert_configs()
flare.clean_up_files()
return

if "config" not in data:
log.warning("Unexpected tracer flare RC payload %r", data)
return
if len(data["config"]) == 0:
log.warning("Unexpected number of tracer flare RC payloads %r", data)
return

product_type = data.get("metadata", [{}])[0].get("product_name")
configs = data.get("config", [{}])
if product_type == "AGENT_CONFIG":
_prepare_tracer_flare(flare, configs)
elif product_type == "AGENT_TASK":
_generate_tracer_flare(flare, configs)
else:
log.warning("Received unexpected tracer flare product type: %s", product_type)


def _prepare_tracer_flare(flare: Flare, configs: List[dict]):
"""
Update configurations to start sending tracer logs to a file
to be sent in a flare later.
"""
for c in configs:
# AGENT_CONFIG is currently being used for multiple purposes
# We only want to prepare for a tracer flare if the config name
# starts with 'flare-log-level'
if not c.get("name", "").startswith("flare-log-level"):
continue

flare_log_level = c.get("config", {}).get("log_level").upper()
flare.prepare(c, flare_log_level)
return


def _generate_tracer_flare(flare: Flare, configs: List[Any]):
"""
Revert tracer flare configurations back to original state
before sending the flare.
"""
for c in configs:
# AGENT_TASK is currently being used for multiple purposes
# We only want to generate the tracer flare if the task_type is
# 'tracer_flare'
if type(c) != dict or c.get("task_type") != "tracer_flare":
continue
args = c.get("args", {})
flare_request = FlareSendRequest(
case_id=args.get("case_id"), hostname=args.get("hostname"), email=args.get("user_handle")
)

flare.revert_configs()

flare.send(flare_request)
return
9 changes: 7 additions & 2 deletions ddtrace/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,12 +809,17 @@ def _format_tags(self, tags: List[Union[str, Dict]]) -> Dict[str, str]:
def enable_remote_configuration(self):
# type: () -> None
"""Enable fetching configuration from Datadog."""
from ddtrace.internal.flare.flare import Flare
from ddtrace.internal.flare.handler import _handle_tracer_flare
from ddtrace.internal.flare.handler import _tracerFlarePubSub
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller

remoteconfig_pubsub = self._remoteconfigPubSub()(self._handle_remoteconfig)
flare = Flare(trace_agent_url=self._trace_agent_url, api_key=self._dd_api_key)
tracerflare_pubsub = _tracerFlarePubSub()(_handle_tracer_flare, flare)
remoteconfig_poller.register("APM_TRACING", remoteconfig_pubsub)
remoteconfig_poller.register("AGENT_CONFIG", remoteconfig_pubsub)
remoteconfig_poller.register("AGENT_TASK", remoteconfig_pubsub)
remoteconfig_poller.register("AGENT_CONFIG", tracerflare_pubsub)
remoteconfig_poller.register("AGENT_TASK", tracerflare_pubsub)

def _remove_invalid_rules(self, rc_rules: List) -> List:
"""Remove invalid sampling rules from the given list"""
Expand Down
5 changes: 5 additions & 0 deletions releasenotes/notes/add-tracer-flare-65e275bca27631dd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
tracer: This introduces the tracer flare functionality. Currently the tracer flare includes the
tracer logs and tracer configurations.
2 changes: 1 addition & 1 deletion tests/.suitespec.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
"ddtrace/internal/_utils.*",
"ddtrace/internal/constants.py",
"ddtrace/internal/encoding.py",
"ddtrace/internal/flare.py",
"ddtrace/internal/flare/*",
"ddtrace/internal/pack.h",
"ddtrace/internal/pack_template.h",
"ddtrace/internal/peer_service/*",
Expand Down
Loading

0 comments on commit b73738e

Please sign in to comment.