Skip to content

Commit

Permalink
Make producer and consumer async
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent da66b2d commit 3f7d304
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
Expand Up @@ -50,6 +50,6 @@ public void publish(String topicName, byte[] data) throws Exception{
if (!producerMap.containsKey(topicName)) {
createProducer(topicName);
}
producerMap.get(topicName).send(data);
producerMap.get(topicName).sendAsync(data);
}
}
Expand Up @@ -30,7 +30,8 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import static java.lang.Thread.sleep;

public class TopicSubscription {
private static final Logger log = LoggerFactory.getLogger(TopicSubscription.class);
Expand All @@ -39,8 +40,8 @@ public class TopicSubscription {
ConcurrentMap<String, FunctionContainer> subscriberMap;
private Thread thread;
private PulsarClient client;
private ConsumerConfiguration consumerConfiguration;
private Consumer consumer;
private ResultsProcessor resultsProcessor;
private volatile boolean running;

TopicSubscription(String topicName, String subscriptionId, PulsarClient client,
Expand All @@ -49,31 +50,31 @@ public class TopicSubscription {
this.subscriptionId = subscriptionId;
subscriberMap = new ConcurrentHashMap<>();
this.client = client;
this.resultsProcessor = resultsProcessor;
this.consumerConfiguration = new ConsumerConfiguration().setSubscriptionType(SubscriptionType.Shared);
consumerConfiguration.setMessageListener((consumer, msg) -> {
try {
String messageId = convertMessageIdToString(msg.getMessageId());
for (FunctionContainer subscriber : subscriberMap.values()) {
subscriber.sendMessage(topicName, messageId, msg.getData())
.thenApply(result -> resultsProcessor.handleResult(subscriber, result));
}
// Acknowledge the message so that it can be deleted by broker
consumer.acknowledge(msg);
} catch (Exception ex) {
log.error("Got exception while dealing with topic " + topicName, ex);
}
});
running = false;
}

public void start() throws Exception {
String subscriptionName = "fn-" + subscriptionId + "-" + topicName + "-subscriber";
consumer = client.subscribe(topicName, subscriptionName,
new ConsumerConfiguration().setSubscriptionType(SubscriptionType.Shared));
consumer = client.subscribe(topicName, subscriptionName, consumerConfiguration);
running = true;
thread = new Thread(() -> {
try {
while (running) {
// Wait for a message
Message msg = consumer.receive(1000, TimeUnit.MILLISECONDS);
if (null == msg) {
continue;
}
String messageId = convertMessageIdToString(msg.getMessageId());
for (FunctionContainer subscriber : subscriberMap.values()) {
subscriber.sendMessage(topicName, messageId, msg.getData())
.thenApply(result -> resultsProcessor.handleResult(subscriber, result));
}

// Acknowledge the message so that it can be deleted by broker
consumer.acknowledge(msg);
sleep(1000);
}
} catch (Exception ex) {
log.error("Got exception while dealing with topic " + topicName, ex);
Expand Down

0 comments on commit 3f7d304

Please sign in to comment.