Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports multi-topic in CommittableOffsetBatch #1041

Open
wants to merge 7 commits into
base: series/2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object CommittableOffset {
Map(_topicPartition -> _offsetAndMetadata)

override def batch: CommittableOffsetBatch[F] =
CommittableOffsetBatch(offsets, consumerGroupId.toSet, consumerGroupId.isEmpty, _commit)
CommittableOffsetBatch.one(this)

override def commit: F[Unit] =
_commit(offsets)
Expand Down
161 changes: 118 additions & 43 deletions modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

package fs2.kafka

import cats.ApplicativeError
import cats.{Applicative, ApplicativeError, ApplicativeThrow, Foldable, Show}
import cats.instances.list._
import cats.syntax.foldable._
import cats.syntax.show._
import cats.{Applicative, Foldable, Show}
import fs2.kafka.instances._
import fs2.kafka.internal.syntax._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
Expand Down Expand Up @@ -93,56 +92,120 @@ sealed abstract class CommittableOffsetBatch[F[_]] {
* is nothing to commit.
*/
def commit: F[Unit]

private[kafka] def committableOffsetsMap
: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]]
}

object CommittableOffsetBatch {
private[kafka] def apply[F[_]](

private[kafka] def ofMultiTopic[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
consumerGroupIdsMissing: Boolean,
commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
commitOffsetsMap: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]]
)(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] = {
val _offsets = offsets
val _consumerGroupIds = consumerGroupIds
val _consumerGroupIdsMissing = consumerGroupIdsMissing
val _commit = commit
val _commitOffsetsMap = commitOffsetsMap

new CommittableOffsetBatch[F] {
override def updated(that: CommittableOffset[F]): CommittableOffsetBatch[F] =
CommittableOffsetBatch(
CommittableOffsetBatch.ofMultiTopic(
_offsets.updated(that.topicPartition, that.offsetAndMetadata),
that.consumerGroupId.fold(_consumerGroupIds)(_consumerGroupIds + _),
_consumerGroupIdsMissing || that.consumerGroupId.isEmpty,
_commit
_commitOffsetsMap.updated(that.topicPartition.topic(), that.commitOffsets)
)

override def updated(that: CommittableOffsetBatch[F]): CommittableOffsetBatch[F] =
CommittableOffsetBatch(
CommittableOffsetBatch.ofMultiTopic(
_offsets ++ that.offsets,
_consumerGroupIds ++ that.consumerGroupIds,
_consumerGroupIdsMissing || that.consumerGroupIdsMissing,
_commit
(committableOffsetsMap.toList ++ that.committableOffsetsMap.toList).toMap
)

override val offsets: Map[TopicPartition, OffsetAndMetadata] =
_offsets

override val committableOffsetsMap
: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] =
_commitOffsetsMap

override val consumerGroupIds: Set[String] =
_consumerGroupIds

override val consumerGroupIdsMissing: Boolean =
_consumerGroupIdsMissing

override def commit: F[Unit] =
if (_consumerGroupIdsMissing || _consumerGroupIds.size != 1)
F.raiseError(ConsumerGroupException(consumerGroupIds))
else _commit(offsets)
if (_consumerGroupIdsMissing)
ApplicativeThrow[F].raiseError(ConsumerGroupException(consumerGroupIds))
else {
offsets
.groupBy(_._1.topic())
.map {
case (topicName, info) =>
committableOffsetsMap
.getOrElse[Map[TopicPartition, OffsetAndMetadata] => F[Unit]](
topicName,
_ =>
ApplicativeThrow[F].raiseError(
new RuntimeException(s"Cannot perform commit for topic: $topicName")
)
)
.apply(info)
}
.toList
.sequence_
}

override def toString: String =
Show[CommittableOffsetBatch[F]].show(this)
}
}

@deprecated("Use CommittableOffsetBatch.apply with commitMap instead.", since = "2.5.1")
private[kafka] def apply[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
consumerGroupIdsMissing: Boolean,
commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
)(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] =
ofMultiTopic[F](
offsets,
consumerGroupIds,
consumerGroupIdsMissing,
offsets.headOption
.map(_._1.topic())
.map(topicName => Map(topicName -> commit))
.getOrElse(Map.empty)
)

/**
* A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @tparam F effect type to use to perform the commit effect
* @return A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @see [[CommittableOffsetBatch#fromFoldable]]
* @see [[CommittableOffsetBatch#fromFoldableOption]]
*/
def one[F[_]: ApplicativeThrow](
committableOffset: CommittableOffset[F]
): CommittableOffsetBatch[F] =
CommittableOffsetBatch.ofMultiTopic[F](
Map(committableOffset.topicPartition -> committableOffset.offsetAndMetadata),
committableOffset.consumerGroupId.toSet,
committableOffset.consumerGroupId.isEmpty,
Map(
committableOffset.topicPartition
.topic() -> Map(committableOffset.topicPartition -> committableOffset.commit)
)
)

/**
* Creates a [[CommittableOffsetBatch]] from some [[CommittableOffset]]s,
* where the containing type has a `Foldable` instance. Guaranteed to be
Expand Down Expand Up @@ -183,32 +246,35 @@ object CommittableOffsetBatch {
def fromFoldableMap[F[_], G[_], A](ga: G[A])(f: A => CommittableOffset[F])(
implicit F: ApplicativeError[F, Throwable],
G: Foldable[G]
): CommittableOffsetBatch[F] = {
var commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] = null
var offsetsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty
var consumerGroupIds: Set[String] = Set.empty
var consumerGroupIdsMissing: Boolean = false
var empty: Boolean = true

ga.foldLeft(()) { (_, a) =>
val offset = f(a)

if (empty) {
commit = offset.commitOffsets
empty = false
): CommittableOffsetBatch[F] =
if (ga.isEmpty)
CommittableOffsetBatch.empty[F]
else {
var commitMap: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] = Map.empty
var offsetsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty
var consumerGroupIds: Set[String] = Set.empty
var consumerGroupIdsMissing: Boolean = false

ga.foldLeft(()) { (_, a) =>
val offset: CommittableOffset[F] = f(a)
val topicPartition = offset.topicPartition

commitMap = commitMap.updatedIfAbsent(topicPartition.topic(), offset.commitOffsets)
offsetsMap = offsetsMap.updated(topicPartition, offset.offsetAndMetadata)
offset.consumerGroupId match {
case Some(consumerGroupId) => consumerGroupIds = consumerGroupIds + consumerGroupId
case None => consumerGroupIdsMissing = true
}
}

offsetsMap = offsetsMap.updated(offset.topicPartition, offset.offsetAndMetadata)
offset.consumerGroupId match {
case Some(consumerGroupId) => consumerGroupIds = consumerGroupIds + consumerGroupId
case None => consumerGroupIdsMissing = true
}
CommittableOffsetBatch.ofMultiTopic(
offsetsMap,
consumerGroupIds,
consumerGroupIdsMissing,
commitMap
)
}

if (empty) CommittableOffsetBatch.empty[F]
else CommittableOffsetBatch(offsetsMap, consumerGroupIds, consumerGroupIdsMissing, commit)
}

/**
* Creates a [[CommittableOffsetBatch]] from some [[CommittableOffset]]s wrapped
* in `Option`, where the containing type has a `Foldable` instance. Guaranteed
Expand All @@ -231,29 +297,34 @@ object CommittableOffsetBatch {
implicit F: ApplicativeError[F, Throwable],
G: Foldable[G]
): CommittableOffsetBatch[F] = {
var commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] = null

var commitMap: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] = Map.empty
var offsetsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty
var consumerGroupIds: Set[String] = Set.empty
var consumerGroupIdsMissing: Boolean = false
var empty: Boolean = true

offsets.foldLeft(()) {
case (_, Some(offset)) =>
if (empty) {
commit = offset.commitOffsets
empty = false
}
val topicPartition = offset.topicPartition

offsetsMap = offsetsMap.updated(offset.topicPartition, offset.offsetAndMetadata)
commitMap = commitMap.updatedIfAbsent(topicPartition.topic(), offset.commitOffsets)
offsetsMap = offsetsMap.updated(topicPartition, offset.offsetAndMetadata)
offset.consumerGroupId match {
case Some(consumerGroupId) => consumerGroupIds = consumerGroupIds + consumerGroupId
case None => consumerGroupIdsMissing = true
}
case (_, None) => ()
}

if (empty) CommittableOffsetBatch.empty[F]
else CommittableOffsetBatch(offsetsMap, consumerGroupIds, consumerGroupIdsMissing, commit)
if (offsets.isEmpty || offsets.exists(_.isEmpty))
CommittableOffsetBatch.empty[F]
else
CommittableOffsetBatch.ofMultiTopic(
offsetsMap,
consumerGroupIds,
consumerGroupIdsMissing,
commitMap
)
}

/**
Expand Down Expand Up @@ -286,6 +357,10 @@ object CommittableOffsetBatch {

override def toString: String =
Show[CommittableOffsetBatch[F]].show(this)

override private[kafka] def committableOffsetsMap
: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] =
Map.empty
}

implicit def committableOffsetBatchShow[F[_]]: Show[CommittableOffsetBatch[F]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ import org.apache.kafka.common.KafkaException
* while attempting to commit offsets.<br>
* <br>
* - There were [[CommittableOffset]]s without a consumer group ID.<br>
* - There were [[CommittableOffset]]s for multiple consumer group IDs.
*/
sealed abstract class ConsumerGroupException(groupIds: Set[String])
extends KafkaException({
val groupIdsString = groupIds.toList.sorted.mkString(", ")
s"multiple or missing consumer group ids [$groupIdsString]"
s"missing consumer group ids [$groupIdsString]"
})

private[kafka] object ConsumerGroupException {
Expand Down
12 changes: 11 additions & 1 deletion modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,17 @@ trait BaseGenerators {
implicit F: ApplicativeError[F, Throwable]
): Gen[CommittableOffsetBatch[F]] =
arbitrary[Map[TopicPartition, OffsetAndMetadata]]
.map(CommittableOffsetBatch[F](_, Set.empty, false, _ => F.unit))
.map(_.toList)
.map(
CommittableOffsetBatch.fromFoldableMap(_)(a => {
CommittableOffset(
topicPartition = a._1,
offsetAndMetadata = a._2,
consumerGroupId = None,
commit = _ => F.unit
)
})
)

implicit def arbCommittableOffsetBatch[F[_]](
implicit F: ApplicativeError[F, Throwable]
Expand Down