Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker: Specified group generation id is not valid #331

Open
6 tasks
rao-mayur opened this issue Jun 18, 2024 · 12 comments
Open
6 tasks

Broker: Specified group generation id is not valid #331

rao-mayur opened this issue Jun 18, 2024 · 12 comments

Comments

@rao-mayur
Copy link

Description

I have a streams application running which uses 2 ForeachAsync() loops for processing. I am seeing an error/exception being logged by the Streamiz Library:
external-stream-thread[] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
{
"error.class": "Confluent.Kafka.KafkaException",
"error.message": "Broker: Specified group generation id is not valid",
"error.stack": "\t at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable1 offsets) \n\t at Confluent.Kafka.Consumer2.Commit(IEnumerable`1 offsets) \n\t at Streamiz.Kafka.Net.Processors.ExternalStreamThread.CommitOffsets(Boolean clearBuffer) \n\t at Streamiz.Kafka.Net.Processors.ExternalStreamThread.Commit() \n\t at Streamiz.Kafka.Net.Processors.ExternalStreamThread.<>c__DisplayClass44_1.b__2() \n\t at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) \n\t at Streamiz.Kafka.Net.Processors.ExternalStreamThread.Run()",
"level": "ERROR",
"message": "external-stream-thread[] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:",
}
Also there is warnings being logged like this :
stream-task[] Error with a non-fatal error during committing offset (ignore this, and try to commit during next time): Broker: Specified group generation id is not valid

I notice issues with rebalance when this happens and all consumers do not join the consumer group until a manual restart happens. Although not fatal, I see lag build up and overall slow processing.

  1. Can you please review and let me know if the above is a critical error from Streamiz?
  2. Does it affect re-balance?
  3. Also are there any config settings that I need to tweak in the client to overcome this error?

How to reproduce

Not sure if this can be easily reproduced. Having a topology that uses 2 ForeachAsync() loops (which does some work processing the message) and adding consumers to an existing group to see if the re-balance happens correctly.

Config :
AutoOffsetReset = earliest
AutoRegisterSchemas = false,
UseLatestVersion = true,
MaxPollIntervalMs = 300000

Checklist

Please provide the following information:

  • Streamiz.Kafka.Net nuget version. - 1.5.1
  • Apache Kafka version. - 3.6
  • Client configuration. - attached above
  • Operating system. - linux
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration). - logs provided above
  • Critical issue. - low
@LGouellec
Copy link
Owner

Hi @rao-mayur ,

Thanks for your ticket.
This kind of issue happens when you take too much time to process messages, so the consumer instance of the group is evicted. Remember in Kafka, you have to poll data regularly depending your configuration MaxPollIntervalMs.
So in your application, the external stream thread failed because when the application has tried to commit the last offset, the instance has evicted from the group.

You can increase the MaxPollIntervalMs, and if you would like to continue to process messages in this case, you can override the InnerExceptionHandler to return CONTINUE. In this case, the consumer will stop, recreate another instance, but keep in mind that you can have duplicated in the downstream processors.

@rao-mayur
Copy link
Author

rao-mayur commented Jun 19, 2024

Hi @LGouellec

Thanks for the response. I have a few follow up :

  1. So more time to process messages mean consumer instance is assumed dead/evicted and hence the generation id not available error is thrown by the broker right?
  2. I see two clients when I startup my app, one just 'ConsumerGroupName' and other 'ConsumerGroupName'-external (which I think is for the foreachasync), so if time is spent in the foreachasync loop are consumers under 'ConsumerGroupName'-external evicted/dead? What happens to ones under 'ConsumerGroupName'?
  3. RE InnerExceptionHandler the consumer will stop, recreate another instance : meaning a new stream thread is created (entire topology is built again)? Like a restart of the app?
  4. I see another aption FAIL for InnerExceptionHandler, what are the cases to use this? Should I try it for solving this error?

@LGouellec
Copy link
Owner

@rao-mayur

1- Yes exactly, there are lot of blog post on that. Example : https://medium.com/codex/dealing-with-long-running-jobs-using-apache-kafka-192f053e1691. Streamiz has internally some mecanishm with a back pressure pattern, but in some cases that doesn't always avoid being kicked out of the group

