Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increasing consumers in consumer group cause rebalance failure with CommitFailedException because of revoked partition #750

Closed
Maatary opened this issue Mar 16, 2019 · 26 comments
Milestone

Comments

@Maatary
Copy link

Maatary commented Mar 16, 2019

Hi,

I think this issue is related to #539 but I don't know if it is a bug, or the user is supposed to handle it himself.

So I have a consumer group, whenever i increase the number of consumer in that group, the revoking of partition is causing the following error:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:778)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:617)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:584)
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1479)
	at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$commit(KafkaConsumerActor.scala:430)
	at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:210)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
	at akka.actor.Timers.aroundReceive(Timers.scala:55)
	at akka.actor.Timers.aroundReceive$(Timers.scala:40)
	at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

This does not happen when i scale down the number of consumer. I mean so far i have not observed that. I assume this is because, partition are not revoke on scaling down. The remaining consumer just get new partition.

Note that, I do group message and commit batch things.

Here is how my code look like

val source = Consumer.committableSource(consumerSettings, subscription)
      .async
      .groupBy(Int.MaxValue, computeNamedGraph)
      .groupedWithin(conf.tripleStoreSettings.batchSize, conf.tripleStoreSettings.batchWindowSec seconds)
      .map(toUpdateStatements)
      .async
      .mergeSubstreams
      .map(toHttpRequest)
      .map(p => p.data -> p)
      .via(poolClientFlow)
      .async
      .map { case (respone, payload) => Payload(respone, payload.offsets) }
      .mapConcat(handleResponse)
      .via(Committer.flow(committerDefaults.withMaxBatch(conf.tripleStoreSettings.batchSize)))

    val (killSwitch, streamResults) = source
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.both)
      .run()

    streamResults.onComplete {
      case Success(_) =>
        logger.info("Stream finished")
        system.terminate()
      case Failure(e) =>
        logger.error("Stream failed:", e)
        system.terminate()
    }

My decider just does the following:

 private val decider: Supervision.Decider = {
    e => {
      logger.error(s"Stream failed. ${e.getMessage} ${e.getStackTrace.map(_.toString).mkString("\n")}", e)
      Supervision.Stop
    }
  }

So I understand based on my reading of #539 that i have a number of inflight messages to commit back and I can't because of the revokation. That is, there is some rebalance that involve revokation that happen when the number of consumer is scaled up.

My service is at-least once, so i don't mind if another consumer reprocess those message. we don't have an at-most-one deliver constraint.

My question would be until the library handle those situation natively, how can i go about committing them anyway whenever revoke occurs or better yet, just discard them, so the consumer who get assigned the partition they belong too, will reprocess them.

Any suggestion ? I check the BalanceListener but i am not sure how to go about using it for this situation.

Note My timeout configs

val subscription = Subscriptions.topicPattern(conf.kafkaConsumer.sourceTopic)
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(conf.kafkaBroker.bootstrapServers)
      .withGroupId(conf.kafkaConsumer.groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, conf.kafkaConsumer.offsetReset)
      .withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000000")
      .withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100000")

@Maatary Maatary changed the title Increasing consumers in consumer group cause rebalance failure with CommitFailException because of revoked partition Increasing consumers in consumer group cause rebalance failure with CommitFailedException because of revoked partition Mar 16, 2019
@laymain
Copy link

laymain commented Mar 19, 2019

We have the same problem here and I think we have found the explanation.
It is easily reproducible using the groupedWithin feature, here is a case :

  • groupedWithin() configured for groups of 1000 records or 30 seconds
  • poll() returns 500 records of partitions 0 and 1
  • group is partially filled, conditions (size or time) are not met
  • a new consumer joins the group -> rebalancing
  • next poll() returns 0 records until the rebalance is finished (since KIP-266)
  • partition 1 is revoked
  • poll() returns 500 records of partition 0
  • group conditions are met
  • committing offsets -> CommitFailedException because group contains records from a revoked partition (records from first poll())

But this error is not related to the groupedWithin feature, we can reproduce it with a long processing operation (ie: a dumb Thread.sleep()) because next poll() will send a JoinGroup request and revoke partitions even if the processing isn't completed.

I'm pretty sure that Akka was depending on the behavior of Kafka's poll(long) that was blocking until an assignment is found, where poll(Duration) is not (as described in KIP-266).

@ennru
Copy link
Member

ennru commented Mar 20, 2019

