/
patch.py
121 lines (98 loc) · 4.22 KB
/
patch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 3p
import redis
from ddtrace.vendor.wrapt import wrap_function_wrapper as _w
# project
from ddtrace import config
from ...constants import ANALYTICS_SAMPLE_RATE_KEY
from ...pin import Pin
from ...ext import AppTypes, redis as redisx
from ...utils.wrappers import unwrap
from ..helpers import require_package
from .util import format_command_args, _extract_conn_tags
def patch():
"""Patch the instrumented methods
This duplicated doesn't look nice. The nicer alternative is to use an ObjectProxy on top
of Redis and StrictRedis. However, it means that any "import redis.Redis" won't be instrumented.
"""
if getattr(redis, '_datadog_patch', False):
return
setattr(redis, '_datadog_patch', True)
with require_package('redis<3.0.0') as exists:
if exists:
_w('redis', 'StrictRedis.execute_command', traced_execute_command)
_w('redis', 'StrictRedis.pipeline', traced_pipeline)
_w('redis', 'Redis.pipeline', traced_pipeline)
_w('redis.client', 'BasePipeline.execute', traced_execute_pipeline)
_w('redis.client', 'BasePipeline.immediate_execute_command', traced_execute_command)
with require_package('redis>=3.0.0') as exists:
if exists:
_w('redis', 'Redis.execute_command', traced_execute_command)
_w('redis', 'Redis.pipeline', traced_pipeline)
_w('redis.client', 'Pipeline.execute', traced_execute_pipeline)
_w('redis.client', 'Pipeline.immediate_execute_command', traced_execute_command)
Pin(service=redisx.DEFAULT_SERVICE, app=redisx.APP, app_type=AppTypes.db).onto(redis.StrictRedis)
def unpatch():
if getattr(redis, '_datadog_patch', False):
setattr(redis, '_datadog_patch', False)
with require_package('redis<3.0.0') as exists:
if exists:
unwrap(redis.StrictRedis, 'execute_command')
unwrap(redis.StrictRedis, 'pipeline')
unwrap(redis.Redis, 'pipeline')
unwrap(redis.client.BasePipeline, 'execute')
unwrap(redis.client.BasePipeline, 'immediate_execute_command')
with require_package('redis>=3.0.0') as exists:
if exists:
unwrap(redis.Redis, 'execute_command')
unwrap(redis.Redis, 'pipeline')
unwrap(redis.client.Pipeline, 'execute')
unwrap(redis.client.Pipeline, 'immediate_execute_command')
#
# tracing functions
#
def traced_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return func(*args, **kwargs)
with pin.tracer.trace(redisx.CMD, service=pin.service, span_type=redisx.TYPE) as s:
query = format_command_args(args)
s.resource = query
s.set_tag(redisx.RAWCMD, query)
if pin.tags:
s.set_tags(pin.tags)
s.set_tags(_get_tags(instance))
s.set_metric(redisx.ARGS_LEN, len(args))
# set analytics sample rate if enabled
s.set_tag(
ANALYTICS_SAMPLE_RATE_KEY,
config.redis.get_analytics_sample_rate()
)
# run the command
return func(*args, **kwargs)
def traced_pipeline(func, instance, args, kwargs):
pipeline = func(*args, **kwargs)
pin = Pin.get_from(instance)
if pin:
pin.onto(pipeline)
return pipeline
def traced_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return func(*args, **kwargs)
# FIXME[matt] done in the agent. worth it?
cmds = [format_command_args(c) for c, _ in instance.command_stack]
resource = '\n'.join(cmds)
tracer = pin.tracer
with tracer.trace(redisx.CMD, resource=resource, service=pin.service) as s:
s.span_type = redisx.TYPE
s.set_tag(redisx.RAWCMD, resource)
s.set_tags(_get_tags(instance))
s.set_metric(redisx.PIPELINE_LEN, len(instance.command_stack))
# set analytics sample rate if enabled
s.set_tag(
ANALYTICS_SAMPLE_RATE_KEY,
config.redis.get_analytics_sample_rate()
)
return func(*args, **kwargs)
def _get_tags(conn):
return _extract_conn_tags(conn.connection_pool.connection_kwargs)