Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ jobs:
paths:
- .tox

futures:
docker:
- image: datadog/docker-library:dd_trace_py_1_0_0
steps:
- checkout
- restore_cache:
keys:
- tox-cache-futures-{{ checksum "tox.ini" }}
- run: tox -e '{py27}-threading-futures{30,31,32}' --result-json /tmp/futures.1.results
- run: tox -e '{py34,py35,py36}-threading' --result-json /tmp/futures.2.results
- persist_to_workspace:
root: /tmp
paths:
- futures.1.results
- futures.2.results
- save_cache:
key: tox-cache-futures-{{ checksum "tox.ini" }}
paths:
- .tox

boto:
docker:
- image: datadog/docker-library:dd_trace_py_1_0_0
Expand Down Expand Up @@ -164,15 +184,13 @@ jobs:
- restore_cache:
keys:
- tox-cache-tornado-{{ checksum "tox.ini" }}
- run: tox -e '{py27}-tornado{40,41,42,43,44}' --result-json /tmp/tornado.1.results
- run: tox -e '{py27}-tornado{40,41,42,43,44}-futures' --result-json /tmp/tornado.2.results
- run: tox -e '{py34,py35,py36}-tornado{40,41,42,43,44}' --result-json /tmp/tornado.3.results
- run: tox -e '{py27,py34,py35,py36}-tornado{40,41,42,43,44}' --result-json /tmp/tornado.1.results
- run: tox -e '{py27}-tornado{40,41,42,43,44}-futures{30,31,32}' --result-json /tmp/tornado.2.results
- persist_to_workspace:
root: /tmp
paths:
- tornado.1.results
- tornado.2.results
- tornado.3.results
- save_cache:
key: tox-cache-tornado-{{ checksum "tox.ini" }}
paths:
Expand Down Expand Up @@ -744,6 +762,7 @@ workflows:
- flake8
- tracer
- integration
- futures
- boto
- ddtracerun
- asyncio
Expand Down Expand Up @@ -778,6 +797,7 @@ workflows:
- flake8
- tracer
- integration
- futures
- boto
- ddtracerun
- asyncio
Expand Down
31 changes: 31 additions & 0 deletions ddtrace/contrib/futures/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
The ``futures`` integration propagates the current active Tracing Context
between threads. The integration ensures that when operations are executed
in a new thread, that thread can continue the previously generated trace.

The integration doesn't trace automatically threads execution, so manual
instrumentation or another integration must be activated. Threads propagation
is not enabled by default with the `patch_all()` method and must be activated
as follows:

from ddtrace import patch, patch_all