@Maatary Thank you for reporting and @laymain for explaining how we could reproduce it. Your explanation makes sense to me.
There have been quite a few discussions in several issues around how to handle rebalances more reliably. Please join us to find the best solution!

@ennru
Copy link
Member

ennru commented Mar 20, 2019

It looks like I have a test showing this now. Will stabilize and send a PR.

ennru added a commit to ennru/alpakka-kafka that referenced this issue Mar 20, 2019
@laymain
Copy link

laymain commented Mar 20, 2019

Hi @ennru, have you any clue of how we could ensure that process is completed (or timed out)
before sending a new JoinGroup request after a rebalance event?
I think that it may be the source of this issue, we should not instantly re-join the group in such a case

@laymain
Copy link

laymain commented Mar 20, 2019

Also, I may have misunderstood your test, but your comment seems strange to me:

commit BEFORE the reassign finishes with an assignment

The case I have described does not require committing before the end of the rebalancing, the issue occurs even if we commit after the rebalancing, as long as the group duration condition is not met

@ennru
Copy link
Member

ennru commented Mar 20, 2019

Yes, the funny thing is once the PartitionsAssigned messages arrive committing works fine (first I waited for those and the test would show the error).
So it is the slot between the revoke and assign which makes the commits fail.

@laymain
Copy link

laymain commented Mar 20, 2019

Sorry but I don't get it, if my group contains records of a revoked partition and I commit it, it will fail, regardless of whether the PartitionsAssigned message has arrived

@ennru
Copy link
Member

ennru commented Mar 20, 2019

For me, it doesn't fail once the rebalance has finished. But I know, that doesn't really make sense.

If you are happy with getting messages redelivered, you may add a supervision strategy to resume on CommitFailedExceptions. (As shown in another test I just added.)

@laymain
Copy link

laymain commented Mar 20, 2019

I have to admit that indeed it doesn't make sense for me, I can't see how a Kafka broker will in any case accept a commit of revoked partitions for a consumer. The exception message seems quite clear:
CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

I'm pretty sure I can reproduce this using a simple Thread.sleep() with an acceptable duration because of the almost instant JoinGroup request sent after the rebalancing has begun.

Just to be sure, have you set the enable.auto.commit property to false in your test?


We can accept a few duplicated messages on rare case, but actually it will happen each time a rebalance occurs. Even worse, this issue has a snowball effect when we have several consumers: a rebalance (scale up only) will cause almost all consumers to raise a CommitFailedException and leave the group. Depending on the time a consumer takes to restart and re-join the group, it might trigger a new rebalance that will cause, again, almost all consumers to raise a CommitFailedException and so on.

I think we should try to find a way to allow consumers flushing the pending data and committing before rejoining the group within an acceptable amount of time.
If we have a look at Kafka Connect's configuration, we can see that there is a such mechanism:

Name Description
rebalance.timeout.ms The maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.

@ennru
Copy link
Member

ennru commented Mar 21, 2019

Thank you for your input on it, I'll dig into it.

You can mitigate the problem with a supervision strategy on the Committer:

    val resumeOnCommitFailed: Supervision.Decider = {
      case _: CommitFailedException ⇒ Supervision.Resume
      case _ ⇒ Supervision.Stop
    }
	
      ...
      .toMat(
        Committer
          .sink(committerSettings)
          .withAttributes(ActorAttributes.supervisionStrategy(resumeOnCommitFailed))
      )(Keep.both)
      .run()

This will ignore the failed commits, and not take down the flow. So you avoid rippling efect of consumers shutting down.

@Maatary
Copy link
Author

Maatary commented Mar 21, 2019

Hi @ennru Thank you for your reply so far. Also @laymain Thanks for the details.

@ennru I was thinking about doing that. The only worry that I had was, If i use the resume do i run the risk to drop good drop good messages or not ?

What i mean is if we commit in batch, is there a potential that i have started to fetch the new message from the new partition and end up with a batch to commit that include message for partition i have been revoked and message for partition i have been newly assign.

Am i sure that all the good messages will pass trough and the fail will only happen for the bad messages. In other words, one big batch commit operation will partially succeed but return commit failed exception ? If that is the case, then your solution make sense. But if Kafka, reject the all batch commit because some of the message are wrong, then resuming will drop the good message.

Does that make sense ? Have you though about that scenario ?

@laymain
Copy link

laymain commented Mar 21, 2019

Thank you, this workaround will indeed stop the consumers from crashing and the risk of triggering new rebalances.

@laymain
Copy link

laymain commented Mar 21, 2019

