Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New polling and removal of wake up functionality #614

Merged
merged 9 commits into from Dec 10, 2018

Conversation

Projects
None yet
4 participants
@zaharidichev
Copy link
Contributor

commented Oct 14, 2018

As already discussed in #608, this PR adopts the new Kafka poll introduced in KIP-266. This enables us to get rid of the manual wake up functionality that was present in the consumer actor

@zaharidichev zaharidichev force-pushed the zaharidichev:zd/remove-wake-up-functionality branch 3 times, most recently from 697ba84 to 20ff68d Oct 14, 2018

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 15, 2018

@ennru
Its interesting, when I do:

testOnly **IntegrationSpec** -- -z "signal rebalance events to actor" the test passes without a problem,

when I run the whole suite with:

testOnly **IntegrationSpec**

it fails. Have you noticed this before. Any ideas why it might happen ?

@@ -189,7 +189,7 @@ class IntegrationSpec extends SpecBase(kafkaPort = KafkaPorts.IntegrationSpec) w
}

control2 should not be null
sleep(4.seconds)
sleep(10.seconds)

This comment has been minimized.

Copy link
@2m

2m Oct 15, 2018

Member

This test-case is timing dependent. Try increasing this even more to see if the test-case starts to pass when running all of the test-cases in the spec.

Ideally we should refactor the test to not be timing dependent.

This comment has been minimized.

Copy link
@zaharidichev

zaharidichev Oct 15, 2018

Author Contributor

Is there a reason for the initial value of four or it was just derived by trial and error.

This comment has been minimized.

Copy link
@2m

2m Oct 15, 2018

Member

Yup, trial and error.

The ideal solution would be to not wait for a defined amount of time, but instead look at the arriving elements and complete the consumer stream when some condition is met. Take a look here for inspiration:

.takeWhile(_ < totalMessages, inclusive = true)

But we can leave this improvement for another PR.

This comment has been minimized.

Copy link
@zaharidichev

zaharidichev Oct 15, 2018

Author Contributor

We can indeed leave it for later, but still these random failures are quite strange. I slashed the number of events by 4 and increased the sleep duration and am still getting failures. I cannot quite explain it to myself why my change has triggered this strange behaviour...

This comment has been minimized.

Copy link
@2m

2m Oct 15, 2018

Member

Aha. If decreasing the number of messages and increasing sleep times does not solve this, then maybe indeed we are loosing messages now. I'll dig deeper as well.

This comment has been minimized.

Copy link
@zaharidichev

zaharidichev Oct 16, 2018

Author Contributor

Its strange indeed. I took another look and only changing the poll call to the deprecated once makes all the tests pass. Looking at the semantics of the new poll call, the main difference is the fact that it might block for infinity on fetching metadata... Will have to think a bit more on that one.

@zaharidichev zaharidichev force-pushed the zaharidichev:zd/remove-wake-up-functionality branch 2 times, most recently from e0d422a to 19cdd3c Oct 15, 2018

@patriknw
Copy link
Member

left a comment

looking good in theory

would be good to ensure that no performance regression is introduced, e.g. by running KafkaConsumerBenchmarks

Show resolved Hide resolved core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala Outdated
Show resolved Hide resolved core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala Outdated
if (subscriptions.nonEmpty) {
val newAssignments = consumer.assignment().asScala
reconcileAssignments(currentAssignmentsJava.asScala.toSet, newAssignments.toSet)
}

This comment has been minimized.

Copy link
@patriknw

patriknw Oct 18, 2018

Member

wonder how this is handled now, if poll returns without any records?

perhaps it's handled by the metadata calls that are part of the poll?

This comment has been minimized.

Copy link
@zaharidichev

zaharidichev Oct 18, 2018

Author Contributor

Looking at the code, that's what the case seems to be

@zaharidichev zaharidichev force-pushed the zaharidichev:zd/remove-wake-up-functionality branch from 19cdd3c to d256a52 Oct 18, 2018

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 18, 2018

@patriknw Albeit looking good in theory, it fails some of the tests quite frequently. So either we have flaky tests that this change has made more obvious or the change itself is having some undesired effect. In any case I will sit down and investigate this over the weekend and run the benchmarks you suggested as well.

@patriknw

This comment has been minimized.

Copy link
Member

