Skip to content

Commit

Permalink
Fixed multi-consumer issue in the Kafka AsyncFetcher
Browse files Browse the repository at this point in the history
This change ensures that multiple consumers may concurrently read from
the same topic, and that the consumers are shutdown and (re)started
properly.

Resolves issue #688
  • Loading branch information
abuijze authored and smcvb committed Aug 1, 2018
1 parent 3987291 commit a3ff4b4
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,88 +26,102 @@
import org.axonframework.serialization.xml.XStreamSerializer;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

/**
* Async implementation of the {@link Fetcher} that uses an in-memory bufferFactory.
*
* @author Nakul Mishra
* @param <K> The key of the Kafka entries
* @param <V> The value type of Kafka entries
*/

/**
* [] [] []
* @param <K>
* @param <V>
*/
public class AsyncFetcher<K, V> implements Fetcher<K, V> {
public class AsyncFetcher<K, V> implements Fetcher {

private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;
private final ExecutorService pool;
private final KafkaMessageConverter<K, V> converter;
private final Consumer<K, V> consumer;
private final ConsumerFactory<K, V> consumerFactory;
private final String topic;
private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
private final long pollTimeout;
private final boolean requirePoolShutdown;
private final Set<FetchEventsTask> activeFetchers = ConcurrentHashMap.newKeySet();

private AsyncFetcher(Builder<K, V> builder) {
this.bufferFactory = builder.bufferFactory;
this.consumer = builder.consumerFactory.createConsumer();
this.consumerFactory = builder.consumerFactory;
this.converter = builder.converter;
this.topic = builder.topic;
this.requirePoolShutdown = builder.requirePoolShutdown;
this.pool = builder.pool;
this.callback = builder.callback;
this.pollTimeout = builder.pollTimeout;
}

/**
* Initialize a builder for configuring an AsyncFetcher, using given Kafka {@code consumerConfig}.
* <p>
* Note that configuring a MessageConverter on the builder is mandatory if the value type is not {@code byte[]}.
*
* @param <K> key type.
* @param <V> value type.
* @param consumerConfig The configuration of KafkaConsumers to use when creating consumers
* @return the builder.
*/
public static <K, V> Builder<K, V> builder(Map<String, Object> consumerConfig) {
return builder(new DefaultConsumerFactory<>(consumerConfig));
}

/**
* Initialize a builder for configuring an AsyncFetcher, using given {@code consumerFactory} for creating Kafka
* Consumers.
* <p>
* Note that configuring a MessageConverter on the builder is mandatory if the value type is not {@code byte[]}.
*
* @param <K> key type.
* @param <V> value type.
* @param consumerFactory The factory providing Kafka Consumer instances
* @return the builder.
*/
public static <K, V> Builder<K, V> builder(ConsumerFactory<K, V> consumerFactory) {
return new Builder<>(consumerFactory);
}

@Override
public MessageStream<TrackedEventMessage<?>> start(KafkaTrackingToken token) {
Consumer<K, V> consumer = consumerFactory.createConsumer();
ConsumerUtil.seek(topic, consumer, token);
if (KafkaTrackingToken.isEmpty(token)) {
token = KafkaTrackingToken.emptyToken();
}
Buffer<KafkaEventMessage> buffer = bufferFactory.get();
Future<?> currentTask = pool.submit(FetchEventsTask.builder(this.consumer,
token,
buffer,
this.converter,
this.callback,
this.pollTimeout).build());

return new KafkaMessageStream(buffer, () -> currentTask.cancel(true));
FetchEventsTask<K, V> fetcherTask = new FetchEventsTask<>(consumer, token, buffer, this.converter,
this.callback, this.pollTimeout,
activeFetchers::remove);
activeFetchers.add(fetcherTask);
pool.execute(fetcherTask);

return new KafkaMessageStream(buffer, fetcherTask::close);
}

@Override
public void shutdown() {
pool.shutdown();

}

/**
* @param <K> key type.
* @param <V> value type.
* @return the builder.
*/
public static <K, V> Builder<K, V> builder(Map<String, Object> consumerConfig) {
return builder(new DefaultConsumerFactory<>(consumerConfig));
}
/**
* @param <K> key type.
* @param <V> value type.
* @return the builder.
*/
public static <K, V> Builder<K, V> builder(ConsumerFactory<K,V> consumerFactory) {
return new Builder<>(consumerFactory);
activeFetchers.forEach(FetchEventsTask::close);
if (requirePoolShutdown) {
pool.shutdown();
}
}

/**
* Builder for the AsyncFetcher.
*
* @param <K> key type.
* @param <V> value type.
*/
Expand All @@ -117,25 +131,30 @@ public static final class Builder<K, V> {
private Supplier<Buffer<KafkaEventMessage>> bufferFactory = SortedKafkaMessageBuffer::new;
private KafkaMessageConverter<K, V> converter =
(KafkaMessageConverter<K, V>) new DefaultKafkaMessageConverter(new XStreamSerializer());
private String topic = "events";
private String topic = "Axon.Events";
private long pollTimeout = 5_000;

private ExecutorService pool = newCachedThreadPool(new AxonThreadFactory("AsyncFetcher-pool-thread"));
private BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback = (r, t) -> null;
private boolean requirePoolShutdown = true;

private Builder(ConsumerFactory<K, V> consumerFactory) {
Assert.notNull(consumerFactory, () -> "ConsumerFactory may not be null");
this.consumerFactory = consumerFactory;
}

/**
* Configure {@link ExecutorService} that uses {@link Consumer} for fetching Kafka records.
* Configure {@link ExecutorService} that uses {@link Consumer} for fetching Kafka records. Note that the pool
* should contain sufficient threads to run the necessary fetcher processes concurrently.
* <p>
* Note that the provided pool will <em>not</em> be shut down when the fetcher is terminated.
*
* @param sevice ExecutorService.
* @return the builder.
*/
public Builder<K, V> withPool(ExecutorService sevice) {
Assert.notNull(sevice, () -> "Pool may not be null");
this.requirePoolShutdown = false;
this.pool = sevice;
return this;
}
Expand All @@ -158,7 +177,7 @@ public Builder<K, V> onRecordPublished(
* Configure {@link ExecutorService} that uses {@link Consumer} for fetching Kafka records.
*
* @param timeout the timeout when reading message from the topic.
* @param unit the unit in which the timeout is expressed.
* @param unit the unit in which the timeout is expressed.
* @return this builder for method chaining.
*/
public Builder<K, V> withPollTimeout(long timeout, TimeUnit unit) {
Expand All @@ -168,6 +187,7 @@ public Builder<K, V> withPollTimeout(long timeout, TimeUnit unit) {

/**
* Configure the converter which converts Kafka messages to Axon messages.
*
* @param converter the converter.
* @return this builder for method chaining.
*/
Expand All @@ -179,6 +199,7 @@ public Builder<K, V> withMessageConverter(KafkaMessageConverter<K, V> converter)

/**
* Configure Kafka topic to read events from.
*
* @param topic the topic.
* @return this builder for method chaining.
*/
Expand All @@ -190,6 +211,7 @@ public Builder<K, V> withTopic(String topic) {

/**
* Configure the factory for creating buffer that is used for each connection.
*
* @param bufferFactory the bufferFactory.
* @return this builder for method chaining.
*/
Expand All @@ -201,10 +223,11 @@ public Builder<K, V> withBufferFactory(Supplier<Buffer<KafkaEventMessage>> buffe

/**
* Builds the fetcher
*
* @return the Fetcher
*/
public Fetcher<K, V> build() {
public Fetcher build() {
return new AsyncFetcher<>(this);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

import static org.axonframework.common.Assert.nonNull;
import static org.axonframework.common.ObjectUtils.getOrDefault;

/**
* Polls {@link Consumer} and inserts records on {@link Buffer}.
*
Expand All @@ -41,22 +45,32 @@ class FetchEventsTask<K, V> implements Runnable {
private final KafkaMessageConverter<K, V> converter;
private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
private final long timeout;
private final AtomicBoolean running = new AtomicBoolean(true);
private final java.util.function.Consumer<FetchEventsTask> closeHandler;

private KafkaTrackingToken currentToken;

private FetchEventsTask(Builder<K, V> config) {
this.consumer = config.consumer;
this.currentToken = config.currentToken;
this.buffer = config.buffer;
this.converter = config.converter;
this.callback = config.callback;
this.timeout = config.timeout;
public FetchEventsTask(Consumer<K, V> consumer, KafkaTrackingToken token,
Buffer<KafkaEventMessage> buffer,
KafkaMessageConverter<K, V> converter,
BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback,
long timeout,
java.util.function.Consumer<FetchEventsTask> closeHandler) {
Assert.isFalse(timeout < 0, () -> "Timeout may not be < 0");

this.consumer = nonNull(consumer, () -> "Consumer may not be null");
this.currentToken = nonNull(token, () -> "Token may not be null");
this.buffer = nonNull(buffer, () -> "Buffer may not be null");
this.converter = nonNull(converter, () -> "Converter may not be null");
this.callback = nonNull(callback, () -> "Callback may not be null");
this.timeout = timeout;
this.closeHandler = getOrDefault(closeHandler, x -> {});
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
while (running.get()) {
ConsumerRecords<K, V> records = consumer.poll(timeout);
if (logger.isDebugEnabled()) {
logger.debug("Fetched {} records", records.count());
Expand All @@ -80,6 +94,7 @@ public void run() {
this.callback.apply(c.record, c.token);
}
} catch (InterruptedException e) {
running.set(false);
if (logger.isDebugEnabled()) {
logger.debug("Event producer thread was interrupted. Shutting down.", e);
}
Expand All @@ -89,10 +104,16 @@ public void run() {
} catch (Exception e) {
logger.error("Cannot proceed with Fetching, encountered {} ", e);
} finally {
running.set(false);
closeHandler.accept(this);
consumer.close();
}
}

public void close() {
this.running.set(false);
}

private static class CallbackEntry<K, V> {

private final KafkaTrackingToken token;
Expand All @@ -104,54 +125,4 @@ public CallbackEntry(KafkaTrackingToken token, ConsumerRecord<K, V> record) {
}
}

public static <K, V> Builder<K, V> builder(Consumer<K, V> consumer, KafkaTrackingToken token,
Buffer<KafkaEventMessage> buffer,
KafkaMessageConverter<K, V> converter,
BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback,
long timeout) {
return new Builder<>(consumer, token, buffer, converter, callback, timeout);
}

public static class Builder<K, V> {

private final long timeout;
private final Consumer<K, V> consumer;
private final Buffer<KafkaEventMessage> buffer;
private final KafkaMessageConverter<K, V> converter;
private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
private final KafkaTrackingToken currentToken;

public Builder(Consumer<K, V> consumer, KafkaTrackingToken token,
Buffer<KafkaEventMessage> buffer,
KafkaMessageConverter<K, V> converter,
BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback, long timeout) {
Assert.notNull(consumer, () -> "Consumer may not be null");
Assert.notNull(buffer, () -> "Buffer may not be null");
Assert.notNull(converter, () -> "Converter may not be null");
Assert.notNull(token, () -> "Token may not be null");
Assert.notNull(callback, () -> "Callback may not be null");
Assert.isFalse(timeout < 0, () -> "Timeout may not be < 0");
this.consumer = consumer;
this.currentToken = token;
this.buffer = buffer;
this.converter = converter;
this.callback = callback;
this.timeout = timeout;
}

@Override
public String toString() {
return "Config{" +
"consumer=" + consumer +
", buffer=" + buffer +
", converter=" + converter +
", callback=" + callback +
", currentToken=" + currentToken +
'}';
}

public FetchEventsTask<K, V> build() {
return new FetchEventsTask<>(this);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@
/**
* Interface describing the component responsible for reading messages from Kafka.
*
* @param <K> the key type.
* @param <V> the value type.
* @author Nakul Mishra
*/
public interface Fetcher<K, V> {
public interface Fetcher {

/**
* Open a stream of messages, starting at the position indicated by the given {@code token}.
*
* @param token
* @return
* @param token the token representing positions of the partition to start from
* @return a stream providing messages from Kafka
*/
MessageStream<TrackedEventMessage<?>> start(KafkaTrackingToken token);

/**
* Close fetcher.
* Shuts the fetcher down, closing any resources used by this fetcher.
*/
void shutdown();
}
Loading

0 comments on commit a3ff4b4

Please sign in to comment.