Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer removed from group but keeps consuming and committing successfully #2631

Closed
7 tasks done
keith-chew opened this issue Nov 20, 2019 · 13 comments
Closed
7 tasks done
Milestone

Comments

@keith-chew
Copy link

Description

We have a case where the consumer was removed from the consumer group, but it kept on consuming and committing offsets successfully.

How to reproduce

  • 3-node cluster
  • 2 consumers
  • try to perform a group coordinator change (eg kafka maintenance upgrade, or zookeeper losing connectivity to kafka node) by stopping and starting the kafka nodes
  • consumer will usually get reassigned to the new group coordinator, but not always
  • repeat the process above until 1 of the consumers is not assigned

Checklist

  • librdkafka version (release number or git tag): 1.1.0
  • Apache Kafka version: 2.1.0
  • librdkafka client configuration: standard configuration
  • Operating system: rhel7
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue

From client logs (it has been removed from server but still stayed with the server, no errors):

{"severity":7,"fac":"HEARTBEAT","message":"[thrd:main]: GroupCoordinator/2: Heartbeat for group \"yyyyy\" generation id 9199"}
{"severity":7,"fac":"HEARTBEAT","message":"[thrd:main]: Group \"yyyyy\" heartbeat error response in state up (join state started, 6 partition(s) assigned): Broker: Not coordinator for group"}

From server logs:

Member xxxxx in group yyyyy has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

From kafka groups:

TOPIC                           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
yyyyy 4          9655839         9667010         11171           -               -               -
yyyyy 10         8920790         8927726         6936            -               -               -
yyyyy 2          8995619         8997003         1384            -               -               -
yyyyy 11         9158346         9188423         30077           -               -               -
yyyyy 3          10581438        10585069        3631            -               -               -
yyyyy 6          9672232         9673972         1740            -               -               -
yyyyy 7          8677966         8679073         1107            -               -               -
yyyyy 0          8955020         8956590         1570            -               -               -
yyyyy 5          9151077         9167532         16455           -               -               -
yyyyy 8          9138402         9159293         20891           -               -               -
yyyyy 9          9650795         9652572         1777            -               -               -
yyyyy 1          8784920         8785103         183             -               -               -

Finally confirmed client is still consuming without a consumer group:

Nov 21 07:40:09 at-kafka-dev kafka_node_3: [2019-11-20 18:40:09,562] WARN Attempting to send response via channel for which there is no open connection, connection id 192.168.2.2:9092-x.x.x.x:59427-92 (kafka.network.Processor)

Normally when this case happens, the commit would fail, and the client reconnects to kafka. But in this scenario, there are no errors and the client continues to consume. The critical issue is that when the other consumer starts up and joins a proper group, this rogue consumer will be processing duplicate messages! If we have 2 consumers, that is 50% more duplication messages, with 50% extra resources (CPU usage) used, which can be massive in a high throughput environment.

@keith-chew keith-chew changed the title Consumer removed from group by keeps consuming and committing Consumer removed from group but keeps consuming and committing successfully Nov 20, 2019
@keith-chew
Copy link
Author

I can further confirm that 2 hours after the consumer went rogue, the commits started to fail. This failure allowed our code to reconnect to kafka and joined the consumer group. Although this is a recovery, 2 hours is not really practical. If someone can suggest a better/faster recovery method, that would be greatly appreciated.

@edenhill
Copy link
Contributor

When the heartbeat fails with ERR_NOT_COORDINATOR_FOR_GROUP the consumer will re-query for the coordinator but remain in an active consumer state.
If the coordinator query fails with ERR_GROUP_COORDINATOR_NOT_AVAILABLE the consumer will evict itself from the group eventually, but for any other error the coord query will simply be retried and the consumer state does not change, and I think this is what we're seeing here (but would need debug=cgrp to verify).

The eventual recovery after 2 hours would seem to indicate that the cluster state is somewhat mangled, or that there is an error code that the consumer needs to handle specifically.
Reproducing this with debug=cgrp would really help.

@keith-chew
Copy link
Author

