Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commits during reblance fail with `CommitFailedException` #755

Closed
ennru opened this issue Mar 22, 2019 · 2 comments

Comments

Projects
None yet
2 participants
@ennru
Copy link
Member

commented Mar 22, 2019

This is a different description of #750.

Versions used

Alpakka Kafka version: 1.0.1

Expected Behavior

When Kafka rebalances partitions between consumers, committing to Kafka should work as usual.
Commits for a revoked partition might put the current offset in a state where the former consumer of that partition commits messages that already have been emitted on its new consumer. This leads to duplicated messages, but that's expected.

Actual Behavior

When consumers are rebalanced, commits fail with CommitFailedExeption which state that it fails as Kafka "assigned the partitions to another member".

Analysis

The error message in CommitFailedExeption is a bit misleading. It is raised by ConsumerCoordinator when a rebalance is half-done:

https://github.com/apache/kafka/blob/2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L835-L840

Since changing to the Kafka 2.x API in Alpakka Kafka (using poll(Duration)), the call to poll may finish before a rebalance has issued the partitionsAssigned. This leaves the consumer in the half-done state and is due to the change in the Kafka client that poll(Duration) doesn't wait for the whole metadata exchange to finish if the timeout is reached.

As the KafkaConsumerActor will process its next messages, which might include commits, the call to consumer.commitAsync may happen during the half-done rebalance and fail in the ConsumerCoordinator.

Once the new partitions are assigned after a later call to poll, committing works again and does not fail even for offsets to partitions that are rebalanced to a new consumer (AFAICS).

Reproducible Test Case

See #751

Work-around

It is possible to ignore the CommitFailedExceptions by mounting a supervision strategy on the Committer.flow or sink.

    val resumeOnCommitFailed: Supervision.Decider = {
      case _: CommitFailedException ⇒ Supervision.Resume
      case _ ⇒ Supervision.Stop
    }
	
      ...
      .toMat(
        Committer
          .sink(committerSettings)
          .withAttributes(ActorAttributes.supervisionStrategy(resumeOnCommitFailed))
      )(Keep.both)
      .run()

Alternatively you may recover from the failure of the commitScaladsl or commitJavadsl futures.

ncreep2 added a commit to SupersonicAds/kafka-snow-white that referenced this issue Apr 4, 2019

ncreep2 added a commit to SupersonicAds/kafka-snow-white that referenced this issue Apr 4, 2019

ncreep2 added a commit to SupersonicAds/kafka-snow-white that referenced this issue Apr 4, 2019

@ennru

This comment has been minimized.

Copy link
Member Author

commented Apr 26, 2019

Fixed with #751

@ennru ennru closed this Apr 26, 2019

@ennru ennru added this to the 1.0.2 milestone Apr 26, 2019

@Maatary

This comment has been minimized.

Copy link

commented May 9, 2019

Awesome !!! Thanks and congrats !!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.