@Maatary There is a risk of producing duplicates because of not committing offsets, but since you are committing a batch, it means that have produced all messages contained in that batch, you won't drop good messages.

@Maatary
Copy link
Author

Maatary commented Mar 21, 2019

@laymain Can you expand on this I did not get that:

it means that have produced all messages contained in that batch, you won't drop good messages.

@laymain
Copy link

laymain commented Mar 21, 2019

Hum... I'll give a try:

  • you have a batch composed of messages from a partition 0 and a partition 1
  • rebalance occurs, partition 1 is revoked
  • since you are at the committing step, it means that you have processed all of the batch messages (as described in your flow)
  • commit fails because of the partition 1 messages

From here there are two scenarios, depending on how it is implemented, we should check

  • assignment has been reset
    • next poll() will return the already processed messages from partition 0
    • new consumer of partition 1 will reprocess partition 1 messages contained in your batch
  • current offsets aren't reset
    • next poll() will return the next messages from partition 0
    • new consumer of partition 1 will reprocess partition 1 messages contained in your batch

In any case, you will redeliver some messages but won't drop any.

@Maatary
Copy link
Author

Maatary commented Mar 21, 2019

The Scenario I have in mind is:

  • Consumer 1 has partition 0

1 - Consumer 1 process some messages

2 - Rebalance occur in the mean time

3 - Consumer 1 get partition 2

4 - Consumer 1 start consuming from partition 2 not committing yet the message from partition 1 he already consumed (This could be because of the size of your batch)

5 - Consumer 1 rich a commit batch size, that contains message from partition 2 and partition 1

6 - Consumer 1 commit the batch and fail

7 - Then with the resume above, consumer 1 drop the all batch (**does the all batch failed or only messages from partition 1 ???? **)

8 - Consumer 1 resume and continue with the message of the new partition, not committing those messages from the new partition 2, but then commit the following messages of partition 2.

That's potentially something that could happen. This does not mean the the messages from partition 2 drop were not processed but still they were not committed and that is not good.

The issue here GroupWithin, can group message from the revoked partition and message from the new partition, or CommitBatch can batch together message from the old partition and message from the new partition.

If however the semantic of commit of the Kafka API is you get everything expected those who failed and therefore get a commit exception for those only, then it works, however that would not be very nice.

@Maatary
Copy link
Author

Maatary commented Mar 21, 2019

The Dilemma however is that a restart might cause a chain of restart cascade. Not completely sure of that tho. Somehow akka stream manage handle some use cases well :) !! i just can't completely predict the behavior :)

@laymain
Copy link

laymain commented Mar 22, 2019

I don't know if I'm misunderstanding what you're saying or if you're misunderstanding how offsets committing works but your following statement doesn't make sense for me

7 - Then with the resume above, consumer 1 drop the all batch (**does the all batch failed or only messages from partition 1 ???? **)

Batch isn't dropped, you're not using transactions, only the commit fails.

This does not mean the the messages from partition 2 drop were not processed but still they were not committed and that is not good.

You should go further on this, ask yourself why is it not good? What are the risks?

To give an answer to your worries, the behavior seems predictible to me depending on the scenario, I can't see a case where messages were dropped, only more or less redelivered messages

@Maatary
Copy link
Author

Maatary commented Mar 22, 2019

Isn't what resume does is dropping messages ?

If you do .via(Committer.flow(committerDefaults.withMaxBatch(conf.tripleStoreSettings.batchSize)) and you get a CommitFailed exception and you do the suggestion above which is Resume, this means you drop the offset you just tried to commit. isn't that correct ?

What I am saying in essence is what happens if in that batch, you have offsets of the revoked partition combined with offset of the partition you do own ?

@laymain
Copy link

laymain commented Mar 22, 2019

Nothing in particular, offsets of both partition aren't committed, Supervision.Resume means that it won't interrupt the flow, it will continue processing next messages.

@ennru
Copy link
Member

ennru commented Mar 22, 2019

I've written down my insights around this issue in #755

@Maatary
Copy link
Author

Maatary commented Mar 22, 2019

Nothing in particular, offsets of both partition aren't committed, Supervision.Resume means that it won't interrupt the flow, it will continue processing next messages.

That was my worry with the resume, you will commit in order but skip committing some valid offset in the process. I don't know if that's a good thing in general. e.g commit 1 2 3 skipp 4 5 6 and commit 7 8 9. You will skip 4 5 6 because you are committing at the same time x y z which are of a revoked partition.

Given that i do not know the consequence of doing that i just raised the question :)

