Skip to content

Commit

Permalink
Use maximal offset when committing transaction in TransactionalProduc…
Browse files Browse the repository at this point in the history
…erStage

Callbacks from `producer.send` are guaranteed to execute in order
only for a single output partition. That means we can have a race condition
where we execute a callback for input record with offset 1 before
executing a callback for input record with offset 0. That causes
`NonEmptyTransactionBatch` to contain offset 0, when committing
transaction. That leads to data duplication.

This fix ensures that we only increase the offsets stored in
`TransactionBatch`. Since having all consecutive offsets wrote to Kafka
is guaranteed by `awaitingConfirmation == 0`, we can only keep the
maximal offset in the `TransactionBatch`.
  • Loading branch information
Szymon Matejczyk committed Mar 8, 2019
1 parent 6718b52 commit 6411558
Showing 1 changed file with 6 additions and 1 deletion.
Expand Up @@ -54,7 +54,12 @@ private object TransactionalProducerStage {
final class NonemptyTransactionBatch(head: PartitionOffset,
tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]())
extends TransactionBatch {
private val offsets = tail + (head.key -> head.offset)
// There is no guarantee that offsets adding callbacks will be called in any particular order.
// Decreasing an offset stored for the KTP would mean possible data duplication.
// Since `awaitingConfirmation` counter guarantees that all writes finished, we can safely assume
// that all all data up to maximal offsets has been wrote to Kafka.
private val previousHighest = tail.getOrElse(head.key, -1L)
private val offsets = tail + (head.key -> head.offset.max(previousHighest))

def group: String = head.key.groupId
def offsetMap(): Map[TopicPartition, OffsetAndMetadata] = offsets.map {
Expand Down

0 comments on commit 6411558

Please sign in to comment.