Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain After Revoke Error #43

Closed
amacciola opened this issue Jan 13, 2021 · 8 comments
Closed

Drain After Revoke Error #43

amacciola opened this issue Jan 13, 2021 · 8 comments

Comments

@amacciola
Copy link
Contributor

When stopping and restarting pipelines i periodically am getting these errors.

Along with my pipeline being stuck in a rebalancing loop before it recovers after a while. Any insight into why after stopping a pipeline i am seeing these errors ? Thanks

19:37:20.225 [error] GenServer #PID<0.17856.0> terminating
** (stop) exited in: GenServer.call(#PID<0.17844.0>, :drain_after_revoke, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir) lib/gen_server.ex:989: GenServer.call/3
    (broadway_kafka) lib/producer.ex:415: BroadwayKafka.Producer.assignments_revoked/1
    (brod) /app/deps/brod/src/brod_group_coordinator.erl:477: :brod_group_coordinator.stabilize/3
    (brod) /app/deps/brod/src/brod_group_coordinator.erl:391: :brod_group_coordinator.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
@josevalim
Copy link
Member

Hi @amacciola, can you please describe what you mean by "stopping a pipeline"? Please try to provide clear steps: are you stopping Kafka? With each command? Or are you stopping the Elixir code? What are the other instructions around the cluster? Thank you.

@amacciola
Copy link
Contributor Author

@josevalim sorry for not including more details.

I am starting the Pipelines under a DynamicSupervisor and then i am stopping the pipeline by sending a terminate signal.
DynamicSupervisor.terminate_child(__MODULE__, child_pid) the child_pid being the pid of the Pipeline itself.

And this happens when i am running it locally and also on our k8s cluster where this specific applications have 3 pods running with a Pipeline running on each pod connected to the same ConsumerGroup

@josevalim
Copy link
Member

Can you please try this patch?

diff --git a/lib/producer.ex b/lib/producer.ex
index 98f3ee4..bcfc5dc 100644
--- a/lib/producer.ex
+++ b/lib/producer.ex
@@ -412,7 +412,12 @@ defmodule BroadwayKafka.Producer do
 
   @impl :brod_group_member
   def assignments_revoked(producer_pid) do
-    GenStage.call(producer_pid, :drain_after_revoke, :infinity)
+    # If the producer_pid is no longer alive, it means the revoke
+    # is happening due to a shutdown, so ignore it.
+    if Process.alive?(producer_pid) do
+      GenStage.call(producer_pid, :drain_after_revoke, :infinity)
+    end
+
     :ok
   end
 

If it works, please send a PR!

@amacciola
Copy link
Contributor Author

@josevalim will do. Ill test it out shortly. Thanks

@amacciola
Copy link
Contributor Author

@josevalim tested it out and the errors do not appear anymore.

@amacciola
Copy link
Contributor Author

#44
PR for fix

@amacciola amacciola reopened this Jan 13, 2021
@amacciola
Copy link
Contributor Author

amacciola commented Jan 13, 2021

Reopening because last PR did not completely fix issue and can cause still major bug. Will submit new PR

@amacciola
Copy link
Contributor Author

#45
new PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants