Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

m broker <-> n threads #679

Closed

Conversation

@janmejay
Copy link
Contributor

commented Jun 1, 2016

  • create pre-configured number of broker threads (these threads do everything that broker threads used to do in 1:1 broker to thread model). The way they pace computation is different though, because they are multiplexing many brokers now.
  • new entity called rd_kafka_broker_thread_t (rkbt), which gets brokers assigned in a way that tries to balances number of brokers assigned per-thread.
  • termination and freeing of rkb is performed on rkbt
  • rkbt has a separate rkb array owned by the thread which is re-populated every time a broker is added or removed. Here addition or removal use rkbt broker-assignment lock, but this array is exclusively owned by rkbt. The rkb pointers are copied over this array and then used lock-lessly.
  • this introduces a new config parameter called 'broker.thread.count' (defaults to 1) and identifies number of threads user intends to run for all the broker related work, which allows user to configure n (thread count) independent of m (broker count)

Note: have tried to fix windows support in transport (WSApoll call), but not sure if that is the right thing to do. That area needs a very close review.

- create pre-configured number of broker threads (these threads do everything that broker threads used to do in 1:1 broker to thread model). The way they pace computation is different though, because they are multiplexing many brokers now.
- new entity called rd_kafka_broker_thread_t (rkbt), which gets brokers assigned in a way that tries to balances number of brokers assigned per-thread.
- termination and freeing of rkb is performed on rkbt
- rkbt has a separate rkb array owned by the thread which is re-populated every time a broker is added or removed. Here addition or removal use rkbt broker-assignment lock, but this array is exclusively owned by rkbt. The rkb pointers are copied over this array and then used lock-lessly.
- this introduces a new config parameter called 'broker.thread.count' (defaults to 1) and identifies number of threads user intends to run for all the broker related work, which allows user to configure n (thread count) independent of m (broker count)

Note: have tried to fix windows support in transport (WSApoll call), but not sure if that is the right thing to do. That area needs a very close review.
@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 1, 2016

