Skip to content

Commit

Permalink
wip: add internal wrapper RebalanceConsumerJ with only allowed method…
Browse files Browse the repository at this point in the history
…s to be used from within rebalance listener, add docs for RebalanceConsumerJ
  • Loading branch information
nikitapecasa committed Apr 13, 2021
1 parent c7d2f03 commit 8a21b03
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object ConsumerConverters {
}
.toTry
.flatMap {
_.foldMapM { partitions => RebalanceCallback.run(call(partitions), consumer) }
_.foldMapM { partitions => RebalanceCallback.run(call(partitions), RebalanceConsumerJ(consumer)) }
}
result.fold(throw _, _ => ())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.evolutiongaming.skafka._
import com.evolutiongaming.skafka.consumer.ConsumerConverters._
import com.evolutiongaming.skafka.consumer.RebalanceCallback.Helpers._
import com.evolutiongaming.skafka.consumer.RebalanceCallback.RebalanceCallbackOps
import org.apache.kafka.clients.consumer.{Consumer => ConsumerJ, OffsetAndMetadata => OffsetAndMetadataJ}
import org.apache.kafka.clients.consumer.{OffsetAndMetadata => OffsetAndMetadataJ}
import org.apache.kafka.common.{TopicPartition => TopicPartitionJ}

