Skip to content

Commit

Permalink
Merge pull request #2 from tonyblundell/commit-offsets-only-for-assig…
Browse files Browse the repository at this point in the history
…ned-partitions

Commit offsets only for assigned partitions
  • Loading branch information
aaronp committed Sep 1, 2020
2 parents b32340a + c3909c9 commit c898980
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
7 changes: 6 additions & 1 deletion src/main/scala/kafka4m/consumer/RichKafkaConsumer.scala
Expand Up @@ -139,7 +139,12 @@ final class RichKafkaConsumer[K, V] private (val consumer: KafkaConsumer[K, V],
}
}
logger.debug(s"commitAsync($state)")
consumer.commitAsync(state.asTopicPartitionMapJava, callback)

// Only commit offsets for partitions that we're currently assigned to.
val assignedPartitions = assignments()
val offsetsToCommit = state.asTopicPartitionMap.view.filterKeys(assignedPartitions.contains).toMap

consumer.commitAsync(offsetsToCommit.asJava, callback)
} else {
logger.trace(s"NOT committing empty state")
promise.trySuccess(Map.empty)
Expand Down
15 changes: 11 additions & 4 deletions src/test/scala/kafka4m/consumer/AckableRecordTest.scala
Expand Up @@ -39,11 +39,19 @@ class AckableRecordTest extends BaseKafka4mDockerSpec {
beforeStatus.flatMap(_.offsetsByPartition.values).foreach(_ shouldBe 0L)

When("we consume some messages then ask an ackable record to ack")
var ackPos: Future[Map[TopicPartition, OffsetAndMetadata]] = null
var ackOffset: PartitionOffsetState = null
val ackStream = ackableRecords.zipWithIndex.map {
case (record, i) if i == numRecords / 2 =>
ackPos = record.commitPosition()
And("The current state contains offsets for a partition we aren't currently assigned to")
val state = record.offset.incOffsets().update(topic, 999, 999)
whenReady(record.commit(state)) { committedOffsets =>
Then("We should kermit offsets only for the currently assigned partition")
committedOffsets.keys.size shouldBe 1
val (topicPartition, offsetAndMetadata) = committedOffsets.head
topicPartition.topic shouldBe topic
topicPartition.partition shouldBe 0
offsetAndMetadata.offset shouldBe 51
}
ackOffset = record.offset.incOffsets()
record
case (record: AckBytes, i) if i == numRecords - 1 =>
Expand All @@ -58,8 +66,7 @@ class AckableRecordTest extends BaseKafka4mDockerSpec {

val lastValue = shared.take(numRecords).delayOnNext(5.millis).lastL.runToFuture.futureValue

Then("we should see the offsets/partitions kermitted in Kafka")
ackPos should not be null
And("we should see the offsets/partitions kermitted in Kafka")
val readFromKafka: Seq[ConsumerGroupStats] = testAdmin.consumerGroupsStats.futureValue
readFromKafka.flatMap(_.forTopic(topic).offsetsByPartition).toMap shouldBe Map(0 -> 51)
lastValue.offset.offsetByPartitionByTopic(topic) shouldBe Map(0 -> (numRecords - 1))
Expand Down

0 comments on commit c898980

Please sign in to comment.