Skip to content

Commit

Permalink
Transition to new polling behavior and get rid of wake up functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
zaharidichev committed Oct 15, 2018
1 parent 8bf85e6 commit e0d422a
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 156 deletions.
3 changes: 3 additions & 0 deletions core/src/main/resources/reference.conf
Expand Up @@ -61,10 +61,12 @@ akka.kafka.consumer {
# The KafkaConsumerActor will throw
# `org.apache.kafka.common.errors.WakeupException` which will be ignored
# until `max-wakeups` limit gets exceeded.
# DEPRECATED: No op as waking up consumer is not used anymore
wakeup-timeout = 3s

# After exceeding maxinum wakeups the consumer will stop and the stage and fail.
# Setting it to 0 will let it ignore the wakeups and try to get the polling done forever.
# DEPRECATED: No op as waking up consumer is not used anymore
max-wakeups = 10

# If set to a finite duration, the consumer will re-send the last committed offsets periodically
Expand All @@ -73,6 +75,7 @@ akka.kafka.consumer {

# If enabled, log stack traces before waking up the KafkaConsumer to give
# some indication why the KafkaConsumer is not honouring the `poll-timeout`
# DEPRECATED: No op as waking up consumer is not used anymore
wakeup-debug = true

# Fully qualified config path which holds the dispatcher configuration
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/scala/akka/kafka/ConsumerSettings.scala
Expand Up @@ -194,6 +194,41 @@ class ConsumerSettings[K, V] @deprecated("use the factory methods `ConsumerSetti
val metadataRequestTimeout: FiniteDuration
) {

def this(properties: Map[String, String],
keyDeserializerOpt: Option[Deserializer[K]],
valueDeserializerOpt: Option[Deserializer[V]],
pollInterval: FiniteDuration,
pollTimeout: FiniteDuration,
stopTimeout: FiniteDuration,
closeTimeout: FiniteDuration,
commitTimeout: FiniteDuration,
commitRefreshInterval: Duration,
dispatcher: String,
commitTimeWarning: FiniteDuration,
waitClosePartition: FiniteDuration,
positionTimeout: FiniteDuration,
offsetForTimesTimeout: FiniteDuration,
metadataRequestTimeout: FiniteDuration) = this(
properties,
keyDeserializerOpt,
valueDeserializerOpt,
pollInterval,
pollTimeout,
stopTimeout,
closeTimeout,
commitTimeout,
wakeupTimeout = 3.seconds,
maxWakeups = 10,
commitRefreshInterval,
dispatcher,
commitTimeWarning,
wakeupDebug = true,
waitClosePartition,
positionTimeout,
offsetForTimesTimeout,
metadataRequestTimeout
)

@deprecated("use the factory methods `ConsumerSettings.apply` and `create` instead", "1.0-M1")
def this(properties: Map[String, String],
keyDeserializer: Option[Deserializer[K]],
Expand Down
116 changes: 8 additions & 108 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Expand Up @@ -20,11 +20,11 @@ import akka.actor.{
Terminated,
Timers
}
import akka.util.JavaDurationConverters._
import akka.event.LoggingReceive
import akka.kafka.KafkaConsumerActor.StoppingException
import akka.kafka.{ConsumerSettings, Metadata}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -150,7 +150,6 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
var committedOffsets = Map.empty[TopicPartition, OffsetAndMetadata]
var commitRefreshDeadline: Option[Deadline] = None
var initialPoll = true
var wakeups = 0
var stopInProgress = false
var delayedPollInFlight = false

Expand Down Expand Up @@ -340,72 +339,15 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
}

def poll(): Unit = {
val wakeupTask = context.system.scheduler.scheduleOnce(settings.wakeupTimeout) {
log.warning(
"KafkaConsumer poll has exceeded wake up timeout ({}). Waking up consumer to avoid thread starvation.",
settings.wakeupTimeout.toCoarsest
)
if (settings.wakeupDebug && wakeups > settings.maxWakeups / 2) {
val stacks =
Thread.getAllStackTraces.asScala.map { case (k, v) => s"$k\n ${v.mkString("\n")}" }.mkString("\n\n")
log.warning(
"Wake up has been triggered {} times (See setting akka.kafka.consumer.wakeup-debug). Dumping stacks: {}",
wakeups,
stacks
)
}
consumer.wakeup()
}(context.system.dispatcher)

val currentAssignmentsJava = consumer.assignment()

def tryPoll(timeout: Long): ConsumerRecords[K, V] =
def tryPoll(timeout: FiniteDuration): ConsumerRecords[K, V] =
try {
val records = consumer.poll(timeout)
val records = consumer.poll(timeout.asJava)
initialPoll = false
wakeups = 0
records
} catch {
case w: WakeupException =>
wakeups = wakeups + 1
if (wakeups == settings.maxWakeups) {
log.error(
"WakeupException limit exceeded during poll({}), stopping (max-wakeups = {}, wakeup-timeout = {}).",
timeout,
settings.maxWakeups,
settings.wakeupTimeout.toCoarsest
)
context.stop(self)
} else {
if (log.isWarningEnabled && wakeups > settings.maxWakeups / 2) {
log.warning(
"Consumer poll({}) interrupted with WakeupException (#{} of max-wakeups = {}, wakeup-timeout = {}).",
timeout,
wakeups,
settings.maxWakeups,
settings.wakeupTimeout.toCoarsest
)
if (initialPoll) {
log.error(
"Initial consumer poll({}) with bootstrap servers {} did not succeed, still trying",
timeout,
settings.properties.getOrElse(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "not set")
)
}
}

// If the current consumer is using group assignment (i.e. subscriptions is non empty) the wakeup might
// have prevented the re-balance callbacks to be called leaving the Source in an inconsistent state w.r.t
// assigned TopicPartitions. In order to reconcile the state we manually call callbacks for all added/remove
// TopicPartition assignments aligning the Source's state the consumer's.
// We are safe to perform the operation here since the poll() thread has been aborted by the wakeup call
// and there are no other threads are using the consumer.
// Note: in case of manual partition assignment this is not needed since rebalance doesn't take place.
if (subscriptions.nonEmpty) {
val newAssignments = consumer.assignment().asScala
reconcileAssignments(currentAssignmentsJava.asScala.toSet, newAssignments.toSet)
}
}
throw new NoPollResult
case NonFatal(e) =>
log.error(e, "Exception when polling from consumer")
context.stop(self)
Expand All @@ -422,7 +364,7 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
throw new IllegalStateException(s"Got ${rawResult.count} unexpected messages")

consumer.pause(currentAssignmentsJava)
checkNoResult(tryPoll(0))
checkNoResult(tryPoll(Duration.Zero))

// For commits we try to avoid blocking poll because a commit normally succeeds after a few
// poll(0). Using poll(1) will always block for 1 ms, since there are no messages.
Expand All @@ -431,7 +373,7 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
var i = 10
while (i > 0 && commitsInProgress > 0) {
LockSupport.parkNanos(10 * 1000)
val pollTimeout = if (i == 1) 1L else 0L
val pollTimeout = if (i == 1) 1.millis else Duration.Zero
checkNoResult(tryPoll(pollTimeout))
i -= 1
}
Expand All @@ -441,60 +383,18 @@ class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) extends Actor w
val (resumeThese, pauseThese) = currentAssignmentsJava.asScala.partition(partitionsToFetch.contains)
consumer.pause(pauseThese.asJava)
consumer.resume(resumeThese.asJava)
processResult(partitionsToFetch, tryPoll(pollTimeout().toMillis))
processResult(partitionsToFetch, tryPoll(pollTimeout()))
}
} catch {
case _: NoPollResult => // already handled, just proceed
} finally wakeupTask.cancel()
}

if (stopInProgress && commitsInProgress == 0) {
log.debug("Stopping")
context.stop(self)
}
}