Thank you very much for your quick response, yes it was a pity I suppressed the other cgrp logs except for HEARTBEAT. Will reproduce with the full crpg logs, and update here.

@keith-chew
Copy link
Author

Great news, I enabled cgrp logs, and managed to get 1 consumer working and 1 consumer failing. I think I can see where the problem is.

After the group coordinator swapped from 2 to 3, the logs show:

{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\": querying for coordinator: intervaled in state up"}
{"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op COORD_QUERY (v0) in state up (join state started, v4 vs 0)"}
{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\": querying for coordinator: Broker: Not coordinator for group"}
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\" coordinator is kafka02:9092 id 2"}
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\" coordinator is kafka02:9092 id 2"}

kafka01 is telling us GC is at 2 (but is actually at 3). This goes on for 20 seconds, and during this time, HEARTBEAT is getting "Not coordinator for group" since 2 is not the GC. For the next 2.5 minutes, it is happily consuming and committing:

{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [0]: stored offset 14726247, committed offset 14726245: setting stored offset 14726247 for commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [1]: stored offset -1001, committed offset 64474355: not including in commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [2]: stored offset 15717352, committed offset 15717351: setting stored offset 15717352 for commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [3]: stored offset 14864960, committed offset 14864956: setting stored offset 14864960 for commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [4]: stored offset 14370404, committed offset 14370400: setting stored offset 14370404 for commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [5]: stored offset 14903751, committed offset 14903749: setting stored offset 14903751 for commit"}
{"severity":7,"fac":"COMMIT","message":"[thrd:main]: GroupCoordinator/2: OffsetCommit for 6 partition(s): manual: returned: Success"}

Then, it gets another GC swap from kafka01, to go from 3 to 2:

{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\": querying for coordinator: intervaled in state up"}
{"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op COORD_QUERY (v0) in state up (join state started, v4 vs 0)"}
{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\": querying for coordinator: Broker: Not coordinator for group"}
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\" coordinator is kafka03:9092 id 3"}
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: Group \"my-group\" changing coordinator 2 -> 3"}
{"severity":7,"fac":"COORDCLEAR","message":"[thrd:main]: Group \"my-group\" broker kafka02:9092/2 is no longer coordinator"}
{"severity":7,"fac":"COORDSET","message":"[thrd:main]: Group \"my-group\" coordinator set to broker kafka03:9092/3"}
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state up -> wait-broker-transport (v4, join-state started)"}
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state wait-broker-transport -> up (v4, join-state started)"}
{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\": querying for coordinator: intervaled in state up"}
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\" coordinator is kafka03:9092 id 3"}
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state up -> query-coord (v4, join-state started)"}
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\" coordinator is kafka03:9092 id 3"}
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state query-coord -> wait-broker-transport (v4, join-state started)"}
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state wait-broker-transport -> up (v4, join-state started)"}
{"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op OFFSET_COMMIT (v0) in state up (join state started, v4 vs 0)"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic az-realtime-gtfs-tu [0]: stored offset 14726247, committed offset 14726247: not including in commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic az-realtime-gtfs-tu [1]: stored offset -1001, committed offset 64474355: not including in commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic az-realtime-gtfs-tu [2]: stored offset 15717356, committed offset 15717356: not including in commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic az-realtime-gtfs-tu [3]: stored offset 14864961, committed offset 14864961: not including in commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic az-realtime-gtfs-tu [4]: stored offset 14370407, committed offset 14370406: setting stored offset 14370407 for commit"}
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic az-realtime-gtfs-tu [5]: stored offset 14903754, committed offset 14903754: not including in commit"}
{"severity":7,"fac":"COMMIT","message":"[thrd:main]: GroupCoordinator/3: OffsetCommit for 6 partition(s): manual: returned: Success"}

and happily consuming and committing, while heartbeat is saying it is in the wrong GC. Then it gets interesting. It queries all 3 kafkas for GC, but getting different answers. kafka01 and kafka03 says it is 2, but kafka02 says it is 3:

{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka02:9092/2: Group \"my-group\": querying for coordinator: intervaled in state up"}] 
{"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op COORD_QUERY (v0) in state up (join state started, v4 vs 0)"}] 
{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka03:9092/3: Group \"my-group\": querying for coordinator: Broker: Not coordinator for group"}] 
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka02:9092/2: Group \"my-group\" coordinator is kafka03:9092 id 3"}] 
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka03:9092/3: Group \"my-group\" coordinator is kafka02:9092 id 2"}] 
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: Group \"my-group\" changing coordinator 3 -> 2"}] 
{"severity":7,"fac":"COORDCLEAR","message":"[thrd:main]: Group \"my-group\" broker kafka03:9092/3 is no longer coordinator"}] 
{"severity":7,"fac":"COORDSET","message":"[thrd:main]: Group \"my-group\" coordinator set to broker kafka02:9092/2"}] 
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state up -> wait-broker-transport (v4, join-state started)"}] 
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state wait-broker-transport -> up (v4, join-state started)"}] 
{"severity":7,"fac":"CGRPQUERY","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\": querying for coordinator: intervaled in state up"}] 
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state up -> query-coord (v4, join-state started)"}] 
{"severity":7,"fac":"CGRPCOORD","message":"[thrd:main]: kafka01:9092/1: Group \"my-group\" coordinator is kafka02:9092 id 2"}] 
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state query-coord -> wait-broker-transport (v4, join-state started)"}] 
{"severity":7,"fac":"CGRPSTATE","message":"[thrd:main]: Group \"my-group\" changed state wait-broker-transport -> up (v4, join-state started)"}

