diff --git a/kafka/src/main/java/org/axonframework/kafka/eventhandling/consumer/DefaultFetcher.java b/kafka/src/main/java/org/axonframework/kafka/eventhandling/consumer/DefaultFetcher.java new file mode 100644 index 0000000000..4495a20895 --- /dev/null +++ b/kafka/src/main/java/org/axonframework/kafka/eventhandling/consumer/DefaultFetcher.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2010-2018. Axon Framework + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.kafka.eventhandling.consumer; + +import org.apache.kafka.clients.consumer.Consumer; +import org.axonframework.eventhandling.TrackedEventMessage; +import org.axonframework.kafka.eventhandling.KafkaMessageConverter; +import org.axonframework.messaging.MessageStream; + +import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * @author Nakul Mishra + */ +public class DefaultFetcher implements Fetcher { + + private final MessageBuffer buffer; + private final ExecutorService pool; + private final KafkaMessageConverter converter; + private final Consumer consumer; + private final String topic; + private Future currentTask; + + public DefaultFetcher(int bufferSize, + ConsumerFactory consumerFactory, + KafkaMessageConverter converter, String topic) { + + this.buffer = new MessageBuffer<>(); + this.converter = converter; + pool = Executors.newSingleThreadExecutor(); + this.topic = topic; + this.consumer = consumerFactory.createConsumer(); + } + + @Override + public MessageStream> start(KafkaTrackingToken token) { + ConsumerUtil.seek(topic, consumer, token); + if (KafkaTrackingToken.isEmpty(token)) { + token = KafkaTrackingToken.newInstance(new HashMap<>()); + } + currentTask = pool.submit(new FetchEventsTask<>(consumer, token, buffer, converter)); + return new KafkaMessageStream(buffer, this); + } + + @Override + public void shutdown() { + currentTask.cancel(true); + pool.shutdown(); + consumer.close(); + } +} \ No newline at end of file diff --git a/kafka/src/main/java/org/axonframework/kafka/eventhandling/consumer/Fetcher.java b/kafka/src/main/java/org/axonframework/kafka/eventhandling/consumer/Fetcher.java index 61ffd3d797..9b5aca182b 100644 --- a/kafka/src/main/java/org/axonframework/kafka/eventhandling/consumer/Fetcher.java +++ b/kafka/src/main/java/org/axonframework/kafka/eventhandling/consumer/Fetcher.java @@ -1,52 +1,18 @@ package org.axonframework.kafka.eventhandling.consumer; -import org.apache.kafka.clients.consumer.Consumer; import org.axonframework.eventhandling.TrackedEventMessage; -import org.axonframework.kafka.eventhandling.KafkaMessageConverter; import org.axonframework.messaging.MessageStream; -import java.util.HashMap; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.PriorityBlockingQueue; - /** + * Fetch messages from Kafka. + * + * @param the key type. + * @param the value type. * @author Nakul Mishra */ -public class Fetcher { - private final BlockingQueue buffer; - private final ExecutorService pool; - private final KafkaMessageConverter converter; - private final Consumer consumer; - private final String topic; - private Future currentTask; - - public Fetcher(int bufferSize, - ConsumerFactory consumerFactory, - KafkaMessageConverter converter, String topic) { - - this.buffer = new PriorityBlockingQueue<>(bufferSize); - this.converter = converter; - pool = Executors.newSingleThreadExecutor(); - this.topic = topic; - this.consumer = consumerFactory.createConsumer(); - } - - public MessageStream> start(KafkaTrackingToken token) { - ConsumerUtil.seek(topic, consumer, token); - if (KafkaTrackingToken.isEmpty(token)) { - token = KafkaTrackingToken.newInstance(new HashMap<>()); - } - currentTask = pool.submit(new FetchEventsTask<>(consumer, token, buffer, converter)); - return new KafkaMessageStream<>(buffer, this); - } +public interface Fetcher { - public void shutdown() { - currentTask.cancel(true); - pool.shutdown(); - consumer.close(); - } + MessageStream> start(KafkaTrackingToken token); -} \ No newline at end of file + void shutdown(); +}