Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass topology options to the producer #100

Closed
josevalim opened this issue Aug 24, 2019 · 12 comments
Closed

Pass topology options to the producer #100

josevalim opened this issue Aug 24, 2019 · 12 comments

Comments

@josevalim
Copy link
Member

Today all Broadway producers projects are in need of receiving more options about the topology. SQS and GCPubSub would benefit from knowing the topology to configure a connection pool. Kafka needs it for reasons I can't remember. And RabbitMQ would need it for the index: dashbitco/broadway_rabbitmq#36

One important thing to mention is also that Broadway does not respect the child_spec contract. We will always call the init function (and never child_spec and start_link) since the child specification is configured externally by the topology. Also remember that Broadway already has an optional callback, called prepare_for_draining.

In any case, we need to introduce a new callback. My suggestion is to call it init_for_broadway. It will receive init_for_broadway(options, {name, index}, topology). This callback is optional, if it is not defined, we will call the regular init callback. The name is the name given in the topology. And the index is index >= 0 with the position of the process in the supervision tree.

/cc @msaraiva @mcrumm @whatyouhide

@whatyouhide
Copy link
Collaborator

@josevalim is there a reason for going with a different name instead of init/3?

@josevalim
Copy link
Member Author

josevalim commented Aug 24, 2019 via email

@whatyouhide
Copy link
Collaborator

@josevalim okay, fair enough. I would suggest the name init_with_broadway_info/3 then, but good to go on my side :)

@mcrumm
Copy link
Collaborator

mcrumm commented Aug 26, 2019

init_for_broadway/3 would still return {:producer, state}, correct?

Is there any value in an abstraction requires less knowledge of GenStage?

defmodule MyProducer do
  use Broadway.Producer

  def init(options, {name, index}, topology) do
    {:ok, state}
  end
end

@josevalim
Copy link
Member Author

init_for_broadway/3 would still return {:producer, state}, correct?

Yes.

Is there any value in an abstraction requires less knowledge of GenStage?

It may have and we are slightly walking towards it, it seems, but we are not there yet. The important thing to note is, if we introduce an abstraction that abstracts away GenStage, then we may no longer support existing GenStage producers (which is a feature of the current design.

@mcrumm
Copy link
Collaborator

mcrumm commented Aug 26, 2019

It may have and we are slightly walking towards it, it seems, but we are not there yet.

In that case, init_for_broadway/3 works for me 👍

@msaraiva
Copy link
Collaborator

Kafka needs it for reasons I can't remember

Yup, we need this for kafka to make sure the producer always sends messages from the same partition to the same processor so we can guarantee the order of messages and have safe acknowledgements. However, we need to guarantee that throughout the pipeline, so as far as I can remember, we'll also need something similar for batcher consumers.

@josevalim
Copy link
Member Author

Ah, in the Kafka case we need a way to modify all options before we even start the producers. 🤔

@josevalim
Copy link
Member Author

So I talked to @msaraiva and we will call this prepare_options and it will only receive producer options and return updated producer options. The partition-ing or not will be a producer configuration.

@josevalim
Copy link
Member Author

For GCPubSub and Amazon SQS, I have thought more about it, and I think it doesn't make sense to couple the pool size to the topology, because we never know if the acking is happening on processors or batchers, especially in face of failures and the upcoming ack_immediately. So I think the best option for the connection pool size is to make it the double of number of producers by default, but make it overall configurable.

@D4no0
Copy link

D4no0 commented Apr 14, 2021

Is multiple producers support added? In documentation it is still stated:

:producer - Required. A keyword list of options. See "Producers options" section below. Only a single producer is allowed.

@josevalim
Copy link
Member Author

Not allowed and likely won't ever be.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants