Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The intention of random subscription in Subscriber does not work as expected #269

Closed
kenspirit opened this issue Oct 4, 2021 · 16 comments
Closed

Comments

@kenspirit
Copy link
Contributor

kenspirit commented Oct 4, 2021

There is one statement in Subscriber's init/1 function:

    # We always subscribe in random order so the load is balanced across consumers.
    names |> Enum.shuffle() |> Enum.each(&subscribe(&1, state))

However, the outcome does not seem to be working after testing.

After reading the Tuning Broadway RabbitMQ Pipelines for Latency by Dockyard and testing with project broadway_rabbitmq_experiment they prepare, it's found that the demands list in every Producer are exactly the same at the beginning using consumers' startup sequence.

As described in my blog, if I add debug message in GenStage and Broadway, and run that experiment project using below pipeline configuration, we can see the phenomenon.

My suspicion is that the messages in the mailbox of each Producer process are filled by the sequence of Processors' startup sequence. That mean even though every Processor tries to subscribe to the Producer randomly by shuffling the producer names, but each Producer still receives the subscription signal from Processor No.1 firstly, and then No.2, and then No.3, and finally the No.10.

This phenomenon happens obviously when the batch size of the messages is equal to or smaller than the processor count. But even if the batch size is much larger, say 50, beside the first 10 will be sent to the No.1 processor, others will be biased to the processors started earlier. I think the load could only be balanced after the system running for a while.

# Pipeline
def start_link(_opts) do
  Broadway.start_link(
    __MODULE__,
    name: __MODULE__,
    producer: [
      module:
        {BroadwayRabbitMQ.Producer,
         queue: Foo.env!(:queue_name),
         connection: Foo.conn_options(),
         qos: [
           # this should never be less than @processor_concurrency
           # or else the processors won't all get messages
           prefetch_count: 1
         ],
         on_failure: :reject},
      # concurrency: 1, # correct behavior
      concurrency: 10, # try this for poor performance
    ],
    processors: [
      default: [
        concurrency: 10,
        max_demand: 1
      ]
    ]
  )
end


# Broadway.Topology.Subscriber.init/1

shuffled_names = names |> Enum.shuffle()
IO.inspect("Processor #{inspect(self())} subscribe to Producer #{inspect(Process.whereis(Enum.at(shuffled_names, 0)))} first")
Enum.each(shuffled_names, &subscribe(&1, state))


# GenStage.DemandDispatcher.subscribe/2

IO.inspect("Subscribed #{inspect(self())} from #{inspect(pid)}")
{:ok, 0, {demands ++ [{0, pid, ref}], pending, max}}


# GenStage.DemandDispatcher.dispatch_demand/3

pids = Enum.map(demands, fn {_, other_pid, _} ->
  other_pid
end)
IO.inspect("Producer #{inspect(self())} sends message to consumer #{inspect(pid)} out of #{inspect(pids)}")

Output logs:

iex(dev@localhost)1> Foo.send_messages(10)
"Producer #PID<0.342.0> got one message."
"Producer #PID<0.340.0> got one message."
"Producer #PID<0.337.0> got one message."
"Producer #PID<0.333.0> got one message."
"Producer #PID<0.334.0> got one message."
"Producer #PID<0.335.0> got one message."
"Producer #PID<0.336.0> got one message."
"Producer #PID<0.338.0> got one message."
"Producer #PID<0.339.0> got one message."
"Producer #PID<0.341.0> got one message."
"Producer #PID<0.342.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.337.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.340.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.333.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.334.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.335.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.336.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.339.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.338.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"Producer #PID<0.341.0> sends message to consumer #PID<0.344.0> out of [#PID<0.345.0>, #PID<0.346.0>, #PID<0.347.0>, #PID<0.348.0>, #PID<0.349.0>, #PID<0.350.0>, #PID<0.351.0>, #PID<0.352.0>, #PID<0.353.0>]"
"processor #PID<0.344.0> got '2'; has 3 message(s) in its mailbox"
"processor #PID<0.344.0> got '3'; has 8 message(s) in its mailbox"
"processor #PID<0.344.0> got '1'; has 7 message(s) in its mailbox"
"processor #PID<0.344.0> got '6'; has 6 message(s) in its mailbox"
"processor #PID<0.344.0> got '4'; has 5 message(s) in its mailbox"
"processor #PID<0.344.0> got '5'; has 4 message(s) in its mailbox"
"processor #PID<0.344.0> got '7'; has 3 message(s) in its mailbox"
"processor #PID<0.344.0> got '10'; has 2 message(s) in its mailbox"
"processor #PID<0.344.0> got '8'; has 1 message(s) in its mailbox"
"processor #PID<0.344.0> got '9'; has 0 message(s) in its mailbox"

