Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer rebalance failed in kakfa binding component #1799

Closed
skyao opened this issue Jun 20, 2022 · 7 comments
Closed

consumer rebalance failed in kakfa binding component #1799

skyao opened this issue Jun 20, 2022 · 7 comments
Assignees
Labels
kind/bug Something isn't working P1 pinned Issue does not get stale
Milestone

Comments

@skyao
Copy link
Member

skyao commented Jun 20, 2022

Expected Behavior

In our certification test, we have a consumer rebalance case for kafak (both in kafak pubsub component and kafka binding component). The test logic is:

  • two apps (named app-2 and app-3) with kafka component (either kafak pubsub or kafak binding) are subscribing to the same kafka topic with the same consumer group
  • So the events from this topic should be sent to one of the apps (app-2 and app-3)
  • if we stopped one of one of the apps (in cert test case, app-2) during test, the events should be sent to app-3 and none of them will be lost
  • if we count all the events received by app-2 and app-3, it should match the number of the event which ware sent to the topic.

Actual Behavior

After app-2 stopped, there's no event sent to app-3.

So the certification test failed for this: there are some events not received.

    flow.go:228: Completed step: wait
    flow.go:220: Running step: assert messages(consumer rebalance)
    watcher.go:390: Timed out with 301 items remaining
    flow.go:228: Completed step: assert messages(consumer rebalance)

Steps to Reproduce the Problem

Our certification test for kakfa binding component failed for this issue, see:

https://github.com/dapr/components-contrib/runs/6958628713?check_suite_focus=true
https://github.com/dapr/components-contrib/runs/6965196737?check_suite_focus=true

Release Note

@skyao skyao added the kind/bug Something isn't working label Jun 20, 2022
@skyao
Copy link
Member Author

skyao commented Jun 20, 2022

First highlight for this bug:

Sometimes the certification test will be passed, this is just for good luck and depends on what app is sending to before we stop app-2:

  • if the events are sent to app-3, then there is no impact to app-3 while stopping app-2, so the test case will pass
  • if the events are sent to app-2, because of this issue, the app-3 won't receive the events (by kafka comsumer rebalance) , so the test case will fail.

I will add some log to show this.

And there is a question about this behavior: if there are two apps subscribed the same topic of kafka with same comsumer group, the events should only be sent to one of these two apps? From the test I found that only when this app is down, the events will sent to another app.

@skyao
Copy link
Member Author

skyao commented Jun 20, 2022

The second highlight for this bug: this bug is not exist in kafka pubsub component!

I have tested and verified that with kafka pubsub component, if the events is be sent to app-2 and app-2 is stopped, kafka comsumer rebalance will work and app-3 will start to receive the event, and this will happens in seconds after app-2 (in fact, dapr sidecar of app-2) is stopped.

This is why the certification test of kafka pubsub component is OK.

@skyao
Copy link
Member Author

skyao commented Jun 20, 2022

Update: I saw a value of 300 later, so this is not fixed to 301, but about 300....

And it is very tricky that the number of items remaining is always be 301 (which I can't understand):

    flow.go:228: Completed step: wait
    flow.go:220: Running step: assert messages(consumer rebalance)
    watcher.go:390: Timed out with 301 items remaining
    flow.go:228: Completed step: assert messages(consumer rebalance)

@ItalyPaleAle
Copy link
Contributor

I am not a Kafka expert, but normally in Dapr pubsub messages are delivered to each instance of the application, or when that’s not possible, we set a specific consumer group.

Don’t quote on me on that, but I don’t believe there’s other instances when we rebalance from “app-2” to “app-3”. I don’t know if we even want that?

@skyao
Copy link
Member Author

skyao commented Jun 20, 2022

I am not a Kafka expert, but normally in Dapr pubsub messages are delivered to each instance of the application, or when that’s not possible, we set a specific consumer group.

Don’t quote on me on that, but I don’t believe there’s other instances when we rebalance from “app-2” to “app-3”. I don’t know if we even want that?

I couldn't find the root cause for a long time, until I finally added the log and saw the actual log content.

This is some of the log after app-2 is stopped, no events are sent to app-3:

   /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 464
time="2022-06-20T15:45:56.68836827Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 465
time="2022-06-20T15:45:56.788676919Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 466
time="2022-06-20T15:45:56.888037889Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 467
time="2022-06-20T15:45:56.98805577Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 468
time="2022-06-20T15:45:57.087909192Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 469
time="2022-06-20T15:45:57.188531825Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 470
time="2022-06-20T15:45:57.288259493Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 471
time="2022-06-20T15:45:57.388276102Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 472
time="2022-06-20T15:45:57.488084206Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 473
time="2022-06-20T15:45:57.588489451Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/simulate.go:34: Simulating error 27
time="2022-06-20T15:45:57.58953958Z" level=error msg="Error processing Kafka message: neworder/1/1473 [key=dGVzdA==]. Error: fails to send binding event to http app channel, status code: 500 body: simulated error\n. Retrying..." instance=skyserver scope=dapr.components type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 474
time="2022-06-20T15:45:57.640810192Z" level=info msg="Successfully processed Kafka message after it previously failed: neworder/1/1473 [key=dGVzdA==]" instance=skyserver scope=dapr.components type=log ver=unknown
time="2022-06-20T15:45:57.688217251Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 475
time="2022-06-20T15:45:57.787850135Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 476
time="2022-06-20T15:45:57.887802611Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 477
time="2022-06-20T15:45:57.988724409Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 478
time="2022-06-20T15:45:58.088478848Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 479
time="2022-06-20T15:45:58.187809885Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 480
time="2022-06-20T15:45:58.288633017Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 481
time="2022-06-20T15:45:58.388047425Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 482
time="2022-06-20T15:45:58.487964783Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 483
time="2022-06-20T15:45:58.588153951Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 484
time="2022-06-20T15:45:58.687729645Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 485
time="2022-06-20T15:45:58.78873586Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 486
time="2022-06-20T15:45:58.888542868Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 487
time="2022-06-20T15:45:58.988532627Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 488
time="2022-06-20T15:45:59.087963296Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 489
time="2022-06-20T15:45:59.187947176Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 490
time="2022-06-20T15:45:59.288255927Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 491
time="2022-06-20T15:45:59.388282484Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 492
time="2022-06-20T15:45:59.487517448Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 493
time="2022-06-20T15:45:59.588655314Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 494
time="2022-06-20T15:45:59.6887103Z" level=info msg="gRPC API Called: /dapr.proto.runtime.v1.Dapr/InvokeBinding" instance=skyserver scope=dapr.runtime.grpc.api-info type=log ver=unknown
    /home/sky/work/code/dapr-fork/components-contrib/tests/certification/bindings/kafka/kafka_retry_test.go:88: ======== app-1 received event: Background message - 495

@artursouza artursouza added P1 pinned Issue does not get stale labels Jun 21, 2022
@artursouza artursouza added this to the v1.9 milestone Jun 21, 2022
@skyao
Copy link
Member Author

skyao commented Jun 21, 2022

The cause of this bug is found and should be fixed by PR #1804

@skyao
Copy link
Member Author

skyao commented Jun 22, 2022

This bug is fixed by above PR.

Close it.

@skyao skyao closed this as completed Jun 22, 2022
@artursouza artursouza modified the milestones: v1.9, v1.8 Jun 22, 2022
@artursouza artursouza added do-not-merge PR is not ready for merging and removed do-not-merge PR is not ready for merging labels Jun 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working P1 pinned Issue does not get stale
Projects
None yet
Development

No branches or pull requests

3 participants