Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved in max-pending-bytes mechanism for broker #7406

Merged
merged 3 commits into from
May 2, 2021

Conversation

merlimat
Copy link
Contributor

@merlimat merlimat commented Jul 1, 2020

Motivation

There are few issues with the current implementation of the broker-wide throttling based on max-outstanding bytes in broker that was added in #6178.

  1. The current implementation is over-counting the outstanding bytes when there are >1 producers on a given connection. It's cycling through the producers and adding the per-connection counter one time per each producer.
  2. There is 1 atomic increment and 2 volatile reads per each request
  3. There is a delay for detecting the memory over-commit, due to the background task running periodically
  4. If there is a substantial amount of producers, the tasks that runs every 100ms will consume significant CPU by looping over all the producers (many of which could be idle) all the time

The improvement proposed here is using thread-local counters to avoid contention and CPU overhead.

  1. Use 1 counter per IO thread. Once the counter for that thread exceed 1/N of the quota, throttle all the connections that are pinned to the thread.
  2. This will also ensure that 1 single connection trying to publish too fast can throttle all other connections in the broker. Rather, it will only affect 1/N of connections.
  3. No need for atomic/volatile, just thread-local and local variables, since we're always operating on connections that belongs to the same IO thread.
  4. Precise enforcement with no CPU overhead for idle producers

Regarding the lower limit per each IO thread, in which 1 single connection cannot use the entire memory space, I don't think it's an issue at all.

