diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala b/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala index c950b54de..f86e9d9b0 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala @@ -110,12 +110,7 @@ object CommittableOffset { Map(_topicPartition -> _offsetAndMetadata) override def batch: CommittableOffsetBatch[F] = - CommittableOffsetBatch.one( - _topicPartition, - _offsetAndMetadata, - consumerGroupId, - _commit - ) + CommittableOffsetBatch.one(this) override def commit: F[Unit] = _commit(offsets) diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala b/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala index 536149eb0..880d9c7b2 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala @@ -101,7 +101,8 @@ sealed abstract class CommittableOffsetBatch[F[_]] { } object CommittableOffsetBatch { - private[kafka] def apply[F[_]]( + + private[kafka] def ofMultiTopic[F[_]]( offsets: Map[TopicPartition, OffsetAndMetadata], consumerGroupIds: Set[String], consumerGroupIdsMissing: Boolean, @@ -114,7 +115,7 @@ object CommittableOffsetBatch { new CommittableOffsetBatch[F] { override def updated(that: CommittableOffset[F]): CommittableOffsetBatch[F] = - CommittableOffsetBatch( + CommittableOffsetBatch.ofMultiTopic( _offsets.updated(that.topicPartition, that.offsetAndMetadata), that.consumerGroupId.fold(_consumerGroupIds)(_consumerGroupIds + _), _consumerGroupIdsMissing || that.consumerGroupId.isEmpty, @@ -122,7 +123,7 @@ object CommittableOffsetBatch { ) override def updated(that: CommittableOffsetBatch[F]): CommittableOffsetBatch[F] = - CommittableOffsetBatch( + CommittableOffsetBatch.ofMultiTopic( _offsets ++ that.offsets, _consumerGroupIds ++ that.consumerGroupIds, _consumerGroupIdsMissing || that.consumerGroupIdsMissing, @@ -140,7 +141,7 @@ object CommittableOffsetBatch { override def commit: F[Unit] = if (_consumerGroupIdsMissing) - F.raiseError(ConsumerGroupException(consumerGroupIds)) + ApplicativeThrow[F].raiseError(ConsumerGroupException(consumerGroupIds)) else { offsetsByTopic .map { @@ -149,7 +150,7 @@ object CommittableOffsetBatch { .getOrElse[Map[TopicPartition, OffsetAndMetadata] => F[Unit]]( topicName, _ => - F.raiseError( + ApplicativeThrow[F].raiseError( new RuntimeException(s"Cannot perform commit for topic: $topicName") ) ) @@ -164,17 +165,43 @@ object CommittableOffsetBatch { } } - def one[F[_]: ApplicativeThrow]( - topicPartition: TopicPartition, - offsetAndMetadata: OffsetAndMetadata, - consumerGroupId: Option[String], + @deprecated("Use CommittableOffsetBatch.apply with commitMap instead.", since = "2.5.1") + private[kafka] def apply[F[_]]( + offsets: Map[TopicPartition, OffsetAndMetadata], + consumerGroupIds: Set[String], + consumerGroupIdsMissing: Boolean, commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] + )(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] = + ofMultiTopic[F]( + offsets, + consumerGroupIds, + consumerGroupIdsMissing, + offsets.headOption + .map(_._1.topic()) + .map(topicName => Map(topicName -> commit)) + .getOrElse(Map.empty) + ) + + /** + * A [[CommittableOffsetBatch]] which does include only one offset for a single topic. + * + * @tparam F effect type to use to perform the commit effect + * @return A [[CommittableOffsetBatch]] which does include only one offset for a single topic. + * + * @see [[CommittableOffsetBatch#fromFoldable]] + * @see [[CommittableOffsetBatch#fromFoldableOption]] + */ + def one[F[_]: ApplicativeThrow]( + committableOffset: CommittableOffset[F] ): CommittableOffsetBatch[F] = - CommittableOffsetBatch( - Map(topicPartition -> offsetAndMetadata), - consumerGroupId.toSet, - consumerGroupId.isEmpty, - Map(topicPartition.topic() -> commit) + CommittableOffsetBatch.ofMultiTopic[F]( + Map(committableOffset.topicPartition -> committableOffset.offsetAndMetadata), + committableOffset.consumerGroupId.toSet, + committableOffset.consumerGroupId.isEmpty, + Map( + committableOffset.topicPartition + .topic() -> Map(committableOffset.topicPartition.topic() -> committableOffset.commit) + ) ) /** @@ -238,7 +265,7 @@ object CommittableOffsetBatch { } } - CommittableOffsetBatch( + CommittableOffsetBatch.ofMultiTopic( offsetsMap, consumerGroupIds, consumerGroupIdsMissing, @@ -290,7 +317,7 @@ object CommittableOffsetBatch { if (offsets.isEmpty || offsets.exists(_.isEmpty)) CommittableOffsetBatch.empty[F] else - CommittableOffsetBatch( + CommittableOffsetBatch.ofMultiTopic( offsetsMap, consumerGroupIds, consumerGroupIdsMissing,