Skip to content

Commit

Permalink
Handle case where registration context is created on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 18, 2015
1 parent a30acc0 commit 93e7d20
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
19 changes: 11 additions & 8 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
* This class is thread-safe
Expand Down Expand Up @@ -562,9 +563,15 @@ private <T> void registerHandler(String address, HandlerRegistration<T> registra
Objects.requireNonNull(registration.handler, "handler");
ContextImpl context = vertx.getContext();
boolean hasContext = context != null;
Handler<AsyncResult<Void>> register;
if (!hasContext) {
// Embedded
context = vertx.createEventLoopContext(null, new JsonObject(), Thread.currentThread().getContextClassLoader());
Context newContext = context = vertx.createEventLoopContext(null, new JsonObject(), Thread.currentThread().getContextClassLoader());
register = fut -> {
newContext.runOnContext(v -> registration.setResult(fut));
};
} else {
register = registration::setResult;
}
HandlerHolder holder = new HandlerHolder<T>(registration, replyHandler, localOnly, context, timeoutID);

Expand All @@ -578,17 +585,13 @@ private <T> void registerHandler(String address, HandlerRegistration<T> registra
if (subs != null && !replyHandler && !localOnly) {
// Propagate the information
subs.add(address, serverID, ar -> {
holder.context.runOnContext(v -> {
registration.setResult(ar);
});
register.handle(ar);
});
} else {
registration.setResult(Future.succeededFuture());
register.handle(Future.succeededFuture());
}
} else {
holder.context.runOnContext(v -> {
registration.setResult(Future.succeededFuture());
});
register.handle(Future.succeededFuture());
}

handlers.list.add(holder);
Expand Down
21 changes: 14 additions & 7 deletions src/test/java/io/vertx/test/core/MetricsContextTest.java
Expand Up @@ -19,6 +19,7 @@
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -848,17 +850,22 @@ public void close() {
await();
}

@Test
public void testMessageHandler() {
testMessageHandler((vertx, handler) -> handler.handle(null), eventLoopChecker);
}

@Test
public void testMessageHandlerEventLoop() {
testMessageHandler(eventLoopContextFactory, eventLoopChecker);
testMessageHandler((vertx, handler) -> eventLoopContextFactory.apply(vertx).runOnContext(handler), eventLoopChecker);
}

@Test
public void testMessageHandlerWorker() {
testMessageHandler(workerContextFactory, workerChecker);
testMessageHandler((vertx, handler) -> workerContextFactory.apply(vertx).runOnContext(handler), workerChecker);
}

private void testMessageHandler(Function<Vertx, Context> contextFactory, BiConsumer<Thread, Context> checker) {
private void testMessageHandler(BiConsumer<Vertx, Handler<Void>> runOnContext, BiConsumer<Thread, Context> checker) {
AtomicReference<Thread> registerThread = new AtomicReference<>();
AtomicReference<Context> registerContext = new AtomicReference<>();
AtomicBoolean unregisteredCalled = new AtomicBoolean();
Expand Down Expand Up @@ -898,12 +905,12 @@ public void endHandleMessage(Void handler, Throwable failure) {
};
Vertx vertx = Vertx.vertx(new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
EventBus eb = vertx.eventBus();
Context context = contextFactory.apply(vertx);
context.runOnContext(v -> {
Thread thread = Thread.currentThread();
// Context context = contextFactory.apply(vertx);
runOnContext.accept(vertx, v -> {
// Thread thread = Thread.currentThread();
MessageConsumer<Object> consumer = eb.consumer("the_address");
consumer.handler(msg -> {
checker.accept(thread, context);
// checker.accept(thread, context);
executeInVanillaThread(() -> {
vertx.getOrCreateContext().runOnContext(v2 -> {
consumer.unregister(ar -> {
Expand Down

0 comments on commit 93e7d20

Please sign in to comment.