patch(futures=True)
# or, when instrumenting all libraries
patch_all(futures=True)
"""
from ..util import require_modules


required_modules = ['concurrent.futures']


with require_modules(required_modules) as missing_modules:
if not missing_modules:
from .patch import patch, unpatch

__all__ = [
'patch',
'unpatch',
]
24 changes: 24 additions & 0 deletions ddtrace/contrib/futures/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from concurrent import futures

from wrapt import wrap_function_wrapper as _w

from .threading import _wrap_submit
from ...util import unwrap as _u


def patch():
"""Enables Context Propagation between threads"""
if getattr(futures, '__datadog_patch', False):
return
setattr(futures, '__datadog_patch', True)

_w('concurrent.futures', 'ThreadPoolExecutor.submit', _wrap_submit)


def unpatch():
"""Disables Context Propagation between threads"""
if not getattr(futures, '__datadog_patch', False):
return
setattr(futures, '__datadog_patch', False)

_u(futures.ThreadPoolExecutor, 'submit')
12 changes: 6 additions & 6 deletions ddtrace/contrib/tornado/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from wrapt import wrap_function_wrapper as _w

from . import handlers, application, decorators, template, futures, compat, context_provider
from . import handlers, application, decorators, template, compat, context_provider
from ..futures.threading import _wrap_submit
from ...util import unwrap as _u


Expand All @@ -29,11 +30,8 @@ def patch():
_w('tornado.template', 'Template.generate', template.generate)

# patch Python Futures when an Executor pool is used
# TODO: this may be a generic module and should be moved
# in a separate contributions when we want to support multi-threading
# context propagation
if compat.futures_available:
_w('concurrent.futures', 'ThreadPoolExecutor.submit', futures._wrap_submit)
_w('concurrent.futures', 'ThreadPoolExecutor.submit', _wrap_submit)

# configure the global tracer
ddtrace.tracer.configure(
Expand All @@ -59,4 +57,6 @@ def unpatch():
_u(tornado.template.Template, 'generate')

if compat.futures_available:
_u('concurrent.futures.ThreadPoolExecutor', 'submit')
from concurrent import futures

_u(futures.ThreadPoolExecutor, 'submit')
1 change: 1 addition & 0 deletions ddtrace/monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'cassandra': True,
'celery': True,
'elasticsearch': True,
'futures': False, # experimental propagation
'mongoengine': True,
'mysql': True,
'mysqldb': True,
Expand Down
Empty file.
162 changes: 162 additions & 0 deletions tests/contrib/futures/test_propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import time
import concurrent

from unittest import TestCase
from nose.tools import eq_, ok_

from ddtrace.contrib.futures import patch, unpatch

from ...util import override_global_tracer
from ...test_tracer import get_dummy_tracer


class PropagationTestCase(TestCase):
"""Ensures the Context Propagation works between threads
when the ``futures`` library is used, or when the
``concurrent`` module is available (Python 3 only)
"""
def setUp(self):
# instrument ``concurrent``
patch()
self.tracer = get_dummy_tracer()

def tearDown(self):
# remove instrumentation
unpatch()

def test_propagation(self):
# it must propagate the tracing context if available

def fn():
# an active context must be available
ok_(self.tracer.context_provider.active() is not None)
with self.tracer.trace('executor.thread'):
return 42

with override_global_tracer(self.tracer):
with self.tracer.trace('main.thread'):
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(fn)
result = future.result()
# assert the right result
eq_(result, 42)

# the trace must be completed
traces = self.tracer.writer.pop_traces()
eq_(len(traces), 1)
eq_(len(traces[0]), 2)
main = traces[0][0]
executor = traces[0][1]

eq_(main.name, 'main.thread')
eq_(executor.name, 'executor.thread')
ok_(executor._parent is main)

def test_propagation_with_params(self):
# instrumentation must proxy arguments if available

def fn(value, key=None):
# an active context must be available
ok_(self.tracer.context_provider.active() is not None)
with self.tracer.trace('executor.thread'):
return value, key

with override_global_tracer(self.tracer):
with self.tracer.trace('main.thread'):
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(fn, 42, 'CheeseShop')
value, key = future.result()
# assert the right result
eq_(value, 42)
eq_(key, 'CheeseShop')

# the trace must be completed
traces = self.tracer.writer.pop_traces()
eq_(len(traces), 1)
eq_(len(traces[0]), 2)
main = traces[0][0]
executor = traces[0][1]

eq_(main.name, 'main.thread')
eq_(executor.name, 'executor.thread')
ok_(executor._parent is main)

def test_disabled_instrumentation(self):
# it must not propagate if the module is disabled
unpatch()

def fn():
# an active context must be available
ok_(self.tracer.context_provider.active() is not None)
with self.tracer.trace('executor.thread'):
return 42

with override_global_tracer(self.tracer):
with self.tracer.trace('main.thread'):
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(fn)
result = future.result()
# assert the right result
eq_(result, 42)

# we provide two different traces
traces = self.tracer.writer.pop_traces()
eq_(len(traces), 2)
eq_(len(traces[0]), 1)
eq_(len(traces[1]), 1)
executor = traces[0][0]
main = traces[1][0]

eq_(main.name, 'main.thread')
eq_(executor.name, 'executor.thread')
ok_(main.parent_id is None)
ok_(executor.parent_id is None)

def test_double_instrumentation(self):
# double instrumentation must not happen
patch()

def fn():
with self.tracer.trace('executor.thread'):
return 42

with override_global_tracer(self.tracer):
with self.tracer.trace('main.thread'):
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(fn)
result = future.result()
# assert the right result
eq_(result, 42)

# the trace must be completed
traces = self.tracer.writer.pop_traces()
eq_(len(traces), 1)
eq_(len(traces[0]), 2)

def test_send_trace_when_finished(self):
# it must send the trace only when all threads are finished

def fn():
with self.tracer.trace('executor.thread'):
# wait before returning
time.sleep(0.05)
return 42

with override_global_tracer(self.tracer):
with self.tracer.trace('main.thread'):
# don't wait for the execution
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
future = executor.submit(fn)
time.sleep(0.01)

# assert the trace is not sent because the secondary thread
# didn't finish the processing
traces = self.tracer.writer.pop_traces()
eq_(len(traces), 0)

# then wait for the second thread and send the trace
result = future.result()
eq_(result, 42)
traces = self.tracer.writer.pop_traces()
eq_(len(traces), 1)
eq_(len(traces[0]), 2)
12 changes: 8 additions & 4 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
envlist =
flake8
wait
{py27}-threading-futures{30,31,32}
{py34,py35,py36}-threading
{py27,py34}-boto
{py27,py34}-botocore
{py27,py34,py35,py36}-tracer
Expand All @@ -28,9 +30,8 @@ envlist =
{py34,py35,py36}-asyncio
{py27}-pylons{096,097,010,10}
{py34,py35,py36}-aiohttp{12,13,20,21,22}-aiohttp_jinja{012,013}-yarl
{py27}-tornado{40,41,42,43,44}
{py27}-tornado{40,41,42,43,44}-futures
{py34,py35,py36}-tornado{40,41,42,43,44}
{py27,py34,py35,py36}-tornado{40,41,42,43,44}
{py27}-tornado{40,41,42,43,44}-futures{30,31,32}
{py27,py34,py35,py36}-bottle{12}-webtest
{py27,py34,py35,py36}-bottle-autopatch{12}-webtest
{py27,py34,py35,py36}-cassandra{35,36,37,38}
Expand Down Expand Up @@ -103,7 +104,9 @@ deps =
tornado42: tornado>=4.2,<4.3
tornado43: tornado>=4.3,<4.4
tornado44: tornado>=4.4,<4.5
futures: futures>=3.0,<3.1
futures30: futures>=3.0,<3.1
futures31: futures>=3.1,<3.2
futures32: futures>=3.2,<3.3
aiohttp_jinja012: aiohttp_jinja2>=0.12,<0.13
aiohttp_jinja013: aiohttp_jinja2>=0.13,<0.14
aiohttp_jinja014: aiohttp_jinja2>=0.14,<0.15
Expand Down Expand Up @@ -273,6 +276,7 @@ commands =
sqlite3: nosetests {posargs} tests/contrib/sqlite3
requests{200,208,209,210,211,212,213}: nosetests {posargs} tests/contrib/requests
sqlalchemy{10,11}: nosetests {posargs} tests/contrib/sqlalchemy
threading: nosetests {posargs} tests/contrib/futures
ddtracerun: nosetests {posargs} tests/commands/test_runner.py
msgpack{03,04}: nosetests {posargs} tests/test_encoders.py
test_utils: nosetests {posargs} tests/contrib/test_utils.py
Expand Down