Skip to content

Commit

Permalink
Move the checkNextTick call at the bottom of the deliver method, it u…
Browse files Browse the repository at this point in the history
…sed to be previously at the top of the method because the deliver method was rethrowing the exception to signal the context of an unhandled exception. Instead now we report the exception to ContextInternal#reportException and we call heckNextTick after. The consumer might have been paused, so that will avoid to run an event loop task in this situation
  • Loading branch information
vietj committed Nov 19, 2018
1 parent 6ec09c5 commit 7b59ea2
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
Expand Up @@ -11,13 +11,15 @@

package io.vertx.core.eventbus.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
Expand Down Expand Up @@ -50,7 +52,7 @@ public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Messa
private long timeoutID = -1;
private HandlerHolder<T> registered;
private Handler<Message<T>> handler;
private Context handlerContext;
private ContextInternal handlerContext;
private AsyncResult<Void> result;
private Handler<AsyncResult<Void>> completionHandler;
private Handler<Void> endHandler;
Expand Down Expand Up @@ -157,7 +159,7 @@ private void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHan
}

synchronized void setHandlerContext(Context context) {
handlerContext = context;
handlerContext = (ContextInternal) context;
}

public synchronized void setResult(AsyncResult<Void> result) {
Expand All @@ -178,6 +180,7 @@ public synchronized void setResult(AsyncResult<Void> result) {
@Override
public void handle(Message<T> message) {
Handler<Message<T>> theHandler;
ContextInternal ctx;
synchronized (this) {
if (demand == 0L) {
if (pending.size() < maxBufferedMessages) {
Expand All @@ -197,14 +200,14 @@ public void handle(Message<T> message) {
}
theHandler = handler;
}
ctx = handlerContext;
}
deliver(theHandler, message);
deliver(theHandler, message, ctx);
}

private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
private void deliver(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
checkNextTick();
boolean local = true;
if (message instanceof ClusteredMessage) {
// A bit hacky
Expand All @@ -230,8 +233,9 @@ private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
if (metrics != null) {
metrics.endHandleMessage(metric, e);
}
throw e;
context.reportException(e);
}
checkNextTick();
}

private synchronized void checkNextTick() {
Expand All @@ -240,6 +244,7 @@ private synchronized void checkNextTick() {
handlerContext.runOnContext(v -> {
Message<T> message;
Handler<Message<T>> theHandler;
ContextInternal ctx;
synchronized (HandlerRegistration.this) {
if (demand == 0L || (message = pending.poll()) == null) {
return;
Expand All @@ -248,8 +253,9 @@ private synchronized void checkNextTick() {
demand--;
}
theHandler = handler;
ctx = handlerContext;
}
deliver(theHandler, message);
deliver(theHandler, message, ctx);
});
}
}
Expand Down

0 comments on commit 7b59ea2

Please sign in to comment.