commented Oct 19, 2018

thanks for your work on this

@zaharidichev zaharidichev force-pushed the zaharidichev:zd/remove-wake-up-functionality branch 7 times, most recently from ecf6d94 to c9c413e Oct 21, 2018

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 21, 2018

After quite a bit of looking into that over the weekend, I have changed a few things especially around the way how we call poll when we do not have any commits. I have tweaked some of the test to be a bit less time - dependent. However some other tests still seem to fail from time to time and is hard fo me to tell why. Looking at the build history however, this does not seem like something new. If you have any further suggestions, they are welcome

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 23, 2018

Is there a chance someone takes a looks over that to see whether I am doing something particularily wrong which causes tests to fail... ?

@2m

This comment has been minimized.

Copy link
Member

commented Oct 23, 2018

Sorry about being slow on this. Most of the Lightbend is currently in the Reactive Summit. I'll give more attention to this change.

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 23, 2018

Ah no worries, its just bugging me what the problem is. I will take another looks as well. But I have the feeling tests failures occur every now and then on master as well , am I right ?

@2m

This comment has been minimized.

Copy link
Member

commented Oct 23, 2018

There were a couple of flaky tests, but we have been working on those recently and situation has improved. On this PR, I see that the benchmark job eventually stalls. Taking a look why that is happening.

@2m

This comment has been minimized.

Copy link
Member

commented Oct 23, 2018

The failing benchmark job can be run from sbt by running benchmarks/dockerComposeTest it:testOnly *.AlpakkaKafkaAtMostOnceConsumer This benchmark jobs consumes messages really slowly in this PR (around 100 messages every š seconds). The full benchmark expects 50k messages and it times out.

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 23, 2018

I will look into it in more detail. Thanks for the tip

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 24, 2018

@2m I am getting some exceptions while running benchmarks... Namely TimeoutException: Timed out waiting for a node assignment. Does that look familiar ? Is it something maybe with my docker setup ?

[info] akka.kafka.benchmarks.ApacheKafkaAtMostOnceConsumer *** ABORTED *** (2 minutes)
[info]   java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[info]   at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
[info]   at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
[info]   at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
[info]   at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
[info]   at akka.kafka.benchmarks.BenchmarksBase.$anonfun$setUp$1(BenchmarksBase.scala:16)
[info]   at akka.kafka.benchmarks.BenchmarksBase.$anonfun$setUp$1$adapted(BenchmarksBase.scala:16)
[info]   at akka.kafka.scaladsl.KafkaSpec.$anonfun$periodicalCheck$1(KafkaSpec.scala:152)
[info]   at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
[info]   at scala.util.Try$.apply(Try.scala:209)
[info]   at akka.kafka.scaladsl.KafkaSpec.check$1(KafkaSpec.scala:152)
[info]   at akka.kafka.scaladsl.KafkaSpec.periodicalCheck(KafkaSpec.scala:167)
[info]   at akka.kafka.scaladsl.KafkaSpec.waitUntilCluster(KafkaSpec.scala:112)
[info]   at akka.kafka.benchmarks.BenchmarksBase.setUp(BenchmarksBase.scala:16)
[info]   at akka.kafka.internal.TestFrameworkInterface$Scalatest.beforeAll(TestFrameworkInterface.scala:21)
[info]   at akka.kafka.internal.TestFrameworkInterface$Scalatest.beforeAll$(TestFrameworkInterface.scala:20)
[info]   at akka.kafka.scaladsl.ScalatestKafkaSpec.beforeAll(ScalatestUtils.scala:11)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at akka.kafka.benchmarks.BenchmarksBase.org$scalatest$FlatSpecLike$$super$run(BenchmarksBase.scala:11)
[info]   at org.scalatest.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:1795)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FlatSpecLike.run(FlatSpecLike.scala:1795)
[info]   at org.scalatest.FlatSpecLike.run$(FlatSpecLike.scala:1793)
[info]   at akka.kafka.benchmarks.BenchmarksBase.run(BenchmarksBase.scala:11)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
[info]   at sbt.TestRunner.runTest$1(TestFramework.scala:113)
[info]   at sbt.TestRunner.run(TestFramework.scala:124)
@2m

This comment has been minimized.

Copy link
Member

commented Oct 24, 2018

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Oct 24, 2018