import scala.annotation.tailrec
Expand Down Expand Up @@ -216,7 +216,7 @@ object RebalanceCallback extends RebalanceCallbackInstances {

private[consumer] def run[F[_]: ToTry, A](
rebalanceCallback: RebalanceCallback[F, A],
consumer: ConsumerJ[_, _]
consumer: RebalanceConsumerJ
): Try[A] = {
type S = Any => RebalanceCallback[F, Any]

Expand Down Expand Up @@ -266,7 +266,7 @@ object RebalanceCallback extends RebalanceCallbackInstances {

private final case class Lift[F[_], A](fa: F[A]) extends RebalanceCallback[F, A]

private final case class WithConsumer[+A](f: ConsumerJ[_, _] => A) extends RebalanceCallback[Nothing, A]
private final case class WithConsumer[+A](f: RebalanceConsumerJ => A) extends RebalanceCallback[Nothing, A]

private final case class Error(throwable: Throwable) extends RebalanceCallback[Nothing, Nothing]

Expand Down Expand Up @@ -296,7 +296,7 @@ object RebalanceCallback extends RebalanceCallbackInstances {
private[consumer] object Helpers {

def committed1(
f: ConsumerJ[_, _] => MapJ[TopicPartitionJ, OffsetAndMetadataJ]
f: RebalanceConsumerJ => MapJ[TopicPartitionJ, OffsetAndMetadataJ]
): RebalanceCallback[Nothing, Map[TopicPartition, OffsetAndMetadata]] = {
for {
result <- WithConsumer(f)
Expand All @@ -305,7 +305,7 @@ object RebalanceCallback extends RebalanceCallbackInstances {
}

def offsets1(
f: ConsumerJ[_, _] => MapJ[TopicPartitionJ, LongJ]
f: RebalanceConsumerJ => MapJ[TopicPartitionJ, LongJ]
): RebalanceCallback[Nothing, Map[TopicPartition, Offset]] = {
for {
result <- WithConsumer(f)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package com.evolutiongaming.skafka.consumer

import java.lang.{Long => LongJ}
import java.time.Duration
import java.util

import org.apache.kafka.clients.consumer.{
Consumer => ConsumerJ,
ConsumerGroupMetadata => ConsumerGroupMetadataJ,
OffsetAndMetadata => OffsetAndMetadataJ,
OffsetAndTimestamp => OffsetAndTimestampJ
}
import org.apache.kafka.common.{PartitionInfo => PartitionInfoJ, TopicPartition => TopicPartitionJ}

/**
* Internal wrapper for [[org.apache.kafka.clients.consumer.Consumer]]
* with a smaller scope of methods making sense during consumer group rebalance.
* Introduced in https://github.com/evolution-gaming/skafka/pull/122
* At the moment of writing we had KafkaConsumer v2.5.0
* and made following choice about methods
* - allowed prefixed with `+ ` in the list below
* - not allowed methods prefixed with `- ` in the list below
*
* The choice is based on following factors
* - it's ok to use any read-only methods like `assignment`, `position`
* - it doesn't make sense to call `consumer.poll` in the middle of current `consumer.poll`
* - it's ok to use `commitSync` as we have to do it in a blocking-way inside corresponding method of [[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]]
* - it doesn't make sense to use `commitAsync` - as we would need to wait for the confirmation before exiting `ConsumerRebalanceListener` method
* - we don't want to allow changing current subscription as we're in the middle of `consumer.poll` method
* - we don't need to `close` or `wakeup` the consumer, instead we want to gracefully finish the work or start one, and close the consumer after exiting the `poll` method
* - we didn't see any use cases for `pause`/`resume` methods, hence they are unsupported
* - seek methods are allowed as they are an official way to manipulate consumer position and used as an example in documentation for `ConsumerRebalanceListener`
* {{{
* - assign
* + assignment
* + beginningOffsets
* - close
* - commitAsync
* + commitSync
* + committed
* + endOffsets
* + groupMetadata
* + listTopics
* - metrics
* + offsetsForTimes
* + partitionsFor
* - pause
* + paused
* - poll
* + position
* - resume
* + seek
* + seekToBeginning
* + seekToEnd
* - subscribe
* + subscription
* - unsubscribe
* - wakeup
* }}}
*
* If you want to support more methods, please double check kafka documentation and implementation about
* consumer group rebalance protocol.
*/
private[consumer] trait RebalanceConsumerJ {

def assignment(): util.Set[TopicPartitionJ]

def subscription(): util.Set[String]

def commitSync(): Unit

def commitSync(timeout: Duration): Unit

def commitSync(offsets: util.Map[TopicPartitionJ, OffsetAndMetadataJ]): Unit

def commitSync(offsets: util.Map[TopicPartitionJ, OffsetAndMetadataJ], timeout: Duration): Unit

def seek(partition: TopicPartitionJ, offset: LongJ): Unit

def seek(partition: TopicPartitionJ, offsetAndMetadata: OffsetAndMetadataJ): Unit

def seekToBeginning(partitions: util.Collection[TopicPartitionJ]): Unit

def seekToEnd(partitions: util.Collection[TopicPartitionJ]): Unit

def position(partition: TopicPartitionJ): LongJ

def position(partition: TopicPartitionJ, timeout: Duration): LongJ

def committed(partitions: util.Set[TopicPartitionJ]): util.Map[TopicPartitionJ, OffsetAndMetadataJ]

def committed(partitions: util.Set[TopicPartitionJ], timeout: Duration): util.Map[TopicPartitionJ, OffsetAndMetadataJ]

def partitionsFor(topic: String): util.List[PartitionInfoJ]

def partitionsFor(topic: String, timeout: Duration): util.List[PartitionInfoJ]

def listTopics(): util.Map[String, util.List[PartitionInfoJ]]

def listTopics(timeout: Duration): util.Map[String, util.List[PartitionInfoJ]]

def paused(): util.Set[TopicPartitionJ]

def offsetsForTimes(
timestampsToSearch: util.Map[TopicPartitionJ, LongJ]
): util.Map[TopicPartitionJ, OffsetAndTimestampJ]

def offsetsForTimes(
timestampsToSearch: util.Map[TopicPartitionJ, LongJ],
timeout: Duration
): util.Map[TopicPartitionJ, OffsetAndTimestampJ]

def beginningOffsets(partitions: util.Collection[TopicPartitionJ]): util.Map[TopicPartitionJ, LongJ]

def beginningOffsets(
partitions: util.Collection[TopicPartitionJ],
timeout: Duration
): util.Map[TopicPartitionJ, LongJ]

def endOffsets(partitions: util.Collection[TopicPartitionJ]): util.Map[TopicPartitionJ, LongJ]

def endOffsets(partitions: util.Collection[TopicPartitionJ], timeout: Duration): util.Map[TopicPartitionJ, LongJ]

def groupMetadata(): ConsumerGroupMetadataJ

}

object RebalanceConsumerJ {
def apply(c: ConsumerJ[_, _]): RebalanceConsumerJ = new RebalanceConsumerJ {

def assignment(): util.Set[TopicPartitionJ] =
c.assignment()

def subscription(): util.Set[String] =
c.subscription()

def commitSync(): Unit =
c.commitSync()

def commitSync(timeout: Duration): Unit =
c.commitSync(timeout)

def commitSync(offsets: util.Map[TopicPartitionJ, OffsetAndMetadataJ]): Unit =
c.commitSync(offsets)

def commitSync(offsets: util.Map[TopicPartitionJ, OffsetAndMetadataJ], timeout: Duration): Unit =
c.commitSync(offsets, timeout)

def seek(partition: TopicPartitionJ, offset: LongJ): Unit =
c.seek(partition, offset)

def seek(partition: TopicPartitionJ, offsetAndMetadata: OffsetAndMetadataJ): Unit =
c.seek(partition, offsetAndMetadata)

def seekToBeginning(partitions: util.Collection[TopicPartitionJ]): Unit =
c.seekToBeginning(partitions)

def seekToEnd(partitions: util.Collection[TopicPartitionJ]): Unit =
c.seekToEnd(partitions)

def position(partition: TopicPartitionJ): LongJ =
c.position(partition)

def position(partition: TopicPartitionJ, timeout: Duration): LongJ =
c.position(partition, timeout)

def committed(partitions: util.Set[TopicPartitionJ]): util.Map[TopicPartitionJ, OffsetAndMetadataJ] =
c.committed(partitions)

def committed(
partitions: util.Set[TopicPartitionJ],
timeout: Duration
): util.Map[TopicPartitionJ, OffsetAndMetadataJ] =
c.committed(partitions, timeout)

def partitionsFor(topic: String): util.List[PartitionInfoJ] =
c.partitionsFor(topic)

def partitionsFor(topic: String, timeout: Duration): util.List[PartitionInfoJ] =
c.partitionsFor(topic, timeout)

def listTopics(): util.Map[String, util.List[PartitionInfoJ]] =
c.listTopics()

def listTopics(timeout: Duration): util.Map[String, util.List[PartitionInfoJ]] =
c.listTopics(timeout)

def paused(): util.Set[TopicPartitionJ] =
c.paused()

def offsetsForTimes(
timestampsToSearch: util.Map[TopicPartitionJ, LongJ]
): util.Map[TopicPartitionJ, OffsetAndTimestampJ] =
c.offsetsForTimes(timestampsToSearch)

def offsetsForTimes(
timestampsToSearch: util.Map[TopicPartitionJ, LongJ],
timeout: Duration
): util.Map[TopicPartitionJ, OffsetAndTimestampJ] =
c.offsetsForTimes(timestampsToSearch, timeout)

def beginningOffsets(partitions: util.Collection[TopicPartitionJ]): util.Map[TopicPartitionJ, LongJ] =
c.beginningOffsets(partitions)

def beginningOffsets(
partitions: util.Collection[TopicPartitionJ],
timeout: Duration
): util.Map[TopicPartitionJ, LongJ] =
c.beginningOffsets(partitions, timeout)

def endOffsets(partitions: util.Collection[TopicPartitionJ]): util.Map[TopicPartitionJ, LongJ] =
c.endOffsets(partitions)

def endOffsets(partitions: util.Collection[TopicPartitionJ], timeout: Duration): util.Map[TopicPartitionJ, LongJ] =
c.endOffsets(partitions, timeout)

def groupMetadata(): ConsumerGroupMetadataJ =
c.groupMetadata()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.{
ConsumerRebalanceListener,
OffsetAndMetadata,
OffsetCommitCallback,
Consumer => ConsumerJ
OffsetCommitCallback
}
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartition => TopicPartitionJ}

Expand All @@ -24,7 +23,7 @@ import scala.util.control.NoStackTrace
* It is used to verify the only expected interaction in corresponding tests
* by implementing the only expected method to be called in test
*/
class ExplodingConsumer extends ConsumerJ[String, String] {
class ExplodingConsumer extends RebalanceConsumerJ {
def assignment(): SetJ[TopicPartitionJ] = notImplemented

def subscription(): SetJ[String] = notImplemented
Expand All @@ -41,7 +40,7 @@ class ExplodingConsumer extends ConsumerJ[String, String] {

def unsubscribe(): Unit = notImplemented

def poll(timeout: Long): consumer.ConsumerRecords[String, String] = notImplemented
def poll(timeout: LongJ): consumer.ConsumerRecords[String, String] = notImplemented

def poll(timeout: DurationJ): consumer.ConsumerRecords[String, String] = notImplemented

Expand All @@ -59,17 +58,17 @@ class ExplodingConsumer extends ConsumerJ[String, String] {

def commitAsync(offsets: MapJ[TopicPartitionJ, OffsetAndMetadata], callback: OffsetCommitCallback): Unit = notImplemented

def seek(partition: TopicPartitionJ, offset: Long): Unit = notImplemented
def seek(partition: TopicPartitionJ, offset: LongJ): Unit = notImplemented

def seek(partition: TopicPartitionJ, offsetAndMetadata: OffsetAndMetadata): Unit = notImplemented

def seekToBeginning(partitions: CollectionJ[TopicPartitionJ]): Unit = notImplemented

def seekToEnd(partitions: CollectionJ[TopicPartitionJ]): Unit = notImplemented

def position(partition: TopicPartitionJ): Long = notImplemented
def position(partition: TopicPartitionJ): LongJ = notImplemented

def position(partition: TopicPartitionJ, timeout: DurationJ): Long = notImplemented
def position(partition: TopicPartitionJ, timeout: DurationJ): LongJ = notImplemented

def committed(partition: TopicPartitionJ): OffsetAndMetadata = notImplemented

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import com.evolutiongaming.skafka.consumer.DataPoints._
import com.evolutiongaming.skafka.consumer.RebalanceCallback._
import com.evolutiongaming.skafka.consumer.RebalanceCallbackSpec._
import com.evolutiongaming.skafka.{Offset, Partition, TopicPartition}
import org.apache.kafka.clients.consumer.{Consumer => ConsumerJ}
import org.apache.kafka.common.{TopicPartition => TopicPartitionJ}
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.must.Matchers
Expand All @@ -22,7 +21,7 @@ class RebalanceCallbackSpec extends AnyFreeSpec with Matchers {
"RebalanceCallback" - {

"consumer unrelated methods do nothing with consumer" - {
val consumer: ConsumerJ[_, _] =
val consumer: RebalanceConsumerJ =
null // null to verify zero interactions with consumer, otherwise there would be an NPE

"empty just returns Unit" in {
Expand Down Expand Up @@ -182,7 +181,7 @@ class RebalanceCallbackSpec extends AnyFreeSpec with Matchers {
"cats traverse is working" in {
@volatile var seekResult: List[String] = List.empty
val consumer = new ExplodingConsumer {
override def seek(partition: TopicPartitionJ, offset: Long): Unit = {
override def seek(partition: TopicPartitionJ, offset: LongJ): Unit = {
seekResult = seekResult :+ partition.topic()
}
}
Expand All @@ -204,7 +203,7 @@ class RebalanceCallbackSpec extends AnyFreeSpec with Matchers {

object RebalanceCallbackSpec {

def tryRun[A](rc: RebalanceCallback[Try, A], consumer: ConsumerJ[_, _]): Try[Any] = {
def tryRun[A](rc: RebalanceCallback[Try, A], consumer: RebalanceConsumerJ): Try[Any] = {
RebalanceCallback.run[Try, A](rc, consumer)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.evolutiongaming.skafka.consumer

import java.lang.{Long => LongJ}

import cats.Applicative
import cats.data.{NonEmptySet => Nes}
import cats.effect.IO
Expand All @@ -21,11 +23,11 @@ class RebalanceListener1Spec extends AnyFreeSpec with Matchers {
val listener1 = new SaveOffsetsOnRebalance[IO]

val consumer = new ExplodingConsumer {
override def seek(partition: TopicPartitionJ, offset: Long): Unit = {
override def seek(partition: TopicPartitionJ, offset: LongJ): Unit = {
seekResult = seekResult :+ partition.toString
}

override def position(partition: TopicPartitionJ): Long = {
override def position(partition: TopicPartitionJ): LongJ = {
offsetsMap.j.get(partition)
}
}
Expand Down Expand Up @@ -109,8 +111,8 @@ object RebalanceListener1Spec {
.pure[F]
}

def saveOffsetsInExternalStore[F[_]: Applicative](offsets: Map[TopicPartition, Offset]) = {
if (offsets == offsetsMap.s.toSortedMap) offsets.as(()).pure[F]
def saveOffsetsInExternalStore[F[_]: Applicative](offsets: Map[TopicPartition, Offset]): F[Unit] = {
if (offsets == offsetsMap.s.toSortedMap) ().pure[F]
else sys.error("saving wrong offsets")
}

Expand Down

0 comments on commit 8a21b03

Please sign in to comment.