@edenhill here is the status of tests with this change:

  • all tests pass (bare)
  • all valgrind tests except 0014 pass (0014 is failing on master, haven't checked why yet, it fails in PR too)
  • all valgrind tests pass with this patch over HEAD as of Apr 30 (https://github.com/janmejay/librdkafka/commits/multiple_brokers_per_thread)
  • all helgrind tests pass this patch over HEAD as of Apr 30 (same branch)
  • some helgrind tests fail as of now on this rebased patch (they same set of tests fail on master too, without this patch)

I haven't tested it on windows and because I have changed WSApoll call, this requires a deeper review of that area.

Also found a problem in trivup (the kafka_path is hardcoded for your dev-env, I think). I had to change it to match my dev-env. Other than that, it worked good.

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 1, 2016

TEST 0038 takes the same time across master and this patch. Both take 9 seconds. This is with default of broker-threads = 1 when using this patch.

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 1, 2016

BTW, this is with trivup pointed to 0.9.0.1. I'll do another run with 0.10 and post the results here.

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 1, 2016

0.10.0.0 comes out clean too (both bare and valgrind)

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 2, 2016

Everything comes clean on 0.8.2.1 as well (haven't tested other 0.8.x.y versions).

BTW, 0014 with valgrind fails intermittently. Haven't had a chance to dig more yet.

I'll roll this out in a test cluster and plan to perform large cluster benchmarks with it after validation run.

Will keep you posted.

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 10, 2016

This has been deployed (0.9.1 + patch) to a production env and has been working well for a few days now. In this scenario it is being used as a publisher that auto-partitions across 49 brokers (each broker having 10s of partitions). This is with default broker.thread.count (which is 1). Each producer node is pushing at ~100 Mbps and there are 66 producer nodes.

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 20, 2016

So benchmark with 1 or 2 broker thread does extremely good (no CPU burn, no run-queue load visible). This is with 300 brokers.

Here is a profile with the change (2 broker-threads, 300 brokers):
screenshot from 2016-06-17 17-04-54

Here is the old one for comparison (this was using the old thread-per-broker model) that I shared with you over mail when we started talking about this.
screenshot-20160129 170249

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jun 20, 2016

In the second screenshot (old), section marked "Production" was running 49 brokers (which is why the problems does not show up). The one marked "Benchmark" was running 300 brokers (afair), which clearly shows the problem.

@edenhill

This comment has been minimized.

Copy link
Owner

commented Jul 5, 2016

Sorry for not getting back to you sooner, this is a substantial change and I need time to go through it.

I have some questions:

  • How are brokers balanced among the available threads?
  • When are brokers rebalanced?
  • How is throughput affected? (e.g., using rdkafka_performance)
@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Jul 5, 2016

Hi Magnus, no problem. Answers:

Q: How are brokers balanced among the available threads?
A: All broker threads start with 0 brokers assigned. When adding a new broker it picks the broker threads with lowest number of assigned brokers (a per-broker-thread counter).

Q: When are brokers rebalanced?
A: As of now, never. But this is fairly simple to do. Intuitively it requires the same level of exclusive access as broker-addition. We can basically take all brokers out and add them back in according to the balancing criterion.

Q: How is throughput affected? (e.g., using rdkafka_performance)
A: Data from separate load-tests:

  • With 2 broker-threads, 300 brokers, 150 producers and aggregate throughput of 6M messages per second with each message between 1.5 to 1.8 KB: This was with snappy. Aggregate TP per producer: 64 Mbps (compressed).
  • With 1 broker-thread, 100 Mbps per producer, compressed(snappy), in a cluster of 49 brokers receiving from 66 producers.

Intuitively, because work that was dispatched on the broker thread hasn't changed, I don't anticipate any drop in performance in small clusters (small clusters can afford higher thread:broker ratio too). Large clusters such as the first-test above show remarkable improvement in screenshots). With 1:1 model we used to grind-halt in terms of throughput due to CPU burn. Even on 49 node kafka cluster (49 brokers) it used to burn atleast > 30% more CPU with 1:1 model, so lower compute overhead should lead to better throughput.

Every load-testing environment is different, so I think it may be worth running a perf-test in controlled environment. Because number of broker threads is configurable, smaller clusters can choose 1:1 model (balancing criterion will ensure 1:1 if number of broker threads is same as number of brokers).

…o unset/clear it, if connect was not successful by the first poll call. So this was a race-condition between connect finishing successfully and first poll call returning. Sometimes poll call returned first which lead to POLLOUT event-mask being cleared from pollfd, causing broker to remain in RD_KAFKA_BROKER_STATE_CONNECT perpetually. This fixes it by not trying to set state while connecting, but instead recognizing before poll that RD_KAFKA_BROKER_STATE_CONNECT state requires POLLOUT enabled for connect-completion to be recorded and hence state-transition to UP state.
@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Aug 24, 2016

Fixed a bug (race condition between connect and poll) that left brokers in RD_KAFKA_BROKER_STATE_CONNECT state if acquiring the connection took long enough.

This is not related to m broker <-> n threads patch, this was an existing unrelated bug.

@ghost

This comment has been minimized.

Copy link

commented Nov 2, 2016

@edenhill Hi Magnus, can this patch getting merged into trunk? The Kafka Java client already does this and this would help us run librdkafka with a large number of broker machines.

@ljackson

This comment has been minimized.

Copy link

commented Feb 7, 2017

Bump? Whats the status of this change wrt master we may need this optimization soon?

@edenhill

This comment has been minimized.

Copy link
Owner

commented Feb 7, 2017

I really appreciate the effort in this PR, and I'll make use of some parts of it, but the way forward is to split up broker threads into IO threads (low-latency) and partition threads (high concurrency).
The target is the June release.

@ljackson Can you tell me what problems you are seeing?

@ljackson

This comment has been minimized.

Copy link

commented Feb 7, 2017

Nothing as of yet but we are just ramping up the replacement of Sarama with librdkafka/golang wrapper and we have large Kafka clusters and will be adding more. Mainly I wanted to understand how you were addressing the one thread per broker potential issues. Thx

@edenhill

This comment has been minimized.

Copy link
Owner

commented Feb 7, 2017

Happy to hear that.
What number of brokers, topics and typical partition counts do you reckon you'll see the coming 12 months?

@janmejay

This comment has been minimized.

Copy link
Contributor Author

commented Feb 8, 2017

@edenhill can you describe the design you have in mind in more detail?

@edenhill

This comment has been minimized.

Copy link
Owner

commented Feb 8, 2017

@janmejay This design stems from solving the latency issue of waiting for condvars and IO simultaneously, which is not possible on most platforms:

  • have one (or more) IO threads that use the most appropriate IO event mechanism for the platform (epoll on linux, kqueues on osx, ...). They only wait on IO, not buffer queues, and it is up to the other threads to enable POLLOUT on fds when they have something to send. E.g., IO-based wakeups rather than condvar-based. This solves the latency issue. Transmits will be immediate. One IO thread should be enough.
  • have a set of partition worker threads that does batching, (de)compression, protocol parsing, etc. This concurrency is for performance. How this thread pool is scaled (automatically, statically, weighed, ..) is not decided on.
  • the broker control plane is moved to the main rdkafka thread, with some parts (fetch decisions) moved to partition threads.
@rgerhards

This comment has been minimized.

Copy link
Contributor

commented Jan 4, 2018

Just for the records: we are also getting user questions on kafka tuning from high-performance rsyslog users.

@edenhill

This comment has been minimized.

Copy link
Owner

commented Jan 4, 2018

We're looking to address this issue in Q2.

@edenhill

This comment has been minimized.

Copy link
Owner

commented Jan 4, 2018

@qduyang

This comment has been minimized.

Copy link

commented Aug 2, 2018

Hi, there are too many threads if i have multiple consumers, and it will cause performance issue if there are thread context switching.
May I know when we could have the feature to reduce thread count?

@rnpridgeon

This comment has been minimized.

Copy link
Collaborator

commented Aug 2, 2018

@qduyang

This comment has been minimized.

Copy link

commented Aug 3, 2018

@rnpridgeon Thanks for your information. But the solution isn't going to solve my problem, because I have to consume messages from multiple sources in parallel. For example I have 10 separate Kafka message sources (with each source has 3 brokers), in this case there would have about ~40 threads in background and which would cause heavy CPU contention.

@edenhill edenhill closed this Jul 12, 2019
@rgerhards

This comment has been minimized.

Copy link
Contributor

commented Jul 12, 2019

@edenhill does the "close" mean this issue has now been addressed?

@edenhill

This comment has been minimized.

Copy link
Owner

commented Jul 12, 2019

@rgerhards No, we're still spawning one thread per broker.
We'll eventually look into fixing this, but not in the near term.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.