-
Notifications
You must be signed in to change notification settings - Fork 395
/
context_manager.py
107 lines (79 loc) · 2.54 KB
/
context_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import abc
import threading
from ddtrace.vendor import six
from .logger import get_logger
from ..context import Context
log = get_logger(__name__)
try:
from contextvars import ContextVar
_DD_CONTEXTVAR = ContextVar("datadog_contextvar", default=None)
CONTEXTVARS_IS_AVAILABLE = True
except ImportError:
CONTEXTVARS_IS_AVAILABLE = False
class BaseContextManager(six.with_metaclass(abc.ABCMeta)):
def __init__(self, reset=True):
if reset:
self.reset()
@abc.abstractmethod
def _has_active_context(self):
pass
@abc.abstractmethod
def set(self, ctx):
pass
@abc.abstractmethod
def get(self):
pass
def reset(self):
pass
class ThreadLocalContext(BaseContextManager):
"""
ThreadLocalContext can be used as a tracer global reference to create
a different ``Context`` for each thread. In synchronous tracer, this
is required to prevent multiple threads sharing the same ``Context``
in different executions.
"""
def __init__(self, reset=True):
# always initialize a new thread-local context holder
super(ThreadLocalContext, self).__init__(reset=True)
def _has_active_context(self):
"""
Determine whether we have a currently active context for this thread
:returns: Whether an active context exists
:rtype: bool
"""
ctx = getattr(self._locals, "context", None)
return ctx is not None
def set(self, ctx):
setattr(self._locals, "context", ctx)
def get(self):
ctx = getattr(self._locals, "context", None)
if not ctx:
# create a new Context if it's not available
ctx = Context()
self._locals.context = ctx
return ctx
def reset(self):
self._locals = threading.local()
class ContextVarContextManager(BaseContextManager):
"""
_ContextVarContext can be used in place of the ThreadLocalContext for Python
3.7 and above to manage different ``Context`` objects for each thread and
async task.
"""
def _has_active_context(self):
ctx = _DD_CONTEXTVAR.get()
return ctx is not None
def set(self, ctx):
_DD_CONTEXTVAR.set(ctx)
def get(self):
ctx = _DD_CONTEXTVAR.get()
if not ctx:
ctx = Context()
self.set(ctx)
return ctx
def reset(self):
_DD_CONTEXTVAR.set(None)
if CONTEXTVARS_IS_AVAILABLE:
DefaultContextManager = ContextVarContextManager
else:
DefaultContextManager = ThreadLocalContext