Hmmm, yes I see the line but that does not look very correct to me, does it ?

kafka_1      | 	advertised.host.name = null
kafka_1      | 	advertised.listeners = INSIDE://:9092,OUTSIDE://linuxkit-025000000001:32768
kafka_1      | 	advertised.port = null
@2m

This comment has been minimized.

Copy link
Member

commented Oct 26, 2018

@2m 2m force-pushed the zaharidichev:zd/remove-wake-up-functionality branch from c9c413e to f826dd3 Nov 30, 2018

val pollTimeout = if (i == 1) 1L else 0L
checkNoResult(tryPoll(pollTimeout))
val pollTimeout = if (i == 1) oneMilli else java.time.Duration.ZERO
checkNoResult(consumer.poll(pollTimeout))

This comment has been minimized.

Copy link
@ennru

ennru Nov 30, 2018

Member

We should take measures and try to find out if any consumer.poll(ZERO) does anything useful. Maybe parkNanos(100) followed by a consumer.poll(oneMilli) is just as good.

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Dec 3, 2018

Did anyone get the chance to take a look at why these tests are failing in such a misterious manner ?

@ennru

This comment has been minimized.

Copy link
Member

commented Dec 3, 2018

Yes, @2m has made some progress - specifically that removing the loop with poll(0)s made committing significantly slower. (Don't ask me why :-) )

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Dec 3, 2018

@2m Can I help somehow? Do you have any idea why the looping made committing slower? I had the feeling our new poll times out before the commit callback was invoked and we need multiple calls to poll before tha commit happens (with the new non blocking poll) as opposed to aways completing the commit callback with the old blocking call. Was that the case ?

@2m

This comment has been minimized.

Copy link
Member

commented Dec 3, 2018

From what I see the change to a different poll changed timings a bit and so some of the tests are now failing a bit more. So I am refactoring some tests to be less racy like replacing Thread.sleep with future composition or similar.

As to the committing optimization, the loop allows us to not do the poll with a full pollInterval when there are no requests in progress and there are some committing to be done.

From the empiric tests I noticed that on average after 5 such polls commits usually succeed in the benchmark code.

@zaharidichev

This comment has been minimized.

Copy link
Contributor Author

commented Dec 3, 2018

I see, this makes sense. These benchmarks (refering to the ones showing that commits succeed after 5 polls on average) have been run using the new non blocking poll ?

@ennru

This comment has been minimized.

Copy link
Member

commented Dec 4, 2018

There are issues in the test cases left around rebalance event notifications.

@ennru

This comment has been minimized.

Copy link
Member

commented Dec 5, 2018

The PartitionedSourcesSpec must rebalance when a new consumer comes up fails sometimes locally for me.
I captured this bit of logging which shows the two partitions that get revoked are believed to have been consumed until offset 49, but the stream counts only 10 elements.

:24,658 [a.k.internal.PlainSubSource$$anon$3 ]  #5 Assigning new partitions: topic-1-7-3, topic-1-7-2
:24,658 [a.kafka.internal.KafkaConsumerActor ]  received handled message PartitionAssigned(Map(topic-1-7-3 -> 49, topic-1-7-2 -> 49)) from Actor[akka://Spec/deadLetters]
:24,659 [a.k.scaladsl.PartitionedSourcesSpec ]  Inner Sub-source for topic-1-7-2
:24,660 [a.k.internal.SubSourceStage$$anon$1 ]  #5 Starting SubSource for partition topic-1-7-2
:24,660 [a.k.scaladsl.PartitionedSourcesSpec ]  Inner Sub-source for topic-1-7-3
:24,660 [a.kafka.internal.KafkaConsumerActor ]  received handled message RequestMessages(0,Set(topic-1-7-2)) from Actor[akka://Spec/system/StreamSupervisor-0/$$p#1395606315]
:24,661 [a.k.internal.SubSourceStage$$anon$1 ]  #5 Starting SubSource for partition topic-1-7-3
:24,672 [a.kafka.internal.KafkaConsumerActor ]  received handled message PartitionRevoked(Set(topic-1-7-3, topic-1-7-0, topic-1-7-1, topic-1-7-2)) from Actor[akka://Spec/deadLetters]
:24,672 [a.kafka.internal.KafkaConsumerActor ]  received handled message PartitionAssigned(Map(topic-1-7-0 -> 49, topic-1-7-1 -> 49)) from Actor[akka://Spec/deadLetters]
:24,733 [a.kafka.internal.KafkaConsumerActor ]  received handled message Poll(akka.kafka.internal.KafkaConsumerActor@3abc63c,true) from Actor[akka://Spec/deadLetters]
:24,763 [a.kafka.internal.KafkaConsumerActor ]  received handled message RequestMessages(0,Set(topic-1-7-3)) from Actor[akka://Spec/system/StreamSupervisor-0/$$q#-2040830947]
:24,764 [a.kafka.internal.KafkaConsumerActor ]  received handled message Poll(akka.kafka.internal.KafkaConsumerActor@64414b1,true) from Actor[akka://Spec/deadLetters]
:24,864 [a.kafka.internal.KafkaConsumerActor ]  received handled message Poll(akka.kafka.internal.KafkaConsumerActor@64414b1,false) from Actor[akka://Spec/system/kafka-consumer-5#1677781759]
:24,904 [a.kafka.internal.KafkaConsumerActor ]  received handled message Poll(akka.kafka.internal.KafkaConsumerActor@3abc63c,true) from Actor[akka://Spec/deadLetters]
:24,967 [a.kafka.internal.KafkaConsumerActor ]  received handled message Poll(akka.kafka.internal.KafkaConsumerActor@64414b1,true) from Actor[akka://Spec/deadLetters]
:25,004 [a.k.internal.PlainSubSource$$anon$3 ]  #4 Closing SubSources for revoked partitions: topic-1-7-3, topic-1-7-2
:25,005 [a.k.internal.SubSourceStage$$anon$1 ]  #4 Completing SubSource for partition topic-1-7-2
:25,005 [a.k.scaladsl.PartitionedSourcesSpec ]  Sub-source for topic-1-7-3 completed: Received [10] messages in total.
:25,005 [a.k.scaladsl.PartitionedSourcesSpec ]  Sub-source for topic-1-7-2 completed: Received [10] messages in total.
:25,005 [a.k.internal.SubSourceStage$$anon$1 ]  #4 Completing SubSource for partition topic-1-7-3

@ennru

This comment has been minimized.

Copy link
Member

commented Dec 6, 2018

My last commit introduced asking and logging of partition offsets from Kafka - that test works now...
And the other fails on JDK 11, only. Kafka 2.1 brings JDK 11 support...

@ennru ennru added this to the 1.0-RC1 milestone Dec 7, 2018

@@ -61,10 +61,12 @@ akka.kafka.consumer {
# The KafkaConsumerActor will throw
# `org.apache.kafka.common.errors.WakeupException` which will be ignored
# until `max-wakeups` limit gets exceeded.
# DEPRECATED: No op as waking up consumer is not used anymore

This comment has been minimized.

Copy link
@ennru

ennru Dec 7, 2018

Member

Shouldn't these settings be removed altogether? Or left there commented out to let people understand that they may remove them from their code?
And add deprecated to the ConsumerSettings withXyz methods.

Show resolved Hide resolved core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala Outdated
Show resolved Hide resolved core/src/main/scala/akka/kafka/javadsl/Consumer.scala Outdated
Show resolved Hide resolved tests/src/test/scala/akka/kafka/internal/PartitionedSourceSpec.scala Outdated

@2m 2m force-pushed the zaharidichev:zd/remove-wake-up-functionality branch from 030cd43 to a1efdf2 Dec 10, 2018

zaharidichev and others added some commits Oct 27, 2018

@2m 2m force-pushed the zaharidichev:zd/remove-wake-up-functionality branch from 629d992 to 4c6e967 Dec 10, 2018

@ennru ennru assigned ennru and unassigned ennru Dec 10, 2018

@ennru

ennru approved these changes Dec 10, 2018

Copy link
Member

left a comment

Most parts are good, but

  • Remove the unused settings #684
  • Experiment with the hot poll loop #685

should still be done.

@2m

This comment has been minimized.

Copy link
Member

commented Dec 10, 2018

The last CI failure was #683

@2m 2m merged commit c86c8c3 into akka:master Dec 10, 2018

1 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build failed
Details
typesafe-cla-validator All users have signed the CLA
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.