Below is the output (sorted) if I send a batch of 50 messages.

"processor #PID<0.344.0> got '1'; has 0 message(s) in its mailbox"
"processor #PID<0.344.0> got '10'; has 3 message(s) in its mailbox"
"processor #PID<0.344.0> got '2'; has 7 message(s) in its mailbox"
"processor #PID<0.344.0> got '3'; has 8 message(s) in its mailbox"
"processor #PID<0.344.0> got '4'; has 6 message(s) in its mailbox"
"processor #PID<0.344.0> got '5'; has 0 message(s) in its mailbox"
"processor #PID<0.344.0> got '6'; has 2 message(s) in its mailbox"
"processor #PID<0.344.0> got '7'; has 5 message(s) in its mailbox"
"processor #PID<0.344.0> got '8'; has 1 message(s) in its mailbox"
"processor #PID<0.344.0> got '9'; has 4 message(s) in its mailbox"
"processor #PID<0.345.0> got '11'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '12'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '14'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '17'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '21'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '26'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '32'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '39'; has 0 message(s) in its mailbox"
"processor #PID<0.345.0> got '47'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '13'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '16'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '18'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '22'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '27'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '33'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '40'; has 0 message(s) in its mailbox"
"processor #PID<0.346.0> got '48'; has 0 message(s) in its mailbox"
"processor #PID<0.347.0> got '15'; has 0 message(s) in its mailbox"
"processor #PID<0.347.0> got '20'; has 0 message(s) in its mailbox"
"processor #PID<0.347.0> got '25'; has 0 message(s) in its mailbox"
"processor #PID<0.347.0> got '28'; has 0 message(s) in its mailbox"
"processor #PID<0.347.0> got '34'; has 0 message(s) in its mailbox"
"processor #PID<0.347.0> got '41'; has 0 message(s) in its mailbox"
"processor #PID<0.347.0> got '49'; has 0 message(s) in its mailbox"
"processor #PID<0.348.0> got '19'; has 0 message(s) in its mailbox"
"processor #PID<0.348.0> got '24'; has 0 message(s) in its mailbox"
"processor #PID<0.348.0> got '31'; has 0 message(s) in its mailbox"
"processor #PID<0.348.0> got '35'; has 0 message(s) in its mailbox"
"processor #PID<0.348.0> got '42'; has 0 message(s) in its mailbox"
"processor #PID<0.348.0> got '50'; has 0 message(s) in its mailbox"
"processor #PID<0.349.0> got '23'; has 0 message(s) in its mailbox"
"processor #PID<0.349.0> got '30'; has 0 message(s) in its mailbox"
"processor #PID<0.349.0> got '38'; has 0 message(s) in its mailbox"
"processor #PID<0.349.0> got '43'; has 0 message(s) in its mailbox"
"processor #PID<0.350.0> got '29'; has 0 message(s) in its mailbox"
"processor #PID<0.350.0> got '36'; has 0 message(s) in its mailbox"
"processor #PID<0.350.0> got '46'; has 0 message(s) in its mailbox"
"processor #PID<0.351.0> got '37'; has 0 message(s) in its mailbox"
"processor #PID<0.351.0> got '44'; has 0 message(s) in its mailbox"
"processor #PID<0.352.0> got '45'; has 0 message(s) in its mailbox"
@josevalim
Copy link
Member

