New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix message-batch loss when rebalancing partitioned sources #1263
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Welcome back @jhooda. I like the direction. The old impl. has already fallen out of my head, but I do recall mentioning if there was a way to simplify the test, perhaps so an integration test isn't required. I admit that will be a challenge though so I'll dig into your test this week.
core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I can't think of a way to optimize this test further without creating just as much complexity as you already have, so I'm inclined to keep it. I did a review with suggestions to make it more readable.
core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
Please do let me know if I need to rebase the commits into a single commit before the merge. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking better. There's still some more cleanup to do for readability. I think some of the new types introduced can be condensed.
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/akka/kafka/scaladsl/RebalanceExtSpec.scala
Outdated
Show resolved
Hide resolved
@seglo Please do let me know if anything else is needed to close this request. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there. Thank you for your patience. I'm just trying to distill this test down as much as possible to make it easier to maintain.
There's some build warnings failing the build:
https://travis-ci.com/github/akka/alpakka-kafka/jobs/452075335
.withCloseTimeout(5.seconds) | ||
.withPollInterval(300.millis) | ||
.withPollTimeout(200.millis) | ||
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) // 500 is the default value | ||
.withProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignmentStrategy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see why MAX_POLL_RECORDS_CONFIG
is important to this test, but close timeout, poll interval, and poll timeout seem rather arbitrary. Do you think these are necessary to reproduce the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@seglo I have removed those configuration which are not relevant. I have also removed few assertions for brevity. As I said earlier, the failure rate is not 100% (it is more like 85% on one machine vs. 50% on another). One can vary the poll interval to enhance the failure rate, for me it is about 400.ms. I'll take a look at the build failure, doesn't seem to be related with this change, I may need to re-merge the latest master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@seglo All except "Run tests with Scala 2.12 and AdoptOpenJDK 8" passed. Tests are also successful on my dev machine (for both scala_12 and scala_13 with 1.8 hotspot jvm). Can you please provide some guidance on how I should proceed. I also have doubts that the failures are related to my changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests look fine. That was a flakey test that failed.
Issue: When there are multiple [KafkaConsumerActor]s they may get assigned an orthoginal set of topic-partitions, backed by independent [StageActor]s during group re-balancing. All [StageActor]s receive a feed of message streams from [KafkaConsumerActor]. At the other end, [KafkaConsumerActor] receives a primary message stream from a shared message [Fetcher] as shown below: [Kafka Broker]-->[Fetcher]--+>[KafkaConsumerActor]--+>[StageActor] During normal message flow [Fetcher] saves the next message offset internally and uses that as a reference offset to deliver the next batch of messages to [KafkaConsumerActor]. However, during re-balancing the in-progress message batch can be lost as shown below [Fetcher]--+>[KafkaConsumerActor]--+>[*defuct* StageActor] Since the entire message stream is asynchornous, [Fetcher] doesn't always know about the lost message batch, and instead delivers the next message batch to post-rebalance [new StageActor] [Fetcher]-->[KafkaConsumerActor]-->[New StageActor] This fix keeps an uptodate mapping between the topic-partition and newer StageActor, whenever a [new StageActor] is initialized. This map is referred by KafkaConsumerActor to prevent requests emitted by the *defunct* StageActor. How to reproduce the issue? 1) Reset changes for core src files included in this commit 2) Run the following test sbt 'tests/testOnly akka.kafka.scaladsl.RebalanceExtSpec -- -z "no messages should be lost when two consumers consume from one topic and two partitions and one consumer aborts mid-stream"'
intellij did some import re-ordering and caused the failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
LGTM.
Thanks @jhooda ! |
This has been backported to the |
Issue: When there are multiple
KafkaConsumerActors
they may getassigned an orthogonal set of topic-partitions, backed by independent
StageActors
during group re-balancing. AllStageActors
receivea feed of message streams from
KafkaConsumerActor
. At the other end,KafkaConsumerActor
receives a primary message stream from a sharedmessage Kafka Consumer
Fetcher
as shown below:During normal message flow
Fetcher
saves the next message offsetinternally and uses that as a reference offset to deliver the next
batch of messages to
KafkaConsumerActor
. However, duringre-balancing the in-progress message batch can be lost as shown below
Since the entire message stream is asynchronous,
Fetcher
doesn'talways know about the lost message batch, and instead delivers the
next message batch to post-rebalance new
StageActor
This fix keeps an up-to-date mapping between the topic-partition and newer
StageActor
, whenever a newStageActor
is initialized. This map isreferred by
KafkaConsumerActor
to prevent requests emitted by thedefunct
StageActor
.How to reproduce the issue?