Once there is a "window" of several MB, there is no throughput gain in having a bigger window. Also, In most scenarios, the only time we'd be filling this window is when the downstream BK is either malfunctioning or under-provisioned for the load. Either case, it's not a performance concern.

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jul 1, 2020
@merlimat merlimat added this to the 2.7.0 milestone Jul 1, 2020
@merlimat merlimat self-assigned this Jul 1, 2020
private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>() {
@Override
protected Set<ServerCnx> initialValue() throws Exception {
return Collections.newSetFromMap(new IdentityHashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of this over "new HashSet<>()"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ServerCnx doesn't have a hashCode() method I thought to be on the safe side to just make sure to use == operator instead of hashing. Honestly, I'm not 100% sure that would make difference in practice.

@@ -2307,4 +2253,16 @@ private boolean isSystemTopic(String topic) {
public void setInterceptor(BrokerInterceptor interceptor) {
this.interceptor = interceptor;
}

public void pausedConnections(int numberOfConnections) {
pausedConnections.addAndGet(numberOfConnections);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exported via prometheus? It would be better to have separate event counters for pause and resume so that if pausing happens between prometheus pulls we can see it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I just exposed it here to for validation in the tests, though it makes sense to expose the 2 counters.

@codelipenghui
Copy link
Contributor

Regarding the lower limit per each IO thread, in which 1 single connection cannot use the entire memory space, I don't think it's an issue at all.

There are some users who use few producers to publish messages, in order to have a better batch effect to improve throughput. We cannot assume 1 single connection cannot use the entire memory space. Our purpose is to control memory and make effective use of memory. I think split memory according to io thread may decrease the memory utilization.

How about using a LongAdder to record the current pending bytes and remove the check tasks.

@ivankelly
Copy link
Contributor

ivankelly commented Jul 1, 2020

@codelipenghui A LongAdder would still cause contention on every entry, as to sum it has to view all cells. To allow a single thread to use the full buffer when there's little traffic on the other threads, we should use a token bucket, as networking folks have been doing for decades.

@ivankelly ivankelly closed this Jul 1, 2020
@ivankelly ivankelly reopened this Jul 1, 2020
@merlimat
Copy link
Contributor Author

merlimat commented Jul 1, 2020

There are some users who use few producers to publish messages, in order to have a better batch effect to improve throughput.

I have, of course, tested it and as expected once you have few MBs of runway, there's no throughput advantage in allowing it to grow higher.

We cannot assume 1 single connection cannot use the entire memory space.

We should assume that, because allowing 1 single connection to take over the entire quota of the broker and stalling all other connections is bad.

Our purpose is to control memory and make effective use of memory.

Effectively and efficiently, in a scalable manner.

I think split memory according to io thread may decrease the memory utilization.

If there's no negative impact on throughput, that is a good thing. That memory can be used on dispatch side.

How about using a LongAdder to record the current pending bytes and remove the check tasks.

The LongAdder would not allow you to remove the check tasks. The LongAdder only difference, compared to an AtomicLong, is that it's optimized for writes (thread-local counters), but the read path is extra slow and meant to be used infrequently.

That doesn't solve the multiple issue I've outlined above, in particular the need for the check task.

A single broker, can be very easily be serving 100s of thousands of producers at a given point in time. Some of them active, some of them idle.

Frequently looping over all the producers (just to get to the connections) is just burning a lot of CPU.

@codelipenghui
Copy link
Contributor

Frequently looping over all the producers (just to get to the connections) is just burning a lot of CPU.

I agree with this point. I just want to say the pending buffer is sensitive to write throughput, especially on machines with less memory. If we split it into multiple parts, when there are fewer connections, the channel will enable auto-read, disable auto-read frequently, And it is possible that some parts are available pending more messages.

The LongAdder would not allow you to remove the check tasks. The LongAdder only difference, compared to an AtomicLong, is that it's optimized for writes (thread-local counters), but the read path is extra slow and meant to be used infrequently.

The io threads count not much, will we add up to a few numbers each time will become a bottleneck?

@merlimat
Copy link
Contributor Author

merlimat commented Jul 1, 2020

I just want to say the pending buffer is sensitive to write throughput, especially on machines with less memory. If we split it into multiple parts, when there are fewer connections, the channel will enable auto-read, disable auto-read frequently,

How much is less memory? Once you have 10s of MB per thread (or per connection, if you have a single connection), there will be no more improvement in letting it use more memory.

It's the same reason for which the OS network stack doesn't let you grow the TCP window indefinitely. There are OS limits that are in place, because: (1) too much doesn't help and (2) it starve other users.

The current default settings we have is -XX:MaxDirectMemorySize=4g.
That means that with 16 cores, you'd get, by default, 32 IO threads.
With this, each thread gets 128MB of buffer size, which is far greater than any TCP window size you'd get from Linux.

Also, 4G for a VM with 16 cores is very little memory ratio. Such VM would typically be having 64GB or higher.

Reversely, in container environment, mem is usually capped lower, but so is the CPU limit. If you limit the CPUs on the broker container, the default IO threads count will adjust accordingly, balancing the situation.

If we split it into multiple parts, when there are fewer connections, the channel will enable auto-read, disable auto-read frequently,

We're already doing that with the per-connection pendingSendRequest (default 1K) and there's no perf impact.

And it is possible that some parts are available pending more messages.

The io threads count not much, will we add up to a few numbers each time will become a bottleneck?

I'm not sure what you mean here.

@codelipenghui
Copy link
Contributor

The io threads count not much, will we add up to a few numbers each time will become a bottleneck?
I'm not sure what you mean here.

Sorry I didn't make it clear. I mean if we have 32 io thread, for each message we sum 32 numbers, is this will be a bottleneck of the broker? I just want to know the disadvantages of using Longadder.

@merlimat
Copy link
Contributor Author

merlimat commented Jul 6, 2020

Sorry I didn't make it clear. I mean if we have 32 io thread, for each message we sum 32 numbers, is this will be a bottleneck of the broker? I just want to know the disadvantages of using Longadder.

@codelipenghui With longadder you'd still need to run a background thread to do the enforcement, because the increment part is low-contention, but reading the "actual value" is on the slow path.

@jiazhai
Copy link
Member

jiazhai commented Jul 14, 2020

@codelipenghui to help review this pr

Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a frequently occurring issue for us and I don't see any review/comment on #7499 .
@merlimat if you have already tested this change then let's merge this approach.
👍

@rdhabalia
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

@merlimat Could you please rebase to the master branch? So that we can onboard this PR in 2.7.0

@codelipenghui codelipenghui modified the milestones: 2.7.0, 2.8.0 Nov 17, 2020
Copy link
Contributor

@315157973 315157973 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can solve the current OOM problem

@315157973
Copy link
Contributor

Assuming 32 IO threads, 8G direct memory. The buffer of each IO thread is 256MB. Bookie’s write latency is about 10ms. Under stable conditions, 100* 256MB = 25G can be written per second, which is enough to fill bookie.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@codelipenghui
Copy link
Contributor

@merlimat Could you please fix the conflicts so that we can advance this PR.

@hangc0276
Copy link
Contributor

Great job, it can solve the current broker OOM, would you please rebase master branch and resolve the conflicts? @merlimat

Copy link
Contributor

@ronfarkash ronfarkash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you finish this PR, this fixes a major issue in the project that should not be neglected. @merlimat

@eolivelli
Copy link
Contributor

@merlimat do you want to rebase this onto current master ?
it looks like it is a very good improvement

@merlimat
Copy link
Contributor Author

Yes, I’ll get this ready in the next few days. There are also some improvements I want to do to this PR.

@galrose
Copy link
Contributor

galrose commented Apr 29, 2021

@merlimat thats great thank you

@merlimat
Copy link
Contributor Author

In a subsequent PR, I'll be adding a bucket-token based mechanism to handle different usages across multiple threads.

private final long maxMessagePublishBufferBytes;
private final long resumeProducerReadMessagePublishBufferBytes;
private volatile boolean reachMessagePublishBufferThreshold;
private final AtomicInteger pausedConnections = new AtomicInteger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reading frequency here should be very low, only used in unit tests or metrics, why not use longAdder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, changed that.

@merlimat merlimat merged commit 2a522c8 into apache:master May 2, 2021
@merlimat merlimat deleted the max-pending-bytes branch May 2, 2021 17:44
BewareMyPower added a commit to BewareMyPower/kop that referenced this pull request May 11, 2021
…native#488)

This PR upgrades pulsar dependency to 2.8.0-rc-202105092228, which has
two major API changes.

apache/pulsar#10391 changed `LoadManager` API so
that `MetadataCache` is used instead of `ZookeeperCache` in this PR.

apache/pulsar#7406 changed the throttling
strategy. However, currently KoP is different from Pulsar that the
produce and its callback may be in different threads. KoP calls
`PersistentTopic#publishMessages` in a callback of
`KafkaTopicManager#getTopic` if the returned future is not completed
immediately. Otherwise, it's called just in the I/O thread. Therefore,
here we still use a **channel based** publish bytes stats for
throttling, while apache/pulsar#7406 uses a
**thread based** publish bytes stats.

The other refactors are:
1. Change the throttling related fields from `InternalServerCnx` to
   `KafkaRequestHandler`.
2. Use `BrokerService#getPausedConnections` to check if the channel's
   auto read is disabled and modify the tests as well.
jiazhai pushed a commit to streamnative/kop that referenced this pull request May 13, 2021
This PR upgrades pulsar dependency to 2.8.0-rc-202105092228, which has two major API changes.

apache/pulsar#10391 changed `LoadManager` API so that `MetadataCache` is used instead of `ZookeeperCache` in this PR.

apache/pulsar#7406 changed the throttling strategy. However, currently KoP is different from Pulsar that the produce and its callback may be in different threads. KoP calls `PersistentTopic#publishMessages` in a callback of `KafkaTopicManager#getTopic` if the returned future is not completed immediately. Otherwise, it's called just in the I/O thread. Therefore, here we still use a **channel based** publish bytes stats for throttling, while apache/pulsar#7406 uses a **thread based** publish bytes stats.

The other refactors are:
1. Change the throttling related fields from `InternalServerCnx` to `KafkaRequestHandler`.
2. Use `BrokerService#getPausedConnections` to check if the channel's auto read is disabled and modify the tests as well.



* Fix LoadManager interface
* Refactor publish throttling
* Remove ZookeeperCache usage
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants