@Component
public class NatsComponent {
@Value("${nats.server.url}")
private String natsServerUrl;
private NatsConnector natsConnector;
@PostConstruct
public void init() {
natsConnector=new NatsConnector()
.addHost(natsServerUrl) ;
}
@Bean(name = "nc")
public Nats nats() {
return natsConnector.connect();
}
/**
* @param subject
* @param request
* @param replyProtoClass
* @param <T>
* @throws IllegalArgumentException When T is not GeneratedMessageV3 or String
* @return
*/
public <T> CompletableFuture<T> request(String subject, GeneratedMessageV3 request, Class<T> replyProtoClass) {
CompletableFuture completableFuture = new CompletableFuture<T>();
if ( ( GeneratedMessageV3.class.isAssignableFrom(replyProtoClass) ) ||
( String.class.getName().equals(replyProtoClass.getName()) ) ) {}
else {
throw new IllegalArgumentException("only GeneratedMessageV3 or String class allowed as Class<T> " +
"replyProtoClass");
}
try( Nats nats = nats()) {
nats.request(subject, protoSerialize(request), 1, TimeUnit.MINUTES, 1, (msg) -> {
T type = null;
try {
if (String.class.getName().equals(replyProtoClass.getName())) {
type = (T) msg.getBody();
} else {
type = (T) protoDeserialize(msg.getBody(), (Class<GeneratedMessageV3>) replyProtoClass);
}
completableFuture.complete(type);
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
});
completableFuture.get(60,TimeUnit.SECONDS);
}catch(CustomException e){
throw e;
}catch (Exception e){
throw new CustomException("ERROR","INTERNAL_SERVER_ERROR");
}
return completableFuture;
}
/**
* Method used to return Async Completable Proto responses
*
* @param subject
* @param request
* @param protoResponseClass
* @param <T>
* @param <R>
* @return
*/
public <T extends GeneratedMessageV3, R> CompletableFuture<T> protoToFutureResponse(String subject,
GeneratedMessageV3 request, Class<T> protoResponseClass) {
return request(subject, request, protoResponseClass);
}
public <T extends GeneratedMessageV3, R> CompletableFuture<T> stringToFutureResponse(String subject,
String request,
Class<T> replyProtoClass) {
CompletableFuture completableFuture = new CompletableFuture<T>();
try( Nats nats = nats()) {
nats.request(subject, request, 1, TimeUnit.MINUTES, 1, msg -> {
T type = null;
try {
type = (T) protoDeserialize(msg.getBody(), (Class<GeneratedMessageV3>) replyProtoClass);
completableFuture.complete(type);
} catch (Exception e) {
System.out.println(e.toString());
completableFuture.completeExceptionally(e);
}
});
completableFuture.get(60,TimeUnit.SECONDS);
}catch(CustomException e){
throw e;
}catch (Exception e){
throw new CustomException("ERROR","INTERNAL_SERVER_ERROR");
}
return completableFuture;
}
}
I am using gnatsd in my spring boot project when using the Request/Reply we are passing the message but its synchronous so other thread/User is waiting.
How can I make it asynchronous
[ nats-server version 1.0.2] Included
gnatsd -DVoutputCode