diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 5dd8bb6c5..6347784cc 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -54,7 +54,12 @@ private object TransactionalProducerStage { final class NonemptyTransactionBatch(head: PartitionOffset, tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]()) extends TransactionBatch { - private val offsets = tail + (head.key -> head.offset) + // There is no guarantee that offsets adding callbacks will be called in any particular order. + // Decreasing an offset stored for the KTP would mean possible data duplication. + // Since `awaitingConfirmation` counter guarantees that all writes finished, we can safely assume + // that all all data up to maximal offsets has been wrote to Kafka. + private val previousHighest = tail.getOrElse(head.key, -1L) + private val offsets = tail + (head.key -> head.offset.max(previousHighest)) def group: String = head.key.groupId def offsetMap(): Map[TopicPartition, OffsetAndMetadata] = offsets.map { diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index bf59ab92a..5bbff9920 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -5,18 +5,20 @@ package akka.kafka.scaladsl +import akka.Done import akka.kafka.ConsumerMessage.PartitionOffset import akka.kafka.Subscriptions.TopicSubscription -import akka.kafka._ +import akka.kafka.{ProducerMessage, _} import akka.kafka.scaladsl.Consumer.Control -import akka.stream.scaladsl.{Keep, RestartSource, Sink} +import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} +import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink import net.manub.embeddedkafka.EmbeddedKafkaConfig import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord -import scala.concurrent.Await +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) { @@ -43,26 +45,13 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) val consumerSettings = consumerDefaults.withGroupId(group) - val control = Transactional - .source(consumerSettings, TopicSubscription(Set(sourceTopic), None)) - .filterNot(_.record.value() == InitialMsg) - .map { msg => - ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) - } - .via(Transactional.flow(producerDefaults, group)) + val control = transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, group) .toMat(Sink.ignore)(Keep.left) .run() val probeConsumerGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeConsumerGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) probeConsumer .request(100) @@ -100,15 +89,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) .run() val probeConsumerGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeConsumerGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) probeConsumer .request(100) @@ -164,15 +146,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) restartSource.runWith(Sink.ignore) val probeGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeGroup), sinkTopic) probeConsumer .request(1000) @@ -233,15 +208,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) restartSource.runWith(Sink.ignore) val probeGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeGroup), sinkTopic) probeConsumer .request(100) @@ -251,5 +219,70 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) Await.result(innerControl.shutdown(), remainingOrDefault) } + "provide consistency when using multiple transactional streams" in { + val sourceTopic = createTopicName(1) + val sinkTopic = createTopic(2, partitions = 4) + val group = createGroupId(1) + + givenInitializedTopic(sourceTopic) + givenInitializedTopic(sinkTopic) + + val elements = 50 + val batchSize = 10 + Await.result(produce(sourceTopic, 1 to elements), remainingOrDefault) + + val consumerSettings = consumerDefaults.withGroupId(group) + + def runStream(id: String): Consumer.Control = { + val control: Control = transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, s"$group-$id") + .toMat(Sink.ignore)(Keep.left) + .run() + control + } + + val controls: Seq[Control] = (0 until elements / batchSize) + .map(_.toString) + .map(runStream) + + val probeConsumerGroup = createGroupId(2) + + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) + + probeConsumer + .request(elements) + .expectNextUnorderedN((1 to elements).map(_.toString)) + + probeConsumer.cancel() + + val futures: Seq[Future[Done]] = controls.map(_.shutdown()) + Await.result(Future.sequence(futures), remainingOrDefault) + } + } + + private def transactionalCopyStream(consumerSettings: ConsumerSettings[String, String], sourceTopic: String, + sinkTopic: String, transactionalId: String): + Source[ProducerMessage.Results[String, String, PartitionOffset], Control] = { + Transactional + .source(consumerSettings, TopicSubscription(Set(sourceTopic), None)) + .filterNot(_.record.value() == InitialMsg) + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) + } + .via(Transactional.flow(producerDefaults, transactionalId)) + } + + private def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] = { + consumerDefaults + .withGroupId(groupId) + .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + } + + private def valuesProbeConsumer(settings: ConsumerSettings[String, String], topic: String): + TestSubscriber.Probe[String] = { + Consumer + .plainSource(settings, TopicSubscription(Set(topic), None)) + .filterNot(_.value == InitialMsg) + .map(_.value()) + .runWith(TestSink.probe) } }