-
Notifications
You must be signed in to change notification settings - Fork 395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a processing pipeline to AsyncWorker #303
Conversation
This makes it possible to do some processing/filtering of the traces before sending them to the agent. Add a FilterRequestsOnUrl processor to remove traces of incoming requests that match a regexp.
@bmermet thanks for the PR! Will take a look soon, in the meantime can you fix the
thanks a lot! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! Left some comments, feel free to ping me if you need more details!
It will help a lot other developers! Thanks!
ddtrace/tracer.py
Outdated
self.writer = AgentWriter(hostname or self.DEFAULT_HOSTNAME, port or self.DEFAULT_PORT) | ||
processing_pipeline = None | ||
if settings is not None and PP_KEY in settings: | ||
processing_pipeline = settings[PP_KEY] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can settings.get(PP_KEY)
so you don't need the condition PP_KEY in settings
.
ddtrace/settings.py
Outdated
@@ -0,0 +1,4 @@ | |||
PROCESSING_PIPELINE_KEY = "PROCESSING_PIPELINE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may call that module constants.py
since it contains (at least for now) only key strings. Also, in general, prefer '
over "
for consistency with the majority of modules in this project (at some point I will enforce via a tool one or the other!).
ddtrace/tracer.py
Outdated
@@ -7,6 +7,7 @@ | |||
from .sampler import AllSampler | |||
from .writer import AgentWriter | |||
from .span import Span | |||
from .settings import PP_KEY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing that, I'd suggest to remove PP_KEY
from the previous file and:
from .constants import PROCESSING_PIPELINE_KEY as PP_KEY
ddtrace/writer.py
Outdated
traces = self._apply_processing_pipeline(traces) | ||
except Exception as err: | ||
log.error("error while processing traces:{0}".format(err)) | ||
if traces: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can simply put everything inside the same if traces
, while keeping the try-except
separated so a failure in the pipeline will not prevent traces from being sent. Generally speaking, this is correct now and it's the best thing we can do to not introduce a possible huge change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept two if traces:
because self._apply_processing_pipeline
may return no traces if all of them are filtered out by the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh right, we don't have any further check; let's keep it that way so!
ddtrace/writer.py
Outdated
@@ -155,6 +168,19 @@ def _log_error_status(self, result, result_name): | |||
getattr(result, "status", None), getattr(result, "reason", None), | |||
getattr(result, "msg", None)) | |||
|
|||
def _apply_processing_pipeline(self, traces): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say to add a docstring here to explain briefly how the pipeline works. But what I'm more interested on, is to specify that traces
is owned by the AsyncWorker
thread, so it can be freely modified without using a mutex.
ddtrace/processors.py
Outdated
|
||
from .ext import http | ||
|
||
class FilterRequestsOnUrl(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sanity in Python 2 MRO, always inherit from object
:
class FilterRequestsOnUrl(object):
# ...
docs/index.rst
Outdated
|
||
**Write a custom processor** | ||
|
||
Creating your own processors is as simple as implementing a class with a process_trace method and adding it to the processing pipeline parameter of Tracer.configure. process_trace should either return a trace to be fed to the next step of the pipeline or None if the trace should be discarded. (see processors.py for example implementations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say to add a really small example instead of ask developers to check the processors.py
module. What about something like:
class ProcessorExample(object):
def process_trace(self, trace):
# write here your logic to return the `trace` or None;
# `trace` instance is owned by the thread and you can alter
# each single span if needed
Or something similar, because I still prefer a really concise snippet rather than long sentences. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it's quicker to grasp.
ddtrace/writer.py
Outdated
for processor in self._processing_pipeline: | ||
trace = processor.process_trace(trace) | ||
if trace is None: | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test for this case when the pipeline is short-circuited? You can have another test with two processors where the first one returns None
so that the second is not invoked. If it helps, you can use Mock on the second one such as:
processor = FilterRequestsOnUrl(r'http://example\.com/health')
processor = mock.Mock(processor, wraps=processor)
eq_(processor.process_trace.call_count, 0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in tests_writer.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
ddtrace/tracer.py
Outdated
processing_pipeline = settings[PP_KEY] | ||
|
||
if hostname is not None or port is not None or processing_pipeline is not None: | ||
self.writer = AgentWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have some indentation issues here. Don't know why flake8
didn't catch that!
34082d4
to
4f88de8
Compare
docs/index.rst
Outdated
configuring the tracer with a processing pipeline. For instance to filter out | ||
all traces of incoming requests to a specific url:: | ||
|
||
processing_pipeline = [FilterRequestsOnUrl(r'http://test\.example\.com')] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our example code can become:
Tracer.configure(settings={
'PROCESSING_PIPELINE': [
FilterRequestsOnUrl(r'http://test\.example\.com'),
],
})
just to make it simpler. Also, as we discussed, this is a WIP functionality so very likely PROCESSING_PIPELINE
will be named differently.
ddtrace/constants.py
Outdated
@@ -0,0 +1 @@ | |||
PROCESSING_PIPELINE_KEY = 'PROCESSING_PIPELINE' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bmermet what about calling it just FILTERS
? Considered that the main usage of the pipeline is to filter traces (or spans)
ddtrace/tracer.py
Outdated
if settings is not None: | ||
processing_pipeline = settings.get(PP_KEY) | ||
|
||
if hostname is not None or port is not None or processing_pipeline is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not something we have to change now, but I think the configure()
method must be changed in something else. I don't like the idea to initialize again the AgentWriter
, especially because if you set the processing_pipeline
and then in another call you change the hostname
, you end up with the right hostname but wrong pipeline (and viceversa).
Keeping this minor refactoring in another PR so that it doesn't impact this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! We can ship that in the next major release
This makes it possible to do some processing/filtering of the traces
before sending them to the agent.
Add a FilterRequestsOnUrl processor to remove traces of incoming
requests that match a regexp.
For instance filtering out all traces of incoming requests to http://test.example.com can be done simply by configuring the tracer with: