-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broadway suspend pipeline #56
Comments
Try using |
@josevalim if I understand correctly, this wouldn't drain current messages, would it? I'm not sure if the rest of the flow depends on the producer process to be responding |
You are correct. I also just realized that the better option is to call |
Personally, I'm all in for this kind of feature to be available upstream on Broadway. |
Keep in mind that Broadway draining on shutdown pretty much sets GenStage.demand/2 to halt it, so it should be enough to achieve the desired behaviour. The issue with moving this to Broadway is that the behaviour can be drastically different between producers. For example in RabbitMQ we would need to ask the server to stop sending data and then ask it to resume. That's not necessary in none of the others. |
@josevalim so the best option to "halt + drain" the Broadway Kafka pipeline is to use the how would i |
I think so but please try it out. |
@josevalim just to clarify you mean using
and setting the |
Yes. |
@josevalim it worked great ! i just did something simple like this for reference
|
Fantastic! |
@josevalim i have ran into an interesting scenario that have stemmed from the changes i made in this thread. The scenario is a pipeline is started and the Kafka topic exists but does not yet have any data. Once the pipeline is started the user shortly after After it has been suspended data is pushed to the Kafka topic. What happens is that it ingests X amount of data from the Kafka topic even though the Genstage demand mode is set to My question is what is the best way to handle this scenario ? because i do not want data ingesting after a user suspends a pipeline. It will confuse them as to why this is happening. Should a Genstage demand mode start as Or should i just try and handle this in my solution and when i suspend a pipeline, detect when it is done processing and try to flush any demand from the consumers when done. |
@amacciola I may be missing something here but not that any demand requested before accumulating will still be served. See this example: # Usage: mix run examples/producer_consumer.exs
#
# Hit Ctrl+C twice to stop it.
#
# This is a base example where a producer A emits items,
# which are amplified by a producer consumer B and printed
# by consumer C.
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
Process.send_after(self(), {:produce, demand}, 2000)
{:noreply, [], counter}
end
def handle_info({:produce, demand}, counter) do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
defmodule C do
use GenStage
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
{:ok, a} = GenStage.start_link(A, 0) # starting from zero
{:ok, c} = GenStage.start_link(C, :ok) # state does not matter
GenStage.sync_subscribe(c, to: a)
GenStage.demand(a, :accumulate)
Process.sleep(:infinity) |
@josevalim maybe i am also mis understanding. But how i thought this worked was
|
I understand your scenario better. You are correct, if you suspend it, never resume it, then those messages will be there unless you also drain it. If you want to suspend and never resume, why not terminate it? In any case, we can allow setting the demand directly in this project. We only need to return it from the producer init callback. But if you know you will immediately start it as accumulated and never resume it, why start it in the first place? |
@josevalim i do want to resume it. A user may resume it whenever they choose. however when they suspended it the ingested count was 0. So its very confusing to have the pipeline be in a suspended state and them see the count increase (increase by the initial demand)
im trying to figure out if this is a problem that needs to be solved in the library or on my end somewhere like if i need to flush the demand each time i suspend the pipeline and detect no data is there to meet the demand or something. however i dont know if there is a way for me to flush the demand from the consumers this is very much an edge case for me. However one i need to cover |
Suspending a pipeline takes time and it is only concluded when the pipeline drains all of its contents. So UI wise you should show as suspending until everything you eventually requested is consumed. |
@josevalim suspending a pipeline is pretty fast in my exp. By suspending i mean i am just changing the GenStage demand mode to We show the user in the UI a loading/progress bar when this is happening. The only issue here is that there was never any data ingested from kafka in this scenario. So nothing actually in the Consumers processing. So the user is not actually waiting for something to finish |
You already asked for the data upstream though. It cannot be suspended until that is cancelled or consumed somehow. |
@josevalim well that is what im asking i guess. If it should be the case that i am able to start the Pipeline with the option of being in I do feel like this is a problem i need to solve and less one for the library to implement. However do you know how i would flush the demand of the consumers ? or is that just not something that is possible |
Starting in accumulate should be doable. You need to return demand: :accumulate in the init producer tuple and we can support as an option in this library. the one with consumer is more complicated because it needs coordination throughout the pipeline. |
@josevalim i will test with starting it in accumulate mode in the init and only changing it to forward if it meets my params Edit: |
Is there a way to suspend a Broadway kafka pipeline from ingesting data without having to fully kill the Genserver ?
The text was updated successfully, but these errors were encountered: