Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to stop a Broadway Kafka pipeline? #86

Closed
aravindanck opened this issue Apr 26, 2022 · 3 comments
Closed

How to stop a Broadway Kafka pipeline? #86

aravindanck opened this issue Apr 26, 2022 · 3 comments

Comments

@aravindanck
Copy link
Contributor

aravindanck commented Apr 26, 2022

Team, I tried Broadway.stop(__ MODULE __) in a broadway_kafka pipeline. But the pipeline gets restarted after a minute by the brod supervisor :brod_sup. The restart strategy is fixed to permanent in brod library (see ref) - could it be the cause? Is there a way I can start my broadway kafka producer with 'restart' arg set to temporary/transient?

:supervisor: {:local, :brod_sup}
    :started: [
  pid: #PID<0.1456.0>,
  id: TestBroadway.Broadway.Producer_0.Client,
  mfargs: {:brod_client, :start_link,
   [[localhost: 9092], TestBroadway.Broadway.Producer_0.Client, []]},
  restart_type: {:permanent, 10},
  shutdown: 5000,
  child_type: :worker
]

Reference: https://github.com/kafka4beam/brod/blob/master/src/brod_sup.erl#L164

@amacciola
Copy link
Contributor

amacciola commented May 5, 2022

@aravindanck how i do this is by starting all my BroadwayPipelines under a DynamicSupervisor.

Then when i want to stop them i just call

    String.to_atom("name_of_pipeline")
    |> Process.whereis()
    |> case do
      nil ->
        {:ok, :success}

      child_pid ->
        GenServer.stop(child_pid)
        {:ok, :success}
    end

@aravindanck
Copy link
Contributor Author

@amacciola Broadway's stop API call that does the same thing I believe. I'm using it to stop the pipeline, but the pipeline gets restarted after a minute automatically.
Ref: https://hexdocs.pm/broadway/Broadway.html#stop/3

@josevalim
Copy link
Member

josevalim commented May 29, 2022

@aravindanck this shouldn't happen because we stop the client on the producer shutdown:

def terminate(_reason, state) do
%{client: client, group_coordinator: group_coordinator, client_id: client_id} = state
client.stop_group_coordinator(group_coordinator)
client.disconnect(client_id)
:ok
end

So you need to debug why that line is not being invoked and the client still sticks around. :)

@josevalim josevalim closed this as not planned Won't fix, can't repro, duplicate, stale Mar 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants