Skip to content

Commit

Permalink
Add ListenerCallbacks.apply to remove code duplication (#746)
Browse files Browse the repository at this point in the history
Small refactoring before adding a synchronous listener for rebalance event
  • Loading branch information
szymonm authored and ennru committed Mar 20, 2019
1 parent 0f583e8 commit a8a8268
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 36 deletions.
26 changes: 26 additions & 0 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Expand Up @@ -26,6 +26,7 @@ import akka.util.JavaDurationConverters._
import akka.event.LoggingReceive
import akka.kafka.KafkaConsumerActor.StoppingException
import akka.kafka._
import akka.stream.stage.AsyncCallback
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

Expand Down Expand Up @@ -99,6 +100,31 @@ import scala.util.control.NonFatal
final case class ListenerCallbacks(onAssign: Set[TopicPartition] => Unit, onRevoke: Set[TopicPartition] => Unit)
extends NoSerializationVerificationNeeded

object ListenerCallbacks {
def apply(subscription: AutoSubscription,
sourceActor: ActorRef,
partitionAssignedCB: AsyncCallback[Set[TopicPartition]],
partitionRevokedCB: AsyncCallback[Set[TopicPartition]]): ListenerCallbacks =
KafkaConsumerActor.ListenerCallbacks(
assignedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(subscription, assignedTps), sourceActor)
}
if (assignedTps.nonEmpty) {
partitionAssignedCB.invoke(assignedTps)
}
},
revokedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(subscription, revokedTps), sourceActor)
}
if (revokedTps.nonEmpty) {
partitionRevokedCB.invoke(revokedTps)
}
}
)
}

/**
* Copied from the implemented interface:
*
Expand Down
19 changes: 1 addition & 18 deletions core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala
Expand Up @@ -45,24 +45,7 @@ import scala.concurrent.{Future, Promise}
log.debug("Revoked partitions: {}. All partitions: {}", revokedTps, tps)
}

KafkaConsumerActor.ListenerCallbacks(
assignedTps => {
autoSubscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(autoSubscription, assignedTps), sourceActor.ref)
}
if (assignedTps.nonEmpty) {
partitionAssignedCB.invoke(assignedTps)
}
},
revokedTps => {
autoSubscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(autoSubscription, revokedTps), sourceActor.ref)
}
if (revokedTps.nonEmpty) {
partitionRevokedCB.invoke(revokedTps)
}
}
)
KafkaConsumerActor.ListenerCallbacks(autoSubscription, sourceActor.ref, partitionAssignedCB, partitionRevokedCB)
}

subscription match {
Expand Down
19 changes: 1 addition & 18 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Expand Up @@ -81,24 +81,7 @@ private abstract class SubSourceLogic[K, V, Msg](
sourceActor.watch(consumerActor)

def rebalanceListener =
KafkaConsumerActor.ListenerCallbacks(
assignedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsAssigned(subscription, assignedTps), sourceActor.ref)
}
if (assignedTps.nonEmpty) {
partitionAssignedCB.invoke(assignedTps)
}
},
revokedTps => {
subscription.rebalanceListener.foreach {
_.tell(TopicPartitionsRevoked(subscription, revokedTps), sourceActor.ref)
}
if (revokedTps.nonEmpty) {
partitionRevokedCB.invoke(revokedTps)
}
}
)
KafkaConsumerActor.ListenerCallbacks(subscription, sourceActor.ref, partitionAssignedCB, partitionRevokedCB)

subscription match {
case TopicSubscription(topics, _) =>
Expand Down

0 comments on commit a8a8268

Please sign in to comment.