From 5431f887b8d6a888fbe07146c8dd6c481422d701 Mon Sep 17 00:00:00 2001 From: Emanuele Palazzetti Date: Mon, 5 Mar 2018 14:56:18 +0100 Subject: [PATCH] [futures] provide context propagation for `concurrent` module; Tornado uses the `futures` integration --- .circleci/config.yml | 28 ++- ddtrace/contrib/futures/__init__.py | 31 ++++ ddtrace/contrib/futures/patch.py | 24 +++ .../futures.py => futures/threading.py} | 0 ddtrace/contrib/tornado/patch.py | 12 +- ddtrace/monkey.py | 1 + tests/contrib/futures/__init__.py | 0 tests/contrib/futures/test_propagation.py | 162 ++++++++++++++++++ tox.ini | 12 +- 9 files changed, 256 insertions(+), 14 deletions(-) create mode 100644 ddtrace/contrib/futures/__init__.py create mode 100644 ddtrace/contrib/futures/patch.py rename ddtrace/contrib/{tornado/futures.py => futures/threading.py} (100%) create mode 100644 tests/contrib/futures/__init__.py create mode 100644 tests/contrib/futures/test_propagation.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 8c26ffc90ca..2e86efabd26 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -63,6 +63,26 @@ jobs: paths: - .tox + futures: + docker: + - image: datadog/docker-library:dd_trace_py_1_0_0 + steps: + - checkout + - restore_cache: + keys: + - tox-cache-futures-{{ checksum "tox.ini" }} + - run: tox -e '{py27}-threading-futures{30,31,32}' --result-json /tmp/futures.1.results + - run: tox -e '{py34,py35,py36}-threading' --result-json /tmp/futures.2.results + - persist_to_workspace: + root: /tmp + paths: + - futures.1.results + - futures.2.results + - save_cache: + key: tox-cache-futures-{{ checksum "tox.ini" }} + paths: + - .tox + boto: docker: - image: datadog/docker-library:dd_trace_py_1_0_0 @@ -164,15 +184,13 @@ jobs: - restore_cache: keys: - tox-cache-tornado-{{ checksum "tox.ini" }} - - run: tox -e '{py27}-tornado{40,41,42,43,44}' --result-json /tmp/tornado.1.results - - run: tox -e '{py27}-tornado{40,41,42,43,44}-futures' --result-json /tmp/tornado.2.results - - run: tox -e '{py34,py35,py36}-tornado{40,41,42,43,44}' --result-json /tmp/tornado.3.results + - run: tox -e '{py27,py34,py35,py36}-tornado{40,41,42,43,44}' --result-json /tmp/tornado.1.results + - run: tox -e '{py27}-tornado{40,41,42,43,44}-futures{30,31,32}' --result-json /tmp/tornado.2.results - persist_to_workspace: root: /tmp paths: - tornado.1.results - tornado.2.results - - tornado.3.results - save_cache: key: tox-cache-tornado-{{ checksum "tox.ini" }} paths: @@ -744,6 +762,7 @@ workflows: - flake8 - tracer - integration + - futures - boto - ddtracerun - asyncio @@ -778,6 +797,7 @@ workflows: - flake8 - tracer - integration + - futures - boto - ddtracerun - asyncio diff --git a/ddtrace/contrib/futures/__init__.py b/ddtrace/contrib/futures/__init__.py new file mode 100644 index 00000000000..3fb3f29e054 --- /dev/null +++ b/ddtrace/contrib/futures/__init__.py @@ -0,0 +1,31 @@ +""" +The ``futures`` integration propagates the current active Tracing Context +between threads. The integration ensures that when operations are executed +in a new thread, that thread can continue the previously generated trace. + +The integration doesn't trace automatically threads execution, so manual +instrumentation or another integration must be activated. Threads propagation +is not enabled by default with the `patch_all()` method and must be activated +as follows: + + from ddtrace import patch, patch_all + + + patch(futures=True) + # or, when instrumenting all libraries + patch_all(futures=True) +""" +from ..util import require_modules + + +required_modules = ['concurrent.futures'] + + +with require_modules(required_modules) as missing_modules: + if not missing_modules: + from .patch import patch, unpatch + + __all__ = [ + 'patch', + 'unpatch', + ] diff --git a/ddtrace/contrib/futures/patch.py b/ddtrace/contrib/futures/patch.py new file mode 100644 index 00000000000..38c050bcf29 --- /dev/null +++ b/ddtrace/contrib/futures/patch.py @@ -0,0 +1,24 @@ +from concurrent import futures + +from wrapt import wrap_function_wrapper as _w + +from .threading import _wrap_submit +from ...util import unwrap as _u + + +def patch(): + """Enables Context Propagation between threads""" + if getattr(futures, '__datadog_patch', False): + return + setattr(futures, '__datadog_patch', True) + + _w('concurrent.futures', 'ThreadPoolExecutor.submit', _wrap_submit) + + +def unpatch(): + """Disables Context Propagation between threads""" + if not getattr(futures, '__datadog_patch', False): + return + setattr(futures, '__datadog_patch', False) + + _u(futures.ThreadPoolExecutor, 'submit') diff --git a/ddtrace/contrib/tornado/futures.py b/ddtrace/contrib/futures/threading.py similarity index 100% rename from ddtrace/contrib/tornado/futures.py rename to ddtrace/contrib/futures/threading.py diff --git a/ddtrace/contrib/tornado/patch.py b/ddtrace/contrib/tornado/patch.py index 37a15276662..e8a4b916fc5 100644 --- a/ddtrace/contrib/tornado/patch.py +++ b/ddtrace/contrib/tornado/patch.py @@ -3,7 +3,8 @@ from wrapt import wrap_function_wrapper as _w -from . import handlers, application, decorators, template, futures, compat, context_provider +from . import handlers, application, decorators, template, compat, context_provider +from ..futures.threading import _wrap_submit from ...util import unwrap as _u @@ -29,11 +30,8 @@ def patch(): _w('tornado.template', 'Template.generate', template.generate) # patch Python Futures when an Executor pool is used - # TODO: this may be a generic module and should be moved - # in a separate contributions when we want to support multi-threading - # context propagation if compat.futures_available: - _w('concurrent.futures', 'ThreadPoolExecutor.submit', futures._wrap_submit) + _w('concurrent.futures', 'ThreadPoolExecutor.submit', _wrap_submit) # configure the global tracer ddtrace.tracer.configure( @@ -59,4 +57,6 @@ def unpatch(): _u(tornado.template.Template, 'generate') if compat.futures_available: - _u('concurrent.futures.ThreadPoolExecutor', 'submit') + from concurrent import futures + + _u(futures.ThreadPoolExecutor, 'submit') diff --git a/ddtrace/monkey.py b/ddtrace/monkey.py index 2830b732d03..07d39670f5c 100644 --- a/ddtrace/monkey.py +++ b/ddtrace/monkey.py @@ -22,6 +22,7 @@ 'cassandra': True, 'celery': True, 'elasticsearch': True, + 'futures': False, # experimental propagation 'mongoengine': True, 'mysql': True, 'mysqldb': True, diff --git a/tests/contrib/futures/__init__.py b/tests/contrib/futures/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/contrib/futures/test_propagation.py b/tests/contrib/futures/test_propagation.py new file mode 100644 index 00000000000..fc371561efc --- /dev/null +++ b/tests/contrib/futures/test_propagation.py @@ -0,0 +1,162 @@ +import time +import concurrent + +from unittest import TestCase +from nose.tools import eq_, ok_ + +from ddtrace.contrib.futures import patch, unpatch + +from ...util import override_global_tracer +from ...test_tracer import get_dummy_tracer + + +class PropagationTestCase(TestCase): + """Ensures the Context Propagation works between threads + when the ``futures`` library is used, or when the + ``concurrent`` module is available (Python 3 only) + """ + def setUp(self): + # instrument ``concurrent`` + patch() + self.tracer = get_dummy_tracer() + + def tearDown(self): + # remove instrumentation + unpatch() + + def test_propagation(self): + # it must propagate the tracing context if available + + def fn(): + # an active context must be available + ok_(self.tracer.context_provider.active() is not None) + with self.tracer.trace('executor.thread'): + return 42 + + with override_global_tracer(self.tracer): + with self.tracer.trace('main.thread'): + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future = executor.submit(fn) + result = future.result() + # assert the right result + eq_(result, 42) + + # the trace must be completed + traces = self.tracer.writer.pop_traces() + eq_(len(traces), 1) + eq_(len(traces[0]), 2) + main = traces[0][0] + executor = traces[0][1] + + eq_(main.name, 'main.thread') + eq_(executor.name, 'executor.thread') + ok_(executor._parent is main) + + def test_propagation_with_params(self): + # instrumentation must proxy arguments if available + + def fn(value, key=None): + # an active context must be available + ok_(self.tracer.context_provider.active() is not None) + with self.tracer.trace('executor.thread'): + return value, key + + with override_global_tracer(self.tracer): + with self.tracer.trace('main.thread'): + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future = executor.submit(fn, 42, 'CheeseShop') + value, key = future.result() + # assert the right result + eq_(value, 42) + eq_(key, 'CheeseShop') + + # the trace must be completed + traces = self.tracer.writer.pop_traces() + eq_(len(traces), 1) + eq_(len(traces[0]), 2) + main = traces[0][0] + executor = traces[0][1] + + eq_(main.name, 'main.thread') + eq_(executor.name, 'executor.thread') + ok_(executor._parent is main) + + def test_disabled_instrumentation(self): + # it must not propagate if the module is disabled + unpatch() + + def fn(): + # an active context must be available + ok_(self.tracer.context_provider.active() is not None) + with self.tracer.trace('executor.thread'): + return 42 + + with override_global_tracer(self.tracer): + with self.tracer.trace('main.thread'): + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future = executor.submit(fn) + result = future.result() + # assert the right result + eq_(result, 42) + + # we provide two different traces + traces = self.tracer.writer.pop_traces() + eq_(len(traces), 2) + eq_(len(traces[0]), 1) + eq_(len(traces[1]), 1) + executor = traces[0][0] + main = traces[1][0] + + eq_(main.name, 'main.thread') + eq_(executor.name, 'executor.thread') + ok_(main.parent_id is None) + ok_(executor.parent_id is None) + + def test_double_instrumentation(self): + # double instrumentation must not happen + patch() + + def fn(): + with self.tracer.trace('executor.thread'): + return 42 + + with override_global_tracer(self.tracer): + with self.tracer.trace('main.thread'): + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future = executor.submit(fn) + result = future.result() + # assert the right result + eq_(result, 42) + + # the trace must be completed + traces = self.tracer.writer.pop_traces() + eq_(len(traces), 1) + eq_(len(traces[0]), 2) + + def test_send_trace_when_finished(self): + # it must send the trace only when all threads are finished + + def fn(): + with self.tracer.trace('executor.thread'): + # wait before returning + time.sleep(0.05) + return 42 + + with override_global_tracer(self.tracer): + with self.tracer.trace('main.thread'): + # don't wait for the execution + executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) + future = executor.submit(fn) + time.sleep(0.01) + + # assert the trace is not sent because the secondary thread + # didn't finish the processing + traces = self.tracer.writer.pop_traces() + eq_(len(traces), 0) + + # then wait for the second thread and send the trace + result = future.result() + eq_(result, 42) + traces = self.tracer.writer.pop_traces() + eq_(len(traces), 1) + eq_(len(traces[0]), 2) diff --git a/tox.ini b/tox.ini index 8bcc83f6ad0..96ef722737a 100644 --- a/tox.ini +++ b/tox.ini @@ -20,6 +20,8 @@ envlist = flake8 wait + {py27}-threading-futures{30,31,32} + {py34,py35,py36}-threading {py27,py34}-boto {py27,py34}-botocore {py27,py34,py35,py36}-tracer @@ -28,9 +30,8 @@ envlist = {py34,py35,py36}-asyncio {py27}-pylons{096,097,010,10} {py34,py35,py36}-aiohttp{12,13,20,21,22}-aiohttp_jinja{012,013}-yarl - {py27}-tornado{40,41,42,43,44} - {py27}-tornado{40,41,42,43,44}-futures - {py34,py35,py36}-tornado{40,41,42,43,44} + {py27,py34,py35,py36}-tornado{40,41,42,43,44} + {py27}-tornado{40,41,42,43,44}-futures{30,31,32} {py27,py34,py35,py36}-bottle{12}-webtest {py27,py34,py35,py36}-bottle-autopatch{12}-webtest {py27,py34,py35,py36}-cassandra{35,36,37,38} @@ -103,7 +104,9 @@ deps = tornado42: tornado>=4.2,<4.3 tornado43: tornado>=4.3,<4.4 tornado44: tornado>=4.4,<4.5 - futures: futures>=3.0,<3.1 + futures30: futures>=3.0,<3.1 + futures31: futures>=3.1,<3.2 + futures32: futures>=3.2,<3.3 aiohttp_jinja012: aiohttp_jinja2>=0.12,<0.13 aiohttp_jinja013: aiohttp_jinja2>=0.13,<0.14 aiohttp_jinja014: aiohttp_jinja2>=0.14,<0.15 @@ -273,6 +276,7 @@ commands = sqlite3: nosetests {posargs} tests/contrib/sqlite3 requests{200,208,209,210,211,212,213}: nosetests {posargs} tests/contrib/requests sqlalchemy{10,11}: nosetests {posargs} tests/contrib/sqlalchemy + threading: nosetests {posargs} tests/contrib/futures ddtracerun: nosetests {posargs} tests/commands/test_runner.py msgpack{03,04}: nosetests {posargs} tests/test_encoders.py test_utils: nosetests {posargs} tests/contrib/test_utils.py