Skip to content

Commit

Permalink
Introduce default fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
nklmish committed Mar 27, 2018
1 parent e8439d5 commit a49743f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 42 deletions.
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2010-2018. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.kafka.eventhandling.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.messaging.MessageStream;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @author Nakul Mishra
*/
public class DefaultFetcher implements Fetcher<String, byte[]> {

private final MessageBuffer<MessageAndMetadata> buffer;
private final ExecutorService pool;
private final KafkaMessageConverter<String, byte[]> converter;
private final Consumer<String, byte[]> consumer;
private final String topic;
private Future<?> currentTask;

public DefaultFetcher(int bufferSize,
ConsumerFactory<String, byte[]> consumerFactory,
KafkaMessageConverter<String, byte[]> converter, String topic) {

this.buffer = new MessageBuffer<>();
this.converter = converter;
pool = Executors.newSingleThreadExecutor();
this.topic = topic;
this.consumer = consumerFactory.createConsumer();
}

@Override
public MessageStream<TrackedEventMessage<?>> start(KafkaTrackingToken token) {
ConsumerUtil.seek(topic, consumer, token);
if (KafkaTrackingToken.isEmpty(token)) {
token = KafkaTrackingToken.newInstance(new HashMap<>());
}
currentTask = pool.submit(new FetchEventsTask<>(consumer, token, buffer, converter));
return new KafkaMessageStream(buffer, this);
}

@Override
public void shutdown() {
currentTask.cancel(true);
pool.shutdown();
consumer.close();
}
}
@@ -1,52 +1,18 @@
package org.axonframework.kafka.eventhandling.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.messaging.MessageStream;

import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;

/**
* Fetch messages from Kafka.
*
* @param <K> the key type.
* @param <V> the value type.
* @author Nakul Mishra
*/
public class Fetcher<K, V> {
private final BlockingQueue<MessageAndTimestamp> buffer;
private final ExecutorService pool;
private final KafkaMessageConverter<K, V> converter;
private final Consumer<K, V> consumer;
private final String topic;
private Future<?> currentTask;

public Fetcher(int bufferSize,
ConsumerFactory<K, V> consumerFactory,
KafkaMessageConverter<K, V> converter, String topic) {

this.buffer = new PriorityBlockingQueue<>(bufferSize);
this.converter = converter;
pool = Executors.newSingleThreadExecutor();
this.topic = topic;
this.consumer = consumerFactory.createConsumer();
}

public MessageStream<TrackedEventMessage<?>> start(KafkaTrackingToken token) {
ConsumerUtil.seek(topic, consumer, token);
if (KafkaTrackingToken.isEmpty(token)) {
token = KafkaTrackingToken.newInstance(new HashMap<>());
}
currentTask = pool.submit(new FetchEventsTask<>(consumer, token, buffer, converter));
return new KafkaMessageStream<>(buffer, this);
}
public interface Fetcher<K, V> {

public void shutdown() {
currentTask.cancel(true);
pool.shutdown();
consumer.close();
}
MessageStream<TrackedEventMessage<?>> start(KafkaTrackingToken token);

}
void shutdown();
}

0 comments on commit a49743f

Please sign in to comment.