diff --git a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala index 5efd8e81e..f657d94b7 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala @@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicInteger import akka.Done import akka.annotation.InternalApi -import akka.kafka.CommitWhen.{NextOffsetObserved, OffsetFirstObserved} import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch} import akka.kafka.ProducerMessage._ import akka.kafka.{CommitDelivery, CommitterSettings, ProducerSettings} @@ -52,7 +51,6 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, with StageIdLogging with DeferredProducer[K, V] { - import CommittingProducerSinkStage._ import CommitTrigger._ /** The promise behind the materialized future. */ diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 29f5ff393..ab7487754 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -566,9 +566,11 @@ import scala.util.control.NonFatal private def commitAggregatedOffsets(): Unit = if (commitMaps.nonEmpty && !rebalanceInProgress) { val aggregatedOffsets = aggregateOffsets(commitMaps) commitRefreshing.add(aggregatedOffsets) - commit(aggregatedOffsets, commitSenders) + val replyTo = commitSenders + // flush the data before calling `consumer.commitAsync` which might call the callback synchronously commitMaps = List.empty commitSenders = Vector.empty + commit(aggregatedOffsets, replyTo) } private def commit(commitMap: Map[TopicPartition, OffsetAndMetadata], replyTo: Vector[ActorRef]): Unit = { @@ -582,18 +584,33 @@ import scala.util.control.NonFatal exception: Exception): Unit = { // this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe val duration = System.nanoTime() - startTime - if (duration > settings.commitTimeWarning.toNanos) { - log.warning("Kafka commit took longer than `commit-time-warning`: {} ms, commitsInProgress={}", - duration / 1000000L, - commitsInProgress) - } commitsInProgress -= 1 - if (exception != null) { - val failure = Status.Failure(exception) - replyTo.foreach(_ ! failure) - } else { - commitRefreshing.committed(offsets) - replyTo.foreach(_ ! Done) + exception match { + case null => + if (duration > settings.commitTimeWarning.toNanos) { + log.warning("Kafka commit took longer than `commit-time-warning`: {} ms, commitsInProgress={}", + duration / 1000000L, + commitsInProgress) + } + commitRefreshing.committed(offsets) + replyTo.foreach(_ ! Done) + + case e: RetriableCommitFailedException => + log.warning("Kafka commit is to be retried, after={} ms, commitsInProgress={}, cause={}", + e.getCause, + duration / 1000000L, + commitsInProgress) + commitMaps = commitMap.toList ++ commitMaps + commitSenders = commitSenders ++ replyTo + requestDelayedPoll() + + case commitException => + log.error("Kafka commit failed after={} ms, commitsInProgress={}, exception={}", + duration / 1000000L, + commitsInProgress, + commitException) + val failure = Status.Failure(commitException) + replyTo.foreach(_ ! failure) } } } diff --git a/core/src/main/scala/akka/kafka/internal/ProducerStage.scala b/core/src/main/scala/akka/kafka/internal/ProducerStage.scala index 2c888db63..c06cc5605 100644 --- a/core/src/main/scala/akka/kafka/internal/ProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/ProducerStage.scala @@ -5,8 +5,6 @@ package akka.kafka.internal -import java.util.concurrent.atomic.AtomicInteger - import akka.annotation.InternalApi import akka.kafka.ProducerMessage._ import akka.kafka.ProducerSettings diff --git a/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala b/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala index a3bfaa4b8..6ee507801 100644 --- a/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala @@ -5,6 +5,8 @@ package akka.kafka.internal +import java.util.concurrent.atomic.AtomicInteger + import akka.Done import akka.actor.ActorSystem import akka.kafka.ConsumerMessage._ @@ -183,6 +185,43 @@ class CommittingWithMockSpec(_system: ActorSystem) Await.result(control.shutdown(), remainingOrDefault) } + it should "retry commit" in assertAllStagesStopped { + val retries = 4 + val callNo = new AtomicInteger() + val timeout = new CommitTimeoutException("injected15") + val onCompleteFailure: ConsumerMock.OnCompleteHandler = { offsets => + if (callNo.getAndIncrement() < retries) + (null, new RetriableCommitFailedException(s"injected ${callNo.get()}", timeout)) + else (offsets, null) + } + val commitLog = new ConsumerMock.LogHandler(onCompleteFailure) + val mock = new ConsumerMock[K, V](commitLog) + val (control, probe) = createCommittableSource(mock.mock) + .toMat(TestSink.probe)(Keep.both) + .run() + + val msg = createMessage(1) + mock.enqueue(List(toRecord(msg))) + + probe.request(100) + val done = probe.expectNext().committableOffset.commitInternal() + + awaitAssert { + commitLog.calls should have size (1) + } + + // allow poll to emulate commits + mock.releaseAndAwaitCommitCallbacks(this) + + // the first commit and the retries should be captured + awaitAssert { + commitLog.calls should have size (retries + 1L) + } + + done.futureValue shouldBe Done + control.shutdown().futureValue shouldBe Done + } + it should "collect commits to be sent to commitAsync" in assertAllStagesStopped { val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog)