Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Committing: retry commits marked as retriable #1111

Merged
merged 3 commits into from May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.annotation.InternalApi
import akka.kafka.CommitWhen.{NextOffsetObserved, OffsetFirstObserved}
import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch}
import akka.kafka.ProducerMessage._
import akka.kafka.{CommitDelivery, CommitterSettings, ProducerSettings}
Expand Down Expand Up @@ -52,7 +51,6 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
with StageIdLogging
with DeferredProducer[K, V] {

import CommittingProducerSinkStage._
import CommitTrigger._

/** The promise behind the materialized future. */
Expand Down
41 changes: 29 additions & 12 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Expand Up @@ -566,9 +566,11 @@ import scala.util.control.NonFatal
private def commitAggregatedOffsets(): Unit = if (commitMaps.nonEmpty && !rebalanceInProgress) {
val aggregatedOffsets = aggregateOffsets(commitMaps)
commitRefreshing.add(aggregatedOffsets)
commit(aggregatedOffsets, commitSenders)
val replyTo = commitSenders
// flush the data before calling `consumer.commitAsync` which might call the callback synchronously
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? That's counter intuitive based on the method name. Good to know.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't expect it to. But in one of my manual tests it appeared as if it did and when I moved these it worked...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should really be the thread calling poll which executes the callback.

commitMaps = List.empty
commitSenders = Vector.empty
commit(aggregatedOffsets, replyTo)
}

private def commit(commitMap: Map[TopicPartition, OffsetAndMetadata], replyTo: Vector[ActorRef]): Unit = {
Expand All @@ -582,18 +584,33 @@ import scala.util.control.NonFatal
exception: Exception): Unit = {
// this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe
val duration = System.nanoTime() - startTime
if (duration > settings.commitTimeWarning.toNanos) {
log.warning("Kafka commit took longer than `commit-time-warning`: {} ms, commitsInProgress={}",
duration / 1000000L,
commitsInProgress)
}
commitsInProgress -= 1
if (exception != null) {
val failure = Status.Failure(exception)
replyTo.foreach(_ ! failure)
} else {
commitRefreshing.committed(offsets)
replyTo.foreach(_ ! Done)
exception match {
case null =>
if (duration > settings.commitTimeWarning.toNanos) {
log.warning("Kafka commit took longer than `commit-time-warning`: {} ms, commitsInProgress={}",
duration / 1000000L,
commitsInProgress)
}
commitRefreshing.committed(offsets)
replyTo.foreach(_ ! Done)

case e: RetriableCommitFailedException =>
log.warning("Kafka commit is to be retried, after={} ms, commitsInProgress={}, cause={}",
e.getCause,
duration / 1000000L,
commitsInProgress)
commitMaps = commitMap.toList ++ commitMaps
commitSenders = commitSenders ++ replyTo
requestDelayedPoll()

case commitException =>
log.error("Kafka commit failed after={} ms, commitsInProgress={}, exception={}",
duration / 1000000L,
commitsInProgress,
commitException)
val failure = Status.Failure(commitException)
replyTo.foreach(_ ! failure)
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/akka/kafka/internal/ProducerStage.scala
Expand Up @@ -5,8 +5,6 @@

package akka.kafka.internal

import java.util.concurrent.atomic.AtomicInteger

import akka.annotation.InternalApi
import akka.kafka.ProducerMessage._
import akka.kafka.ProducerSettings
Expand Down
Expand Up @@ -5,6 +5,8 @@

package akka.kafka.internal

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage._
Expand Down Expand Up @@ -183,6 +185,43 @@ class CommittingWithMockSpec(_system: ActorSystem)
Await.result(control.shutdown(), remainingOrDefault)
}

it should "retry commit" in assertAllStagesStopped {
val retries = 4
val callNo = new AtomicInteger()
val timeout = new CommitTimeoutException("injected15")
val onCompleteFailure: ConsumerMock.OnCompleteHandler = { offsets =>
if (callNo.getAndIncrement() < retries)
(null, new RetriableCommitFailedException(s"injected ${callNo.get()}", timeout))
else (offsets, null)
}
val commitLog = new ConsumerMock.LogHandler(onCompleteFailure)
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
.toMat(TestSink.probe)(Keep.both)
.run()

val msg = createMessage(1)
mock.enqueue(List(toRecord(msg)))

probe.request(100)
val done = probe.expectNext().committableOffset.commitInternal()

awaitAssert {
commitLog.calls should have size (1)
}

// allow poll to emulate commits
mock.releaseAndAwaitCommitCallbacks(this)

// the first commit and the retries should be captured
awaitAssert {
commitLog.calls should have size (retries + 1L)
}

done.futureValue shouldBe Done
control.shutdown().futureValue shouldBe Done
}

it should "collect commits to be sent to commitAsync" in assertAllStagesStopped {
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
Expand Down