Skip to content

Commit

Permalink
Remove the use of a consumer group for StreamableKafkaMessageSource.
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed May 10, 2022
1 parent aa62cf4 commit 2238ee3
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 112 deletions.
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2018. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -25,16 +25,20 @@
* @param <V> the value type of a build {@link Consumer} instance
* @author Nakul Mishra
* @author Steven van Beelen
* @author Gerard Klijs
* @since 4.0
*/
@FunctionalInterface
public interface ConsumerFactory<K, V> {

/**
* Create a {@link Consumer} that should be part of the Consumer Group with the given {@code groupId}.
* Create a {@link Consumer} that should be part of the Consumer Group with the given {@code groupId}, or without a
* consumer group if called with {@code null}.
*
* @param groupId a {@link String} defining the group the constructed {@link Consumer} will be a part of
* @return a {@link Consumer} which is part of Consumer Group with the given {@code groupId}
* @param groupId a {@link String} defining the group the constructed {@link Consumer} will be a part of, this can
* be {@code null} to not add it to a group.
* @return a {@link Consumer} which is part of Consumer Group with the given {@code groupId}, or without a groupId
* when called with {@code null}.
*/
Consumer<K, V> createConsumer(String groupId);
}
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.kafka.eventhandling.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Contains static util functions related to the Kafka consumer related to seeking to certain offsets.
*
* @author Gerard Klijs
* @since 4.5.4
*/
public class ConsumerSeekUtil {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private ConsumerSeekUtil() {
//prevent instantiation
}

/**
* Assigns the correct {@link TopicPartition partitions} to the consumer, and seeks to the correct offset, using the
* {@link KafkaTrackingToken}, defaulting to the head of the partition. So for each {@link TopicPartition partition}
* that belongs to the {@code topics}, either it will start reading from the next record of the partition, if
* included in the token, or else from the start.
*
* @param consumer a Kafka consumer instance
* @param tokenSupplier a function that returns the current {@link KafkaTrackingToken}
* @param topics a list of topics that will be assigned to the consumer
*/
public static void seekToCurrentPositions(Consumer<?, ?> consumer, Supplier<KafkaTrackingToken> tokenSupplier,
List<String> topics) {
List<TopicPartition> all = consumer.listTopics().entrySet()
.stream()
.filter(e -> topics.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList());
consumer.assign(all);
KafkaTrackingToken currentToken = tokenSupplier.get();
all.forEach(assignedPartition -> {
Map<TopicPartition, Long> tokenPartitionPositions = currentToken.getPositions();

long offset = 0L;
if (tokenPartitionPositions.containsKey(assignedPartition)) {
offset = tokenPartitionPositions.get(assignedPartition) + 1;
}

logger.info("Seeking topic-partition [{}] with offset [{}]", assignedPartition, offset);
consumer.seek(assignedPartition, offset);
});
}
}
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2018. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -58,13 +58,14 @@ public DefaultConsumerFactory(Map<String, Object> consumerConfiguration) {

@Override
public Consumer<K, V> createConsumer(String groupId) {
if (this.consumerConfiguration.remove(GROUP_ID_CONFIG) != null) {
Map<String, Object> configuration = new HashMap<>(this.consumerConfiguration);
if (configuration.remove(GROUP_ID_CONFIG) != null) {
logger.warn("Found a global {} whilst it is required to be provided consciously", GROUP_ID_CONFIG);
}

Map<String, Object> consumerConfiguration = new HashMap<>(this.consumerConfiguration);
consumerConfiguration.put(GROUP_ID_CONFIG, groupId);
return new KafkaConsumer<>(consumerConfiguration);
if (groupId != null) {
configuration.put(GROUP_ID_CONFIG, groupId);
}
return new KafkaConsumer<>(configuration);
}

/**
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerSeekUtil;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.messaging.StreamableMessageSource;
Expand Down Expand Up @@ -63,8 +64,6 @@ public class StreamableKafkaMessageSource<K, V> implements StreamableMessageSour
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final List<String> topics;
private final String groupIdPrefix;
private final Supplier<String> groupIdSuffixFactory;
private final ConsumerFactory<K, V> consumerFactory;
private final Fetcher<K, V, KafkaEventMessage> fetcher;
private final KafkaMessageConverter<K, V> messageConverter;
Expand Down Expand Up @@ -98,8 +97,6 @@ public static <K, V> Builder<K, V> builder() {
protected StreamableKafkaMessageSource(Builder<K, V> builder) {
builder.validate();
this.topics = Collections.unmodifiableList(builder.topics);
this.groupIdPrefix = builder.groupIdPrefix;
this.groupIdSuffixFactory = builder.groupIdSuffixFactory;
this.consumerFactory = builder.consumerFactory;
this.fetcher = builder.fetcher;
this.messageConverter = builder.messageConverter;
Expand All @@ -117,23 +114,15 @@ public BlockingStream<TrackedEventMessage<?>> openStream(TrackingToken trackingT
KafkaTrackingToken token = KafkaTrackingToken.from(trackingToken);
TrackingRecordConverter<K, V> recordConverter = new TrackingRecordConverter<>(messageConverter, token);

String groupId = buildConsumerGroupId();
logger.debug("Consumer Group Id [{}] will start consuming from topics [{}]", groupId, topics);
Consumer<K, V> consumer = consumerFactory.createConsumer(groupId);
consumer.subscribe(
topics,
new TrackingTokenConsumerRebalanceListener<>(consumer, recordConverter::currentToken)
);
logger.debug("Will start consuming from topics [{}]", topics);
Consumer<K, V> consumer = consumerFactory.createConsumer(null);
ConsumerSeekUtil.seekToCurrentPositions(consumer, recordConverter::currentToken, topics);

Buffer<KafkaEventMessage> buffer = bufferFactory.get();
Registration closeHandler = fetcher.poll(consumer, recordConverter, buffer::putAll);
return new KafkaMessageStream(buffer, closeHandler);
}

private String buildConsumerGroupId() {
return groupIdPrefix + groupIdSuffixFactory.get();
}

/**
* Builder class to instantiate a {@link StreamableKafkaMessageSource}.
* <p>
Expand All @@ -149,8 +138,6 @@ private String buildConsumerGroupId() {
public static class Builder<K, V> {

private List<String> topics = Collections.singletonList("Axon.Events");
private String groupIdPrefix = "Axon.Streamable.Consumer-";
private Supplier<String> groupIdSuffixFactory = () -> UUID.randomUUID().toString();
private ConsumerFactory<K, V> consumerFactory;
private Fetcher<K, V, KafkaEventMessage> fetcher;
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -193,11 +180,16 @@ public Builder<K, V> addTopic(String topic) {
* @param groupIdPrefix a {@link String} defining the prefix of the Consumer Group id to which a {@link
* Consumer} should retrieve records from
* @return the current Builder instance, for fluent interfacing
* @deprecated value is not used anymore, as a {@code groupId} is no longer used. Instead of the group id the
* topic partitions are manually assigned, using less resources.
*/
@Deprecated
@SuppressWarnings("squid:S1133") //Removal will break the API, so can only be done in a new major version.
public Builder<K, V> groupIdPrefix(String groupIdPrefix) {
logger.warn(
"Using groupIdPrefix in the StreamableKafkaMessageSource.Builder has been deprecated and already effectively does nothing.");
assertThat(groupIdPrefix, name -> Objects.nonNull(name) && !"".equals(name),
"The groupIdPrefix may not be null or empty");
this.groupIdPrefix = groupIdPrefix;
return this;
}

Expand All @@ -208,11 +200,15 @@ public Builder<K, V> groupIdPrefix(String groupIdPrefix) {
* @param groupIdSuffixFactory a {@link Supplier} of {@link String} providing the suffix of the Consumer {@code
* groupId} from which a {@link Consumer} should retrieve records from
* @return the current Builder instance, for fluent interfacing
* @deprecated value is not used anymore, as a {@code groupId} is no longer used. Instead of the group id the
* topic partitions are manually assigned, using less resources
*/
@SuppressWarnings("WeakerAccess")
@Deprecated
@SuppressWarnings("squid:S1133") //Removal will break the API, so can only be done in a new major version.
public Builder<K, V> groupIdSuffixFactory(Supplier<String> groupIdSuffixFactory) {
logger.warn(
"Using groupIdSuffixFactory in the StreamableKafkaMessageSource.Builder has been deprecated and already effectively does nothing.");
assertNonNull(groupIdSuffixFactory, "GroupIdSuffixFactory may not be null");
this.groupIdSuffixFactory = groupIdSuffixFactory;
return this;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,11 +19,13 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerSeekUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

Expand All @@ -40,7 +42,11 @@
* @param <V> the value type of the records the {@link Consumer} polls
* @author Steven van Beelen
* @since 4.0
* @deprecated functionality moved to {@link ConsumerSeekUtil#seekToCurrentPositions(Consumer,
* Supplier, List)} when group id was removed from the consumer.
*/
@Deprecated
@SuppressWarnings("squid:S1133") //removing would be a breaking change and can only be done in a major release
public class TrackingTokenConsumerRebalanceListener<K, V> implements ConsumerRebalanceListener {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,6 @@
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.TrackingRecordConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.TrackingTokenConsumerRebalanceListener;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.util.KafkaAdminUtils;
import org.axonframework.extensions.kafka.eventhandling.util.KafkaContainerTest;
Expand All @@ -44,9 +43,9 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.axonframework.extensions.kafka.eventhandling.util.ConsumerConfigUtil.DEFAULT_GROUP_ID;
import static org.axonframework.extensions.kafka.eventhandling.util.ConsumerConfigUtil.consumerFactory;
import static org.axonframework.extensions.kafka.eventhandling.util.ProducerConfigUtil.producerFactory;
import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -186,12 +185,23 @@ void testStartFetcherWithExistingTokenShouldStartAtSpecificPositions() throws In
testPositions.put(new TopicPartition(topic, 4), 0L);
KafkaTrackingToken testStartToken = KafkaTrackingToken.newInstance(testPositions);

Consumer<String, String> testConsumer = consumerFactory(getBootstrapServers()).createConsumer(
DEFAULT_GROUP_ID);
testConsumer.subscribe(
Collections.singletonList(topic),
new TrackingTokenConsumerRebalanceListener<>(testConsumer, () -> testStartToken)
);
Consumer<String, String> testConsumer = consumerFactory(getBootstrapServers()).createConsumer(null);
List<TopicPartition> all = testConsumer.listTopics().entrySet()
.stream()
.filter(e -> e.getKey().equals(topic))
.flatMap(e -> e.getValue().stream())
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
.collect(Collectors.toList());
testConsumer.assign(all);
all.forEach(assignedPartition -> {
Map<TopicPartition, Long> tokenPartitionPositions = testStartToken.getPositions();
long offset = 0L;
if (tokenPartitionPositions.containsKey(assignedPartition)) {
offset = tokenPartitionPositions.get(assignedPartition) + 1;
}
testConsumer.seek(assignedPartition, offset);
});

testSubject.poll(
testConsumer,
Expand Down

0 comments on commit 2238ee3

Please sign in to comment.