2- Yes exactly, all the async processing is made with a dedicated consumer-group (ConsumerGroupName'-external). So doesn't have impact of the initial consumer group.

3- Not totally, if an internal exception is raised (Consumer group evicted for instance), the external consumer is closed properly, and another instance of the consumer group ConsumerGroupName'-external will be created. See : https://github.com/LGouellec/kafka-streams-dotnet/blob/3e77bb5b3ae670a9e261258faa9adf8c5cd44ca2/core/Processors/ExternalStreamThread.cs#L406.

4- This is the default behavior, FAIL means the thread will die and nothing else more. So no more external processing in your current application instance.

@rao-mayur
Copy link
Author

Ok thank you @LGouellec
I will look into the config setting or InnerExceptionHandler and see if it helps with the error.

@rao-mayur
Copy link
Author

Hi @LGouellec

I set the InnerExcetionHandler to continue in the stream config setting. Got the warning Detected that the thread throw an inner exception. Your configuration manager has decided to continue running stream processing. So will close out all assigned tasks and rejoin the consumer group so I know the config setting is being applied.

Even after making the config update the issue I continue to see is : all the consumers in a group (I have 6 consumers running) do not join the group as part of the rebalance. This is for the -external group (ForEaschAsync() processor). This causes processing to go down and builds up lag. Manually re-starting the processors seem to help and more consumers join the group.

Do you have any advise for this? How to be sure that all consumers re join the group after InnerExceptionHandler returns Continue?

Thanks,
Mayur.

@LGouellec
Copy link
Owner

Hi @rao-mayur ,

Can I share your last logs please ?

Best regards,

@LGouellec LGouellec reopened this Aug 6, 2024
@rao-mayur
Copy link
Author

Hi @LGouellec

Please see file attached.

basically those logs capture that after the InnerExceptionHandler Continue hits on Jul 31, 2024 14:41:02.428 only one guid is reporting Processed X total records.

I am running 6 containers so 6 threads. According to me after the Continue all 6 should restart and in logs I should be able to see 6 different guids report Processed X total records right?

Thank you for your help on this.
issue-331-Logs.txt

@LGouellec
Copy link
Owner

Hey @rao-mayur ,

I don't understand your logs and usecase. So you have 6 pods, and so 1 external stream thread per pods.
The log shared is a concatenation of your 6 pods ? or just 1 pod ? Why the 6 threads should restart after the Continue, it's just the thread impacted by the error.
Can you share in 6 differents files the logs of each container please ? It will be easier to troubleshoot.

Btw, I'm currently conducting a satisfaction survey to understand how I can better serve and I would love to get your feedback on the product.

Your insights are invaluable and will help us shape the future of our product to better meet your needs. The survey will only take a few minutes, and your responses will be completely confidential.

Survey

Thank you for your time and feedback!

@sm003ash
Copy link

sm003ash commented Sep 25, 2024

@LGouellec I see this error but not because it took too long to process messages; rather the state transitioned BACK from RUNNING to PARTITIONS_ASSIGNED. Here is the timeline:

INFORMATION
Sep 24, 2024 01:41:03.062
stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING
 
INFORMATION
Sep 24, 2024 01:41:06.176
stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING
 
INFORMATION
Sep 24, 2024 01:42:00.076
external-stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-external-stream-thread] Processed 62 total records in 60000ms

As you can see, the thread processed 62 messages in a minute; which is less than the default 5 min MaxPollInterval.
Now these state transitions occur but I am not sure why:


INFORMATION
Sep 24, 2024 01:42:06.317
stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-stream-thread-0] State transition from RUNNING to PARTITIONS_ASSIGNED
 
INFORMATION
Sep 24, 2024 01:42:06.718
stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING

This causes the following error. Note that its unable to close out the thread because it tries to Commit offsets which isn't going to work:

