Skip to content

Commit

Permalink
motivation: currently the MessageConsumer end handler is called only …
Browse files Browse the repository at this point in the history
…when consumer is unregistered during an event bus close. When the consumer is explicitly unregistered it is not called. This handler is mainly used by RxJava and Kotlin coroutines and it seems it would be best to call the end handler in all case to give an opportunity to end the treatment. In RxJava it would call the complete handler and in Kotlin it would close the underlying channel (so a for-each iteration on the channel would terminate and execute the code after).

change: call the MessageConsumer end handler when the MessageConsumer is unregistered explictly.
  • Loading branch information
vietj committed Aug 28, 2017
1 parent d05dfe2 commit 4124380
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 16 deletions.
Expand Up @@ -516,7 +516,7 @@ private void unregisterAll() {
// Unregister all handlers explicitly - don't rely on context hooks
for (Handlers handlers: handlerMap.values()) {
for (HandlerHolder holder: handlers.list) {
holder.getHandler().unregister(true);
holder.getHandler().unregister();
}
}
}
Expand Down
Expand Up @@ -102,29 +102,25 @@ public synchronized void completionHandler(Handler<AsyncResult<Void>> completion

@Override
public synchronized void unregister() {
unregister(false);
doUnregister(null);
}

@Override
public synchronized void unregister(Handler<AsyncResult<Void>> completionHandler) {
Objects.requireNonNull(completionHandler);
doUnregister(completionHandler, false);
}

public void unregister(boolean callEndHandler) {
doUnregister(null, callEndHandler);
doUnregister(completionHandler);
}

public void sendAsyncResultFailure(ReplyFailure failure, String msg) {
unregister();
asyncResultHandler.handle(Future.failedFuture(new ReplyException(failure, msg)));
}

private void doUnregister(Handler<AsyncResult<Void>> completionHandler, boolean callEndHandler) {
private void doUnregister(Handler<AsyncResult<Void>> completionHandler) {
if (timeoutID != -1) {
vertx.cancelTimer(timeoutID);
}
if (endHandler != null && callEndHandler) {
if (endHandler != null) {
Handler<Void> theEndHandler = endHandler;
Handler<AsyncResult<Void>> handler = completionHandler;
completionHandler = ar -> {
Expand Down
9 changes: 2 additions & 7 deletions src/test/java/io/vertx/test/core/LocalEventBusTest.java
Expand Up @@ -744,7 +744,7 @@ public void testContextsSend() throws Exception {
awaitLatch(latch);
assertEquals(2, contexts.size());
}

@Test
public void testContextsPublish() throws Exception {
Set<ContextImpl> contexts = new ConcurrentHashSet<>();
Expand Down Expand Up @@ -1121,13 +1121,8 @@ public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithBodyStream

private void testUnregisterationOfRegisteredConsumerCallsEndHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
consumer.handler(msg -> {});
consumer.endHandler(v -> {
fail();
});
consumer.endHandler(v -> testComplete());
consumer.unregister();
vertx.runOnContext(d -> {
testComplete();
});
await();
}

Expand Down

0 comments on commit 4124380

Please sign in to comment.