Exploring alternate approach to queueing & flushing#1127
Conversation
| @@ -1,34 +1,21 @@ | |||
| package datadog.trace.common.writer; | |||
|
|
|||
| import static datadog.trace.api.Config.DEFAULT_AGENT_HOST; | |||
There was a problem hiding this comment.
Please ignore the imports, I'll clean this up in the final pull request.
|
|
||
| private static final EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>> TRANSLATOR = | ||
| new EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>>() { | ||
| private static final EventTranslatorOneArg<TraceEvent, List<DDSpan>> TRANSLATOR = |
There was a problem hiding this comment.
This should probably be factor out into an NFC PR.
Our Event class is only used for a single List and only makes sense for List, so the parameterization is unnecessary.
|
|
||
| @Override | ||
| public void incrementTraceCount() { | ||
| traceCount.incrementAndGet(); |
There was a problem hiding this comment.
I haven't track down how incrementTraceCount is used exactly. Is it just for testing?
My hope is to move the representive count and other metadata into the DDApi.Request.
| disruptor.start(); | ||
| senderThread.start(); | ||
| running = true; | ||
| scheduleFlush(); |
There was a problem hiding this comment.
The senderThread now requests the flushes, so there's no need to schedule once the senderThread is started.
| private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>(); | ||
| private final Disruptor<TraceEvent> disruptor; | ||
|
|
||
| private final BlockingQueue<DDApi.Request> requestQueue = new ArrayBlockingQueue<>(16); |
There was a problem hiding this comment.
ArrayBlockingQueue effectively replaces the internal work queue of the ScheduledExecutorService.
This was done because the ScheduledExecutorService work queue is unbounded.
If we want to do a less invasive change, we can introduce a counting semaphore around the ScheduledExecutorService.
| import datadog.opentracing.DDSpan; | ||
| import datadog.opentracing.DDTraceOTInfo; | ||
| import datadog.trace.common.writer.unixdomainsockets.UnixDomainSocketFactory; | ||
| import lombok.extern.slf4j.Slf4j; |
There was a problem hiding this comment.
Again, please ignore the imports for now.
| @Override | ||
| public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) { | ||
| statsd.incrementCounter("queue.accepted"); | ||
| statsd.count("queue.accepted", 1); |
There was a problem hiding this comment.
The intention here is to always set the negative indicators to 0, so they stay visible in the UI.
This should definitely be separated into another PR.
| apiPhaser.register(); // Register on behalf of the scheduled executor thread. | ||
|
|
||
| sender = new Sender(flushFrequencySeconds, 5); | ||
| sender.register(); |
There was a problem hiding this comment.
sender registering onto the apiPhaser.
Although, as mentioned in the PR description, I think we need to change the implementation of flush and away from a single shared Phaser.
|
Potentially superseded by #1161. |
This is a work-in-progress branch of how we might change the internals of the DDAgentWriter.
I don't intend to merge this pull request as is. Rather I'd like to get opinions on parts of the approach contained within -- and then I'll break this into smaller pull requests.
This pull request contains a few experimental improvements...
The DDApi.Request.Builder replaces the List<byte[]> used to build up a payload for the trace_agent -- although it still contains a List<byte[]> internally.
The intention here is to...
DDAgentWriter's internal threading and queues...
This change keeps the Disruptor but the serializer/Disruptor thread but now builds up a payload using a DDApi.Request.Builder (mentioned above).
The ScheduledExecutor is replaced with a sender thread and an ArrayBlockingQueue<DDApi.Request>. The primary motivation for replacing the ScheduledExecutor is that internally it uses an unbounded queue.
The disruptor still enqueues a request (now DDApi.Request) when the the request becomes too large or a flush is requested.
However, flushes no longer happen on a strict timer.
Instead the sender thread -- does a blocking dequeue with a specified timeout.
Whenever the sender thread times out, it requests a flush from the Disruptor and then attempts to dequeue again.
The overall effect is that the sender effectively operates on the same schedule as before -- except...
The sender thread now performs retries to the agent in the event of a failure.
In the event of a failure, the sender will now exponentially back-off before retrying.
But the sender thread also slows down the rate at which it requests a flush from the Disruptor.
One part not included here, but that I feel also needs to change is the semantics/implementation of DDAgentWriter.flush.
flush currently cancels any flush that is already in-progress and then waits for the next flush to finish. While I believe this method is only used in tests, the current implementation is inherently problematic.
Multiple threads calling flush at the same time inhibit each other's progress by extending the time until the next flush is complete. A better semantic would be for each thread to only wait until the last span created in its thread is sent to the agent. This could be done by associating a Future with the DDApi.Request being built and then waiting for the Future to resolve.