Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sender] enable buffer pool + concurrent sending threads #95

Merged
merged 24 commits into from
Jan 29, 2020

Conversation

truthbk
Copy link
Member

@truthbk truthbk commented Dec 26, 2019

This PR adds a buffer pool to the sender. The original code used a single pre-allocated buffer we reused to submit batched dogstatsd metrics. While efficient from a memory perspective, this approach implied we had to stop processing dogstatsd metrics while waiting on IO for every packet submission.

This PR aims to decouple actual packet transmission from dogstatsd message processing and packet assembling. To achieve this we introduce a pool of preallocated buffers and an additional queue, message are processed and batched into these buffers - blocking, if no buffers are available (further improvements are possible here, where we could entertain different strategies to deal with buffer unavailability), once a packet has been filled or no more messages are waiting, the buffer is queued in an outgoing queue. A workers thread (or threads) then will dequeue and write these buffers to the socket. The benefits of this approach, other than the high-level idea explained above, is that it should allow for better usage of the IOwait slice, as the OS kernel will likely do the right thing when scheduling pending work helping us increase the throughput.

TL;DR:

  • decouple processing of incoming dogstatsd messages from packet submission.
  • worker thread(s) for packet submission should allow the OS to better schedule work in the active client threads.
  • finer granular control with new knobs (queue size, number of sender threads, etc).

Note: the max perf test is useful for benchmarking but might be worth removing or modifying before merging - or disabling in the CI. It also conflicts currently with a similar test in jaime/perf, we can address that later.

Note: this should be merged onto jaime/perf (so before #94)

Copy link
Contributor

@ogaca-dd ogaca-dd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice PR!
Added some minor notes, feel free to ignore them.

import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.util.concurrent.Callable;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note:

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.Callable;

not used

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

@@ -0,0 +1,38 @@
package com.timgroup.statsd;

import java.io.IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Not used

pool.put(buffer);
}

int available() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Unused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still like it as a class method, so I think I'm going to keep it unless you strongly disagree, good catch.

boolean send(final String message) {
if (!shutdown) {
if (qSize.get() < qCapacity) {
messages.offer(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returned value of boolean java.util.Queue.offer(String e) might be checked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has now been moved to StatsDNonBlockingProcessor, there we use a ConcurrentLinkedQueue which never returns false on offer() because it is an unbounded queue.

}
} catch (final InterruptedException e) {
if (shutdown) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want to call handler.handle(e); as in line 76?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question, I'm not sure, we're going to gracefully shutdown, and handle doesn't do much, but it's a good question. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about it, I'll address in #94 if I decide we should handle these errors differently.

@truthbk
Copy link
Member Author

truthbk commented Jan 29, 2020

CI is green, merging to the big larger #94, final changes and fixes will be made there.

Thank you for the review.

@truthbk truthbk merged commit 5b0a99c into jaime/perf Jan 29, 2020
@truthbk truthbk deleted the jaime/buffer_pool branch January 29, 2020 21:04
@truthbk truthbk added this to the 2.10.0 milestone May 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants