Skip to content

Commit

Permalink
feat(dramatiq): add dramatiq integration (#8779)
Browse files Browse the repository at this point in the history
## Overview
This PR adds the
[`dramatiq`](https://github.com/Bogdanp/dramatiq/tree/master) library as
a supported integration for `dd-trace-py`. This addresses
#5043.

### In Scope for this PR
- Instrumenting the `dramatiq.actor.Actor.send_with_options()` method.
This is the method called when a function with the `@dramatiq.actor`
decorator is called asynchronously. See example below:

```python
# app.py
import dramatiq
from flask import Flask

app = Flask(__name__)

@dramatiq.actor
def my_func():
   return "response"

@dramatiq.actor
def my_other_func(a: int, b: int) -> int:
   return a + b

@app.route('/')
def index():
   my_func.send() # this calls send_with_options() under the hood
   my_other_func.send_with_options(args=(1, 1), options={"max_retries": 3})
   return 'hello world'

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=6060)
```

Running the above example Flask app with `ddtrace-run flask run` and
hitting the `/` endpoint would generate a trace in the UI like the
following:

<img width="1052" alt="Screenshot 2024-03-26 at 3 28 21 PM"
src="https://github.com/DataDog/dd-trace-py/assets/153395705/78c6e832-c406-4829-92ac-799821fc2e31">

The detailed span info for `my_other_func.send_with_options(...)` would
look something like this:

```python
{
   actor: {
      name: my_other_func
      options: {"max_retries": 3}
   }
   env: test
   language: python
   span: {
      kind: producer
   }
}
```

#### NOTE
- The duration of the span is of the `send_with_options()` call itself,
and not reflective of the execution duration of the function being
asynchronously completed.
- When calling a function asynchronously with `send()`, it will also
display on the UI as a `send_with_options()` span. This is because
`send()` calls `send_with_options()` with empty options, so tracing both
of these functions would create two spans for every instance that
`send()` is called, with the exact same span information.

### Out of Scope for this PR
- All other `dramatiq` methods.
- Supporting the actual duration of the function executed asynchronously
by `send_with_options()`

The above and additional features can be investigated and added to this
initial iteration at a later time.

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.
- [x] If change touches code that signs or publishes builds or packages,
or handles credentials of any kind, I've requested a review from
`@DataDog/security-design-and-guidance`.

## Reviewer Checklist

- [ ] Title is accurate
- [ ] All changes are related to the pull request's stated goal
- [ ] Description motivates each change
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [ ] Testing strategy adequately addresses listed risks
- [ ] Change is maintainable (easy to change, telemetry, documentation)
- [ ] Release note makes sense to a user of the library
- [ ] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [ ] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
erikayasuda committed Mar 27, 2024
1 parent 1a5ed22 commit be880c1
Show file tree
Hide file tree
Showing 25 changed files with 786 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .circleci/config.templ.yml
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,15 @@ jobs:
snapshot: true
docker_services: "memcached redis"

dramatiq:
<<: *machine_executor
parallelism: 2
steps:
- run_test:
pattern: "dramatiq"
snapshot: true
docker_services: "redis"

fastapi:
<<: *machine_executor
parallelism: 2
Expand Down
21 changes: 21 additions & 0 deletions .riot/requirements/14116fa.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/14116fa.in
#
attrs==23.2.0
coverage[toml]==7.4.4
dramatiq==1.16.0
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.0
pluggy==1.4.0
prometheus-client==0.20.0
pytest==8.1.1
pytest-cov==5.0.0
pytest-mock==3.14.0
redis==5.0.3
sortedcontainers==2.4.0
24 changes: 24 additions & 0 deletions .riot/requirements/16f33ce.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/16f33ce.in
#
async-timeout==4.0.3
attrs==23.2.0
coverage[toml]==7.4.4
dramatiq==1.16.0
exceptiongroup==1.2.0
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.0
pluggy==1.4.0
prometheus-client==0.20.0
pytest==8.1.1
pytest-cov==5.0.0
pytest-mock==3.14.0
redis==5.0.3
sortedcontainers==2.4.0
tomli==2.0.1
21 changes: 21 additions & 0 deletions .riot/requirements/19508cd.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/19508cd.in
#
attrs==23.2.0
coverage[toml]==7.4.4
dramatiq==1.16.0
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.0
pluggy==1.4.0
prometheus-client==0.20.0
pytest==8.1.1
pytest-cov==5.0.0
pytest-mock==3.14.0
redis==5.0.3
sortedcontainers==2.4.0
24 changes: 24 additions & 0 deletions .riot/requirements/638973a.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/638973a.in
#
async-timeout==4.0.3
attrs==23.2.0
coverage[toml]==7.4.4
dramatiq==1.16.0
exceptiongroup==1.2.0
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.0
pluggy==1.4.0
prometheus-client==0.20.0
pytest==8.1.1
pytest-cov==5.0.0
pytest-mock==3.14.0
redis==5.0.3
sortedcontainers==2.4.0
tomli==2.0.1
27 changes: 27 additions & 0 deletions .riot/requirements/a9dcb3f.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# This file is autogenerated by pip-compile with Python 3.7
# by the following command:
#
# pip-compile --config=pyproject.toml --no-annotate --resolver=backtracking .riot/requirements/a9dcb3f.in
#
async-timeout==4.0.3
attrs==23.2.0
coverage[toml]==7.2.7
dramatiq==1.16.0
exceptiongroup==1.2.0
hypothesis==6.45.0
importlib-metadata==6.7.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.0
pluggy==1.2.0
prometheus-client==0.17.1
pytest==7.4.4
pytest-cov==4.1.0
pytest-mock==3.11.1
redis==5.0.3
sortedcontainers==2.4.0
tomli==2.0.1
typing-extensions==4.7.1
zipp==3.15.0
24 changes: 24 additions & 0 deletions .riot/requirements/ee62ebe.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --no-annotate .riot/requirements/ee62ebe.in
#
async-timeout==4.0.3
attrs==23.2.0
coverage[toml]==7.4.4
dramatiq==1.16.0
exceptiongroup==1.2.0
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.0
pluggy==1.4.0
prometheus-client==0.20.0
pytest==8.1.1
pytest-cov==5.0.0
pytest-mock==3.14.0
redis==5.0.3
sortedcontainers==2.4.0
tomli==2.0.1
1 change: 1 addition & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"celery": True,
"consul": True,
"django": True,
"dramatiq": True,
"elasticsearch": True,
"algoliasearch": True,
"futures": True,
Expand Down
42 changes: 42 additions & 0 deletions ddtrace/contrib/dramatiq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Enabling
~~~~~~~~
The dramatiq integration will trace background tasks as marked by the @dramatiq.actor
decorator. To trace your dramatiq app, call the patch method:
import dramatiq
from ddtrace import patch
patch(dramatiq=True)
@dramatiq.actor
def my_background_task():
# do something
@dramatiq.actor
def my_other_task(content):
# do something
if __name__ == "__main__":
my_background_task.send()
my_other_task.send("mycontent")
# Can also call the methods with options
# my_other_task.send_with_options(("mycontent"), {"max_retries"=3})
You may also enable dramatiq tracing automatically via ddtrace-run::
ddtrace-run python app.py
"""
from ...internal.utils.importlib import require_modules


required_modules = ["dramatiq"]

with require_modules(required_modules) as missing_modules:
if not missing_modules:
from .patch import get_version
from .patch import patch
from .patch import unpatch

__all__ = ["patch", "unpatch", "get_version"]
72 changes: 72 additions & 0 deletions ddtrace/contrib/dramatiq/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Any
from typing import Callable
from typing import Dict
from typing import Tuple

import dramatiq

from ddtrace import config
from ddtrace import tracer
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.settings.config import Config


def get_version() -> str:
return str(dramatiq.__version__)


def patch() -> None:
"""
Instrument dramatiq so any new Actor is automatically instrumented.
"""
if getattr(dramatiq, "__datadog_patch", False):
return
dramatiq.__datadog_patch = True

trace_utils.wrap("dramatiq", "Actor.send_with_options", _traced_send_with_options_function(config.dramatiq))


def unpatch() -> None:
"""
Disconnect remove tracing capabilities from dramatiq Actors
"""
if not getattr(dramatiq, "__datadog_patch", False):
return
dramatiq.__datadog_patch = False

trace_utils.unwrap(dramatiq.Actor, "send_with_options")


def _traced_send_with_options_function(integration_config: Config) -> Callable[[Any], Any]:
"""
NOTE: This accounts for both the send() and send_with_options() methods,
since send() just wraps around send_with_options() with empty options.
In terms of expected behavior, this traces the send_with_options() calls,
but does not reflect the actual execution time of the background task
itself. The duration of this span is the duration of the send_with_options()
call itself.
"""

def _traced_send_with_options(
func: Callable[[Any], Any], instance: dramatiq.Actor, args: Tuple[Any], kwargs: Dict[Any, Any]
) -> Callable[[Any], Any]:
with tracer.trace(
"dramatiq.Actor.send_with_options",
span_type=SpanTypes.WORKER,
service=trace_utils.ext_service(pin=None, int_config=integration_config),
) as span:
span.set_tags(
{
SPAN_KIND: SpanKind.PRODUCER,
"actor.name": instance.actor_name,
"actor.options": instance.options,
}
)

return func(*args, **kwargs)

return _traced_send_with_options
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
features:
- |
dramatiq: Adds automatic tracing of the ``dramatiq`` library.
10 changes: 10 additions & 0 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,16 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
},
pys=select_pys(min_version="3.8", max_version="3.11"),
),
Venv(
name="dramatiq",
command="pytest {cmdargs} tests/contrib/dramatiq",
venvs=[
Venv(
pys=select_pys(),
pkgs={"dramatiq": latest, "pytest": latest, "redis": latest},
),
],
),
Venv(
name="elasticsearch",
command="pytest {cmdargs} tests/contrib/elasticsearch/test_elasticsearch.py",
Expand Down
12 changes: 12 additions & 0 deletions tests/.suitespec.json
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@
"celery": [
"ddtrace/contrib/celery/*"
],
"dramatiq": [
"ddtrace/contrib/dramatiq/*"
],
"cassandra": [
"ddtrace/contrib/cassandra/*",
"ddtrace/ext/cassandra.py"
Expand Down Expand Up @@ -819,6 +822,15 @@
"@dogpile_cache",
"tests/contrib/dogpile_cache/*"
],
"dramatiq": [
"@bootstrap",
"@core",
"@contrib",
"@tracing",
"@dramatiq",
"tests/contrib/dramatiq/*",
"tests/snapshots/tests.contrib.dramatiq.*"
],
"elasticsearch": [
"@bootstrap",
"@core",
Expand Down
Empty file.
19 changes: 19 additions & 0 deletions tests/contrib/dramatiq/autopatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from ddtrace.vendor import wrapt


if __name__ == "__main__":
# have to import dramatiq in order to have the post-import hooks run
import dramatiq
from dramatiq.brokers.stub import StubBroker

broker = StubBroker()
dramatiq.set_broker(broker)

@dramatiq.actor()
def add_numbers(a: int, b: int):
return a + b

# now dramatiq should be patched
actor = broker.get_actor("add_numbers")
assert isinstance(dramatiq.Actor.send_with_options, wrapt.ObjectProxy)
print("Test success")
22 changes: 22 additions & 0 deletions tests/contrib/dramatiq/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import dramatiq
from dramatiq import Worker
from dramatiq.brokers.stub import StubBroker
import pytest


@pytest.fixture()
def stub_broker():
broker = StubBroker()
broker.emit_after("process_boot")
dramatiq.set_broker(broker)
yield broker
broker.flush_all()
broker.close()


@pytest.fixture()
def stub_worker(stub_broker):
worker = Worker(stub_broker)
worker.start()
yield worker
worker.stop()
Loading

0 comments on commit be880c1

Please sign in to comment.