//TODO can't be re-created deterministically so should be pulled out and tested
private def reconcileAssignments(currentAssignments: Set[TopicPartition],
newAssignments: Set[TopicPartition]): Unit = {

val revokedAssignmentsByTopic = (currentAssignments -- newAssignments).groupBy(_.topic())
val addedAssignmentsByTopic = (newAssignments -- currentAssignments).groupBy(_.topic())

if (settings.wakeupDebug) {
log.info(
"Reconciliation has found revoked assignments: {} added assignments: {}. Current subscriptions: {}",
revokedAssignmentsByTopic,
addedAssignmentsByTopic,
subscriptions
)
}

subscriptions.foreach {
case Subscribe(topics, listener) =>
topics.foreach { topic =>
val removedTopicAssignments = revokedAssignmentsByTopic.getOrElse(topic, Set.empty)
if (removedTopicAssignments.nonEmpty) listener.onRevoke(removedTopicAssignments)

val addedTopicAssignments = addedAssignmentsByTopic.getOrElse(topic, Set.empty)
if (addedTopicAssignments.nonEmpty) listener.onAssign(addedTopicAssignments)
}

case SubscribePattern(pattern: String, listener) =>
val ptr = Pattern.compile(pattern)
def filterByPattern(tpm: Map[String, Set[TopicPartition]]): Set[TopicPartition] =
tpm.flatMap {
case (topic, tps) if ptr.matcher(topic).matches() => tps
case _ => Set.empty[TopicPartition]
}.toSet

val revokedAssignments = filterByPattern(revokedAssignmentsByTopic)
if (revokedAssignments.nonEmpty) listener.onRevoke(revokedAssignments)

val addedAssignments = filterByPattern(addedAssignmentsByTopic)
if (addedAssignments.nonEmpty) listener.onAssign(addedAssignments)
}
}

