Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance: filter messages of revoked partitions #946

Merged
merged 2 commits into from Oct 30, 2019

Conversation

ennru
Copy link
Member

@ennru ennru commented Oct 23, 2019

Purpose

This adds another PartitionAssignmentHandler which will add filtering in the stage so that messages of partitions that were revoked are not issued anymore.

References

Issue #872
Test in #865

Changes

  • Add filterRevokedPartitions in BaseSingleSourceLogic to filter the internal buffer from subclasses
  • A new partition assignment handler adds filtering from onAssigned

Background Context

As the RebalanceSpec test in #865 shows, the consumer stage's buffer continues to emit messages of revoked partitions that will be re-emitted by a different consumer.

As the `RebalanceSpec` test in akka#865 shows, the consumer stage's
buffer continues to emit messages of revoked partitions that will be re-emitted by a differnt consumer.

This adds another `PartitionAssignmentHandler` which will add filtering in the stage so that
messages of partitions that were revoked are not issued anymore.
@ennru ennru added this to the 2.0.0 milestone Oct 23, 2019
@ennru ennru requested a review from seglo October 29, 2019 16:43
Copy link
Member

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@seglo seglo merged commit b7ac0eb into akka:master Oct 30, 2019
@jhooda
Copy link
Contributor

jhooda commented Nov 27, 2019

@ennru @seglo can we please also extend filter stale messages support to SubSourceLogic class. I've run into exactly same issue after rebalancing when using Consumer.committablePartitionedSource. Sometimes there are even three threads (two stale and one active) emitting messages in parallel causing confusion and skipping offsets, leading to loss of messages. I've tested it for both head (master) and 1.1.0. Thanks.

@seglo
Copy link
Member

seglo commented Nov 28, 2019

@jhooda Thanks for reporting. We can certainly take a look at implementing this for partitioned sources. You mentioned that you tested this use case, do you happen to have a test written that you can share? I'll try to reproduce it myself in the meantime.

@seglo
Copy link
Member

seglo commented Nov 28, 2019

I recreated the issue: #992

ennru pushed a commit that referenced this pull request Dec 5, 2019
@jhooda
Copy link
Contributor

jhooda commented Dec 6, 2019

@seglo @ennru Thank you for recreating the issue #992 and also committing a solution.

I have now also tested after #992 merge and looks like the message loss is still there.

My test scenario is as follows:

I am running a kafka cluster with four brokers running kafka_2.12-2.2.0.

There are 30 test topics with 10 partitions each, each topic has one replica.

I publish 100K messages per topic, with identifier 1 to 100K, and then store
these messages in DB via alpakka partition source with index numbers to
identify individual messages.

Each topic has its own group_id, same as the topic name, and consumer source.

Below is a snippet of the code on how I'm creating the consumers:

//.....................
  topicSet.foreach({ myTopic =>
    val topicGroupId = myTopic
    val partitionCount = getPartitionCount(myTopic)
    val rebalanceListener = actorSystem.actorOf(Props(new RebalanceListenerActor(topicGroupId, logger)))
    val autoSubscriptions = Subscriptions.topics(myTopic).withPartitionAssignmentHandler(assignmentHandler).withRebalanceListener(rebalanceListener)
    createAndRunConsumer(autoSubscriptions, topicGroupId, partitionCount)
  })
//...
  def createAndRunConsumer(autoSubscriptions:AutoSubscription, topicGroupId:String, partitionCount:Integer): Unit = {
    val v1 = Consumer.committablePartitionedSource(createConsumerSettings(topicGroupId), autoSubscriptions)
    val v2 = v1.mapAsyncUnordered(50) {
      case (topicPartition, topicPartitionStream) => topicPartitionStream
        .via(businessFlow(topicGroupId, consumerInstance))
        .map(offsetOption => if (offsetOption.nonEmpty) {
          logger.debug("topic {} partition {} offset {}", topicPartition.topic, offsetOption.get.partitionOffset.key.partition.toString, offsetOption.get.partitionOffset.offset.toString)
          offsetOption.get
        } else {
          logger.debug("topic {} partition {}", topicPartition.topic, topicPartition.partition.toString)
          CommittableOffsetBatch.empty
        })
        .filterNot(_ == CommittableOffsetBatch.empty)
        .runWith(Committer.sink(committerDefaults))
    }
    .toMat(Sink.last)(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()
   // store v2 for control purposes
   // controlRefMap(topicGroupId) = v2
  }
//...
  def businessFlow(topicGroupId: String, consumerInstance: MultiConsumer): Flow[CommittableMessage[String, String], Option[CommittableOffset], NotUsed] = {
    Flow.fromFunction {
      msg => {
        handleMessage(msg, consumerInstance)
      }
    }
  }
//...
  def handleMessage(message: CommittableMessage[String, String], consumerInstance: MultiConsumer): Option[CommittableOffset] = {
    // store messages in DB and for DB write failures emit None
  }
//.....................

For rebalance testing I run four independent jvms with identical code base,
and then I randomly stop/start the four instances as follows

echo "stop_instance1.sh"  | at now + 7 minutes
echo "stop_instance2.sh"  | at now + 10 minutes
echo "start_instance1.sh" | at now + 13 minutes
echo "start_instance2.sh" | at now + 14 minutes
echo "stop_instance1.sh"  | at now + 17 minutes
echo "start_instance1.sh" | at now + 19 minutes
echo "stop_instance2.sh"  | at now + 20 minutes
echo "start_instance2.sh" | at now + 22 minutes
sleep 60
echo "stop_instance3.sh"  | at now + 7 minutes
echo "stop_instance4.sh"  | at now + 10 minutes
echo "start_instance3.sh" | at now + 13 minutes
echo "start_instance4.sh" | at now + 14 minutes
echo "stop_instance3.sh"  | at now + 17 minutes
echo "start_instance4.sh" | at now + 19 minutes
echo "stop_instance3.sh"  | at now + 20 minutes
echo "start_instance4.sh" | at now + 22 minutes

At the end of the run I analyze DB to tally messages consuming count.
All the topics lose about 1% of messages and I'm afraid after #992 the
loss is substantial on the later half (50K to 100K indexed) of the messages.

Please note without stop/start there are mostly no issues and all messages are
consumed properly. However, I have seen different behavior in both start/stop
and without start/stop cases as one changes the following setting

...
  # Not relevant for Kafka after version 2.1.0.                                 
  # If set to a finite duration, the consumer will re-send the last committed offsets periodically
  # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
  commit-refresh-interval = infinite                                            
  #following causes more message losses
  # commit-refresh-interval = 60m
...

Above setting is suppose to be a workaround for KAFKA-4682, but when one closely examine
its implementation in

...
  private[KafkaConsumerActor] object CommitRefreshing {
    def apply(commitRefreshInterval: Duration): CommitRefreshing =
      commitRefreshInterval match {
        case finite: FiniteDuration => new Impl(finite)
        case _ => NoOp
      }
...

it looks like the Impl class also participates during the rebalance events, it specially
has functions that try to stop stale SubSource from committing offsets via
RebalanceListenerImpl. I feel the rebalance logic must be invoked independently
of commit-refresh-interval being infinite or not.

alpakka.conf.txt

@jhooda
Copy link
Contributor

jhooda commented Dec 6, 2019

A sample run resulted into following messages per topic (29 topics, correction I am using 29 topics not 30). The expected right hand count is 100000

topic01 = 98800                                                                 
topic02 = 98800                                                                 
topic03 = 99200                                                                 
topic04 = 99400                                                                 
topic05 = 99600                                                                 
topic06 = 99179                                                                 
topic07 = 99200                                                                 
topic08 = 98820                                                                 
topic09 = 99700                                                                 
topic10 = 99501                                                                 
topic11 = 99214                                                                 
topic12 = 99234                                                                 
topic13 = 99356                                                                 
topic14 = 99078                                                                 
topic15 = 99700                                                                 
topic16 = 98915                                                                 
topic17 = 98863                                                                 
topic18 = 99500                                                                 
topic19 = 99500                                                                 
topic20 = 99509                                                                 
topic21 = 99469                                                                 
topic22 = 99042                                                                 
topic23 = 99313                                                                 
topic24 = 99400                                                                 
topic25 = 99071                                                                 
topic26 = 98909                                                                 
topic27 = 98900                                                                 
topic28 = 98904                                                                 
topic29 = 98900

Attached is a detail look at topic26 showing how many messages got duplicated and how many were missed
topic26_details.txt

@jhooda
Copy link
Contributor

jhooda commented Dec 7, 2019

I also noticed a substantial message loss when following setting is used
commit-refresh-interval = 10m
(to contrast the default setting used earlier was commit-refresh-interval = infinite)

(to recall, the expected right hand count was 100000)

topic01 = 34397
topic02 = 34644
topic03 = 34242
topic04 = 34456
topic05 = 34349
topic06 = 34328
topic07 = 34340
topic08 = 34261
topic09 = 34075
topic10 = 34309
topic11 = 34347
topic12 = 34454
topic13 = 34330
topic14 = 34395
topic15 = 34338
topic16 = 34631
topic17 = 34478
topic18 = 34151
topic19 = 34351
topic20 = 34238
topic21 = 34363
topic22 = 34039
topic23 = 34537
topic24 = 34252
topic25 = 34244
topic26 = 34399
topic27 = 34252
topic28 = 34645
topic29 = 34295

@seglo
Copy link
Member

seglo commented Dec 11, 2019

@jhooda Thanks for doing this testing. I'll put the commit refreshing issue aside for now to reduce the number of variables. It's strange that you're seeing message loss for your use case. The reason for the fixes #872 and #992 was to reduce the number of use cases where there would be message duplication. You shouldn't see any message loss as long as your app is at-least-once, which at face value it appears to be. Some questions regarding your use case:

  • In the test code you included do you write messages in batches to your database?
  • Does the code block while writing to the database is taking place or is it asynchronous? If it's asynchronous is it encapsulated in a mapAsync to maintain back-pressure?
  • Do your database writes require any consistency setting (i.e. to write to a minimum quorum in Cassandra)? If so, what consistency guarantees are being used?

One way to rule out whether this is an issue with your business logic is to instead write the messages to a Kafka topic and then write a small app that asserts the consistency of those messages. We do something like this for our transactional tests to assert if there are missing or duplicate messages. See this highlight in TransactionOps for the logic we use.

https://github.com/akka/alpakka-kafka/blob/84ab62c5e7b53d4671d878b4d41aa35876681628/tests/src/test/scala/akka/kafka/TransactionsOps.scala#L101..L142

Something else that would be helpful to us is if you could reproduce this issue in our own testsuite. I understand that this may be difficult given that a 3rd party database is involved. We have a few example tests where we transiently fail to assert consistency that maybe you could use for inspiration. There are a few such tests in PartitionedSourcesSpec:

@jhooda
Copy link
Contributor

jhooda commented Dec 14, 2019

@seglo thank you for the pointers. Based on these pointers, I am writing a test that matches my use case, will post the results once I have it squared out.

@jhooda
Copy link
Contributor

jhooda commented Jan 10, 2020

@seglo sorry I took a small break from this issue. I just submitted a pull request that has a test case which can reproduce the offset skipping issue. Although, the issue is reproducible only 10% of the time. For example, I ran the test 100 times with about 10 failures. Reference #1016 . Will appreciate if you can review the pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants