-
Notifications
You must be signed in to change notification settings - Fork 212
/
base.py
110 lines (90 loc) · 3.98 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import functools
import logging
import os
from elasticapm.traces import get_transaction
from elasticapm.utils import wrapt
logger = logging.getLogger(__name__)
class AbstractInstrumentedModule(object):
name = None
instrument_list = [
# List of (module, method) pairs to instrument. E.g.:
# ("requests.sessions", "Session.send"),
]
def __init__(self):
"""
:param client: elasticapm.base.Client
"""
self.originals = {}
self.instrumented = False
assert self.name is not None
def get_wrapped_name(self, wrapped, instance, fallback_method=None):
wrapped_name = []
if hasattr(instance, '__class__') and hasattr(instance.__class__, '__name__'):
wrapped_name.append(instance.__class__.__name__)
if hasattr(wrapped, '__name__'):
wrapped_name.append(wrapped.__name__)
elif fallback_method:
attribute = fallback_method.split('.')
if len(attribute) == 2:
wrapped_name.append(attribute[1])
return ".".join(wrapped_name)
def get_instrument_list(self):
return self.instrument_list
def instrument(self):
if self.instrumented:
return
skip_env_var = 'SKIP_INSTRUMENT_' + str(self.name.upper())
if skip_env_var in os.environ:
logger.debug("Skipping instrumentation of %s. %s is set.",
self.name, skip_env_var)
return
try:
instrument_list = self.get_instrument_list()
skipped_modules = set()
for module, method in instrument_list:
try:
# Skip modules we already failed to load
if module in skipped_modules:
continue
# We jump through hoop here to get the original
# `module`/`method` in the call to `call_if_sampling`
parent, attribute, original = wrapt.resolve_path(module, method)
self.originals[(module, method)] = original
wrapper = wrapt.FunctionWrapper(
original,
functools.partial(self.call_if_sampling, module, method),
)
wrapt.apply_patch(parent, attribute, wrapper)
except ImportError:
# Could not import module
logger.debug("Skipping instrumentation of %s."
" Module %s not found",
self.name, module)
# Keep track of modules we couldn't load so we don't
# try to instrument anything in that module again
skipped_modules.add(module)
except AttributeError as ex:
# Could not find thing in module
logger.debug("Skipping instrumentation of %s.%s: %s",
module, method, ex)
except ImportError as ex:
logger.debug("Skipping instrumentation of %s. %s",
self.name, ex)
self.instrumented = True
def uninstrument(self):
if not self.instrumented or not self.originals:
return
for module, method in self.get_instrument_list():
if (module, method) in self.originals:
parent, attribute, wrapper = wrapt.resolve_path(module, method)
wrapt.apply_patch(parent, attribute, self.originals[(module, method)])
self.instrumented = False
self.originals = {}
def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
transaction = get_transaction()
if not transaction or not transaction.is_sampled:
return wrapped(*args, **kwargs)
else:
return self.call(module, method, wrapped, instance, args, kwargs)
def call(self, module, method, wrapped, instance, args, kwargs):
raise NotImplemented