ERROR
Sep 24, 2024 01:42:06.763
external-stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-external-stream-thread] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
error.message:Broker: Specified group generation id is not valid
error.stack:	   at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable`1 offsets) 
	   at Confluent.Kafka.Consumer`2.Commit(IEnumerable`1 offsets) 
	   at Streamiz.Kafka.Net.Processors.ExternalStreamThread.CommitOffsets(Boolean clearBuffer) 
	   at Streamiz.Kafka.Net.Processors.ExternalStreamThread.Commit() 
	   at Streamiz.Kafka.Net.Processors.ExternalStreamThread.<>c__DisplayClass44_1.<Run>b__2() 
	   at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action) 
	   at Streamiz.Kafka.Net.Processors.ExternalStreamThread.Run()
 
INFORMATION
Sep 24, 2024 01:42:06.792
external-stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-external-stream-thread] Shutting down
 
ERROR
Sep 24, 2024 01:42:06.811
external-stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-external-stream-thread] Failed to close external stream thread due to the following error:
error.message:Broker: Specified group generation id is not valid
error.stack:	   at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable`1 offsets) 
	   at Confluent.Kafka.Consumer`2.Commit(IEnumerable`1 offsets) 
	   at Streamiz.Kafka.Net.Processors.ExternalStreamThread.CommitOffsets(Boolean clearBuffer) 
	   at Streamiz.Kafka.Net.Processors.ExternalStreamThread.CompleteShutdown()

Is the expectation here to CONTINUE on inner exceptions like you proposed earlier? I don't think increasing MaxPollIntervalMs helps in this particular case.

Also if the external stream thread is shutdown, why isn't a new one launched so that stream processing continues? Seems like the stream threads can fail and shutdown for several reasons - SSL handshake errors / network errors leading to member being kicked out, message processing taking too long, state transitions from RUNNING to PARITIONS_ASSIGNED and so on.

Moreover, HandleInnerExceptions() makes a call to CommitOffsets(true). Won't this itself throw the same exception as above? I don't think this gets caught anywhere. Please confirm.

@sm003ash
Copy link

Another case: On startup, task restoration takes > 5 min, which is the default for MaxPollIntervalMs. As a result, Consume() fails:

      stream-task[3|1] Restored and ready to run
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Restoration took 382079ms for all tasks 5-2,5-0,6-1,6-0,3-0,2-1,2-0,5-3,5-1,3-1,6-3,6-2,5-4,6-5,6-4,5-5

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[digestmanager-363a729f-8fe7-4883-9e04-9795aa3625aa-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.ConsumeException: Application maximum poll interval (300000ms) exceeded by 441ms
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
         at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout, Int64 maxRecords)
         at Streamiz.Kafka.Net.Processors.StreamThread.<>c__DisplayClass65_0.<Run>b__0()
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()

We are able to workaround this by changing InnerExceptionHandler to CONTINUE and by increasing MaxPollIntervalMs. Is a better alternative to create Consumer instance in StreamThread after the restoration is complete? Is that feasible?

@LGouellec
Copy link
Owner

external-stream-thread[sessionmanager-ce34557c-bc83-48cf-a569-a801ee06db33-external-stream-thread] Processed 62 total records in 60000ms

Thanks for your input, for your information, I will complete refactor the external stream thread processor in the 1.8.0 release.
Let me check your logs and troubleshoot the origin

@LGouellec
Copy link
Owner

Another case: On startup, task restoration takes > 5 min, which is the default for MaxPollIntervalMs. As a result, Consume() fails:

      stream-task[3|1] Restored and ready to run
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Restoration took 382079ms for all tasks 5-2,5-0,6-1,6-0,3-0,2-1,2-0,5-3,5-1,3-1,6-3,6-2,5-4,6-5,6-4,5-5

fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[digestmanager-363a729f-8fe7-4883-9e04-9795aa3625aa-stream-thread-0] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
      Confluent.Kafka.ConsumeException: Application maximum poll interval (300000ms) exceeded by 441ms
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
         at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout, Int64 maxRecords)
         at Streamiz.Kafka.Net.Processors.StreamThread.<>c__DisplayClass65_0.<Run>b__0()
         at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()

We are able to workaround this by changing InnerExceptionHandler to CONTINUE and by increasing MaxPollIntervalMs. Is a better alternative to create Consumer instance in StreamThread after the restoration is complete? Is that feasible?

This is a common error yes, when the task restoration is bigger than 5 minutes per default, the consumer fail due to an inactive consumer.
Workaround : InnerExceptionHandler to CONTINUE and by increasing MaxPollIntervalMs.

Is a better alternative to create Consumer instance in StreamThread after the restoration is complete? Is that feasible?
No better alternative.

For your information, I will work to improve the restoration time soon (plan for 1.8.0) to leverage bulk upsert into RocksDb instead of unitary upsert.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants