diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java index c282c6c1bb49a..888acb58d3d07 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java @@ -58,6 +58,12 @@ public void testFailure() throws InterruptedException { @Test public void testFailureNoReplyHandler() throws InterruptedException { + verifyFailureNoReply("foo", "Foo is dead", IllegalStateException.class); + verifyFailureNoReply("foo-blocking", "Red is dead", IllegalStateException.class); + } + + void verifyFailureNoReply(String address, String expectedMessage, Class expectedException) + throws InterruptedException { Handler oldHandler = vertx.exceptionHandler(); try { BlockingQueue synchronizer = new LinkedBlockingQueue<>(); @@ -71,10 +77,10 @@ public void handle(Throwable event) { } } }); - eventBus.send("foo", "bar"); + eventBus.send(address, "hello"); Object ret = synchronizer.poll(2, TimeUnit.SECONDS); - assertTrue(ret instanceof IllegalStateException); - assertEquals("Foo is dead", ((IllegalStateException) ret).getMessage()); + assertTrue(expectedException.isAssignableFrom(ret.getClass())); + assertEquals(expectedMessage, ((Throwable) ret).getMessage()); } finally { vertx.exceptionHandler(oldHandler); } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index 71adb62608645..6c8e6633064a4 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -37,6 +37,7 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AsyncResult; import io.vertx.core.Context; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; @@ -147,7 +148,7 @@ public void run() { } }); } else { - dup.executeBlocking(new Callable() { + Future future = dup.executeBlocking(new Callable() { @Override public Void call() { try { @@ -163,6 +164,7 @@ public Void call() { return null; } }, invoker.isOrdered()); + future.onFailure(context::reportException); } } else { // Will run on the context used for the consumer registration.