Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Transactions #420

Merged
merged 8 commits into from
May 18, 2018
Merged

Kafka Transactions #420

merged 8 commits into from
May 18, 2018

Conversation

seglo
Copy link
Member

@seglo seglo commented Mar 26, 2018

This PR is my first draft of Kafka transactional support to allow for "exactly once" message processing semantics. I originally proposed this feature in #369 . I've been tinkering for some time now and I'm eager for feedback from the maintainers and users of this project. This implementation is inspired by the exactly_once setting for the processing.guarantee configuration found in Kafka Streams (KIP-129, design document). I've included documentation which describes the implementation and usage in detail. Some excerpts are copied into the PR description below for convenience.

Transactions

Kafka Transactions provide guarantees that messages processed in a consume->transform->produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as "committed" only once the transformed messages are successfully produced to the sink.

...

Consume Transform Produce Workflow

Kafka transactions are handled transparently to the user. The Consumer.transactionalSource will enforce that a consumer group id is specified and the Producer.transactionalFlow or Producer.transactionalSink will enforce that a transactional id is specified. All other Kafka consumer and producer properties required to enable transactions are overridden.

Transactions are committed on an interval which can be controlled with the producer config akka.kafka.producer.eos-commit-interval-ms, similar to how exactly once works with Kafka Streams. The default value is 100ms. The larger commit interval is the more records will need to be reprocessed in the event of failure and the transaction is aborted.

When the stream is materialized the producer will initialize the transaction for the provided transactionalId and a transaction will begin. Every commit interval (eos-commit-interval-ms) we check if there are any offsets available to commit. If offsets exist then we suspend backpressured demand while we drain all outstanding messages that have not yet been successfully acknowledged (if any) and then commit the transaction. After the commit succeeds a new transaction is begun and we re-initialize demand for upstream messages.

To gracefully shutdown the stream and commit the current transaction you must call shutdown() on the Control materialized value to await all produced message acknowledgements and commit the final transaction.

An oversimplified example using RestartableSource to recover from transient errors encountered in a stream. For a more detailed example see the "successful consume transform produce transaction with transient failure causing an abort with restartable source" test in IntegrationSpec.

    var innerControl = null.asInstanceOf[Control]

    val stream = RestartSource.onFailuresWithBackoff(
      minBackoff = 1.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2
    ) { () =>
      Consumer.transactionalSource(consumerSettings, Subscriptions.topics("source-topic"))
        .via(business)
        .map { msg =>
          ProducerMessage.Message(
            new ProducerRecord[Array[Byte], String]("sink-topic", msg.record.value), msg.partitionOffset)
        }
        // side effect out the `Control` materialized value because it can't be propagated through the `RestartSource`
        .mapMaterializedValue(innerControl = _)
        .via(Producer.transactionalFlow(producerSettings, "transactional-id"))
    }

    stream.runWith(Sink.ignore)

    // Add shutdown hook to respond to SIGTERM and gracefully shutdown stream
    Runtime.getRuntime.addShutdownHook(new Thread { Await.result(innerControl.shutdown(), 10.seconds) })

Copy link

@maasg maasg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive yet complex code. It shows why transactions are hard.
As I don't have the background, I cannot comment on the semantics of the implementation.
Instead, I've focused on the code and left comments where I saw an opportunity for improvement.

case Some(Failure(ex)) => failStage(ex)
case None => failStage(new IllegalStateException("Stage completed, but there is no info about status"))
}
override protected def logSource: Class[_] = classOf[ProducerStage[K, V, P]]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type information here will get erased. Probably classOf[ProducerStage[_, _, _]] would be more appropriate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

