From a51250af72ce3a3b95ddc69c410c530866f9b9c8 Mon Sep 17 00:00:00 2001 From: Szymon Matejczyk Date: Mon, 4 Mar 2019 17:45:13 +0100 Subject: [PATCH 1/3] Add tests for transactions with multiple streams --- .../kafka/scaladsl/TransactionsSpec.scala | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index bf59ab92a..f4ad74e7c 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -5,6 +5,7 @@ package akka.kafka.scaladsl +import akka.Done import akka.kafka.ConsumerMessage.PartitionOffset import akka.kafka.Subscriptions.TopicSubscription import akka.kafka._ @@ -16,7 +17,7 @@ import net.manub.embeddedkafka.EmbeddedKafkaConfig import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord -import scala.concurrent.Await +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) { @@ -251,5 +252,57 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) Await.result(innerControl.shutdown(), remainingOrDefault) } + "provide consistency when using multiple transactional streams" in { + val sourceTopic = createTopicName(1) + val sinkTopic = createTopic(2, partitions = 4) + val group = createGroupId(1) + + givenInitializedTopic(sourceTopic) + givenInitializedTopic(sinkTopic) + + val elements = 50 + val batchSize = 10 + Await.result(produce(sourceTopic, 1 to elements), remainingOrDefault) + + val consumerSettings = consumerDefaults.withGroupId(group) + + def runStream(id: String): Consumer.Control = { + val control: Control = Transactional + .source(consumerSettings, TopicSubscription(Set(sourceTopic), None)) + .filterNot(_.record.value() == InitialMsg) + .take(batchSize) + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) + } + .via(Transactional.flow(producerDefaults, s"$group-$id")) + .toMat(Sink.ignore)(Keep.left) + .run() + control + } + + val controls: Seq[Control] = (0 until elements / batchSize) + .map(_.toString) + .map(runStream) + + val probeConsumerGroup = createGroupId(2) + val probeConsumerSettings = consumerDefaults + .withGroupId(probeConsumerGroup) + .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + + val probeConsumer = Consumer + .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) + .filterNot(_.value == InitialMsg) + .map(_.value()) + .runWith(TestSink.probe) + + probeConsumer + .request(elements) + .expectNextUnorderedN((1 to elements).map(_.toString)) + + probeConsumer.cancel() + + val futures: Seq[Future[Done]] = controls.map(_.shutdown()) + Await.result(Future.sequence(futures), remainingOrDefault) + } } } From 5d6de51ea547bb5663def5e82144a3792b1466fb Mon Sep 17 00:00:00 2001 From: Szymon Matejczyk Date: Fri, 8 Mar 2019 11:05:41 +0100 Subject: [PATCH 2/3] Use maximal offset when committing transaction in TransactionalProducerStage Callbacks from `producer.send` are guaranteed to execute in order only for a single output partition. That means we can have a race condition where we execute a callback for input record with offset 1 before executing a callback for input record with offset 0. That causes `NonEmptyTransactionBatch` to contain offset 0, when committing transaction. That leads to data duplication. This fix ensures that we only increase the offsets stored in `TransactionBatch`. Since having all consecutive offsets wrote to Kafka is guaranteed by `awaitingConfirmation == 0`, we can only keep the maximal offset in the `TransactionBatch`. --- .../akka/kafka/internal/TransactionalProducerStage.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 5dd8bb6c5..6347784cc 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -54,7 +54,12 @@ private object TransactionalProducerStage { final class NonemptyTransactionBatch(head: PartitionOffset, tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]()) extends TransactionBatch { - private val offsets = tail + (head.key -> head.offset) + // There is no guarantee that offsets adding callbacks will be called in any particular order. + // Decreasing an offset stored for the KTP would mean possible data duplication. + // Since `awaitingConfirmation` counter guarantees that all writes finished, we can safely assume + // that all all data up to maximal offsets has been wrote to Kafka. + private val previousHighest = tail.getOrElse(head.key, -1L) + private val offsets = tail + (head.key -> head.offset.max(previousHighest)) def group: String = head.key.groupId def offsetMap(): Map[TopicPartition, OffsetAndMetadata] = offsets.map { From dd6226e3609a578aa494eb7ff7adc569e085bfb3 Mon Sep 17 00:00:00 2001 From: Szymon Matejczyk Date: Mon, 11 Mar 2019 21:28:32 +0100 Subject: [PATCH 3/3] Refactor common parts of TransactionsSpec tests --- .../kafka/scaladsl/TransactionsSpec.scala | 94 ++++++++----------- 1 file changed, 37 insertions(+), 57 deletions(-) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index f4ad74e7c..5bbff9920 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -8,9 +8,10 @@ package akka.kafka.scaladsl import akka.Done import akka.kafka.ConsumerMessage.PartitionOffset import akka.kafka.Subscriptions.TopicSubscription -import akka.kafka._ +import akka.kafka.{ProducerMessage, _} import akka.kafka.scaladsl.Consumer.Control -import akka.stream.scaladsl.{Keep, RestartSource, Sink} +import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} +import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink import net.manub.embeddedkafka.EmbeddedKafkaConfig @@ -44,26 +45,13 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) val consumerSettings = consumerDefaults.withGroupId(group) - val control = Transactional - .source(consumerSettings, TopicSubscription(Set(sourceTopic), None)) - .filterNot(_.record.value() == InitialMsg) - .map { msg => - ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) - } - .via(Transactional.flow(producerDefaults, group)) + val control = transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, group) .toMat(Sink.ignore)(Keep.left) .run() val probeConsumerGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeConsumerGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) probeConsumer .request(100) @@ -101,15 +89,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) .run() val probeConsumerGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeConsumerGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) probeConsumer .request(100) @@ -165,15 +146,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) restartSource.runWith(Sink.ignore) val probeGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeGroup), sinkTopic) probeConsumer .request(1000) @@ -234,15 +208,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) restartSource.runWith(Sink.ignore) val probeGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeGroup), sinkTopic) probeConsumer .request(100) @@ -267,14 +234,7 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) val consumerSettings = consumerDefaults.withGroupId(group) def runStream(id: String): Consumer.Control = { - val control: Control = Transactional - .source(consumerSettings, TopicSubscription(Set(sourceTopic), None)) - .filterNot(_.record.value() == InitialMsg) - .take(batchSize) - .map { msg => - ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) - } - .via(Transactional.flow(producerDefaults, s"$group-$id")) + val control: Control = transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, s"$group-$id") .toMat(Sink.ignore)(Keep.left) .run() control @@ -285,15 +245,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) .map(runStream) val probeConsumerGroup = createGroupId(2) - val probeConsumerSettings = consumerDefaults - .withGroupId(probeConsumerGroup) - .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - val probeConsumer = Consumer - .plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None)) - .filterNot(_.value == InitialMsg) - .map(_.value()) - .runWith(TestSink.probe) + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) probeConsumer .request(elements) @@ -305,4 +258,31 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) Await.result(Future.sequence(futures), remainingOrDefault) } } + + private def transactionalCopyStream(consumerSettings: ConsumerSettings[String, String], sourceTopic: String, + sinkTopic: String, transactionalId: String): + Source[ProducerMessage.Results[String, String, PartitionOffset], Control] = { + Transactional + .source(consumerSettings, TopicSubscription(Set(sourceTopic), None)) + .filterNot(_.record.value() == InitialMsg) + .map { msg => + ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset) + } + .via(Transactional.flow(producerDefaults, transactionalId)) + } + + private def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] = { + consumerDefaults + .withGroupId(groupId) + .withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + } + + private def valuesProbeConsumer(settings: ConsumerSettings[String, String], topic: String): + TestSubscriber.Probe[String] = { + Consumer + .plainSource(settings, TopicSubscription(Set(topic), None)) + .filterNot(_.value == InitialMsg) + .map(_.value()) + .runWith(TestSink.probe) + } }