Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Undesirable resource usage related to producer concurrency #132

Closed
oliveigah opened this issue Oct 6, 2023 · 8 comments
Closed

Undesirable resource usage related to producer concurrency #132

oliveigah opened this issue Oct 6, 2023 · 8 comments

Comments

@oliveigah
Copy link
Contributor

oliveigah commented Oct 6, 2023

Context

I've noticed that the current implementation of BroadwayKafka.BrodClient.setup/4 always starts a new :brod client as follow:

def setup(stage_pid, client_id, callback_module, config) do
    with :ok <- :brod.start_client(config.hosts, client_id, config.client_config),
         {:ok, group_coordinator} <-
           start_link_group_coordinator(stage_pid, client_id, callback_module, config) do
      Process.monitor(client_id)
      ref = Process.monitor(group_coordinator)
      Process.unlink(group_coordinator)
      {:ok, group_coordinator, ref}
    end
  end

The problem is that this function is called for every new BroadwayKafka.Producer which may be initialized multiple times if producer concurrency is set to a number greater than one.

At my current understanding, in order to achieve maximum parallelism the number of broadway producers we need is the lowest between schedulers online and the sum of all topic's partitions.

But with the current implementation this would lead to a new TCP connection with each broker of the cluster for each one of the producers which is undesirable.

Proposal

Since 1 brod client is enough to handle most workloads, we could offer a new client_option called max_concurrency defaults to :infinity that would control how many brod clients will be started.

At my first look at the code I think the best approach would be start all brod clients before any producer and select a random client for each one on intialization.

The general approach consist in the following changes:

  • On BroadwayKafka.Producer.init/1 call a function maybe_start_clients(opts) that will return a list of tuples {client_id, group_coordinator} select a random tuple to use as its internal state

  • maybe_start_clients(opts) If client max_concurrency is infinity, starts a single client and return a single tuple. If it is a positive integer start N clients and return it if they are not yet started. It will save the information about started clients on a shared resource such as an ETS table or persistent term

This proposal is very broad and I'll probably need to refine it during development considering possible side effects.

Closing thoughts

Let me know if all this makes sense to you or if I misunderstood something about the problem or it's another way to solve this with the current features we have.

If all make sense I'll start working on the PR for this. Just let me know! Thanks! 😃

@oliveigah oliveigah changed the title Undeseried resource usage for BroadwayKafka.BrodClient related to producer_concurrency Undesired resource usage for BroadwayKafka.BrodClient related to producer_concurrency Oct 6, 2023
@oliveigah oliveigah changed the title Undesired resource usage for BroadwayKafka.BrodClient related to producer_concurrency Undesired resource usage for BroadwayKafka.BrodClient related to producer concurrency Oct 6, 2023
@oliveigah oliveigah changed the title Undesired resource usage for BroadwayKafka.BrodClient related to producer concurrency Undesirable resource usage for BroadwayKafka.BrodClient related to producer concurrency Oct 6, 2023
@oliveigah oliveigah changed the title Undesirable resource usage for BroadwayKafka.BrodClient related to producer concurrency Undesirable resource usage related to producer concurrency Oct 7, 2023
@josevalim
Copy link
Member

I am ok with sharing the client but I think your proposed solution would be non-trivial. Perhaps the best is to have a shared_client: boolean() configuration. If true, you create the client in the supervision tree and shares it, otherwise it is one per producer. WDYT? If you are happy with it, I'd love to review a PR. :)

@oliveigah
Copy link
Contributor Author

Yeah, that sounds simpler indeed. The only problem I can see is that it's a one or all solution.

Maybe receive an integer to create multiple clients and select a random one on producer initialization?

What is the part you consider non trivial? Multiple (but not all) clients or starting them inside producer init?

Gonna start to work on a PR receiving a boolean flag at first and we decide about the integer later.

Thanks!

@v0idpwn
Copy link
Contributor

v0idpwn commented Oct 7, 2023

I'm not sure if I follow the use-case: why would you have a producer concurrency higher than the number of brod clients required for maximum parallelism?

i.e.: assume you have 4 partitions, why'd you start more than 4 producers? And if it has some benefit, wouldn't those benefits be lost by sharing the same brod client/group coordinator?

@oliveigah
Copy link
Contributor Author

assume you have 4 partitions, why'd you start more than 4 producers?

Yes, AFAICT we do not need more producer concurrency than 4, but that is not the problem here. The problem is that each broadway producer initialize a new brod client.

With the current behaviour if you setup this pipeline you will end up with 4 brod clients and 4 TCP conections (assuming just 1 broker).

The point is that you could reuse the brod client and not start a new one per producer because the paralelism needed for the producer scales differently from the parallelism needed for brod clients.

And if it has some benefit, wouldn't those benefits be lost by sharing the same brod client/group coordinator?

The only scenario I can see starting more brod clients would help is if the bottleneck of your pipeline is the TCP connection between the application and the broker, which is almost never the case if you are batching properly and the connection uses non blocking IO.

As the kafka protocol guide says https://kafka.apache.org/protocol.html#protocol_network:

The client will likely need to maintain a connection to multiple brokers, as data is partitioned and the clients will need to talk to the server that has their data. However it should not generally be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling).

The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted.

Considering that, I think most cases would not be negativelly impacted by sharing clients but since it may be a problem for some specific scenarios we should keep the current behaviour as the default.

That's makes sense to you @v0idpwn?

@v0idpwn
Copy link
Contributor

v0idpwn commented Oct 7, 2023

Absolutely, thanks for the thoughtful explanation!

@oliveigah
Copy link
Contributor Author

oliveigah commented Oct 7, 2023

@josevalim after my first pass at the code I have some considerations.

Since we just have access to the client config when Broadway.start_link is called at runtime I can't see a way to start the clients directly on the broadway_kafka supervision tree without some changes on the configuration interface.

The only way I could start the client before Broadway.start_link is called is by setting a new configuration section just for shared clients. Like this:

client_1 = %{
  id: :my_shared_client_1,
  hosts: ["host1", "host2"]
  group_config_options: foo,
  client_config_options: bar,
  fetch_config_options: baz
}

config :broadway_kafka, :shared_clients, [ client_1, client_2,  client_3 ]

And them accepts the shared_client_id as a option for the BroadwayKafka.Producer like this:

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module: {BroadwayKafka.Producer, [
      shared_client_id: :my_shared_client_1
      topics: ["test"],
    ]},
    concurrency: 1
  ],
  processors: [
    default: [
      concurrency: 10
    ]
  ]
)

Was that what you had in mind? Seems like a bigger change than what you had proposed, am I missing something?

@josevalim
Copy link
Member

The prepare_for_start callback should allow you to specify more children that are added to the supervision tree: https://github.com/dashbitco/broadway/blob/ebee2a94ffa6f16bc14ffa6dbc20d3c2f7b5bb73/lib/broadway/producer.ex#L114

@oliveigah
Copy link
Contributor Author

I've tested the changes on the sandbox environment of a real world system we have here and the results are great so far.

The memory usage decreased 1.6 GB and port usage decreased in 500 ports. Some results from the :observer_cli

shared_client = true
image

shared_client = false
image

Given that, I think we can close this issue! Thanks for help and feedback. 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants