/
provider.py
74 lines (61 loc) · 2.79 KB
/
provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
from ...provider import BaseContextProvider
from ...provider import DatadogContextMixin
from ...span import Span
class AsyncioContextProvider(BaseContextProvider, DatadogContextMixin):
"""Manages the active context for asyncio execution. Framework
instrumentation that is built on top of the ``asyncio`` library, should
use this provider when contextvars are not available (Python versions
less than 3.7).
This Context Provider inherits from ``DefaultContextProvider`` because
it uses a thread-local storage when the ``Context`` is propagated to
a different thread, than the one that is running the async loop.
"""
# Task attribute used to set/get the context
_CONTEXT_ATTR = "__datadog_context"
def activate(self, context, loop=None):
"""Sets the scoped ``Context`` for the current running ``Task``."""
loop = self._get_loop(loop)
if not loop:
super(AsyncioContextProvider, self).activate(context)
return context
# the current unit of work (if tasks are used)
task = asyncio.Task.current_task(loop=loop)
if task:
setattr(task, self._CONTEXT_ATTR, context)
return context
def _get_loop(self, loop=None):
"""Helper to try and resolve the current loop"""
try:
return loop or asyncio.get_event_loop()
except RuntimeError:
# Detects if a loop is available in the current thread;
# DEV: This happens when a new thread is created from the out that is running the async loop
# DEV: It's possible that a different Executor is handling a different Thread that
# works with blocking code. In that case, we fallback to a thread-local Context.
pass
return None
def _has_active_context(self, loop=None):
"""Helper to determine if we have a currently active context"""
loop = self._get_loop(loop=loop)
if loop is None:
return super(AsyncioContextProvider, self)._has_active_context()
# the current unit of work (if tasks are used)
task = asyncio.Task.current_task(loop=loop)
if task is None:
return False
ctx = getattr(task, self._CONTEXT_ATTR, None)
return ctx is not None
def active(self, loop=None):
"""Returns the active context for the execution."""
loop = self._get_loop(loop=loop)
if not loop:
return super(AsyncioContextProvider, self).active()
# the current unit of work (if tasks are used)
task = asyncio.Task.current_task(loop=loop)
if task is None:
return None
ctx = getattr(task, self._CONTEXT_ATTR, None)
if isinstance(ctx, Span):
return self._update_active(ctx)
return ctx