def produce(msg: Message[K, V, P]): Unit = {
val r = Promise[Result[K, V, P]]
producer.send(msg.record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception) = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

((minor|style)) lifting the Exception here to a Try could make the code much more concise

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but I'm not sure there's much value in doing it in this one use case where we handle a producer Callback. The produce method's code was moved, but otherwise unchanged from its original implementation.

case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
}
private def onCommitInterval(): Unit = {
if (!commitInProgress) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the guard in this case.
this is equivalent to commitInProgress = true

Copy link
Member Author

@seglo seglo Mar 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onCommitInterval is called on the normal commit interval to start a commit and when draining producer ack's while a commit is in progress. The guard is just to prevent re-assigning commitInProgress = true in the latter case, but it doesn't really matter since its threadsafe anyway. I could have created a separate schedule, but since the callback is the same (except to assign commitInProgress) I just re-used it.

if (!commitInProgress) {
commitInProgress = true
}
val awaitingConfirmationCount = awaitingConfirmation.get
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

((style)) as awaitingConfirmationCount is only used in the if consider compacting the two statements together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was a leftover from earlier troubleshooting.

+1

}

private[kafka] trait MessageCallback[K, V, P] {
def awaitingConfirmation: AtomicInteger
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

((style)) is it correct that the previous trait has default implementations and this does not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -418,5 +420,109 @@ class IntegrationSpec extends TestKit(ActorSystem("IntegrationSpec"))

}
}

