Skip to content

Commit

Permalink
Vert.x: report exception for blocking message consumer methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouba committed Nov 22, 2023
1 parent d353f20 commit 9917e6b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public void testFailure() throws InterruptedException {

@Test
public void testFailureNoReplyHandler() throws InterruptedException {
verifyFailureNoReply("foo", "Foo is dead", IllegalStateException.class);
verifyFailureNoReply("foo-blocking", "Red is dead", IllegalStateException.class);
}

void verifyFailureNoReply(String address, String expectedMessage, Class<? extends Exception> expectedException)
throws InterruptedException {
Handler<Throwable> oldHandler = vertx.exceptionHandler();
try {
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
Expand All @@ -71,10 +77,10 @@ public void handle(Throwable event) {
}
}
});
eventBus.send("foo", "bar");
eventBus.send(address, "hello");
Object ret = synchronizer.poll(2, TimeUnit.SECONDS);
assertTrue(ret instanceof IllegalStateException);
assertEquals("Foo is dead", ((IllegalStateException) ret).getMessage());
assertTrue(expectedException.isAssignableFrom(ret.getClass()));
assertEquals(expectedMessage, ((Throwable) ret).getMessage());
} finally {
vertx.exceptionHandler(oldHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void run() {
}
});
} else {
dup.executeBlocking(new Callable<Void>() {
Future<Void> future = dup.executeBlocking(new Callable<Void>() {
@Override
public Void call() {
try {
Expand All @@ -163,6 +164,7 @@ public Void call() {
return null;
}
}, invoker.isOrdered());
future.onFailure(context::reportException);
}
} else {
// Will run on the context used for the consumer registration.
Expand Down

0 comments on commit 9917e6b

Please sign in to comment.