From e28060857971b81b976981e230d366c5561a9dfd Mon Sep 17 00:00:00 2001 From: pivovarit Date: Sun, 1 Apr 2018 11:36:49 +0200 Subject: [PATCH] NatsClient refactor --- .../java/com/baeldung/jnats/NatsClient.java | 80 ++++++------------- 1 file changed, 26 insertions(+), 54 deletions(-) diff --git a/libraries/src/main/java/com/baeldung/jnats/NatsClient.java b/libraries/src/main/java/com/baeldung/jnats/NatsClient.java index cdd1e764efc6..c592727cd1f1 100644 --- a/libraries/src/main/java/com/baeldung/jnats/NatsClient.java +++ b/libraries/src/main/java/com/baeldung/jnats/NatsClient.java @@ -1,31 +1,35 @@ package com.baeldung.jnats; -import io.nats.client.*; - -import java.io.IOException; -import java.util.*; - +import io.nats.client.AsyncSubscription; +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.nats.client.Subscription; +import io.nats.client.SyncSubscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NatsClient { +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public final class NatsClient { - private String serverURI; + private final String serverURI; - private Connection natsConnection; + private final Connection natsConnection; - private Map subscriptions = new HashMap<>(); + private final Map subscriptions = new HashMap<>(); private final static Logger log = LoggerFactory.getLogger(NatsClient.class); - public NatsClient() { + NatsClient() { this.serverURI = "jnats://localhost:4222"; natsConnection = initConnection(serverURI); } - public NatsClient(String serverURI) { - if ((serverURI != null) && (!serverURI.isEmpty())) { this.serverURI = serverURI; } else { @@ -40,59 +44,33 @@ public void closeConnection() { natsConnection.close(); } - private Connection initConnection(String uri) { try { - Options options = new Options.Builder() - .errorCb(new ExceptionHandler() { - @Override - public void onException(NATSException ex) { - log.error("Connection Exception: ", ex); - } - }) - .disconnectedCb(new DisconnectedCallback() { - @Override - public void onDisconnect(ConnectionEvent event) { - log.error("Channel disconnected: {}", event.getConnection()); - } - }) - .reconnectedCb(new ReconnectedCallback() { - @Override - public void onReconnect(ConnectionEvent event) { - log.error("Reconnected to server: {}", event.getConnection()); - } - }) + .errorCb(ex -> log.error("Connection Exception: ", ex)) + .disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection())) + .reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection())) .build(); return Nats.connect(uri, options); - } catch (IOException ioe) { log.error("Error connecting to NATs! ", ioe); return null; } } - - public void publishMessage(String topic, String replyTo, String message) { + void publishMessage(String topic, String replyTo, String message) { try { - // Simple Publisher natsConnection.publish(topic, replyTo, message.getBytes()); } catch (IOException ioe) { log.error("Error publishing message: {} to {} ", message, topic, ioe); } } - public void subscribeAsync(String topic) { - // Simple Async Subscriber - AsyncSubscription subscription = natsConnection.subscribe(topic, new MessageHandler() { - @Override - public void onMessage(Message msg) { - log.info("Received message on {}", msg.getSubject()); - } - }); + AsyncSubscription subscription = natsConnection.subscribe( + topic, msg -> log.info("Received message on {}", msg.getSubject())); if (subscription == null) { log.error("Error subscribing to {}", topic); @@ -101,13 +79,11 @@ public void onMessage(Message msg) { } } - public SyncSubscription subscribeSync(String topic) { - // Simple Sync Subscriber + SyncSubscription subscribeSync(String topic) { return natsConnection.subscribe(topic); } public void unsubscribe(String topic) { - try { Subscription subscription = subscriptions.get(topic); @@ -116,15 +92,12 @@ public void unsubscribe(String topic) { } else { log.error("{} not found. Unable to unsubscribe.", topic); } - } catch (IOException ioe) { log.error("Error unsubscribing from {} ", topic, ioe); } } - - public Message makeRequest(String topic, String request) { - + Message makeRequest(String topic, String request) { try { return natsConnection.request(topic, request.getBytes(), 100); } catch (IOException | InterruptedException ioe) { @@ -133,7 +106,7 @@ public Message makeRequest(String topic, String request) { } } - public void installReply(String topic, String reply) { + void installReply(String topic, String reply) { natsConnection.subscribe(topic, message -> { try { natsConnection.publish(message.getReplyTo(), reply.getBytes()); @@ -143,8 +116,7 @@ public void installReply(String topic, String reply) { }); } - public SyncSubscription joinQueueGroup(String topic, String queue) { + SyncSubscription joinQueueGroup(String topic, String queue) { return natsConnection.subscribe(topic, queue); } - }