Yes, you are correct. We should probably just remove this line from Broadway. The change needs to be added to GenStage instead. A PR for Broadway is welcome.

@kenspirit
Copy link
Contributor Author

@josevalim So excited that I can find out something while learning to read the source. Do you mean in Broadway, we can just subscribe without shuffling?

I wonder how to change in GenStage. As the documentation said about DemandDispatcher, it is in a FIFO ordering. If we shuffle the demands list in subscribe/3, it seems not aligned.

@josevalim
Copy link
Member

@josevalim So excited that I can find out something while learning to read the source. Do you mean in Broadway, we can just subscribe without shuffling?

Yes!

I wonder how to change in GenStage. As the documentation said about DemandDispatcher, it is in a FIFO ordering. If we shuffle the demands list in subscribe/3, it seems not aligned.

In the DemandDispatcher, there is a max value that starts as nil. I am thinking it should start as :none or :shuffle and we will shuffle only on the first demand. We will have a flag like :shuffle_demand_on_first_ask which we can set to try in Broadway. WDYT?

kenspirit added a commit to kenspirit/broadway that referenced this issue Oct 5, 2021
@kenspirit
Copy link
Contributor Author

em... After second thought, I wonder whether we really need to maintain the "FIFO" as the main characteristics of DemandDispatcher is to dispatch based on which consumer has the biggest demand. After event consuming and demanding, I don't think any consumer still has any particular sequence there.

And I also think the library users, using subscription pattern, should not rely on any sequencing for their implementation. Hence, from my point of view, it seems to be more suitable and easier to shuffle in subscribe/3 function and always load balancing the demands while still maintaining the major characteristics of dispatching with biggest demand.

May you advise any situation that I do not know that may require "FIFO"?

@josevalim
Copy link
Member

The biggest concern is having the same producer always come back on top of the queue, although that would only happen if the system is idle, so perhaps not a big concern. The other concern is performance: if we need to get all equal consumers and shuffle them every time, what is the impact?

Because of that, I think introducing only an initial shuffle is much more contained and hopefully will give enough entropy into the system.

@kenspirit
Copy link
Contributor Author

kenspirit commented Oct 5, 2021

I am sorry that I don't quite follow the first concern. Shuffling during subscribe/1 should only impact and possibly move the same consumer on top of the queue? And the performance should only be impacted if there is any subscription which I thought it would just be during the startup phase?

  def subscribe(_opts, {pid, ref}, {demands, pending, max}) do
    {:ok, 0, {demands ++ [{0, pid, ref}], pending, max}}
  end

@josevalim
Copy link
Member

The demands is meant to ordered by demands. We also usually subscribe and then immediately ask, which means that if we want to keep the property of being ordered by demand, the new subscription will be last anyway. That's why I believe the best time to sort is the first time we need to distribute the demand.

@kenspirit
Copy link
Contributor Author

The add_demand/4 function used by ask/3 searches for the consumer with biggest demand and put it at the top which actually complete sorting. So the shuffling in subscribe should not break the sorting?

I thought about the first ask but would it guarantee that all consumers have subscribed to it at that moment? Would the first ask from the first consumer happens before others' subscription, just like this issue?

@josevalim
Copy link
Member

IIRC add_demand adds into a sorted list. It does not sort the list. All the operations are assuming there is a sorted list.

@kenspirit
Copy link
Contributor Author

kenspirit commented Oct 5, 2021

O, yes. It's assuming there is a sorted list. But would my concern on the timing of ask and subscription possible? I just tested adding this debug log in that experiment project:

