Skip to content

Commit

Permalink
fix(eb-registry) Fix some startup issues and comparison bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
msavy committed Jun 26, 2017
1 parent f9698fd commit 1c0873d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 30 deletions.
Expand Up @@ -17,6 +17,7 @@

import io.apiman.gateway.engine.IEngineConfig;
import io.apiman.gateway.engine.IRegistry;
import io.apiman.gateway.engine.async.AsyncInitialize;
import io.apiman.gateway.engine.async.IAsyncResult;
import io.apiman.gateway.engine.async.IAsyncResultHandler;
import io.apiman.gateway.engine.beans.Api;
Expand All @@ -26,6 +27,8 @@
import io.apiman.gateway.engine.vertxebinmemory.apis.EBRegistryProxy;
import io.apiman.gateway.engine.vertxebinmemory.apis.EBRegistryProxyHandler;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

import java.util.Map;
import java.util.UUID;
Expand All @@ -41,20 +44,24 @@
* @author Marc Savy {@literal <msavy@redhat.com>}
*/
@SuppressWarnings("nls")
public class EBInMemoryRegistry extends InMemoryRegistry implements EBRegistryProxyHandler {
public class EBInMemoryRegistry extends InMemoryRegistry
implements EBRegistryProxyHandler, AsyncInitialize {
private Vertx vertx;
private EBRegistryProxy proxy;
private final static String ADDRESS = "io.vertx.core.Vertx.registry.EBInMemoryRegistry.event"; //$NON-NLS-1$
private String registryUuid = UUID.randomUUID().toString();
private Logger log = LoggerFactory.getLogger(EBInMemoryRegistry.class);

public EBInMemoryRegistry(Vertx vertx, IEngineConfig vxConfig, Map<String, String> options) {
super();

System.out.println("Starting an EBInMemoryRegistry on UUID " + registryUuid);

this.vertx = vertx;
listenProxyHandler();
}

@Override
public void initialize(IAsyncResultHandler<Void> startupHandler) {
log.info("Starting an EBInMemoryRegistry on UUID {0}", registryUuid);
this.proxy = new EBRegistryProxy(vertx, address(), registryUuid);
listenProxyHandler(startupHandler);
}

@Override
Expand All @@ -72,7 +79,7 @@ public void getContract(String apiOrganizationId, String apiId, String apiVersio
public void publishApi(Api api, IAsyncResultHandler<Void> handler) {
super.publishApi(api, handler);
proxy.publishApi(api);
System.out.println("Published a api");
log.info("Published an API {0}", api);
}

@Override
Expand Down Expand Up @@ -115,42 +122,41 @@ public Vertx vertx() {
}

// These are called back by the listener

@Override
public void publishApi(Api api) {
System.out.println("Publish api");
super.publishApi(api, emptyHandler);
}

@Override
public void retireApi(Api api) {
System.out.println("Retire api");
super.retireApi(api, emptyHandler);
}

@Override
public void registerClient(Client client) {
System.out.println("Register client");
super.registerClient(client, emptyHandler);
}

@Override
public void unregisterClient(Client client) {
System.out.println("Unregister client");
super.unregisterClient(client, emptyHandler);
}

@Override
public Logger log() {
return log;
}

private EmptyHandler emptyHandler = new EmptyHandler();

private class EmptyHandler implements IAsyncResultHandler<Void> {

@Override
public void handle(IAsyncResult<Void> result) {
if (result.isError()) {
System.err.println("Error " + result.getError());
log.error("Error {0}", result.getError());
throw new RuntimeException(result.getError());
}
}
}

}
Expand Up @@ -18,7 +18,6 @@
import io.apiman.gateway.engine.beans.Api;
import io.apiman.gateway.engine.beans.Client;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;

/**
* Publishes mutating events onto the event bus for listeners to consume. A UUID is sent to avoid circular
Expand All @@ -27,9 +26,8 @@
* @author Marc Savy {@literal <msavy@redhat.com>}
*/
public class EBRegistryProxy {
Vertx vertx;
DeliveryOptions options;
String address;
private final Vertx vertx;
private final String address;
private String uuid;

public static final String REGISTER = "register"; //$NON-NLS-1$
Expand Down
Expand Up @@ -15,12 +15,16 @@
*/
package io.apiman.gateway.engine.vertxebinmemory.apis;

import io.apiman.gateway.engine.async.AsyncResultImpl;
import io.apiman.gateway.engine.async.IAsyncResultHandler;
import io.apiman.gateway.engine.beans.Api;
import io.apiman.gateway.engine.beans.Client;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;

/**
* Listens for registry events on the event bus. Ignores self-generated events. These arrive as a simple JSON
Expand All @@ -34,15 +38,14 @@
public interface EBRegistryProxyHandler {

@SuppressWarnings("nls")
default void listenProxyHandler() {
System.out.println("Setting up a listener on " + address());
default void listenProxyHandler(IAsyncResultHandler<Void> startupHandler) {
log().info("Setting up a listener on: {0}", address());

vertx().eventBus().consumer(address(), (Message<JsonObject> message) -> {
String uuid = message.body().getString("uuid");
MessageConsumer<JsonObject> consumer = vertx().eventBus().consumer(address(), (Message<JsonObject> message) -> {
String inboundUuid = message.body().getString("uuid");
log().debug("[{0}] Handling command from inbound UUID: {1} {2}", uuid(), inboundUuid, message);

System.out.println("UUID == " + uuid + " vs " + uuid());

if (shouldIgnore(uuid))
if (shouldIgnore(inboundUuid))
return;

String type = message.body().getString("type");
Expand All @@ -53,19 +56,19 @@ default void listenProxyHandler() {
case "client":
Client app = Json.decodeValue(body, Client.class);

if (action == "register") {
if (action.equals("register")) {
registerClient(app);
} else if (action == "unregister") {
} else if (action.equals("unregister")) {
unregisterClient(app);
}

break;
case "api":
Api api = Json.decodeValue(body, Api.class);

if (action == "publish") { //$NON-NLS-1$
if (action.equals("publish")) {
publishApi(api);
} else if (action == "retire") {
} else if (action.equals("retire")) {
retireApi(api);
}

Expand All @@ -75,6 +78,19 @@ default void listenProxyHandler() {
}

});

consumer.completionHandler(complete -> {
if (complete.succeeded()) {
startupHandler.handle(AsyncResultImpl.create((Void) null));
} else {
startupHandler.handle(AsyncResultImpl.create(complete.cause()));
}
});

consumer.exceptionHandler(ex -> {
log().error("[{0}] An exception occurred: {1}", uuid(), ex);
ex.printStackTrace();
});
}

// Address to subscribe on
Expand All @@ -87,9 +103,10 @@ default void listenProxyHandler() {
void retireApi(Api api);
void registerClient(Client app);
void unregisterClient(Client app);
Logger log();

// If *we* sent the message, we shouldn't also digest it, else we'll end in a cycle.
default boolean shouldIgnore(String uuid) {
return uuid() == uuid;
return uuid().equals(uuid);
}
}
}

0 comments on commit 1c0873d

Please sign in to comment.