Skip to content

Commit

Permalink
Add singleTopic in CommittableOffsetBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
geirolz committed Jul 30, 2022
1 parent 7b8f807 commit ea31b9b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,7 @@ object CommittableOffset {
Map(_topicPartition -> _offsetAndMetadata)

override def batch: CommittableOffsetBatch[F] =
CommittableOffsetBatch.one(
_topicPartition,
_offsetAndMetadata,
consumerGroupId,
_commit
)
CommittableOffsetBatch.one(this)

override def commit: F[Unit] =
_commit(offsets)
Expand Down
59 changes: 43 additions & 16 deletions modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ sealed abstract class CommittableOffsetBatch[F[_]] {
}

object CommittableOffsetBatch {
private[kafka] def apply[F[_]](

private[kafka] def ofMultiTopic[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
consumerGroupIdsMissing: Boolean,
Expand All @@ -114,15 +115,15 @@ object CommittableOffsetBatch {

new CommittableOffsetBatch[F] {
override def updated(that: CommittableOffset[F]): CommittableOffsetBatch[F] =
CommittableOffsetBatch(
CommittableOffsetBatch.ofMultiTopic(
_offsets.updated(that.topicPartition, that.offsetAndMetadata),
that.consumerGroupId.fold(_consumerGroupIds)(_consumerGroupIds + _),
_consumerGroupIdsMissing || that.consumerGroupId.isEmpty,
_commitMap
)

override def updated(that: CommittableOffsetBatch[F]): CommittableOffsetBatch[F] =
CommittableOffsetBatch(
CommittableOffsetBatch.ofMultiTopic(
_offsets ++ that.offsets,
_consumerGroupIds ++ that.consumerGroupIds,
_consumerGroupIdsMissing || that.consumerGroupIdsMissing,
Expand All @@ -140,7 +141,7 @@ object CommittableOffsetBatch {

override def commit: F[Unit] =
if (_consumerGroupIdsMissing)
F.raiseError(ConsumerGroupException(consumerGroupIds))
ApplicativeThrow[F].raiseError(ConsumerGroupException(consumerGroupIds))
else {
offsetsByTopic
.map {
Expand All @@ -149,7 +150,7 @@ object CommittableOffsetBatch {
.getOrElse[Map[TopicPartition, OffsetAndMetadata] => F[Unit]](
topicName,
_ =>
F.raiseError(
ApplicativeThrow[F].raiseError(
new RuntimeException(s"Cannot perform commit for topic: $topicName")
)
)
Expand All @@ -164,17 +165,43 @@ object CommittableOffsetBatch {
}
}

def one[F[_]: ApplicativeThrow](
topicPartition: TopicPartition,
offsetAndMetadata: OffsetAndMetadata,
consumerGroupId: Option[String],
@deprecated("Use CommittableOffsetBatch.apply with commitMap instead.", since = "2.5.1")
private[kafka] def apply[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
consumerGroupIdsMissing: Boolean,
commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
)(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] =
ofMultiTopic[F](
offsets,
consumerGroupIds,
consumerGroupIdsMissing,
offsets.headOption
.map(_._1.topic())
.map(topicName => Map(topicName -> commit))
.getOrElse(Map.empty)
)

/**
* A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @tparam F effect type to use to perform the commit effect
* @return A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @see [[CommittableOffsetBatch#fromFoldable]]
* @see [[CommittableOffsetBatch#fromFoldableOption]]
*/
def one[F[_]: ApplicativeThrow](
committableOffset: CommittableOffset[F]
): CommittableOffsetBatch[F] =
CommittableOffsetBatch(
Map(topicPartition -> offsetAndMetadata),
consumerGroupId.toSet,
consumerGroupId.isEmpty,
Map(topicPartition.topic() -> commit)
CommittableOffsetBatch.ofMultiTopic[F](
Map(committableOffset.topicPartition -> committableOffset.offsetAndMetadata),
committableOffset.consumerGroupId.toSet,
committableOffset.consumerGroupId.isEmpty,
Map(
committableOffset.topicPartition
.topic() -> Map(committableOffset.topicPartition.topic() -> committableOffset.commit)
)
)

/**
Expand Down Expand Up @@ -238,7 +265,7 @@ object CommittableOffsetBatch {
}
}

CommittableOffsetBatch(
CommittableOffsetBatch.ofMultiTopic(
offsetsMap,
consumerGroupIds,
consumerGroupIdsMissing,
Expand Down Expand Up @@ -290,7 +317,7 @@ object CommittableOffsetBatch {
if (offsets.isEmpty || offsets.exists(_.isEmpty))
CommittableOffsetBatch.empty[F]
else
CommittableOffsetBatch(
CommittableOffsetBatch.ofMultiTopic(
offsetsMap,
consumerGroupIds,
consumerGroupIdsMissing,
Expand Down

0 comments on commit ea31b9b

Please sign in to comment.