# GenStage.DemandDispatcher
  @doc false
  def ask(counter, {pid, ref}, {demands, pending, max}) do
    if is_nil(max) do
      IO.inspect("In #{inspect(self())} when max is nil, demands length is #{Enum.count(demands)}")
    end

We can see that when the first call on the ask happens, most of the subscription signal from other consumers are not there yet.

iex(dev@localhost)1> "Subscribed #PID<0.336.0> from #PID<0.344.0>"
"Subscribed #PID<0.340.0> from #PID<0.344.0>"
"Subscribed #PID<0.335.0> from #PID<0.344.0>"
"In #PID<0.336.0> when max is nil, demands length is 1"
"In #PID<0.340.0> when max is nil, demands length is 1"
"Subscribed #PID<0.337.0> from #PID<0.344.0>"
"Subscribed #PID<0.341.0> from #PID<0.344.0>"
"In #PID<0.335.0> when max is nil, demands length is 1"
"Subscribed #PID<0.336.0> from #PID<0.345.0>"
"Subscribed #PID<0.340.0> from #PID<0.345.0>"
"Subscribed #PID<0.333.0> from #PID<0.344.0>"
"In #PID<0.337.0> when max is nil, demands length is 1"
"Subscribed #PID<0.342.0> from #PID<0.344.0>"
"Subscribed #PID<0.335.0> from #PID<0.345.0>"
"In #PID<0.341.0> when max is nil, demands length is 1"
"Subscribed #PID<0.340.0> from #PID<0.346.0>"
"Subscribed #PID<0.336.0> from #PID<0.346.0>"
"Subscribed #PID<0.337.0> from #PID<0.345.0>"
"In #PID<0.333.0> when max is nil, demands length is 1"
"Subscribed #PID<0.338.0> from #PID<0.344.0>"
"Subscribed #PID<0.341.0> from #PID<0.345.0>"
"In #PID<0.342.0> when max is nil, demands length is 1"
"Subscribed #PID<0.335.0> from #PID<0.346.0>"
"Subscribed #PID<0.336.0> from #PID<0.347.0>"
"Subscribed #PID<0.340.0> from #PID<0.347.0>"
"Subscribed #PID<0.333.0> from #PID<0.345.0>"
"Subscribed #PID<0.337.0> from #PID<0.346.0>"
"Subscribed #PID<0.339.0> from #PID<0.344.0>"

@josevalim
Copy link
Member

You are correct, it needs to be sorted on the first call to dispatch. But I think we would need a new tuple element to track that.

@kenspirit
Copy link
Contributor Author

kenspirit commented Oct 7, 2021

I am trying to write a PR in GenStage. However, due to the entropy introduced by shuffling, I am still struggling with how the UT should be written (Possibly PropertyBaseTest required?) and I am also wondering if the change causes unexpected impacts to other modules (as some strange warning/error might show if I keep running the UT several times).

@josevalim
Copy link
Member

I would add a dispatcher unit test. What I would do is: start a dispatcher without the shuffle tag, with 3 subscriptions, and dispatch to one of them. Then I would do something like: Enum.any?(1..10, fn _ -> deliver_code end), where deliver_code is the same code with 3 subscription + dispatch and then assert that the output of at least one of them is different than the one without the shuffle flag.

Another option you can use is to rely on the fact that Enum.random, which is most likely what we are using for shuffle, has a configurable seed (search for seed in the Enum docs). So you can set a consistent seed and then call the dispatcher one without the shuffle flag and the other with the shuffle flag plus seed, and see the result is different.

@kenspirit
Copy link
Contributor Author

kenspirit commented Oct 7, 2021

Sorry that I don't quite get the first suggestion. For the second one, I thought about using seed but wonder whether applying the seed to generate reproducible (fixed) outcome against the property of shuffling. Is the UT in PR good enough?

@josevalim
Copy link
Member

New GenStage is out, thank you so much!

@kenspirit
Copy link
Contributor Author

It's my pleasure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants