From 3101720dc129fc1eced60d59090391ade578b215 Mon Sep 17 00:00:00 2001 From: Saheb Date: Fri, 20 Oct 2017 12:19:52 +0100 Subject: [PATCH 1/4] POC --- .../kafka/akka/TrackPartitions.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala b/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala index 4f1e9f8..efcc095 100644 --- a/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala +++ b/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala @@ -13,6 +13,9 @@ sealed trait TrackPartitions extends ConsumerRebalanceListener { def isRevoked: Boolean def reset(): Unit + + def offsetsToTopicPartitions(offsets: Map[TopicPartition, Long]): List[TopicPartition] = + offsets.map { case (tp, _) => tp }.toList } /** @@ -25,8 +28,10 @@ sealed trait TrackPartitions extends ConsumerRebalanceListener { * @param consumer The client driver * @param consumerActor Tha KafkaConsumerActor to notify of partition change events */ -private final class TrackPartitionsCommitMode(consumer: KafkaConsumer[_, _], consumerActor: ActorRef) - extends TrackPartitions { +private final class TrackPartitionsCommitMode( + consumer: KafkaConsumer[_, _], consumerActor: ActorRef, + assignedListener: List[TopicPartition] => Unit = items => (), + revokedListener: List[TopicPartition] => Unit = items => ()) extends TrackPartitions { private val log = LoggerFactory.getLogger(getClass) @@ -59,6 +64,7 @@ private final class TrackPartitionsCommitMode(consumer: KafkaConsumer[_, _], con partition <- partitions.asScala offset <- _offsets.get(partition) } { + assignedListener(partitions) log.info(s"Seeking partition: [{}] to offset [{}]", partition, offset) consumer.seek(partition, offset) } @@ -66,6 +72,9 @@ private final class TrackPartitionsCommitMode(consumer: KafkaConsumer[_, _], con } else { consumerActor ! KafkaConsumerActor.RevokeReset + + // Invoke client callback to notify revocation of all existing partitions. + revokedListener(offsetsToTopicPartitions(_offsets)) } } @@ -113,9 +122,6 @@ private final class TrackPartitionsManualOffset( log.debug("onPartitionsAssigned: " + partitions.toString) - def offsetsToTopicPartitions(offsets: Map[TopicPartition, Long]): List[TopicPartition] = - offsets.map { case (tp, _) => tp }.toList - def assign(partitions: List[TopicPartition]) = { val offsets = assignedListener(partitions) for { From f6a21a105ef0e400210dd58a7e0f907c25cdb2e3 Mon Sep 17 00:00:00 2001 From: Simon Souter Date: Fri, 27 Oct 2017 15:48:12 +0100 Subject: [PATCH 2/4] Add optional callback for assigned/ revoked partitions in auto partition mode --- .../kafka/akka/KafkaConsumerActor.scala | 16 +++++++++++++--- .../kafka/akka/TrackPartitions.scala | 8 +++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala b/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala index 713dda2..a130675 100644 --- a/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala +++ b/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala @@ -97,8 +97,18 @@ object KafkaConsumerActor { * The client should ensure that received records are confirmed with 'commit = true' to ensure kafka tracks the commit point. * * @param topics the topics to subscribe to start consuming from + * @param assignedListener Optionally provide a callback when partitions are assigned. Can be used if any initialisation is + * required prior to receiving messages for the partition, such as to populate a cache. Default implementation + * is to do nothing. + * @param revokedListener Optionally provide a callback when partitions are revoked. Can be used if any cleanup is + * required after a partition assignment is revoked. Default implementation + * is to do nothing. */ - final case class AutoPartition(topics: Iterable[String]) extends Subscribe + final case class AutoPartition( + topics: Iterable[String] = List(), + assignedListener: List[TopicPartition] => Unit = _ => (), + revokedListener: List[TopicPartition] => Unit = _ => () + ) extends Subscribe /** * Subscribe to topics in auto assigned partition mode with client managed offset commit positions for each partition. @@ -658,9 +668,9 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag]( } private def subscribe(s: Subscribe): Unit = s match { - case Subscribe.AutoPartition(topics) => + case Subscribe.AutoPartition(topics, assignedListener, revokedListener) => log.info(s"Subscribing in auto partition assignment mode to topics [{}].", topics.mkString(",")) - trackPartitions = new TrackPartitionsCommitMode(consumer, context.self) + trackPartitions = new TrackPartitionsCommitMode(consumer, context.self, assignedListener, revokedListener) consumer.subscribe(topics.toList.asJava, trackPartitions) case Subscribe.AutoPartitionWithManualOffset(topics, assignedListener, revokedListener) => diff --git a/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala b/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala index efcc095..10c79dc 100644 --- a/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala +++ b/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala @@ -30,8 +30,8 @@ sealed trait TrackPartitions extends ConsumerRebalanceListener { */ private final class TrackPartitionsCommitMode( consumer: KafkaConsumer[_, _], consumerActor: ActorRef, - assignedListener: List[TopicPartition] => Unit = items => (), - revokedListener: List[TopicPartition] => Unit = items => ()) extends TrackPartitions { + assignedListener: List[TopicPartition] => Unit, + revokedListener: List[TopicPartition] => Unit) extends TrackPartitions { private val log = LoggerFactory.getLogger(getClass) @@ -43,6 +43,8 @@ private final class TrackPartitionsCommitMode( _revoked = true + revokedListener(partitions.asScala.toList) + // If partitions have been revoked, keep a record of our current position within them. if (!partitions.isEmpty) { _offsets = partitions.asScala.map(partition => partition -> consumer.position(partition)).toMap @@ -64,7 +66,7 @@ private final class TrackPartitionsCommitMode( partition <- partitions.asScala offset <- _offsets.get(partition) } { - assignedListener(partitions) + assignedListener(partitions.asScala.toList) log.info(s"Seeking partition: [{}] to offset [{}]", partition, offset) consumer.seek(partition, offset) } From b2256a8c075356484934f6a4e70158a7ee6e8e88 Mon Sep 17 00:00:00 2001 From: Simon Souter Date: Fri, 27 Oct 2017 15:49:16 +0100 Subject: [PATCH 3/4] Fix test server expected records comparison --- .../main/scala/cakesolutions.kafka/testkit/KafkaServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala b/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala index f194255..0749587 100644 --- a/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala +++ b/testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala @@ -158,7 +158,7 @@ final class KafkaServer( val collected = ArrayBuffer.empty[(Option[Key], Value)] val start = System.currentTimeMillis() - while (total <= expectedNumOfRecords && System.currentTimeMillis() < start + timeout) { + while (total < expectedNumOfRecords && System.currentTimeMillis() < start + timeout) { val records = consumer.poll(100) val kvs = records.asScala.map(r => (Option(r.key()), r.value())) collected ++= kvs From 940a40dd637017a7405825bdf734704e3166afd7 Mon Sep 17 00:00:00 2001 From: Saheb Date: Wed, 1 Nov 2017 17:44:18 +0000 Subject: [PATCH 4/4] Fixing the first time partition assigned case --- .../main/scala/cakesolutions/kafka/akka/TrackPartitions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala b/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala index 10c79dc..3d308a1 100644 --- a/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala +++ b/akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala @@ -62,11 +62,11 @@ private final class TrackPartitionsCommitMode( val allExisting = _offsets.forall { case (partition, _) => partitions.contains(partition) } if (allExisting) { + assignedListener(partitions.asScala.toList) for { partition <- partitions.asScala offset <- _offsets.get(partition) } { - assignedListener(partitions.asScala.toList) log.info(s"Seeking partition: [{}] to offset [{}]", partition, offset) consumer.seek(partition, offset) }