diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index 9372b0af018..470ea63c9c1 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -131,15 +131,14 @@ public void unregister(Handler> completionHandler) { doUnregister(completionHandler); } - private void doUnregister(Handler> completionHandler) { + private void doUnregister(Handler> doneHandler) { Deque> discarded; Handler> discardHandler; synchronized (this) { - handler = null; if (endHandler != null) { Handler theEndHandler = endHandler; - Handler> handler = completionHandler; - completionHandler = ar -> { + Handler> handler = doneHandler; + doneHandler = ar -> { theEndHandler.handle(null); if (handler != null) { handler.handle(ar); @@ -155,10 +154,15 @@ private void doUnregister(Handler> completionHandler) { } discardHandler = this.discardHandler; if (holder != null) { + handler = null; registered = null; - eventBus.removeRegistration(holder, completionHandler); + eventBus.removeRegistration(holder, doneHandler); } else { - callCompletionHandlerAsync(completionHandler); + callHandlerAsync(Future.succeededFuture(), doneHandler); + } + if (result == null) { + result = Future.failedFuture("Consumer unregistered before registration completed"); + callHandlerAsync(result, completionHandler); } } if (discardHandler != null && discarded != null) { @@ -169,9 +173,9 @@ private void doUnregister(Handler> completionHandler) { } } - private void callCompletionHandlerAsync(Handler> completionHandler) { + private void callHandlerAsync(AsyncResult result, Handler> completionHandler) { if (completionHandler != null) { - vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + vertx.runOnContext(v -> completionHandler.handle(result)); } } @@ -180,17 +184,17 @@ synchronized void setHandlerContext(Context context) { } public synchronized void setResult(AsyncResult result) { + if (this.result != null) { + return; + } this.result = result; - if (completionHandler != null) { - if (metrics != null && result.succeeded()) { + if (result.failed()) { + log.error("Failed to propagate registration for handler " + handler + " and address " + address); + } else { + if (metrics != null) { metric = metrics.handlerRegistered(address, repliedAddress); } - Handler> callback = completionHandler; - vertx.runOnContext(v -> callback.handle(result)); - } else if (result.failed()) { - log.error("Failed to propagate registration for handler " + handler + " and address " + address); - } else if (metrics != null) { - metric = metrics.handlerRegistered(address, repliedAddress); + callHandlerAsync(result, completionHandler); } } diff --git a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java index cc7fbc1772c..27ce5071d9b 100644 --- a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java @@ -332,4 +332,22 @@ public void testLocalOnlyDoesNotApplyToReplies() { })); await(); } + + @Test + public void testImmediateUnregistration() { + startNodes(1); + MessageConsumer consumer = vertices[0].eventBus().consumer(ADDRESS1); + AtomicInteger completionCount = new AtomicInteger(); + consumer.completionHandler(ar -> { + int val = completionCount.getAndIncrement(); + assertEquals(0, val); + assertTrue(ar.failed()); + vertx.setTimer(10, id -> { + testComplete(); + }); + }); + consumer.handler(msg -> {}); + consumer.unregister(); + await(); + } } diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 6c8b0b2506a..50d927dcc28 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -1399,5 +1399,22 @@ public void testUnregisterConsumerDiscardPendingMessages() { eb.send(ADDRESS1, "val0"); await(); } + + @Test + public void testImmediateUnregistration() { + MessageConsumer consumer = vertx.eventBus().consumer(ADDRESS1); + AtomicInteger completionCount = new AtomicInteger(); + consumer.completionHandler(ar -> { + int val = completionCount.getAndIncrement(); + assertEquals(0, val); + assertTrue(ar.succeeded()); + vertx.setTimer(10, id -> { + testComplete(); + }); + }); + consumer.handler(msg -> {}); + consumer.unregister(); + await(); + } }