This goes on forever, and in the meantime, heartbeat is still reporting the incorrect group coordinator.

Now for the consumer which worked (did not get stuck), it also queried all 3 kafkas, and got the same conflicting replies as above, but it happened to join the correct GC and never had any issues after that. In the above, it joined the wrong GC in the middle and subsequently got stuck.

Seems like a difficult problem to solve. Our workaround is to trap the HEARTBEAT incorrect group coordinator and after 45s, disconnect and reconnect to the server. It works, but it would be really great if the library can handle this transparently.

@keith-chew
Copy link
Author

keith-chew commented Dec 1, 2019

I have re-tested this scenario with a kafka java client, and I can confirm it handles the case properly. Below are the logs:

Dec 01 11:57:08 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending FindCoordinator request to broker kafka02:9092 (id: 2 rack: null) 
Dec 01 11:57:10 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1575154627978, latencyMs=1130, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=my-client, correlationId=41017), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='NONE', error=NONE, node=kafka03:9092 (id: 3 rack: null))) 
Dec 01 11:57:10 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Discovered group coordinator kafka03:9092 (id: 2147483644 rack: null) 
Dec 01 11:57:10 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending Heartbeat request to coordinator kafka03:9092 (id: 2147483644 rack: null) 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending Heartbeat request to coordinator kafka03:9092 (id: 2147483644 rack: null) 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Group coordinator kafka03:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending FindCoordinator request to broker kafka03:9092 (id: 3 rack: null) 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Coordinator discovery failed, refreshing metadata 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending FindCoordinator request to broker kafka01:9092 (id: 1 rack: null) 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1575154631644, latencyMs=455, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=my-client, correlationId=41027), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='NONE', error=NONE, node=kafka03:9092 (id: 3 rack: null))) 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Discovered group coordinator kafka03:9092 (id: 2147483644 rack: null) 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Group coordinator kafka03:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery 
Dec 01 11:57:13 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending FindCoordinator request to broker kafka01:9092 (id: 1 rack: null) 
Dec 01 11:57:14 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1575154632013, latencyMs=321, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=my-client, correlationId=41028), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='NONE', error=NONE, node=kafka03:9092 (id: 3 rack: null))) 
Dec 01 11:57:14 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Discovered group coordinator kafka03:9092 (id: 2147483644 rack: null) 
Dec 01 11:57:14 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Group coordinator kafka03:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery 
Dec 01 11:57:14 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending FindCoordinator request to broker kafka01:9092 (id: 1 rack: null) 
Dec 01 11:57:14 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1575154632302, latencyMs=207, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=my-client, correlationId=41029), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='NONE', error=NONE, node=kafka03:9092 (id: 3 rack: null))) 
Dec 01 11:57:14 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Discovered group coordinator kafka03:9092 (id: 2147483644 rack: null) 
Dec 01 11:57:14 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending Heartbeat request to coordinator kafka03:9092 (id: 2147483644 rack: null) 
Dec 01 11:57:17 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Group coordinator kafka03:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery 
Dec 01 11:57:17 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending FindCoordinator request to broker kafka03:9092 (id: 3 rack: null) 
Dec 01 11:57:18 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending FindCoordinator request to broker kafka02:9092 (id: 2 rack: null) 
Dec 01 11:57:18 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Received FindCoordinator response ClientResponse(receivedTimeMs=1575154636491, latencyMs=172, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=my-client, correlationId=41037), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='NONE', error=NONE, node=kafka02:9092 (id: 2 rack: null))) 
Dec 01 11:57:18 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Discovered group coordinator kafka02:9092 (id: 2147483645 rack: null) 
Dec 01 11:57:19 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Sending Heartbeat request to coordinator kafka02:9092 (id: 2147483645 rack: null) 
Dec 01 11:57:19 AbstractCoordinator: [Consumer clientId=my-client, groupId=my-group] Received successful Heartbeat response 

