Split TraceConsumer into two different disruptors#1161
Conversation
88baa9e to
e3054cb
Compare
| /** Old signature (pre-Monitor) used in tests */ | ||
| private DDAgentWriter(final DDAgentApi api) { | ||
| this(api, new Monitor.Noop()); | ||
| batchWritingDisruptor = |
There was a problem hiding this comment.
I think it would be fine to pass the Spec object done to the disruptors.
There was a problem hiding this comment.
Currently not all of the DDAgentWriter constructors use the Spec.
|
|
||
| if (0 < flushFrequencySeconds) { | ||
| // This provides a steady stream of events to enable flushing with a low throughput. | ||
| final Runnable heartbeat = |
There was a problem hiding this comment.
I'm not crazy about adding an extra executor for this.
The requesting flush on time out seen cleaner and lighter weight to me.
There was a problem hiding this comment.
Not sure what you mean "on time out"? if we don't have any events in the queue, the handler will never be called to trigger a flush.
There was a problem hiding this comment.
If you look at the approach in the experimental branch that I did, it doesn't have a scheduled timer.
Instead the sender thread does something akin to this pseudo-code...
while ( !Thread.current().isInterrupted() ) {
try {
DDApi.Request request = queue.poll(flushFrequencySecs, TimeUnit.SECONDS);
send(request);
} catch ( TimeoutException e ) {
// request flush
flush();
}
}
I prefer this because it is one fewer threads, but also because it is easier to have the sender back-off its schedule.
There was a problem hiding this comment.
To clarify, the heartbeat only ensures a minimum level of events to enable timely reporting in case no traces are sent in a given time window. The actual sending frequency can be adjusted in scheduleNextFlush().
| public volatile boolean shouldFlush = false; | ||
| public volatile T data = null; | ||
| public volatile int representativeCount = 0; | ||
| public volatile CountDownLatch flushLatch = null; |
There was a problem hiding this comment.
I think having a latch per batch is a big improvement in the flush semantics.
There was a problem hiding this comment.
Yes, I also like this much better than the previous phaser approach.
There was a problem hiding this comment.
A CountDownLatch might be overkill. We don't really need to wait for all the flushers to arrive before unblocking the others, but I don't think it is a big deal.
There was a problem hiding this comment.
What would you suggest using instead?
| if (event.data != null) { | ||
| try { | ||
| final byte[] serializedTrace = api.serializeTrace(event.data); | ||
| monitor.onSerialize(writer, event.data, serializedTrace); |
There was a problem hiding this comment.
I think there's a bug here. We shouldn't be calling onSerialize before we know if the publishing was successful.
There was a problem hiding this comment.
I admit, I had a hard time understanding how to translate the monitor calls. That aspect of this change warrants a thorough review.
There was a problem hiding this comment.
The monitor callbacks are following a couple rules...
1 - The call back happens after something is complete.
2 - The success and failure cases are split -- to force thinking carefully about failure.
So in general, I'd expect the callback to be at the end of a try block.
There was a problem hiding this comment.
I changed the order. Let me know if I'm missing anything else.
| private final Monitor monitor; | ||
| private final DDAgentWriter writer; | ||
| private final List<byte[]> serializedTraces = new ArrayList<>(); | ||
| private int representativeCount = 0; |
There was a problem hiding this comment.
It could track not just traces but also spans. We wanted to include in health metrics, but that wasn't terribly easy in the prior design.
There was a problem hiding this comment.
Do you mean the number of total spans? What's the benefit there?
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| @Slf4j | ||
| public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> { |
There was a problem hiding this comment.
I have some concerns with this. I'd actually like to see us get away from producing many tiny byte[].
I'd prefer to see us build up one big byte[] instead to reduce the amount of allocation.
DDApi.Request from the experimental branch was built with that in mind. I don't quite see how we do that with this design.
There was a problem hiding this comment.
We talked and came up with a good solution. Use a byte[] on the event as a buffer that gets reused and grows to satisfy the needed size and copy the array off to a large buffer when batching.
This requires moving off jackson though, so will be done in a separate PR.
| } | ||
| } | ||
| }; | ||
| heartbeatExecutor.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
To have more meaningful back-pressure, we probably also need to able to back-off on the rate that we are sending. How would that work with the heartbeatExecutor?
There was a problem hiding this comment.
The heartbeat Executor will only add an event to the queue if the queue is empty and it doesn't influence the frequency of flushing. What it does influence is the greatest amount of delay (beyond the flush frequency) that a flush will occur when the queue is empty. (ie, a flush will be at most 100 ms late from the 1/sec rate.)
|
I have a few concerns about how future changes will fit into this... Finally, it would help to have a comment / diagram that describes the overall publishing pipeline. That would the code easier to follow in the future. |
e3054cb to
a5b7705
Compare
First disruptor (TraceProcessingDisruptor) does processing, which is currently limited to serialization, but in the future can do other processing such as TraceInterceptor invocation. Second disruptor (BatchWritingDisruptor) takes serialized traces and batches them into groups and flushes them periodically based on size and time.
… metrics when traces are sampled.
090c9cb to
66928ae
Compare
| // attempt to have agent scale the metrics properly | ||
| ((DDSpan) event.data.get(0).getLocalRootSpan()) | ||
| .context() | ||
| .setMetric("_sample_rate", 1d / event.representativeCount); |
There was a problem hiding this comment.
@gbbr does this look like a legit way of getting our _sample_rate scaling done by the agent to be accurate?
There was a problem hiding this comment.
The agent doesn't do any scaling, it's the backend, so I wouldn't be able to tell. _sample_rate is expected to hold the rate that a local client sampler (the one that doesn't send stuff to the agent at all) is using IIRC. @furmmon is our expert for answering any questions around sampling, maybe you can confirm.
| if (traceProcessingDisruptor.running) { | ||
| final int representativeCount = traceCount.getAndSet(0) + 1; | ||
| final int representativeCount; | ||
| if (trace.isEmpty() || !(trace.get(0).isRootSpan())) { |
There was a problem hiding this comment.
This might not work if the last span reported isn't the root span. This might be an issue for async traces and for partial flush traces. Any better ideas?
Also rename the builder class on DDTracer to default name generated by Lombok.
66928ae to
5cce4cb
Compare
| this.writer = writer; | ||
| } | ||
|
|
||
| // TODO: reduce byte[] garbage by keeping the byte[] on the event and copy before returning. |
There was a problem hiding this comment.
So reducing byte[] remains to be done, I think that's fine for now. We can revisit that after ripping out Jackson.
randomanderson
left a comment
There was a problem hiding this comment.
I ran the performance tests and got nearly identical results (~5,300 traces/s) from my local laptop on master and this PR
First disruptor (TraceProcessingDisruptor) does processing, which is currently limited to serialization, but in the future can do other processing such as TraceInterceptor invocation.
Second disruptor (BatchWritingDisruptor) takes serialized traces and batches them into groups and flushes them periodically based on size and time.