From e65dd7b7c6ec830a831978d1bd941e7cbe9a6c1c Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Wed, 22 Nov 2023 17:20:42 +0100 Subject: [PATCH] Vert.x: report exception for blocking message consumer methods - fixes #37222 --- .../vertx/deployment/MessageConsumerFailureTest.java | 12 +++++++++--- .../vertx/runtime/VertxEventBusConsumerRecorder.java | 4 +++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java index c282c6c1bb49a..888acb58d3d07 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java @@ -58,6 +58,12 @@ public void testFailure() throws InterruptedException { @Test public void testFailureNoReplyHandler() throws InterruptedException { + verifyFailureNoReply("foo", "Foo is dead", IllegalStateException.class); + verifyFailureNoReply("foo-blocking", "Red is dead", IllegalStateException.class); + } + + void verifyFailureNoReply(String address, String expectedMessage, Class expectedException) + throws InterruptedException { Handler oldHandler = vertx.exceptionHandler(); try { BlockingQueue synchronizer = new LinkedBlockingQueue<>(); @@ -71,10 +77,10 @@ public void handle(Throwable event) { } } }); - eventBus.send("foo", "bar"); + eventBus.send(address, "hello"); Object ret = synchronizer.poll(2, TimeUnit.SECONDS); - assertTrue(ret instanceof IllegalStateException); - assertEquals("Foo is dead", ((IllegalStateException) ret).getMessage()); + assertTrue(expectedException.isAssignableFrom(ret.getClass())); + assertEquals(expectedMessage, ((Throwable) ret).getMessage()); } finally { vertx.exceptionHandler(oldHandler); } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index 71adb62608645..6c8e6633064a4 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -37,6 +37,7 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AsyncResult; import io.vertx.core.Context; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; @@ -147,7 +148,7 @@ public void run() { } }); } else { - dup.executeBlocking(new Callable() { + Future future = dup.executeBlocking(new Callable() { @Override public Void call() { try { @@ -163,6 +164,7 @@ public Void call() { return null; } }, invoker.isOrdered()); + future.onFailure(context::reportException); } } else { // Will run on the context used for the consumer registration.