You can observe that like before, both kafka01 and kafka02 responded saying kafka03 is the GC... the client initially accepted this information but needed to confirm it with a heatbeat to kafka03... with an invalid response, it rejected the GC and proceeded to rediscover the correct GC. When it made a call to kafka03, the response said the GC is kafka02, and upon a successful heartbeat the client can confirm the GC is correct.

I have not looked at the source of librdkafka, but just wanted to report the findings first, and hoping for confirmation of the bug before going any further with the investigation. But based on the logs in the previous comment, I can assume librdkafka is not using the heartbeat response to validate the GC discovery call.

@plachor
Copy link

plachor commented Dec 16, 2019

Hi @edenhill what is the status of this issue, we believe that we were affected by this issue on one of our sandbox environments. We are using Confluent.Kafka dotnet wrapper at version 1.2.0 which targets librdkafka at same version. Our brokers are at 2.1.0 version.

We have there Kafka and Zookeeper in HA (each deployed with 3 nodes). There are multiple consumer groups and topics. Consumer groups vary in size from 3 to 45 instances. In our case we must guarantee order across partition and at least once delivery.

There was a maintenance action applied to a virtual hosts of Kafka and Zookeeper instances thus they were taken down within an interval one by one to perform the maintenance task. It turned out that the action lacked proper monitoring of cluster state and graceful shutdown was not not guaranteed to be fully awaited.

After 3 days from maintenance action we detected that some of messages where processed in parallel causing race conditions in our application. After investigation it turned out that after this 3 days there was an active-but-detached instance of consumer group that was not active from Kafka cluster perspective. It was not listed by consumer-group API nor by CLI scripts. We checked logs of that instance and looking at logs, this instance believed that it is still active in a group and was still processing it's assignment (3-days old).

In logs we found:
Error at offset: "TOPIC_XYZ [[12]] @5915: Broker: Not coordinator for group".
So it seems that library should able to recognize the issue. I cannot confirm if this instance could commit offset successfully on that day since debug level logs where not enabled.

We checked brokers logs and found that there was an issue reported:
[2019-12-11 01:49:57,453] INFO [GroupCoordinator 1001]: Member CONSUMER_GROUP_XYZ.ID in group CONSUMER_GROUP_XYZ has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

Sadly we had no option to turn on librd-debug logging without restart of instance. So cannot provide more details. Could it correlate with above?

I'm adding hot reload of logging severity level within our Kafka abstraction layer, is there option hot-swap this logging options after consumer was created and if not, is there a significant performance footprint on your side if enable your trace level logs but will not process them unless required on my side?

