Skip to content

Commit

Permalink
Support committing of metadata for partitioned sources (#579)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnclara authored and ennru committed Oct 1, 2018
1 parent adb2894 commit aa7c2f5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/akka/kafka/internal/ConsumerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ private[kafka] object ConsumerStage {
}

private[kafka] final class CommittableSubSource[K, V](settings: ConsumerSettings[K, V],
subscription: AutoSubscription)
subscription: AutoSubscription,
_metadataFromRecord: ConsumerRecord[K, V] => String =
(_: ConsumerRecord[K, V]) => OffsetFetchResponse.NO_METADATA)
extends KafkaSourceStage[K, V, (TopicPartition, Source[CommittableMessage[K, V], NotUsed])](
s"CommittableSubSource ${subscription.renderStageAttribute}"
) {
override protected def logic(shape: SourceShape[(TopicPartition, Source[CommittableMessage[K, V], NotUsed])]) =
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription)
with CommittableMessageBuilder[K, V] with MetricsControl {
override def metadataFromRecord(record: ConsumerRecord[K, V]): String = OffsetFetchResponse.NO_METADATA
override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record)
override def groupId: String = settings.properties(ConsumerConfig.GROUP_ID_CONFIG)
lazy val committer: Committer = {
val ec = materializer.executionContext
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/akka/kafka/javadsl/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,24 @@ object Consumer {
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava

/**
* The same as [[#plainPartitionedSource]] but with offset commit with metadata support
*/
def commitWithMetadataPartitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
metadataFromRecord: java.util.function.Function[ConsumerRecord[K, V], String]
): Source[Pair[TopicPartition, Source[CommittableMessage[K, V], NotUsed]], Control] =
scaladsl.Consumer
.commitWithMetadataPartitionedSource(settings,
subscription,
(record: ConsumerRecord[K, V]) => metadataFromRecord(record))
.map {
case (tp, source) => Pair(tp, source.asJava)
}
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava

/**
* Special source that can use external `KafkaAsyncConsumer`. This is useful in case when
* you have lot of manually assigned topic-partitions and want to keep only one kafka consumer
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/akka/kafka/scaladsl/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ object Consumer {
): Source[(TopicPartition, Source[CommittableMessage[K, V], NotUsed]), Control] =
Source.fromGraph(new ConsumerStage.CommittableSubSource[K, V](settings, subscription))

/**
* The same as [[#plainPartitionedSource]] but with offset commit with metadata support
*/
def commitWithMetadataPartitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
metadataFromRecord: ConsumerRecord[K, V] => String
): Source[(TopicPartition, Source[CommittableMessage[K, V], NotUsed]), Control] =
Source.fromGraph(new ConsumerStage.CommittableSubSource[K, V](settings, subscription, metadataFromRecord))

/**
* Special source that can use an external `KafkaAsyncConsumer`. This is useful when you have
* a lot of manually assigned topic-partitions and want to keep only one kafka consumer.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ To get delivery guarantees, please read about @ref[transactions](transactions.md

`Consumer.plainPartitionedSource`
(@scala[@scaladoc[Consumer API](akka.kafka.scaladsl.Consumer$)]@java[@scaladoc[Consumer API](akka.kafka.javadsl.Consumer$)])
and `Consumer.committablePartitionedSource` support tracking the automatic partition assignment from Kafka. When a topic-partition is assigned to a consumer, this source will emit a tuple with the assigned topic-partition and a corresponding source. When a topic-partition is revoked, the corresponding source completes.
, `Consumer.committablePartitionedSource`, and `Consumer.commitWithMetadataPartitionedSource` support tracking the automatic partition assignment from Kafka. When a topic-partition is assigned to a consumer, this source will emit a tuple with the assigned topic-partition and a corresponding source. When a topic-partition is revoked, the corresponding source completes.

Backpressure per partition with batch commit:

Expand Down

0 comments on commit aa7c2f5

Please sign in to comment.