Skip to content

Commit

Permalink
Add singleTopic in CommittableOffsetBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
geirolz committed Jul 30, 2022
1 parent 7b8f807 commit 67cde6c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,7 @@ object CommittableOffset {
Map(_topicPartition -> _offsetAndMetadata)

override def batch: CommittableOffsetBatch[F] =
CommittableOffsetBatch.one(
_topicPartition,
_offsetAndMetadata,
consumerGroupId,
_commit
)
CommittableOffsetBatch.one(this)

override def commit: F[Unit] =
_commit(offsets)
Expand Down
46 changes: 35 additions & 11 deletions modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ sealed abstract class CommittableOffsetBatch[F[_]] {
}

object CommittableOffsetBatch {

private[kafka] def apply[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
Expand Down Expand Up @@ -140,7 +141,7 @@ object CommittableOffsetBatch {

override def commit: F[Unit] =
if (_consumerGroupIdsMissing)
F.raiseError(ConsumerGroupException(consumerGroupIds))
ApplicativeThrow[F].raiseError(ConsumerGroupException(consumerGroupIds))
else {
offsetsByTopic
.map {
Expand All @@ -149,7 +150,7 @@ object CommittableOffsetBatch {
.getOrElse[Map[TopicPartition, OffsetAndMetadata] => F[Unit]](
topicName,
_ =>
F.raiseError(
ApplicativeThrow[F].raiseError(
new RuntimeException(s"Cannot perform commit for topic: $topicName")
)
)
Expand All @@ -164,17 +165,40 @@ object CommittableOffsetBatch {
}
}

def one[F[_]: ApplicativeThrow](
topicPartition: TopicPartition,
offsetAndMetadata: OffsetAndMetadata,
consumerGroupId: Option[String],
@deprecated("Use CommittableOffsetBatch.apply with commitMap instead.", since = "2.5.1")
private[kafka] def apply[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
consumerGroupIdsMissing: Boolean,
commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
)(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] =
apply[F](
offsets,
consumerGroupIds,
consumerGroupIdsMissing,
offsets.headOption
.map(_._1.topic())
.map(topicName => Map(topicName -> commit))
.getOrElse(Map.empty)
)

/**
* A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @tparam F effect type to use to perform the commit effect
* @return A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @see [[CommittableOffsetBatch#fromFoldable]]
* @see [[CommittableOffsetBatch#fromFoldableOption]]
*/
def one[F[_]: ApplicativeThrow](
committableOffset: CommittableOffset[F]
): CommittableOffsetBatch[F] =
CommittableOffsetBatch(
Map(topicPartition -> offsetAndMetadata),
consumerGroupId.toSet,
consumerGroupId.isEmpty,
Map(topicPartition.topic() -> commit)
CommittableOffsetBatch[F](
Map(committableOffset.topicPartition -> committableOffset.offsetAndMetadata),
committableOffset.consumerGroupId.toSet,
committableOffset.consumerGroupId.isEmpty,
Map(committableOffset.topicPartition.topic() -> committableOffset.commit)
)

/**
Expand Down

0 comments on commit 67cde6c

Please sign in to comment.