Skip to content

Commit

Permalink
Merge 757623f into b76b257
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitapecasa committed Apr 9, 2021
2 parents b76b257 + 757623f commit 8dc1c60
Show file tree
Hide file tree
Showing 9 changed files with 1,124 additions and 24 deletions.
31 changes: 28 additions & 3 deletions skafka/src/main/scala/com/evolutiongaming/skafka/Converters.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.evolutiongaming.skafka

import java.lang.{Long => LongJ}
import java.time.{Duration => DurationJ}
import java.util.{Optional, Collection => CollectionJ, Map => MapJ, Set => SetJ}
import java.util.{Optional, Collection => CollectionJ, Map => MapJ, Set => SetJ, List => ListJ}

import cats.Monad
import cats.data.{NonEmptyList => Nel, NonEmptySet => Nes}
import cats.data.{NonEmptyList => Nel, NonEmptySet => Nes, NonEmptyMap => Nem}
import cats.implicits._
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.{ApplicativeThrowable, FromTry, ToTry}
import com.evolutiongaming.catshelper.{ApplicativeThrowable, FromTry, MonadThrowable, ToTry}
import org.apache.kafka.clients.consumer.{OffsetAndMetadata => OffsetAndMetadataJ}
import org.apache.kafka.common.header.{Header => HeaderJ}
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
Expand Down Expand Up @@ -215,4 +216,28 @@ object Converters {

def toOptional: Optional[A] = self.fold(Optional.empty[A]()) { a => Optional.of(a) }
}

def committedOffsetsF[F[_] : MonadThrowable](mapJ: MapJ[TopicPartitionJ, OffsetAndMetadataJ]): F[Map[TopicPartition, OffsetAndMetadata]] = {
Option(mapJ).fold {
Map.empty[TopicPartition, OffsetAndMetadata].pure[F]
} {
_.asScalaMap(_.asScala[F], _.asScala[F])
}
}

def offsetsMapF[F[_] : MonadThrowable](mapJ: MapJ[TopicPartitionJ, LongJ]): F[Map[TopicPartition, Offset]] = {
mapJ.asScalaMap(_.asScala[F], a => Offset.of[F](a))
}

def asOffsetsJ(offsets: Nem[TopicPartition, OffsetAndMetadata]): MapJ[TopicPartitionJ, OffsetAndMetadataJ] = {
offsets.toSortedMap.asJavaMap(_.asJava, _.asJava)
}

def partitionsInfoListF[F[_]: ApplicativeThrowable](listJ: ListJ[PartitionInfoJ]): F[List[PartitionInfo]] = {
listJ.asScala.toList.traverse { _.asScala[F] }
}

def partitionsInfoMapF[F[_]: MonadThrowable](mapJ: MapJ[Topic, ListJ[PartitionInfoJ]]): F[Map[Topic, List[PartitionInfo]]] = {
mapJ.asScalaMap(_.pure[F], partitionsInfoListF[F])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.evolutiongaming.skafka.Converters._
import com.evolutiongaming.skafka.consumer.ConsumerConverters._
import com.evolutiongaming.smetrics.MeasureDuration
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, OffsetCommitCallback, Consumer => ConsumerJ, OffsetAndMetadata => OffsetAndMetadataJ, OffsetAndTimestamp => OffsetAndTimestampJ}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener => ConsumerRebalanceListenerJ, OffsetCommitCallback, Consumer => ConsumerJ, OffsetAndMetadata => OffsetAndMetadataJ, OffsetAndTimestamp => OffsetAndTimestampJ}
import org.apache.kafka.common.{PartitionInfo => PartitionInfoJ, TopicPartition => TopicPartitionJ}

import scala.concurrent.ExecutionContext
Expand All @@ -34,6 +34,9 @@ trait Consumer[F[_], K, V] {

def assignment: F[Set[TopicPartition]]

// TODO default `listener` to `RebalanceListener1.noOp` ?
// or add subscribe methods without any listeners and hide implementation details
def subscribe(topics: Nes[Topic], listener: RebalanceListener1[F]): F[Unit]

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[F]]): F[Unit]

