From 1c0873d2ea13e42271661fe67a7621611e004a74 Mon Sep 17 00:00:00 2001 From: Marc Savy Date: Mon, 26 Jun 2017 11:22:22 +0100 Subject: [PATCH] fix(eb-registry) Fix some startup issues and comparison bugs. --- .../vertxebinmemory/EBInMemoryRegistry.java | 32 ++++++++------ .../vertxebinmemory/apis/EBRegistryProxy.java | 6 +-- .../apis/EBRegistryProxyHandler.java | 43 +++++++++++++------ 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/EBInMemoryRegistry.java b/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/EBInMemoryRegistry.java index 197b8d09a2..1e396ce8d9 100644 --- a/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/EBInMemoryRegistry.java +++ b/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/EBInMemoryRegistry.java @@ -17,6 +17,7 @@ import io.apiman.gateway.engine.IEngineConfig; import io.apiman.gateway.engine.IRegistry; +import io.apiman.gateway.engine.async.AsyncInitialize; import io.apiman.gateway.engine.async.IAsyncResult; import io.apiman.gateway.engine.async.IAsyncResultHandler; import io.apiman.gateway.engine.beans.Api; @@ -26,6 +27,8 @@ import io.apiman.gateway.engine.vertxebinmemory.apis.EBRegistryProxy; import io.apiman.gateway.engine.vertxebinmemory.apis.EBRegistryProxyHandler; import io.vertx.core.Vertx; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; import java.util.Map; import java.util.UUID; @@ -41,20 +44,24 @@ * @author Marc Savy {@literal } */ @SuppressWarnings("nls") -public class EBInMemoryRegistry extends InMemoryRegistry implements EBRegistryProxyHandler { +public class EBInMemoryRegistry extends InMemoryRegistry + implements EBRegistryProxyHandler, AsyncInitialize { private Vertx vertx; private EBRegistryProxy proxy; private final static String ADDRESS = "io.vertx.core.Vertx.registry.EBInMemoryRegistry.event"; //$NON-NLS-1$ private String registryUuid = UUID.randomUUID().toString(); + private Logger log = LoggerFactory.getLogger(EBInMemoryRegistry.class); public EBInMemoryRegistry(Vertx vertx, IEngineConfig vxConfig, Map options) { super(); - - System.out.println("Starting an EBInMemoryRegistry on UUID " + registryUuid); - this.vertx = vertx; - listenProxyHandler(); + } + + @Override + public void initialize(IAsyncResultHandler startupHandler) { + log.info("Starting an EBInMemoryRegistry on UUID {0}", registryUuid); this.proxy = new EBRegistryProxy(vertx, address(), registryUuid); + listenProxyHandler(startupHandler); } @Override @@ -72,7 +79,7 @@ public void getContract(String apiOrganizationId, String apiId, String apiVersio public void publishApi(Api api, IAsyncResultHandler handler) { super.publishApi(api, handler); proxy.publishApi(api); - System.out.println("Published a api"); + log.info("Published an API {0}", api); } @Override @@ -115,31 +122,31 @@ public Vertx vertx() { } // These are called back by the listener - @Override public void publishApi(Api api) { - System.out.println("Publish api"); super.publishApi(api, emptyHandler); } @Override public void retireApi(Api api) { - System.out.println("Retire api"); super.retireApi(api, emptyHandler); } @Override public void registerClient(Client client) { - System.out.println("Register client"); super.registerClient(client, emptyHandler); } @Override public void unregisterClient(Client client) { - System.out.println("Unregister client"); super.unregisterClient(client, emptyHandler); } + @Override + public Logger log() { + return log; + } + private EmptyHandler emptyHandler = new EmptyHandler(); private class EmptyHandler implements IAsyncResultHandler { @@ -147,10 +154,9 @@ private class EmptyHandler implements IAsyncResultHandler { @Override public void handle(IAsyncResult result) { if (result.isError()) { - System.err.println("Error " + result.getError()); + log.error("Error {0}", result.getError()); throw new RuntimeException(result.getError()); } } } - } diff --git a/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxy.java b/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxy.java index e5c70ba3ed..d5dfa22e7a 100644 --- a/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxy.java +++ b/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxy.java @@ -18,7 +18,6 @@ import io.apiman.gateway.engine.beans.Api; import io.apiman.gateway.engine.beans.Client; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.DeliveryOptions; /** * Publishes mutating events onto the event bus for listeners to consume. A UUID is sent to avoid circular @@ -27,9 +26,8 @@ * @author Marc Savy {@literal } */ public class EBRegistryProxy { - Vertx vertx; - DeliveryOptions options; - String address; + private final Vertx vertx; + private final String address; private String uuid; public static final String REGISTER = "register"; //$NON-NLS-1$ diff --git a/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxyHandler.java b/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxyHandler.java index 112cfcecf8..94fd92ecdc 100644 --- a/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxyHandler.java +++ b/gateway/engine/vertx-eb-inmemory/src/main/java/io/apiman/gateway/engine/vertxebinmemory/apis/EBRegistryProxyHandler.java @@ -15,12 +15,16 @@ */ package io.apiman.gateway.engine.vertxebinmemory.apis; +import io.apiman.gateway.engine.async.AsyncResultImpl; +import io.apiman.gateway.engine.async.IAsyncResultHandler; import io.apiman.gateway.engine.beans.Api; import io.apiman.gateway.engine.beans.Client; import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; +import io.vertx.core.logging.Logger; /** * Listens for registry events on the event bus. Ignores self-generated events. These arrive as a simple JSON @@ -34,15 +38,14 @@ public interface EBRegistryProxyHandler { @SuppressWarnings("nls") - default void listenProxyHandler() { - System.out.println("Setting up a listener on " + address()); + default void listenProxyHandler(IAsyncResultHandler startupHandler) { + log().info("Setting up a listener on: {0}", address()); - vertx().eventBus().consumer(address(), (Message message) -> { - String uuid = message.body().getString("uuid"); + MessageConsumer consumer = vertx().eventBus().consumer(address(), (Message message) -> { + String inboundUuid = message.body().getString("uuid"); + log().debug("[{0}] Handling command from inbound UUID: {1} {2}", uuid(), inboundUuid, message); - System.out.println("UUID == " + uuid + " vs " + uuid()); - - if (shouldIgnore(uuid)) + if (shouldIgnore(inboundUuid)) return; String type = message.body().getString("type"); @@ -53,9 +56,9 @@ default void listenProxyHandler() { case "client": Client app = Json.decodeValue(body, Client.class); - if (action == "register") { + if (action.equals("register")) { registerClient(app); - } else if (action == "unregister") { + } else if (action.equals("unregister")) { unregisterClient(app); } @@ -63,9 +66,9 @@ default void listenProxyHandler() { case "api": Api api = Json.decodeValue(body, Api.class); - if (action == "publish") { //$NON-NLS-1$ + if (action.equals("publish")) { publishApi(api); - } else if (action == "retire") { + } else if (action.equals("retire")) { retireApi(api); } @@ -75,6 +78,19 @@ default void listenProxyHandler() { } }); + + consumer.completionHandler(complete -> { + if (complete.succeeded()) { + startupHandler.handle(AsyncResultImpl.create((Void) null)); + } else { + startupHandler.handle(AsyncResultImpl.create(complete.cause())); + } + }); + + consumer.exceptionHandler(ex -> { + log().error("[{0}] An exception occurred: {1}", uuid(), ex); + ex.printStackTrace(); + }); } // Address to subscribe on @@ -87,9 +103,10 @@ default void listenProxyHandler() { void retireApi(Api api); void registerClient(Client app); void unregisterClient(Client app); + Logger log(); // If *we* sent the message, we shouldn't also digest it, else we'll end in a cycle. default boolean shouldIgnore(String uuid) { - return uuid() == uuid; + return uuid().equals(uuid); } -} \ No newline at end of file +}