Skip to content

Commit

Permalink
spring-projectsGH-1948: KafkaTemplate Receive Multiple Records
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Sep 27, 2021
1 parent 39eefc8 commit 827b7c9
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 19 deletions.
13 changes: 10 additions & 3 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2378,18 +2378,25 @@ The `Collection` beans will be removed in a future release.

This section covers how to use `KafkaTemplate` to receive messages.

Starting with version 2.8, the template has two `receive()` methods:
Starting with version 2.8, the template has four `receive()` methods:

====
[source, jvava]
[source, java]
----
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
----
====

As you can see, you need to know the partition and offset of the record you need to retrieve; a new `Consumer` is created (and closed) for each operation.
As you can see, you need to know the partition and offset of the record(s) you need to retrieve; a new `Consumer` is created (and closed) for each operation.

With the last two methods, each record is retrieved individually and the results assembled into a `ConsumerRecords` object.
When creating the `TopicPartitionOffset` s for the request, only positive, absolute offsets are supported.

[[container-props]]
==== Listener Container Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -31,6 +33,7 @@
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -298,6 +301,25 @@ default ProducerFactory<K, V> getProducerFactory() {
@Nullable
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);

/**
* Receive a multiple records with the default poll timeout (5 seconds). Only
* absolute, positive offsets are supported.
* @param requested a collection of record requests (topic/partition/offset).
* @return the records
* @since 2.8
* @see #DEFAULT_POLL_TIMEOUT
*/
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);

/**
* Receive multiple records. Only absolute, positive offsets are supported.
* @param requested a collection of record requests (topic/partition/offset).
* @param pollTimeout the timeout.
* @return the record or null.
* @since 2.8
*/
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);

/**
* A callback for executing arbitrary operations on the {@link Producer}.
* @param <K> the key type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -54,6 +57,7 @@
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
Expand Down Expand Up @@ -558,7 +562,6 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
producerForOffsets().sendOffsetsToTransaction(offsets, groupMetadata);
}


@Override
@Nullable
public ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
Expand All @@ -568,19 +571,51 @@ public ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
@Override
@Nullable
public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
Properties props = oneOnly();
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
return receiveOne(topicPartition, offset, pollTimeout, consumer);
}
}

@Override
@Nullable
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested) {
return receive(requested, DEFAULT_POLL_TIMEOUT);
}

@Override
@Nullable
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
Properties props = oneOnly();
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
requested.forEach(tpo -> {
ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>()).add(one);
});
return new ConsumerRecords<>(records);
}
}

private Properties oneOnly() {
Assert.notNull(this.consumerFactory, "A consumerFactory is required");
Properties props = new Properties();
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
if (records.count() == 1) {
return records.iterator().next();
}
return null;
return props;
}

@Nullable
private ConsumerRecord<K, V> receiveOne(TopicPartition topicPartition, long offset, Duration pollTimeout,
Consumer<K, V> consumer) {

consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
if (records.count() == 1) {
return records.iterator().next();
}
return null;
}

private Producer<K, V> producerForOffsets() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,6 +45,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
Expand Down Expand Up @@ -144,9 +147,9 @@ void testTemplate() {
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

template.send(INT_KEY_TOPIC, 0, null, "qux");
template.send(INT_KEY_TOPIC, 1, null, "qux");
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(allOf(keyValue(null, "qux"), partition(0)));
assertThat(received).has(allOf(keyValue(null, "qux"), partition(1)));

template.send(MessageBuilder.withPayload("fiz")
.setHeader(KafkaHeaders.TOPIC, INT_KEY_TOPIC)
Expand All @@ -157,11 +160,11 @@ void testTemplate() {
assertThat(received).has(allOf(keyValue(2, "fiz"), partition(0)));

template.send(MessageBuilder.withPayload("buz")
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.PARTITION_ID, 1)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(allOf(keyValue(2, "buz"), partition(0)));
assertThat(received).has(allOf(keyValue(2, "buz"), partition(1)));

Map<MetricName, ? extends Metric> metrics = template.execute(Producer::metrics);
assertThat(metrics).isNotNull();
Expand All @@ -173,10 +176,26 @@ void testTemplate() {
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
template.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 0, received.offset());
assertThat(receive).has(allOf(keyValue(2, "buz"), partition(0)))
ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 1, received.offset());
assertThat(receive).has(allOf(keyValue(2, "buz"), partition(1)))
.extracting(rec -> rec.offset())
.isEqualTo(received.offset());
ConsumerRecords<Integer, String> records = template.receive(List.of(
new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L),
new TopicPartitionOffset(INT_KEY_TOPIC, 0, 1L),
new TopicPartitionOffset(INT_KEY_TOPIC, 0, 0L),
new TopicPartitionOffset(INT_KEY_TOPIC, 1, 0L)));
assertThat(records.count()).isEqualTo(4);
Set<TopicPartition> partitions2 = records.partitions();
assertThat(partitions2).containsExactly(
new TopicPartition(INT_KEY_TOPIC, 1),
new TopicPartition(INT_KEY_TOPIC, 0));
assertThat(records.records(new TopicPartition(INT_KEY_TOPIC, 1)))
.extracting(rec -> rec.offset())
.containsExactly(1L, 0L);
assertThat(records.records(new TopicPartition(INT_KEY_TOPIC, 0)))
.extracting(rec -> rec.offset())
.containsExactly(1L, 0L);
pf.destroy();
}

Expand Down

0 comments on commit 827b7c9

Please sign in to comment.