Expand Down Expand Up @@ -130,6 +133,8 @@ object Consumer {

val assignment = Set.empty[TopicPartition].pure[F]

def subscribe(topics: Nes[Topic], listener: RebalanceListener1[F]): F[Unit] = empty

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[F]]) = empty

def subscribe(pattern: Pattern, listener: Option[RebalanceListener[F]]) = empty
Expand Down Expand Up @@ -287,7 +292,7 @@ object Consumer {
}

def listenerOf(listener: Option[RebalanceListener[F]]) = {
listener.fold[ConsumerRebalanceListener] {
listener.fold[ConsumerRebalanceListenerJ] {
new NoOpConsumerRebalanceListener
} { listener =>
listener.asJava(serialListeners)
Expand All @@ -309,25 +314,21 @@ object Consumer {
val partitionsJ = partitions.asJava
for {
result <- serialBlocking { f(partitionsJ) }
result <- Option(result).fold {
Map.empty[TopicPartition, OffsetAndMetadata].pure[F]
} {
_.asScalaMap(_.asScala[F], _.asScala[F])
}
result <- committedOffsetsF[F](result)
} yield result
}

def partitions1(f: => ListJ[PartitionInfoJ]) = {
for {
result <- serialBlocking { f }
result <- result.asScala.toList.traverse { _.asScala[F] }
result <- partitionsInfoListF[F](result)
} yield result
}

def topics1(f: => MapJ[Topic, ListJ[PartitionInfoJ]]) = {
for {
result <- serialBlocking { f }
result <- result.asScalaMap(_.pure[F], _.asScala.toList.traverse { _.asScala[F] })
result <- partitionsInfoMapF[F](result)
} yield result
}

Expand All @@ -338,7 +339,7 @@ object Consumer {
val timestampsJ = timestamps.asJavaMap(_.asJava, a => LongJ.valueOf(a.value))
for {
result <- serialBlocking { f(timestampsJ) }
result <- result.asScalaMap(_.asScala[F], v => Option(v).traverse { _.asScala[F] })
result <- offsetsAndTimestampsMapF[F](result)
} yield result
}

Expand All @@ -349,7 +350,7 @@ object Consumer {
val partitionsJ = partitions.asJava
for {
result <- serialBlocking { f(partitionsJ) }
result <- result.asScalaMap(_.asScala[F], a => Offset.of[F](a))
result <- offsetsMapF[F](result)
} yield result
}

Expand All @@ -369,6 +370,13 @@ object Consumer {
}
}

def subscribe(topics: Nes[Topic], listener: RebalanceListener1[F]) = {
// TODO derive ToTry timeout based on ConsumerConfig.maxPollInterval: FiniteDuration (defaulf of 5.minutes)
val topicsJ = topics.toSortedSet.asJava
val listenerJ = listener.asJava(consumer)
serialNonBlocking { consumer.subscribe(topicsJ, listenerJ) }
}

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[F]]) = {
val topicsJ = topics.toSortedSet.toSet.asJava
val listenerJ = listenerOf(listener)
Expand Down Expand Up @@ -407,16 +415,12 @@ object Consumer {
}

def commit(offsets: Nem[TopicPartition, OffsetAndMetadata]) = {
val offsetsJ = offsets
.toSortedMap
.asJavaMap(_.asJava, _.asJava)
val offsetsJ = asOffsetsJ(offsets)
serialBlocking { consumer.commitSync(offsetsJ) }
}

def commit(offsets: Nem[TopicPartition, OffsetAndMetadata], timeout: FiniteDuration) = {
val offsetsJ = offsets
.toSortedMap
.asJavaMap(_.asJava, _.asJava)
val offsetsJ = asOffsetsJ(offsets)
val timeoutJ = timeout.asJava
serialBlocking { consumer.commitSync(offsetsJ, timeoutJ) }
}
Expand Down Expand Up @@ -636,6 +640,9 @@ object Consumer {

val assignment = self.assignment

// TODO add metrics
def subscribe(topics: Nes[Topic], listener: RebalanceListener1[F]) = self.subscribe(topics, listener)

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[F]]) = {
val listener1 = listener.map(rebalanceListener)
for {
Expand Down Expand Up @@ -881,6 +888,11 @@ object Consumer {

def assignment = fg(self.assignment)

def subscribe(topics: Nes[Topic], listener: RebalanceListener1[G]) = {
val listener1 = listener.mapK(gf)
fg(self.subscribe(topics, listener1))
}

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[G]]) = {
val listener1 = listener.map(_.mapK(gf))
fg(self.subscribe(topics, listener1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import com.evolutiongaming.catshelper.DataHelper._
import com.evolutiongaming.catshelper._
import com.evolutiongaming.skafka.Converters._
import com.evolutiongaming.skafka._
import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata => ConsumerGroupMetadataJ, ConsumerRebalanceListener => RebalanceListenerJ, ConsumerRecord => ConsumerRecordJ, ConsumerRecords => ConsumerRecordsJ, OffsetAndMetadata => OffsetAndMetadataJ, OffsetAndTimestamp => OffsetAndTimestampJ}
import org.apache.kafka.clients.consumer.{Consumer => ConsumerJ, ConsumerGroupMetadata => ConsumerGroupMetadataJ, ConsumerRebalanceListener => RebalanceListenerJ, ConsumerRecord => ConsumerRecordJ, ConsumerRecords => ConsumerRecordsJ, OffsetAndMetadata => OffsetAndMetadataJ, OffsetAndTimestamp => OffsetAndTimestampJ}
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.record.{TimestampType => TimestampTypeJ}
import org.apache.kafka.common.{TopicPartition => TopicPartitionJ}

import scala.jdk.CollectionConverters._
import scala.util.Try

object ConsumerConverters {

Expand Down Expand Up @@ -89,6 +90,52 @@ object ConsumerConverters {
}
}

implicit class RebalanceListener1Ops[F[_]](val self: RebalanceListener1[F]) extends AnyVal {

def asJava(consumer: ConsumerJ[_, _])(implicit
F: Concurrent[F],
toTry: ToTry[F],
): RebalanceListenerJ = {

def onPartitions(
partitions: CollectionJ[TopicPartitionJ],
call: Nes[TopicPartition] => RebalanceCallback[F, Unit]
) = {
val result = partitions
.asScala
.toList
.traverse {_.asScala[F]}
.map { partitions =>
partitions
.toSortedSet
.toNes
}
.toTry
.flatMap { nesOpt =>
nesOpt.map {
nes => RebalanceCallback.run(call(nes), consumer)
}.getOrElse(Try(()))
}
result.fold(throw _, identity)
}

new RebalanceListenerJ {

def onPartitionsAssigned(partitions: CollectionJ[TopicPartitionJ]) = {
onPartitions(partitions, self.onPartitionsAssigned)
}

def onPartitionsRevoked(partitions: CollectionJ[TopicPartitionJ]) = {
onPartitions(partitions, self.onPartitionsRevoked)
}

override def onPartitionsLost(partitions: CollectionJ[TopicPartitionJ]) = {
onPartitions(partitions, self.onPartitionsLost)
}
}
}
}


implicit class ConsumerRecordJOps[K, V](val self: ConsumerRecordJ[K, V]) extends AnyVal {

Expand Down Expand Up @@ -208,4 +255,8 @@ object ConsumerConverters {
self.groupInstanceId.toOptional)
}
}

def offsetsAndTimestampsMapF[F[_] : MonadThrowable](mapJ: MapJ[TopicPartitionJ, OffsetAndTimestampJ]): F[Map[TopicPartition, Option[OffsetAndTimestamp]]] = {
mapJ.asScalaMap(_.asScala[F], v => Option(v).traverse { _.asScala[F] })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ object ConsumerLogging {
} yield a
}

// TODO: implement logging for subscribe with RebalanceListener1
def subscribe(topics: Nes[Topic], listener: RebalanceListener1[F]) = consumer.subscribe(topics, listener)

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[F]]) = {

val listenerLogging = (listener getOrElse RebalanceListener.empty[F]).withLogging(log)
Expand Down

0 comments on commit 8dc1c60

Please sign in to comment.