Skip to content

Commit

Permalink
Event bus consumer racy cluster registration - fixes #2899
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 8, 2019
1 parent ed9db2e commit 6d33b93
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 16 deletions.
36 changes: 20 additions & 16 deletions src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
Expand Up @@ -131,15 +131,14 @@ public void unregister(Handler<AsyncResult<Void>> completionHandler) {
doUnregister(completionHandler); doUnregister(completionHandler);
} }


private void doUnregister(Handler<AsyncResult<Void>> completionHandler) { private void doUnregister(Handler<AsyncResult<Void>> doneHandler) {
Deque<Message<T>> discarded; Deque<Message<T>> discarded;
Handler<Message<T>> discardHandler; Handler<Message<T>> discardHandler;
synchronized (this) { synchronized (this) {
handler = null;
if (endHandler != null) { if (endHandler != null) {
Handler<Void> theEndHandler = endHandler; Handler<Void> theEndHandler = endHandler;
Handler<AsyncResult<Void>> handler = completionHandler; Handler<AsyncResult<Void>> handler = doneHandler;
completionHandler = ar -> { doneHandler = ar -> {
theEndHandler.handle(null); theEndHandler.handle(null);
if (handler != null) { if (handler != null) {
handler.handle(ar); handler.handle(ar);
Expand All @@ -155,10 +154,15 @@ private void doUnregister(Handler<AsyncResult<Void>> completionHandler) {
} }
discardHandler = this.discardHandler; discardHandler = this.discardHandler;
if (holder != null) { if (holder != null) {
handler = null;
registered = null; registered = null;
eventBus.removeRegistration(holder, completionHandler); eventBus.removeRegistration(holder, doneHandler);
} else { } else {
callCompletionHandlerAsync(completionHandler); callHandlerAsync(Future.succeededFuture(), doneHandler);
}
if (result == null) {
result = Future.failedFuture("Consumer unregistered before registration completed");
callHandlerAsync(result, completionHandler);
} }
} }
if (discardHandler != null && discarded != null) { if (discardHandler != null && discarded != null) {
Expand All @@ -169,9 +173,9 @@ private void doUnregister(Handler<AsyncResult<Void>> completionHandler) {
} }
} }


private void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHandler) { private void callHandlerAsync(AsyncResult<Void> result, Handler<AsyncResult<Void>> completionHandler) {
if (completionHandler != null) { if (completionHandler != null) {
vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture())); vertx.runOnContext(v -> completionHandler.handle(result));
} }
} }


Expand All @@ -180,17 +184,17 @@ synchronized void setHandlerContext(Context context) {
} }


public synchronized void setResult(AsyncResult<Void> result) { public synchronized void setResult(AsyncResult<Void> result) {
if (this.result != null) {
return;
}
this.result = result; this.result = result;
if (completionHandler != null) { if (result.failed()) {
if (metrics != null && result.succeeded()) { log.error("Failed to propagate registration for handler " + handler + " and address " + address);
} else {
if (metrics != null) {
metric = metrics.handlerRegistered(address, repliedAddress); metric = metrics.handlerRegistered(address, repliedAddress);
} }
Handler<AsyncResult<Void>> callback = completionHandler; callHandlerAsync(result, completionHandler);
vertx.runOnContext(v -> callback.handle(result));
} else if (result.failed()) {
log.error("Failed to propagate registration for handler " + handler + " and address " + address);
} else if (metrics != null) {
metric = metrics.handlerRegistered(address, repliedAddress);
} }
} }


Expand Down
18 changes: 18 additions & 0 deletions src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java
Expand Up @@ -332,4 +332,22 @@ public void testLocalOnlyDoesNotApplyToReplies() {
})); }));
await(); await();
} }

@Test
public void testImmediateUnregistration() {
startNodes(1);
MessageConsumer<Object> consumer = vertices[0].eventBus().consumer(ADDRESS1);
AtomicInteger completionCount = new AtomicInteger();
consumer.completionHandler(ar -> {
int val = completionCount.getAndIncrement();
assertEquals(0, val);
assertTrue(ar.failed());
vertx.setTimer(10, id -> {
testComplete();
});
});
consumer.handler(msg -> {});
consumer.unregister();
await();
}
} }
17 changes: 17 additions & 0 deletions src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java
Expand Up @@ -1399,5 +1399,22 @@ public void testUnregisterConsumerDiscardPendingMessages() {
eb.send(ADDRESS1, "val0"); eb.send(ADDRESS1, "val0");
await(); await();
} }

@Test
public void testImmediateUnregistration() {
MessageConsumer<Object> consumer = vertx.eventBus().consumer(ADDRESS1);
AtomicInteger completionCount = new AtomicInteger();
consumer.completionHandler(ar -> {
int val = completionCount.getAndIncrement();
assertEquals(0, val);
assertTrue(ar.succeeded());
vertx.setTimer(10, id -> {
testComplete();
});
});
consumer.handler(msg -> {});
consumer.unregister();
await();
}
} }


0 comments on commit 6d33b93

Please sign in to comment.