@laymain
Copy link

laymain commented Mar 22, 2019

@Maatary That's not really how it works, when you have messages 1, 2, 3 in your batch, you will only commit offset 3, you don't need to commit offsets one by one.
Skipping the commit of a processed batch with messages 4, 5, 6 and commiting the next one containing 7, 8, 9 is just as committing a bigger batch containing 4, 5, 6, 7, 8, 9 since you will only commit offset 9.
The only risk is, if there is a crash, restarting at offset 4 and redelivering messages.

Committing has no effect on messages, it is just saving your group progression in consuming the stream, it's checkpointing.

@ennru I've read your insights, I'm not sure that I'm understanding the implications, have you got in mind a solution that will avoid redelivering messages systematically on rebalance? Are you planning to disable polling until the pending messages are processed and committed (or timed out), not only until rebalance is finished?
It would be nice to have a draining mechanism when rebalancing occurs before re-joining the group and starting to poll again.

@Maatary
Copy link
Author

Maatary commented Mar 23, 2019

@laymain I think I found a case where that could be problematic. It is if you reach the end of the stream.

Basically if you fail to commit the last offsets of a partition because your batch commit failed because it included offset of a partition you do not own anymore, then you consumer will act like he is done i.e. stop consuming from the partition, but then if you look into kafka/lenses, you will notice that there is still a lag and nobody is consuming those last offset ?

It will take to have new messages in that partition for your consumer to request for new offsets.

So in essence, yes you will have processed them, but for monitoring purpose it is a problem, if you use things like lenses, you will see that for a specific partition, some offset are not consumed yet, and nobody is consuming them, and you won't understand why.

If you skip some commit and it appears that what you just skept, were the last offset of a partition you own, then, you can not signal to kafka that you have reach the end of the steam for that partition and whatever tool you use to monitor your consumer, will show you something confusing.

@Maatary
Copy link
Author

Maatary commented Mar 23, 2019

My hunch is restarting the all stream instead of resume might be better. This all problem will not happen at all.

However, I am just afraid that you will trigger a chain of rebalance as a result of it. Not sure, need to try it out

ennru added a commit that referenced this issue Mar 24, 2019
## Purpose

Commit 1: Make use of the fact that the rebalance listener `WrappedAutoPausedListener` is called on the thread that called `poll`, which is the actor thread. So it is safe to touch actor state. The messages sent from it before are turned into method calls.

Commit 2: Isolate commit-refreshing into a class holding the data structures and deadline logic.

Commit 3: When commit-refreshing is switched off (the default) no data needs to be collected at all. Depending on the `commit-refresh-interval` setting a no-op implementation might be used.

## Background Context

I'm investigating #750 and rebalance handling for #539 and found this to be a good step on the way.
Since Kafka 2.1 commit-refreshing is not required anymore, as [KAFKA-4682](https://issues.apache.org/jira/browse/KAFKA-4682) is resolved.
ennru added a commit to ennru/alpakka-kafka that referenced this issue Mar 27, 2019
ennru added a commit to ennru/alpakka-kafka that referenced this issue Mar 27, 2019
Fail actor of the rebalance doesn't finish within a timeout.

Fixes akka#750
ennru added a commit to ennru/alpakka-kafka that referenced this issue Apr 4, 2019
ennru added a commit to ennru/alpakka-kafka that referenced this issue Apr 4, 2019
Fail actor of the rebalance doesn't finish within a timeout.

Fixes akka#750
ennru added a commit that referenced this issue Apr 26, 2019
* Test showing #750, commit during rebalance
* Add tests to prove Committer.sinks and supervision
* Improve commit failure tests
* Stash `Commit` messages while a rebalance is unfinished.
@ennru
Copy link
Member

ennru commented Apr 26, 2019

Fixed with #751

@ennru ennru closed this as completed Apr 26, 2019
@ennru ennru added this to the 1.0.2 milestone Apr 26, 2019
charlibot pushed a commit to charlibot/alpakka-kafka that referenced this issue May 10, 2019
* Test showing akka#750, commit during rebalance
* Add tests to prove Committer.sinks and supervision
* Improve commit failure tests
* Stash `Commit` messages while a rebalance is unfinished.
charlibot pushed a commit to charlibot/alpakka-kafka that referenced this issue May 10, 2019
* Test showing akka#750, commit during rebalance
* Add tests to prove Committer.sinks and supervision
* Improve commit failure tests
* Stash `Commit` messages while a rebalance is unfinished.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants