Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NatsClient refactor #3914

Merged
merged 1 commit into from
Apr 11, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 26 additions & 54 deletions libraries/src/main/java/com/baeldung/jnats/NatsClient.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
package com.baeldung.jnats;

import io.nats.client.*;

import java.io.IOException;
import java.util.*;

import io.nats.client.AsyncSubscription;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.client.SyncSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NatsClient {
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public final class NatsClient {

private String serverURI;
private final String serverURI;

private Connection natsConnection;
private final Connection natsConnection;

private Map<String, Subscription> subscriptions = new HashMap<>();
private final Map<String, Subscription> subscriptions = new HashMap<>();

private final static Logger log = LoggerFactory.getLogger(NatsClient.class);

public NatsClient() {
NatsClient() {
this.serverURI = "jnats://localhost:4222";
natsConnection = initConnection(serverURI);
}


public NatsClient(String serverURI) {

if ((serverURI != null) && (!serverURI.isEmpty())) {
this.serverURI = serverURI;
} else {
Expand All @@ -40,59 +44,33 @@ public void closeConnection() {
natsConnection.close();
}


private Connection initConnection(String uri) {
try {

Options options = new Options.Builder()
.errorCb(new ExceptionHandler() {
@Override
public void onException(NATSException ex) {
log.error("Connection Exception: ", ex);
}
})
.disconnectedCb(new DisconnectedCallback() {
@Override
public void onDisconnect(ConnectionEvent event) {
log.error("Channel disconnected: {}", event.getConnection());
}
})
.reconnectedCb(new ReconnectedCallback() {
@Override
public void onReconnect(ConnectionEvent event) {
log.error("Reconnected to server: {}", event.getConnection());
}
})
.errorCb(ex -> log.error("Connection Exception: ", ex))
.disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
.reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
.build();

return Nats.connect(uri, options);

} catch (IOException ioe) {
log.error("Error connecting to NATs! ", ioe);
return null;
}
}


public void publishMessage(String topic, String replyTo, String message) {
void publishMessage(String topic, String replyTo, String message) {
try {
// Simple Publisher
natsConnection.publish(topic, replyTo, message.getBytes());
} catch (IOException ioe) {
log.error("Error publishing message: {} to {} ", message, topic, ioe);
}
}


public void subscribeAsync(String topic) {

// Simple Async Subscriber
AsyncSubscription subscription = natsConnection.subscribe(topic, new MessageHandler() {
@Override
public void onMessage(Message msg) {
log.info("Received message on {}", msg.getSubject());
}
});
AsyncSubscription subscription = natsConnection.subscribe(
topic, msg -> log.info("Received message on {}", msg.getSubject()));

if (subscription == null) {
log.error("Error subscribing to {}", topic);
Expand All @@ -101,13 +79,11 @@ public void onMessage(Message msg) {
}
}

public SyncSubscription subscribeSync(String topic) {
// Simple Sync Subscriber
SyncSubscription subscribeSync(String topic) {
return natsConnection.subscribe(topic);
}

public void unsubscribe(String topic) {

try {
Subscription subscription = subscriptions.get(topic);

Expand All @@ -116,15 +92,12 @@ public void unsubscribe(String topic) {
} else {
log.error("{} not found. Unable to unsubscribe.", topic);
}

} catch (IOException ioe) {
log.error("Error unsubscribing from {} ", topic, ioe);
}
}


public Message makeRequest(String topic, String request) {

Message makeRequest(String topic, String request) {
try {
return natsConnection.request(topic, request.getBytes(), 100);
} catch (IOException | InterruptedException ioe) {
Expand All @@ -133,7 +106,7 @@ public Message makeRequest(String topic, String request) {
}
}

public void installReply(String topic, String reply) {
void installReply(String topic, String reply) {
natsConnection.subscribe(topic, message -> {
try {
natsConnection.publish(message.getReplyTo(), reply.getBytes());
Expand All @@ -143,8 +116,7 @@ public void installReply(String topic, String reply) {
});
}

public SyncSubscription joinQueueGroup(String topic, String queue) {
SyncSubscription joinQueueGroup(String topic, String queue) {
return natsConnection.subscribe(topic, queue);
}

}