private def nextCommitRefreshDeadline(): Option[Deadline] = settings.commitRefreshInterval match {
case finite: FiniteDuration => Some(finite.fromNow)
case infinite => None
Expand Down
Expand Up @@ -26,7 +26,7 @@ private[kafka] abstract class SingleSourceLogic[K, V, Msg](
final val actorNumber = KafkaConsumerActor.Internal.nextNumber()
final def consumerFuture: Future[ActorRef] = consumerPromise.future

private val partitionLogLevel = if (settings.wakeupDebug) Logging.InfoLevel else Logging.DebugLevel
private val partitionLogLevel = Logging.DebugLevel

final def configureSubscription(): Unit = {
val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assignedTps =>
Expand Down
6 changes: 3 additions & 3 deletions tests/src/test/scala/akka/kafka/internal/ConsumerMock.scala
Expand Up @@ -36,7 +36,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = ConsumerMock.notI
val mock = {
val result = Mockito.mock(classOf[KafkaConsumer[K, V]])
Mockito
.when(result.poll(ArgumentMatchers.any[Long]))
.when(result.poll(ArgumentMatchers.any[java.time.Duration]))
.thenAnswer(new Answer[ConsumerRecords[K, V]] {
override def answer(invocation: InvocationOnMock) = ConsumerMock.this.synchronized {
pendingSubscriptions.foreach {
Expand Down Expand Up @@ -122,7 +122,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = ConsumerMock.notI
verify(mock, mode).close(SettingsCreator.closeTimeout.asJava)

def verifyPoll(mode: VerificationMode = Mockito.atLeastOnce()) =
verify(mock, mode).poll(ArgumentMatchers.any[Long])
verify(mock, mode).poll(ArgumentMatchers.any[java.time.Duration])

def assignPartitions(tps: Set[TopicPartition]) =
tps.groupBy(_.topic()).foreach {
Expand All @@ -141,7 +141,7 @@ class FailingConsumerMock[K, V](throwable: Throwable, failOnCallNumber: Int*) ex
var callNumber = 0

Mockito
.when(mock.poll(ArgumentMatchers.any[Long]))
.when(mock.poll(ArgumentMatchers.any[java.time.Duration]))
.thenAnswer(new Answer[ConsumerRecords[K, V]] {
override def answer(invocation: InvocationOnMock) = FailingConsumerMock.this.synchronized {
callNumber = callNumber + 1
Expand Down
39 changes: 0 additions & 39 deletions tests/src/test/scala/akka/kafka/internal/ConsumerSpec.scala
Expand Up @@ -17,7 +17,6 @@ import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.errors.WakeupException
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

Expand Down Expand Up @@ -99,44 +98,6 @@ class ConsumerSpec(_system: ActorSystem)
.expectError()
}

it should "not fail stream when poll() fails twice with WakeupException" in assertAllStagesStopped {
val mock = new FailingConsumerMock[K, V](new WakeupException(), failOnCallNumber = 1, 2)

val probe = createCommittableSource(mock.mock)
.toMat(TestSink.probe)(Keep.right)
.run()

probe
.request(1)
.expectNoMessage(200.millis)
.cancel()
}

it should "not fail stream when poll() fails twice, then succeeds, then fails twice with WakeupException" in assertAllStagesStopped {
val mock = new FailingConsumerMock[K, V](new WakeupException(), failOnCallNumber = 1, 2, 4, 5)

val probe = createCommittableSource(mock.mock)
.toMat(TestSink.probe)(Keep.right)
.run()

probe
.request(1)
.expectNoMessage(200.millis)
.cancel()
}

it should "fail stream when poll() fail limit exceeded" in assertAllStagesStopped {
val mock = new FailingConsumerMock[K, V](new WakeupException(), failOnCallNumber = 1, 2, 3)

val probe = createCommittableSource(mock.mock)
.toMat(TestSink.probe)(Keep.right)
.run()

probe
.request(1)
.expectError()
}

it should "complete stage when stream control.stop called" in assertAllStagesStopped {
val mock = new ConsumerMock[K, V]()
val (control, probe) = createCommittableSource(mock.mock)
Expand Down
Expand Up @@ -551,7 +551,8 @@ object PartitionedSourceSpec {
callbacks = callback
override def assign(partitions: java.util.Collection[TopicPartition]): Unit =
tps = partitions.asScala.map(_ -> Assigned).toMap
override def poll(timeout: Long): ConsumerRecords[K, V] = {

override def poll(timeout: java.time.Duration): ConsumerRecords[K, V] = {
val data = nextPollData.get()
val (data2, dataPaused) = data.partition {
case (tp, _) => tpsResumed.contains(tp)
Expand Down
Expand Up @@ -133,7 +133,7 @@ class IntegrationSpec extends SpecBase(kafkaPort = KafkaPorts.IntegrationSpec) w

"signal rebalance events to actor" in assertAllStagesStopped {
val partitions = 4
val totalMessages = 200L
val totalMessages = 50L

val topic = createTopic(1, partitions)
val allTps = (0 until partitions).map(p => new TopicPartition(topic, p))
Expand Down Expand Up @@ -189,7 +189,7 @@ class IntegrationSpec extends SpecBase(kafkaPort = KafkaPorts.IntegrationSpec) w
}

control2 should not be null
sleep(4.seconds)
sleep(10.seconds)

rebalanceActor.expectMsg(TopicPartitionsRevoked(subscription1, Set(allTps: _*)))
rebalanceActor.expectMsg(TopicPartitionsAssigned(subscription1, Set(allTps(0), allTps(1))))
Expand Down
Expand Up @@ -292,7 +292,7 @@ class PartitionedSourcesSpec

rebalanceActor.expectMsg(TopicPartitionsRevoked(subscription1, Set(allTps: _*)))
rebalanceActor.expectMsg(TopicPartitionsAssigned(subscription1, Set(allTps(0), allTps(1))))
sleep(1.seconds, "to have all messages consumed")
sleep(5.seconds, "to have all messages consumed")

val stream1messages = control.drainAndShutdown().futureValue
val stream2messages = control2.drainAndShutdown().futureValue
Expand Down
2 changes: 1 addition & 1 deletion tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
Expand Up @@ -36,7 +36,7 @@ class ConsumerExample extends DocsSpecBase(KafkaPorts.ScalaConsumerExamples) {
def createKafkaConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)

override def sleepAfterProduce: FiniteDuration = 4.seconds
override def sleepAfterProduce: FiniteDuration = 5.seconds
private def waitBeforeValidation(): Unit = sleep(4.seconds)

"ExternalOffsetStorage" should "work" in {
Expand Down

0 comments on commit e0d422a

Please sign in to comment.