"successful consume transform produce transaction" in {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consume-transform-produce ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's how it's referred to in KIP-98. I'll update all references +1

Consumer.transactionalSource(consumerSettings, TopicSubscription(Set(sourceTopic)))
.filterNot(_.record.value() == InitialMsg)
.map { msg =>
if (msg.record.value().equals("500") && restartCount < 2) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the producer above looks like it produces integers (1 to 1000) but this check is against a String value. (maybe there's some .toString transformation in between, but it's not obvious from the test)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the existing integration test infrastructure uses key, value serdes of Array[Byte], String.


They override producer properties `enable.idempotence` to `true` and `max.in.flight.requests.per.connection` to `1` as required by the Kafka producer to enable transactions.

A transactional ID must be defined and unique for each instance of the application.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'transactional ID' - I'd certainly look for another name. I find it confusing with the ubiquitous transactionId

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in reference to the transactional.id kafka producer config.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK if it's known in the "local dialect" :)

class TransactionsFailureRetryExample extends ConsumerExample {
def main(args: Array[String]): Unit = {
// #transactionalFailureRetry
var innerControl = null.asInstanceOf[Control]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider

var innerControl : Control = _

to avoid the noisy asInstanceOf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually not possible (results in a compiler error) because innerControl is a variable. See this SO answer for a summary as to why: https://stackoverflow.com/a/36955530/895309

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I didn't know about the scope difference. Thanks for the link.
How about:

var innerControl = null.asInstanceOf[Control]
// vs
var innerControl:Control = null

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that works.

stream.runWith(Sink.ignore)

// Add shutdown hook to respond to SIGTERM and gracefully shutdown stream
Runtime.getRuntime.addShutdownHook(new Thread { Await.result(innerControl.shutdown(), 10.seconds) })
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider:

sys.ShutdownHookThread {
  Await.result(innerControl.shutdown(), 10.seconds)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice +1

@seglo
Copy link
Member Author

seglo commented Mar 27, 2018

Thanks for the thorough review @maasg ! I've addressed much of it in a commit I just pushed. I'm going to tackle some of the re-factorings you suggested offline tomorrow (FTR: unit tests, encapsulate transaction state with an ADT).

private[kafka] class EmptyTransactionOffsetBatch extends TransactionOffsetBatch {
override def nonEmpty: Boolean = false
override def updated(partitionOffset: PartitionOffset): TransactionOffsetBatch = new NonemptyTransactionOffsetBatch(partitionOffset)
override def group: String = throw new IllegalStateException("Empty batch has no group defined")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether group should rather be an Option[String] and this should return a None instead.

@seglo seglo force-pushed the transactions branch 2 times, most recently from 59a33c4 to afb5028 Compare April 4, 2018 22:00
@seglo
Copy link
Member Author

seglo commented Apr 4, 2018

@patriknw @ktoso Hey. I've been tinkering with this PR and I think it's ready for more eyes. I've squashed and rebased. I realise this is a bit of a monster so I'm not looking for any kind of quick turnaround, but I just wanted your advice on who might be an ideal reviewer so I can poke the right people.

Copy link

@fchaillou fchaillou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great Job,
I would love to have that merge, it would allow me to replace my manualy crafted actor version for that use-case with going back to using this great library !

final case class TransactionalMessage[K, V](
record: ConsumerRecord[K, V],
partitionOffset: PartitionOffset
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget to make this class implement PartitionOffsetMessage ?

Copy link
Member Author

@seglo seglo Apr 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an artifact of an earlier implementation attempt where I tried to generalize consumer group committable messages and transactional messages by extracting PartitionOffsetMessage into a trait. I didn't pursue this option because I thought it would be more appropriate for the ProducerStage to control the transaction commits instead of the user, like they're done in Kafka Streams. For now I'll remove the PartitionOffsetMessage since it's not being used.

extends TransactionBatch {
private val offsets = tail + (head.key -> head.offset)

def group: String = offsets.keys.head.groupId

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not only head.key.groupId ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking of it you could probably simplify the NonemptyTransactionBatch by directly storing the offsets and groupId :

class EmptyTransactionBatch extends TransactionBatch {
    override def updated(partitionOffset: PartitionOffset): TransactionBatch = new NonemptyTransactionBatch(partitionOffsetkey.groupId, Map(partitionOffset.key -> partitionOffset.offset))
}

 class NonemptyTransactionBatch(val group : String, offsets : Map[GroupTopicPartition, Long]) {
         def offsetMap(): Map[TopicPartition, OffsetAndMetadata] = offsets.map {
           case (gtp, offset) => new TopicPartition(gtp.topic, gtp.partition) -> new OffsetAndMetadata(offset + 1)
        }

override def updated(partitionOffset: PartitionOffset): TransactionBatch = {
     require(
          group == partitionOffset.key.groupId,
        s"Transaction batch must contain messages from exactly 1 consumer group. $group != ${partitionOffset.key.groupId}")
 
         val updatedOffsets = offsets + (partitionOffset.key -> partitionOffset.offset)

      new NonemptyTransactionBatch(group, updateOffsets)
    }
}


}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing in the group name may be a good idea. I want to give the NonemptyTransactionBatch the property of never being empty, so I was inspired by the NonEmptyList implementation in cats to provide the head element in the default constructor.

@@ -146,7 +148,8 @@ final class ProducerSettings[K, V](
val valueSerializerOpt: Option[Serializer[V]],
val closeTimeout: FiniteDuration,
val parallelism: Int,
val dispatcher: String
val dispatcher: String,
val eosCommitIntervalMs: Long

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if that makes sense but in some use case the user might want to be able to decide to commit a transaction after a specific number of input message were fully processed.
I think it should be doable by tracking an awaitingTransactionCommit the same way the current awaitingConfirmation works.
This way you could support the use case where the user want to make sure each input message generate a transaction (or more than one if wanted)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. There are several reasons I didn't implement this approach, but they may not be too compelling. I welcome a challenge here.

  1. KIP-129 nor the current Kafka Streams implementation (AFAICT) exposes a way to batch a transaction in this manner.
  2. In a Consume-Transform-Produce workflow we have strong guarantees that the source message is already persisted and possibly replicated, so it's not critical we must process the message successfully the first time. If the batch fails for some transient reason then it can be processed the next time. I acknowledge this reason is more of an opinion than fact since there are other things that can go wrong, such as the retention period for that source message elapsing, etc.
  3. It would be a little more implementation work and I wanted to see what people thought of this approach (which mirrors what's done in Kafka Streams) before I extended the implementation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your points are completely valid.
Let's get this implementation merged and after I could take a look at adding this feature if I really feel it important !

Copy link

@SemanticBeeng SemanticBeeng Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also think we need extra support on top of this current Kafka transactions semantics to implement things like sagas, especially in the context of "data pipelines" and not just "microservices".

https://microservices.io/patterns/data/saga.html

A semantics of kafka transactions that is working in sync with user controlled batches would at least allow tracking the incremental data flow between two components at time from the overall ("data pipeline") orchestration.

Sagas would need to be implemented on top.
Since Kafka transactions are likely not suitable for implementing data flow (again, in context of "data pipelines") across all components of the overall orchestration, we still need sagas on top.

But still can see how it would be useful to control with kafka transactions that all the batches between two ("at a time") of the components in the overall orchestration are exchanged with "exactly once semantics".

Does this make sense?
cc @seglo @fchaillou

@seglo
Copy link
Member Author

seglo commented Apr 6, 2018

Thanks a lot for the review @fchaillou ! I provided feedback and added a commit incorporating some of your suggestions.


All of the scenarios covered in the @ref[At-Least-Once Delivery documentation](atleastonce.md) (Multiple Effects per Commit, Non-Sequential Processing, and Conditional Message Processing) are applicable to transactional scenarios as well.

Only one application instance per `transactional.id` is allowed. If two application instances with the same `transactional.id` are run at the same time then the instance that registers with Kafka's transaction coordinator second will throw a `ProducerFencedException` so it doesn't interfere with transactions in process by the first instance. To distribute multiple transactional workflows for the same subscription the user must manually subdivide the subscription across multiple instances of the application. This may be handled internally in future versions.
Copy link
Contributor

@rafalmag rafalmag Apr 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In KafkaStreams they add a sufix with "group, topic, partition" to transactional.id . It was the most challenging thing that I tried to explain in comment as in my opinion in most use cases there would be multiple "processors" that would handle the same work (same input topics). We cannot generate new transactional.id for new processing instance as then zombie fencing would not work either.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of agree with you in the way that using auto-assignment of topic-partition with transactional.id is hard but not unfeasable (something like a transactional.id based on "topic-hostname" or equivalent should work).
In my personal use-case where i'm using transactions without reactive-kafka for now, i'm doing manual subscription on the topic with one pair of kafka-producer/kafka-consumer per partition. In that case it is easy to use the topic-partition as the transaction.id.

The current PR would allow me to do the same use-case and I think the proposed solution is flexible as it let's the user use it in several ways for different use case.

Copy link
Contributor

@rafalmag rafalmag Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might still break exactly once processing. Simple example:

  • single group name
  • single topic T with 3 partitions: p1, p2, p3. Each partition has a lot of messages, p1 has p1m1,p1m2, ... p2 has p2m1,p2m2, etc. - offset 0
  • processing on 2 hosts/nodes/processes: A,B

After a system start (deployment/restart, etc.):

  • node A registers, kafka assigns p1,p2,p3 partitions and the p1m1, p2m1, p3m1 messages are pulled.
  • node B registers, kafka would reassign the partitions, for example A to p1,p2 and B to p3.
    Process B could potentially pull the message from p3 - so it would process p3m1.

And now the race between A and B happens - the outcome should be that the processing of p3m1 on one of the nodes should finish successfully and the other should fail (InvalidProducerEpoch or ProducerFenced).
In order to make this mechanism work both processes should use the same transactional.id - so during the partition attach and initTransaction the transactional.id epoch is bumped up.

Because of that Transactional.id cannot be host related.
It cannot be common for both processes because then instead of distributed processing one would fence out the other, so in the end only one would be "active".

It would work perfectly if we would have just a single processor - but then the question is why somebody choose Kafka if not for horizontal scaling...

See also https://www.confluent.io/blog/transactions-apache-kafka/ - but that does not give any solution... I cannot find out better solution than just coping the one from kafka streams...

This is extremely important as when this bit is broken then we still have at least once processing instead of exactly once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see also my comment here.

Copy link
Member Author

@seglo seglo Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for joining this discussion @rafalmag , I was hoping you would provide your feedback! Distributed transactional workflows are a hard problem to solve. The implementation in KafkaStreams is complex. I intentionally avoided this use case because I wanted to get the basic workflow for a single application instance working correctly. I'm on the fence about implementing distributed transactional support in reactive-kafka because I think it can be handled by the client configuring appropriate consumer group subscriptions and the transaction.id themselves. This path loses out on the advantages of consumer group rebalancing, but does satisfy distributed transaction workflows. Do you agree?

That said, I understand that it would be wonderful to have some way to support the distributed use case, so let's study our options.

In KafkaStreams the task's Producer's transactional.id is defined in StreamThread. It's a concatenation of the KafkaStreams config application.id and the TaskId. If you review KIP-129 we see the design specifies one KafkaProducer per task. So what is a task?

In Kafka Streams a task has an assigned set of topic partitions from the consumer group. A task can contain multiple StreamThread's to distribute work internally on the host machine. A PartitionGrouper is used to assign tasks to partitions and the DefaultPartitionGrouper assigns each TaskId to a subset of topic partitions.

How the TaskId's are created is not clear to me, but it seems to be related to how many Source's are available for the stream. I think this may translate to the number of active consumers there are for a consumer group. If that's the case then somehow Kafka Streams is aware of other consumers and assigns them ID's, so the assignment can happen, probably thru some metadata Kafka topic for the Kafka Streams app.

Obviously more research needs to be done here, but I agree that the simplest approach is what you already suggested by assigning a producer transaction.id per consumed topic partition. I think this is our go forward solution.

I'll also do a little more digging in the Kafka Streams codebase to see how TaskId's are generated, but if it involves out of band communication to other app instances then I think the solution maybe too high level and complex for the reactive-kafka project.

Copy link
Contributor

@rafalmag rafalmag Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seglo I really like the idea of TransactionalMessage in

  def transactionalSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription): Source[TransactionalMessage[K, V], Control]

and

def transactionalSink[K, V](settings: ProducerSettings[K, V], transactionalId: String): 
Sink[Message[K, V, ConsumerMessage.PartitionOffset], Future[Done]] 

Rebalancing can be detected by consumer in source, transactional id should be set for producer in sink, but this approach really decouples source and sink. That is why I suggested in comment to implement something that receives the logic as a flow and does the creation of consumer, producer and necessary feedback loop in its internals, eg:

def withTransactions(consumerSettings: ConsumerSettings, producerSettings: ProducerSettings, flow: Flow[ConsumerRecord, Seq[ProducerRecord], Mat]): RunnableGraph[Mat]

Previously I thought that method could be implemented using:

  def plainPartitionedSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription): Source[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed]), Control]

so here we get a new substream for each topic-partition pair, so we can attach the flow and then some lazy producer (that would be created taking as an argument the TopicPartition).

BTW: Maybe a corresponding method based on TransactionalMessage could be useful for that:

  def transactionalSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription): Source[(TopicPartition, Source[TransactionalMessage[K, V], NotUsed]), Control]

In the sourcecode of plainPartitionedSource you will find a mechanism of emitting a new substream after rebalancing and a closing the old stream (in case of detach).

Regarding:

I'll also do a little more digging in the Kafka Streams codebase to see how TaskId's are generated, but if it involves out of band communication to other app instances then I think the solution maybe too high level and complex for the reactive-kafka project.

TaskId in the Kafka Streams is just topicGorupId and partition in a form (t_p)

For setting the task Id no extra communication is required. The org.apache.kafka.clients.consumer.Consumer has a nice method:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

and ConsumerRebalanceListener could be used to listen to the rebalancing events to know what partitions are revoked and what new are created.

def transactionalSink[K, V](
settings: ProducerSettings[K, V],
transactionalId: String
): Sink[Message[K, V, ConsumerMessage.PartitionOffset], CompletionStage[Done]] =
Copy link
Contributor

@rafalmag rafalmag Apr 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 other common use cases - for single input message:

  • someone would like to do the message "filtering", so there would be no output kafka message, but the input message should be marked as processed - so offset update and commit should still happen.
  • someone would like to emit multiple messages (possible for different output topics)

This could be modeled as Sink[Seq[Message[K, V, ConsumerMessage.PartitionOffset]], CompletionStage[Done]]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I didn't consider this use case. I'll think about how to support it.

Copy link
Member Author

@seglo seglo Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rafalmag Couldn't this use case be satisfied with non-Transactional consumer group offset commits after the message is processed? What benefit does the user get when they actually use a transaction?

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work. Mostly a nitpick on the config parameter type.

@@ -16,6 +16,9 @@ akka.kafka.producer {
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"

# The time interval to commit a transaction when using the `transactionalSink` or `transactionalFlow`
eos-commit-interval-ms = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Config durations instead: eos-commit-interval = 100ms and carry it through as FiniteDuration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -0,0 +1,72 @@
# Transactions

Kafka Transactions provide guarantees that messages processed in a consume-transform-produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as "committed" only once the transformed messages are successfully produced to the sink.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to add links into the Kafka documentation on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a sentence in the introduction paragraph referencing the KIP for full details. This link was already included in the Further Reading section, but there's value in referencing it early on too.

.map(msg ->
new ProducerMessage.Message<byte[], String, ConsumerMessage.PartitionOffset>(
new ProducerRecord<>("sink-topic", msg.record().value()), msg.partitionOffset()))
.runWith(Sink.ignore(), materializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to show a transactionalSource without a transactionalSink? Is this snippet even referenced?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this wasn't used and wouldn't make much sense anyway. It was a remnant from an earlier documentation attempt (documenting it across existing consumer and producer doc pages) that I later abandoned.

@seglo
Copy link
Member Author

seglo commented Apr 30, 2018

Thanks for the review @ennru. I've incorporated your feedback.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, @2m will review as well and we're close merge it!

def transactionalSource[K, V](consumerSettings: ConsumerSettings[K, V], subscription: Subscription): Source[TransactionalMessage[K, V], Control] =
scaladsl.Consumer.transactionalSource(consumerSettings, subscription)
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect a type error here as we upgraded to Akka 2.5.12.
Could you rebase, please.

@seglo
Copy link
Member Author

seglo commented May 17, 2018

Thanks @ennru . I rebased and successfully ran the testsuite. I did not see the type error in javadsl.Consumer that you anticipated. Let me know if I missed something here.

On an unrelated note I noticed that the resume consumer from committed offset after retention period test in IntegrationSpec takes over 2 minutes to run because of the Thread.sleep's, which significantly bloats the time it takes to run the testsuite especially when targeting other JVM/Scala versions in Travis. I assume this is an intentional test to show how to recover from an offset after consumer group offset information is lost, but maybe we can tag it differently so the IntegrationSpec can be run more quickly during development? Another option may be to decrease the retention period for the internal consumer offsets topic so we don't need to wait as long.

@ennru
Copy link
Member

ennru commented May 17, 2018

Great, thank you. If the compiler is happy, I'm happy.

Yes, that test is a bit in the way. The retention period in Kafka can't be less than one minute, that's why it takes so long time. But you're right, we should move it to a separate class.

private var batchOffsets = TransactionBatch.empty

override def preStart(): Unit = {
initTransactions()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls KafkaProducer.initTransactions() which in its turn does a blocking operation (latch.await()). Same with the implementation of abortTransaction().

These operations are only called once during start-up and once on completion so I do not think there is a big risk where all of the threads would be blocked in a pool and other parts of the stream could not proceed.

That might happen if more than one stage with TransactionProducerStageLogic is started up. Therefore it might be worth setting an IO dispatcher attribute on the stages that run TransactionProducerStageLogic.

Copy link
Member

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Exciting stuff! Great work @seglo!

@ennru ennru merged commit a41ef17 into akka:master May 18, 2018
@ennru
Copy link
Member

ennru commented May 18, 2018

Thank you for this great new functionality!

@ennru ennru added this to the 0.21 milestone May 18, 2018
@seglo
Copy link
Member Author

seglo commented May 18, 2018

Great! Thank you and all the reviewers :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants