Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction offsets for transactional producer #742

Merged
merged 3 commits into from Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,7 +54,12 @@ private object TransactionalProducerStage {
final class NonemptyTransactionBatch(head: PartitionOffset,
tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]())
extends TransactionBatch {
private val offsets = tail + (head.key -> head.offset)
// There is no guarantee that offsets adding callbacks will be called in any particular order.
// Decreasing an offset stored for the KTP would mean possible data duplication.
// Since `awaitingConfirmation` counter guarantees that all writes finished, we can safely assume
// that all all data up to maximal offsets has been wrote to Kafka.
private val previousHighest = tail.getOrElse(head.key, -1L)
private val offsets = tail + (head.key -> head.offset.max(previousHighest))

def group: String = head.key.groupId
def offsetMap(): Map[TopicPartition, OffsetAndMetadata] = offsets.map {
Expand Down
117 changes: 75 additions & 42 deletions tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala
Expand Up @@ -5,18 +5,20 @@

package akka.kafka.scaladsl

import akka.Done
import akka.kafka.ConsumerMessage.PartitionOffset
import akka.kafka.Subscriptions.TopicSubscription
import akka.kafka._
import akka.kafka.{ProducerMessage, _}
import akka.kafka.scaladsl.Consumer.Control
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import net.manub.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord

import scala.concurrent.Await
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec) {
Expand All @@ -43,26 +45,13 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec)

val consumerSettings = consumerDefaults.withGroupId(group)

val control = Transactional
.source(consumerSettings, TopicSubscription(Set(sourceTopic), None))
.filterNot(_.record.value() == InitialMsg)
.map { msg =>
ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset)
}
.via(Transactional.flow(producerDefaults, group))
val control = transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, group)
.toMat(Sink.ignore)(Keep.left)
.run()

val probeConsumerGroup = createGroupId(2)
val probeConsumerSettings = consumerDefaults
.withGroupId(probeConsumerGroup)
.withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed")

val probeConsumer = Consumer
.plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None))
.filterNot(_.value == InitialMsg)
.map(_.value())
.runWith(TestSink.probe)
val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic)

probeConsumer
.request(100)
Expand Down Expand Up @@ -100,15 +89,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec)
.run()

val probeConsumerGroup = createGroupId(2)
val probeConsumerSettings = consumerDefaults
.withGroupId(probeConsumerGroup)
.withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed")

val probeConsumer = Consumer
.plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None))
.filterNot(_.value == InitialMsg)
.map(_.value())
.runWith(TestSink.probe)
val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic)

probeConsumer
.request(100)
Expand Down Expand Up @@ -164,15 +146,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec)
restartSource.runWith(Sink.ignore)

val probeGroup = createGroupId(2)
val probeConsumerSettings = consumerDefaults
.withGroupId(probeGroup)
.withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed")

val probeConsumer = Consumer
.plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None))
.filterNot(_.value == InitialMsg)
.map(_.value())
.runWith(TestSink.probe)
val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeGroup), sinkTopic)

probeConsumer
.request(1000)
Expand Down Expand Up @@ -233,15 +208,8 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec)
restartSource.runWith(Sink.ignore)

val probeGroup = createGroupId(2)
val probeConsumerSettings = consumerDefaults
.withGroupId(probeGroup)
.withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed")

val probeConsumer = Consumer
.plainSource(probeConsumerSettings, TopicSubscription(Set(sinkTopic), None))
.filterNot(_.value == InitialMsg)
.map(_.value())
.runWith(TestSink.probe)
val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeGroup), sinkTopic)

probeConsumer
.request(100)
Expand All @@ -251,5 +219,70 @@ class TransactionsSpec extends SpecBase(kafkaPort = KafkaPorts.TransactionsSpec)
Await.result(innerControl.shutdown(), remainingOrDefault)
}

"provide consistency when using multiple transactional streams" in {
val sourceTopic = createTopicName(1)
val sinkTopic = createTopic(2, partitions = 4)
val group = createGroupId(1)

givenInitializedTopic(sourceTopic)
givenInitializedTopic(sinkTopic)

val elements = 50
val batchSize = 10
Await.result(produce(sourceTopic, 1 to elements), remainingOrDefault)

val consumerSettings = consumerDefaults.withGroupId(group)

def runStream(id: String): Consumer.Control = {
val control: Control = transactionalCopyStream(consumerSettings, sourceTopic, sinkTopic, s"$group-$id")
.toMat(Sink.ignore)(Keep.left)
.run()
control
}

val controls: Seq[Control] = (0 until elements / batchSize)
.map(_.toString)
.map(runStream)

val probeConsumerGroup = createGroupId(2)

val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic)

probeConsumer
.request(elements)
.expectNextUnorderedN((1 to elements).map(_.toString))

probeConsumer.cancel()

val futures: Seq[Future[Done]] = controls.map(_.shutdown())
Await.result(Future.sequence(futures), remainingOrDefault)
}
}

private def transactionalCopyStream(consumerSettings: ConsumerSettings[String, String], sourceTopic: String,
sinkTopic: String, transactionalId: String):
Source[ProducerMessage.Results[String, String, PartitionOffset], Control] = {
Transactional
.source(consumerSettings, TopicSubscription(Set(sourceTopic), None))
.filterNot(_.record.value() == InitialMsg)
.map { msg =>
ProducerMessage.single(new ProducerRecord[String, String](sinkTopic, msg.record.value), msg.partitionOffset)
}
.via(Transactional.flow(producerDefaults, transactionalId))
}

private def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] = {
consumerDefaults
.withGroupId(groupId)
.withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed")
}

private def valuesProbeConsumer(settings: ConsumerSettings[String, String], topic: String):
TestSubscriber.Probe[String] = {
Consumer
.plainSource(settings, TopicSubscription(Set(topic), None))
.filterNot(_.value == InitialMsg)
.map(_.value())
.runWith(TestSink.probe)
}
}