Skip to content

Commit

Permalink
Unregistering a MessageConsumer should not call the endHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 11, 2015
1 parent 110fba7 commit f2a06ed
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 136 deletions.
16 changes: 9 additions & 7 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Expand Up @@ -281,7 +281,7 @@ private void unregisterAllHandlers() {
// Unregister all handlers explicitly - don't rely on context hooks
for (Handlers handlers: handlerMap.values()) {
for (HandlerHolder holder: handlers.list) {
holder.handler.unregister();
holder.handler.unregister(true);
}
}
}
Expand Down Expand Up @@ -986,7 +986,6 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
private AsyncResult<Void> result;
private Handler<AsyncResult<Void>> completionHandler;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Message<T>> discardHandler;
private int maxBufferedMessages;
private final Queue<Message<T>> pending = new ArrayDeque<>(8);
Expand Down Expand Up @@ -1033,17 +1032,21 @@ public synchronized void completionHandler(Handler<AsyncResult<Void>> completion

@Override
public synchronized void unregister() {
doUnregister(null);
unregister(false);
}

@Override
public synchronized void unregister(Handler<AsyncResult<Void>> completionHandler) {
Objects.requireNonNull(completionHandler);
doUnregister(completionHandler);
doUnregister(completionHandler, false);
}

private void doUnregister(Handler<AsyncResult<Void>> completionHandler) {
if (endHandler != null) {
void unregister(boolean callEndHandler) {
doUnregister(null, callEndHandler);
}

private void doUnregister(Handler<AsyncResult<Void>> completionHandler, boolean callEndHandler) {
if (endHandler != null && callEndHandler) {
Handler<Void> theEndHandler = endHandler;
Handler<AsyncResult<Void>> handler = completionHandler;
completionHandler = ar -> {
Expand Down Expand Up @@ -1166,7 +1169,6 @@ public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {

@Override
public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
this.exceptionHandler = handler;
return this;
}

Expand Down
137 changes: 8 additions & 129 deletions src/test/java/io/vertx/test/core/LocalEventBusTest.java
Expand Up @@ -1087,7 +1087,7 @@ private void testExceptionWhenDeliveringBufferedMessage(BiFunction<MessageConsum
}

@Test
public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithMessaqgeStream() {
public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithMessageStream() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
testUnregisterationOfRegisteredConsumerCallsEndHandler(consumer, consumer);
}
Expand All @@ -1100,23 +1100,16 @@ public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithBodyStream

private void testUnregisterationOfRegisteredConsumerCallsEndHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
consumer.handler(msg -> {});
consumer.endHandler(v -> testComplete());
consumer.endHandler(v -> {
fail();
});
consumer.unregister();
vertx.runOnContext(d -> {
testComplete();
});
await();
}

@Test
public void testUnregistrationOfUnregisteredConsumerCallsEndHandlerWithMessageStream() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
testUnregistrationOfUnregisteredConsumerCallsEndHandler(consumer, consumer);
}

@Test
public void testUnregistrationOfUnregisteredConsumerCallsEndHandlerWithBodyStream() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
testUnregistrationOfUnregisteredConsumerCallsEndHandler(consumer, consumer.bodyStream());
}

@Test
public void testUnregisterThenUnsetEndHandler() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
Expand All @@ -1128,69 +1121,6 @@ public void testUnregisterThenUnsetEndHandler() {
await();
}

private void testUnregistrationOfUnregisteredConsumerCallsEndHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
consumer.endHandler(v -> testComplete());
consumer.unregister();
await();
}

@Test
public void testCompletingUnregistrationOfRegisteredConsumerCallsEndHandlerWithMessageStream() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
testCompletingUnregistrationOfRegisteredConsumerCallsEndHandler(consumer, consumer);
}

@Test
public void testCompletingUnregistrationOfRegisteredConsumerCallsEndHandlerWithBodyStream() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
testCompletingUnregistrationOfRegisteredConsumerCallsEndHandler(consumer, consumer.bodyStream());
}

private void testCompletingUnregistrationOfRegisteredConsumerCallsEndHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
AtomicInteger count = new AtomicInteger(0);
consumer.handler(msg -> {});
consumer.endHandler(v -> {
if (count.incrementAndGet() == 2) {
testComplete();
}
});
consumer.unregister(ar -> {
assertTrue(ar.succeeded());
if (count.incrementAndGet() == 2) {
testComplete();
}
});
await();
}

@Test
public void testCompletingUnregistrationUnregisteredConsumerCallsEndHandlerWithMessageStream() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
testCompletingUnregistrationUnregisteredConsumerCallsEndHandler(consumer, consumer);
}

@Test
public void testCompletingUnregistrationUnregisteredConsumerCallsEndHandlerWithBodyStream() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
testCompletingUnregistrationUnregisteredConsumerCallsEndHandler(consumer, consumer.bodyStream());
}

private void testCompletingUnregistrationUnregisteredConsumerCallsEndHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
AtomicInteger count = new AtomicInteger(0);
readStream.endHandler(v -> {
if (count.incrementAndGet() == 2) {
testComplete();
}
});
consumer.unregister(ar -> {
assertTrue(ar.succeeded());
if (count.incrementAndGet() == 2) {
testComplete();
}
});
await();
}

@Test
public void testUnregistrationWhenSettingNullHandlerWithConsumer() {
MessageConsumer<String> consumer = eb.consumer(ADDRESS1);
Expand All @@ -1205,13 +1135,9 @@ public void testUnregistrationWhenSettingNullHandlerWithBodyStream() {

private void testUnregistrationWhenSettingNullHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
readStream.handler(msg -> {});
readStream.endHandler(v -> {
assertFalse(consumer.isRegistered());
testComplete();
});
assertTrue(consumer.isRegistered());
readStream.handler(null);
await();
assertFalse(consumer.isRegistered());
}

@Test
Expand Down Expand Up @@ -1320,53 +1246,6 @@ public void testConsumerHandlesCompletionAsynchronously2() {
await();
}

@Test
public void testConsumerHandlesEndAsynchronously1() {
MessageConsumer<Object> consumer = eb.consumer(ADDRESS1);
consumer.handler(msg -> {
});
ThreadLocal<Object> stack = new ThreadLocal<>();
stack.set(true);
consumer.endHandler(v -> {
assertNull(stack.get());
assertTrue(Vertx.currentContext().isEventLoopContext());
testComplete();
});
consumer.unregister();
await();
}

@Test
public void testConsumerHandlesEndAsynchronously2() {
eb.consumer(ADDRESS1).handler(v -> {});
MessageConsumer<Object> consumer = eb.consumer(ADDRESS1);
consumer.handler(msg -> {
});
ThreadLocal<Object> stack = new ThreadLocal<>();
stack.set(true);
consumer.endHandler(v -> {
assertNull(stack.get());
assertTrue(Vertx.currentContext().isEventLoopContext());
testComplete();
});
consumer.unregister();
await();
}

@Test
public void testConsumerHandlesEndAsynchronously3() {
MessageConsumer<Object> consumer = eb.consumer(ADDRESS1);
ThreadLocal<Object> stack = new ThreadLocal<>();
stack.set(true);
consumer.endHandler(v -> {
assertNull(stack.get());
assertTrue(Vertx.currentContext().isEventLoopContext());
testComplete();
});
consumer.unregister();
await();
}

@Test
public void testUpdateDeliveryOptionsOnProducer() {
MessageProducer<String> producer = eb.sender(ADDRESS1);
Expand Down

0 comments on commit f2a06ed

Please sign in to comment.