@plachor
Copy link

plachor commented Dec 16, 2019

So to clarify situation we had competing consumers scenario in which two instances with the same consumer-group-id where reading and processing same partition.

@KrzysztofBranicki
Copy link

Hi @edenhill can you give some update on the status of this issue? In our system we strongly rely on the fact that Kafka will not allow for multiple consumers that identify themselves with the same consumer group ID to read the same partition. We implement CQRS pattern where we have separate microservice that owns Write Model and separate microservice that owns Read Model. Read Model is kept up to date by consuming events produced by Write Model microservice. Our event handlers are idempotent so we don't mind having duplicate events as long as there is no competing consumer scenario. Problems start when we have competing consumer scenario (within one consumer group). Let's say we have 10 events that indicate state change of a single entity. First competing instance processed all 10 events and updated Read Model to latest version of the entity. Then second competing instance started processing same 10 events but after processing 5 events it is being restarted or in some other way acknowledges that it should not be consuming same partition and stops consumption. As a result we have Read Model in inconsistent state (version 5 instead of 10). Those inconsistencies are very difficult to spot especially when we have tens of millions of documents in our Read Model.

@mhowlett we are using C# client library which is a wrapper to librd so I don't know if you want to track this issue only in this repository or to create separate issue also in C# client repo.

@plachor
Copy link

plachor commented Jan 20, 2020

Any update ? @edenhill

@edenhill
Copy link
Contributor

Thank you for the detailed analysis of the program, @keith-chew !

I believe the issue is that librdkafka solely relies on the group coordinator to enforce the group session timeout, so all that happens when the HEARTBEAT fails with ERR_NOT_COORDINATOR_FOR_GROUP is that librdkafka performs coordinator query (which may point to another broker) and continuation of the Heartbeats.
librdkafka will need to maintain its own session timer to kick itself out of the group if there hasn't been a successful Heartbeat request in session.timeout.ms.

This is a bug in librdkafka that we will fix for the upcoming v1.4.0 release.

@edenhill
Copy link
Contributor

@edenhill edenhill added this to the v1.4.0 milestone Jan 20, 2020
@keith-chew
Copy link
Author

Amazing! Thank you very much for the update @edenhill...!

edenhill added a commit that referenced this issue Jan 29, 2020
If no successful Heartbeat has been sent in session.timeout.ms
the consumer will trigger a local rebalance (rebalance callback with
error code set to REVOKE_PARTITIONS).
The consumer will rejoin the group when the rebalance has been handled.
edenhill added a commit that referenced this issue Jan 30, 2020
If no successful Heartbeat has been sent in session.timeout.ms
the consumer will trigger a local rebalance (rebalance callback with
error code set to REVOKE_PARTITIONS).
The consumer will rejoin the group when the rebalance has been handled.
edenhill added a commit that referenced this issue Feb 3, 2020
If no successful Heartbeat has been sent in session.timeout.ms
the consumer will trigger a local rebalance (rebalance callback with
error code set to REVOKE_PARTITIONS).
The consumer will rejoin the group when the rebalance has been handled.
edenhill added a commit that referenced this issue Feb 5, 2020
If no successful Heartbeat has been sent in session.timeout.ms
the consumer will trigger a local rebalance (rebalance callback with
error code set to REVOKE_PARTITIONS).
The consumer will rejoin the group when the rebalance has been handled.
edenhill added a commit that referenced this issue Feb 5, 2020
If no successful Heartbeat has been sent in session.timeout.ms
the consumer will trigger a local rebalance (rebalance callback with
error code set to REVOKE_PARTITIONS).
The consumer will rejoin the group when the rebalance has been handled.
@edenhill edenhill closed this as completed Feb 5, 2020
@gmeena
Copy link

gmeena commented Jul 17, 2023

@edenhill I am getting same issue with librdkafka 1.5.3
Error message is slightly different though. RD_KAFKA_RESP_ERR_NOT_COORDINATOR

Rebalancing is happening and partition got assigned to another consumer, but this particular consumer is not able to join back the cluster again.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants