implements StreamableMessageSour
/**
* Instantiate a {@link StreamableKafkaMessageSource} based on the fields contained in the {@link Builder}.
*
- * Will assert that the {@link ConsumerFactory} and {@link Fetcher} are not {@code null}. An {@link
- * AxonConfigurationException} is thrown if any of them is not the case.
+ * Will assert that the {@link ConsumerFactory} and {@link Fetcher} are not {@code null}. An
+ * {@link AxonConfigurationException} is thrown if any of them is not the case.
*
* @param builder the {@link Builder} used to instantiate a {@link StreamableKafkaMessageSource} instance
*/
@@ -95,9 +97,9 @@ protected StreamableKafkaMessageSource(Builder builder) {
*
* The {@code topics} list is defaulted to single entry of {@code "Axon.Events"}, {@code groupIdPrefix} defaults to
* {@code "Axon.Streamable.Consumer-"} and it's {@code groupIdSuffixFactory} to a {@link UUID#randomUUID()}
- * operation, the {@link KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the {@link
- * XStreamSerializer} and the {@code bufferFactory} the {@link SortedKafkaMessageBuffer} constructor. The {@link
- * ConsumerFactory} and {@link Fetcher} are hard requirements and as such should be provided.
+ * operation, the {@link KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the
+ * {@link XStreamSerializer} and the {@code bufferFactory} the {@link SortedKafkaMessageBuffer} constructor. The
+ * {@link ConsumerFactory} and {@link Fetcher} are hard requirements and as such should be provided.
*
* @param the key of the {@link ConsumerRecords} to consume, fetch and convert
* @param the value type of {@link ConsumerRecords} to consume, fetch and convert
@@ -110,8 +112,8 @@ public static Builder builder() {
/**
* {@inheritDoc}
*
- * The stream is filled by polling {@link ConsumerRecords} from the specified {@code topic} with the {@link
- * Fetcher}. The provided {@code trackingToken} is required to be of type {@link KafkaTrackingToken}.
+ * The stream is filled by polling {@link ConsumerRecords} from the specified {@code topic} with the
+ * {@link Fetcher}. The provided {@code trackingToken} is required to be of type {@link KafkaTrackingToken}.
*/
@Override
public BlockingStream> openStream(TrackingToken trackingToken) {
@@ -127,14 +129,29 @@ public BlockingStream> openStream(TrackingToken trackingT
return new KafkaMessageStream(buffer, closeHandler);
}
+ @Override
+ public TrackingToken createHeadToken() {
+ return KafkaTrackingToken.newInstance(ConsumerPositionsUtil.getHeadPositions(
+ consumerFactory.createConsumer(null),
+ topics));
+ }
+
+ @Override
+ public TrackingToken createTokenAt(Instant dateTime) {
+ return KafkaTrackingToken.newInstance(ConsumerPositionsUtil.getPositionsBasedOnTime(
+ consumerFactory.createConsumer(null),
+ topics,
+ dateTime));
+ }
+
/**
* Builder class to instantiate a {@link StreamableKafkaMessageSource}.
*
* The {@code topics} list is defaulted to single entry of {@code "Axon.Events"}, {@code groupIdPrefix} defaults to
* {@code "Axon.Streamable.Consumer-"} and it's {@code groupIdSuffixFactory} to a {@link UUID#randomUUID()}
- * operation, the {@link KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the {@link
- * XStreamSerializer} and the {@code bufferFactory} the {@link SortedKafkaMessageBuffer} constructor. The {@link
- * ConsumerFactory} and {@link Fetcher} are hard requirements and as such should be provided.
+ * operation, the {@link KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the
+ * {@link XStreamSerializer} and the {@code bufferFactory} the {@link SortedKafkaMessageBuffer} constructor. The
+ * {@link ConsumerFactory} and {@link Fetcher} are hard requirements and as such should be provided.
*
* @param the key of the {@link ConsumerRecords} to consume, fetch and convert
* @param the value type of {@link ConsumerRecords} to consume, fetch and convert
@@ -149,8 +166,8 @@ public static class Builder {
private Supplier serializer;
/**
- * Sets the {@link Serializer} used to serialize and deserialize messages. Defaults to a {@link
- * XStreamSerializer}.
+ * Sets the {@link Serializer} used to serialize and deserialize messages. Defaults to a
+ * {@link XStreamSerializer}.
*
* @param serializer a {@link Serializer} used to serialize and deserialize messages
* @return the current Builder instance, for fluent interfacing
@@ -191,8 +208,8 @@ public Builder addTopic(String topic) {
* Sets the prefix of the Consumer {@code groupId} from which a {@link Consumer} should retrieve records from.
* Defaults to {@code "Axon.Streamable.Consumer-"}.
*
- * @param groupIdPrefix a {@link String} defining the prefix of the Consumer Group id to which a {@link
- * Consumer} should retrieve records from
+ * @param groupIdPrefix a {@link String} defining the prefix of the Consumer Group id to which a
+ * {@link Consumer} should retrieve records from
* @return the current Builder instance, for fluent interfacing
* @deprecated value is not used anymore, as a {@code groupId} is no longer used. Instead of the group id the
* topic partitions are manually assigned, using less resources.
@@ -211,8 +228,8 @@ public Builder groupIdPrefix(String groupIdPrefix) {
* Sets the factory that will provide the suffix of the Consumer {@code groupId} from which a {@link Consumer}
* should retrieve records from
*
- * @param groupIdSuffixFactory a {@link Supplier} of {@link String} providing the suffix of the Consumer {@code
- * groupId} from which a {@link Consumer} should retrieve records from
+ * @param groupIdSuffixFactory a {@link Supplier} of {@link String} providing the suffix of the Consumer
+ * {@code groupId} from which a {@link Consumer} should retrieve records from
* @return the current Builder instance, for fluent interfacing
* @deprecated value is not used anymore, as a {@code groupId} is no longer used. Instead of the group id the
* topic partitions are manually assigned, using less resources
@@ -227,8 +244,8 @@ public Builder groupIdSuffixFactory(Supplier groupIdSuffixFactory)
}
/**
- * Sets the {@link ConsumerFactory} to be used by this {@link StreamableKafkaMessageSource} to create {@link
- * Consumer} instances with.
+ * Sets the {@link ConsumerFactory} to be used by this {@link StreamableKafkaMessageSource} to create
+ * {@link Consumer} instances with.
*
* @param consumerFactory a {@link ConsumerFactory} to be used by this {@link StreamableKafkaMessageSource} to
* create {@link Consumer} instances with.
@@ -245,8 +262,8 @@ public Builder consumerFactory(ConsumerFactory consumerFactory) {
* {@link StreamableKafkaMessageSource} to create {@link Consumer} instances with.
*
* @param consumerConfiguration a {@link DefaultConsumerFactory} with the given {@code consumerConfiguration},
- * to be used by this {@link StreamableKafkaMessageSource} to create {@link
- * Consumer} instances with
+ * to be used by this {@link StreamableKafkaMessageSource} to create
+ * {@link Consumer} instances with
* @return the current Builder instance, for fluent interfacing
*/
@SuppressWarnings("unused")
@@ -268,15 +285,15 @@ public Builder fetcher(Fetcher fetcher) {
}
/**
- * Sets the {@link KafkaMessageConverter} used to convert Kafka messages into {@link
- * org.axonframework.eventhandling.EventMessage}s. Defaults to a {@link DefaultKafkaMessageConverter} using the
- * {@link XStreamSerializer}.
+ * Sets the {@link KafkaMessageConverter} used to convert Kafka messages into
+ * {@link org.axonframework.eventhandling.EventMessage}s. Defaults to a {@link DefaultKafkaMessageConverter}
+ * using the {@link XStreamSerializer}.
*
- * Note that configuring a MessageConverter on the builder is mandatory if the value type is not {@code
- * byte[]}.
+ * Note that configuring a MessageConverter on the builder is mandatory if the value type is not
+ * {@code byte[]}.
*
- * @param messageConverter a {@link KafkaMessageConverter} used to convert Kafka messages into {@link
- * org.axonframework.eventhandling.EventMessage}s
+ * @param messageConverter a {@link KafkaMessageConverter} used to convert Kafka messages into
+ * {@link org.axonframework.eventhandling.EventMessage}s
* @return the current Builder instance, for fluent interfacing
*/
public Builder messageConverter(KafkaMessageConverter messageConverter) {
@@ -286,9 +303,9 @@ public Builder messageConverter(KafkaMessageConverter messageConvert
}
/**
- * Sets the {@code bufferFactory} of type {@link Supplier} with a generic type {@link Buffer} with {@link
- * KafkaEventMessage}s. Used to create a buffer which will consume the converted Kafka {@link ConsumerRecords}.
- * Defaults to a {@link SortedKafkaMessageBuffer}.
+ * Sets the {@code bufferFactory} of type {@link Supplier} with a generic type {@link Buffer} with
+ * {@link KafkaEventMessage}s. Used to create a buffer which will consume the converted Kafka
+ * {@link ConsumerRecords}. Defaults to a {@link SortedKafkaMessageBuffer}.
*
* @param bufferFactory a {@link Supplier} to create a buffer for the Kafka records fetcher
* @return the current Builder instance, for fluent interfacing
diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/ConsumerSeekUtilIntegrationTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/ConsumerSeekUtilIntegrationTest.java
index a48dada3..721777fc 100644
--- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/ConsumerSeekUtilIntegrationTest.java
+++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/ConsumerSeekUtilIntegrationTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010-2022. Axon Framework
+ * Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -111,7 +111,7 @@ void tearDown() {
}
@Test
- void testSeekUsingEmptyTokenConsumerStartsAtPositionZero() {
+ void seekUsingEmptyTokenConsumerStartsAtPositionZero() {
String topic = "testSeekUsing_EmptyToken_ConsumerStartsAtPositionZero";
int recordsPerPartitions = 1;
Producer producer = producerFactory.createProducer();
@@ -136,7 +136,7 @@ void testSeekUsingEmptyTokenConsumerStartsAtPositionZero() {
@SuppressWarnings("unchecked")
@Test
- void testSeekUsingExistingTokenConsumerStartsAtSpecificPosition() {
+ void seekUsingExistingTokenConsumerStartsAtSpecificPosition() {
String topic = "testSeekUsing_ExistingToken_ConsumerStartsAtSpecificPosition";
int recordsPerPartitions = 10;
Producer producer = producerFactory.createProducer();
@@ -170,7 +170,7 @@ void testSeekUsingExistingTokenConsumerStartsAtSpecificPosition() {
}
@Test
- void testSeekUsingExistingTokenConsumerStartsAtSpecificPositionAndCanContinueReadingNewRecords() {
+ void seekUsingExistingTokenConsumerStartsAtSpecificPositionAndCanContinueReadingNewRecords() {
String topic = "testSeekUsing_ExistingToken_ConsumerStartsAtSpecificPosition_AndCanContinueReadingNewRecords";
int recordsPerPartitions = 10;
Producer testProducer = producerFactory.createProducer();
diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/ConsumerPositionsUtilIntegrationTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/ConsumerPositionsUtilIntegrationTest.java
new file mode 100644
index 00000000..1f06c459
--- /dev/null
+++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/ConsumerPositionsUtilIntegrationTest.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2010-2023. Axon Framework
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
+import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
+import org.axonframework.extensions.kafka.eventhandling.util.KafkaAdminUtils;
+import org.axonframework.extensions.kafka.eventhandling.util.KafkaContainerTest;
+import org.junit.jupiter.api.*;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.axonframework.extensions.kafka.eventhandling.util.ConsumerConfigUtil.consumerFactory;
+import static org.axonframework.extensions.kafka.eventhandling.util.ProducerConfigUtil.producerFactory;
+import static org.junit.jupiter.api.Assertions.*;
+
+/***
+ * Integration tests spinning up a Kafka Broker to verify whether the {@link ConsumerPositionsUtil}
+ * gets the correct positions.
+ *
+ * @author Gerard Klijs
+ */
+
+class ConsumerPositionsUtilIntegrationTest extends KafkaContainerTest {
+
+ private static final String RECORD_BODY = "foo";
+
+ private static final String[] TOPICS = {"testPositionsUtil"};
+ private static final Integer NR_PARTITIONS = 5;
+
+ private ProducerFactory producerFactory;
+ private ConsumerFactory consumerFactory;
+
+ @BeforeAll
+ static void before() {
+ KafkaAdminUtils.createTopics(getBootstrapServers(), TOPICS);
+ KafkaAdminUtils.createPartitions(getBootstrapServers(), NR_PARTITIONS, TOPICS);
+ }
+
+ @AfterAll
+ public static void after() {
+ KafkaAdminUtils.deleteTopics(getBootstrapServers(), TOPICS);
+ }
+
+ private static void publishRecordsOnPartitions(Producer producer,
+ String topic,
+ int recordsPerPartitions,
+ int partitionsPerTopic) {
+ for (int i = 0; i < recordsPerPartitions; i++) {
+ for (int p = 0; p < partitionsPerTopic; p++) {
+ producer.send(buildRecord(topic, p));
+ }
+ }
+ producer.flush();
+ }
+
+ private static ProducerRecord buildRecord(String topic, int partition) {
+ return new ProducerRecord<>(topic, partition, null, null, RECORD_BODY);
+ }
+
+ @BeforeEach
+ void setUp() {
+ producerFactory = producerFactory(getBootstrapServers());
+ consumerFactory = consumerFactory(getBootstrapServers());
+ }
+
+ @AfterEach
+ void tearDown() {
+ producerFactory.shutDown();
+ }
+
+ @Test
+ void positionsTest() {
+ String topic = "testPositionsUtil";
+
+ Consumer, ?> testConsumer = consumerFactory.createConsumer(null);
+ List topics = Collections.singletonList(topic);
+ assertTrue(ConsumerPositionsUtil.getHeadPositions(testConsumer, topics).isEmpty());
+ assertTrue(ConsumerPositionsUtil.getPositionsBasedOnTime(testConsumer, topics, Instant.now()).isEmpty());
+
+ int recordsPerPartitions = 5;
+ Producer producer = producerFactory.createProducer();
+ publishRecordsOnPartitions(producer, topic, recordsPerPartitions, 5);
+
+ Instant now = Instant.now();
+ publishRecordsOnPartitions(producer, topic, recordsPerPartitions, 5);
+
+ Map headPositions = ConsumerPositionsUtil.getHeadPositions(testConsumer, topics);
+ assertFalse(headPositions.isEmpty());
+ assertEquals(5, headPositions.keySet().size());
+ headPositions.values().forEach(p -> assertEquals(9, p));
+
+ Map positionsBasedOnTime =
+ ConsumerPositionsUtil.getPositionsBasedOnTime(testConsumer, topics, now);
+ assertFalse(positionsBasedOnTime.isEmpty());
+ assertEquals(5, positionsBasedOnTime.keySet().size());
+ positionsBasedOnTime.values().forEach(p -> assertEquals(4, p));
+ }
+}