>> replyHandler);
-
/**
* Like {@link #send(String, Object)} but specifying {@code options} that can be used to configure the delivery.
*
@@ -236,7 +235,7 @@ public interface EventBus extends Measured {
* Unregister a default message codec.
*
* @param clazz the class for which the codec was registered
- * @return @return a reference to this, so the API can be used fluently
+ * @return a reference to this, so the API can be used fluently
*/
@GenIgnore
EventBus unregisterDefaultCodec(Class clazz);
@@ -244,7 +243,7 @@ public interface EventBus extends Measured {
/**
* Start the event bus. This would not normally be called in user code
*
- * @param completionHandler
+ * @param completionHandler handler will be called when event bus is started
*/
@GenIgnore
void start(Handler> completionHandler);
@@ -257,6 +256,21 @@ public interface EventBus extends Measured {
@GenIgnore
void close(Handler> completionHandler);
+ /**
+ * Add an interceptor that will be called whenever a message is sent from Vert.x
+ *
+ * @param interceptor the interceptor
+ * @return a reference to this, so the API can be used fluently
+ */
+ EventBus addInterceptor(Handler interceptor);
+
+ /**
+ * Remove an interceptor
+ *
+ * @param interceptor the interceptor
+ * @return a reference to this, so the API can be used fluently
+ */
+ EventBus removeInterceptor(Handler interceptor);
}
diff --git a/src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java b/src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java
new file mode 100644
index 00000000000..8a9b7ac3737
--- /dev/null
+++ b/src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java
@@ -0,0 +1,29 @@
+package io.vertx.core.eventbus;
+
+import io.vertx.core.Handler;
+
+/**
+ * @author Tim Fox
+ */
+public abstract class FilteringInterceptor implements Handler {
+
+ private final String startsWith;
+
+ public FilteringInterceptor(String startsWith) {
+ this.startsWith = startsWith;
+ }
+
+ // TODO regex
+
+ @Override
+ public void handle(SendContext sendContext) {
+ if (sendContext.message().address().startsWith(startsWith)) {
+ handleContext(sendContext);
+ } else {
+ sendContext.next();
+ }
+ }
+
+ protected abstract void handleContext(SendContext sendContext);
+
+}
diff --git a/src/main/java/io/vertx/core/eventbus/MessageProducer.java b/src/main/java/io/vertx/core/eventbus/MessageProducer.java
index 8a5081f4de9..bf0d1b464dd 100644
--- a/src/main/java/io/vertx/core/eventbus/MessageProducer.java
+++ b/src/main/java/io/vertx/core/eventbus/MessageProducer.java
@@ -30,6 +30,16 @@
@VertxGen
public interface MessageProducer extends WriteStream {
+ int DEFAULT_WRITE_QUEUE_MAX_SIZE = 1000;
+
+ /**
+ * Synonym for {@link #write(Object)}.
+ *
+ * @param message the message to send
+ * @return reference to this for fluency
+ */
+ MessageProducer send(T message);
+
@Override
MessageProducer exceptionHandler(Handler handler);
@@ -56,4 +66,5 @@ public interface MessageProducer extends WriteStream {
*/
String address();
+ void close();
}
diff --git a/src/main/java/io/vertx/core/eventbus/SendContext.java b/src/main/java/io/vertx/core/eventbus/SendContext.java
new file mode 100644
index 00000000000..7032c94c089
--- /dev/null
+++ b/src/main/java/io/vertx/core/eventbus/SendContext.java
@@ -0,0 +1,29 @@
+package io.vertx.core.eventbus;
+
+import io.vertx.codegen.annotations.VertxGen;
+
+/**
+ *
+ * Encapsulates a message being sent from Vert.x. Used with event bus interceptors
+ *
+ * @author Tim Fox
+ */
+@VertxGen
+public interface SendContext {
+
+ /**
+ * @return The message being sent
+ */
+ Message message();
+
+ /**
+ * Call the next interceptor
+ */
+ void next();
+
+ /**
+ *
+ * @return true if the message is being sent (point to point) or False if the message is being published
+ */
+ boolean send();
+}
diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java
deleted file mode 100644
index 50f96bd39cb..00000000000
--- a/src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package io.vertx.core.eventbus.impl;
-
-import io.vertx.core.VertxOptions;
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.spi.EventBusFactory;
-import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
-import io.vertx.core.eventbus.impl.local.LocalEventBus;
-import io.vertx.core.impl.HAManager;
-import io.vertx.core.impl.VertxInternal;
-import io.vertx.core.spi.cluster.ClusterManager;
-
-/**
- * @author Tim Fox
- */
-public class EventBusFactoryImpl implements EventBusFactory {
-
- @Override
- public EventBus createEventBus(VertxInternal vertx, VertxOptions options, ClusterManager clusterManager, HAManager haManager) {
- EventBus eb;
- if (options.isClustered()) {
- eb = new ClusteredEventBus(vertx, options, clusterManager, haManager);
- } else {
- eb = new LocalEventBus(vertx);
- }
- return eb;
- }
-}
diff --git a/src/main/java/io/vertx/core/eventbus/impl/local/LocalEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
similarity index 64%
rename from src/main/java/io/vertx/core/eventbus/impl/local/LocalEventBus.java
rename to src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
index 06c7f65e430..ab98bcf2255 100644
--- a/src/main/java/io/vertx/core/eventbus/impl/local/LocalEventBus.java
+++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
@@ -14,30 +14,34 @@
* You may elect to redistribute this code under either of these licenses.
*/
-package io.vertx.core.eventbus.impl.local;
+package io.vertx.core.eventbus.impl;
import io.vertx.core.*;
import io.vertx.core.eventbus.*;
-import io.vertx.core.eventbus.impl.*;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
+import java.util.Iterator;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
/**
+ * A local event bus implementation
*
* @author Tim Fox T
*/
-public class LocalEventBus implements EventBus, MetricsProvider {
+public class EventBusImpl implements EventBus, MetricsProvider {
- private static final Logger log = LoggerFactory.getLogger(LocalEventBus.class);
+ private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
+ private final List> interceptors = new CopyOnWriteArrayList<>();
private final AtomicLong replySequence = new AtomicLong(0);
protected final VertxInternal vertx;
protected final EventBusMetrics metrics;
@@ -45,11 +49,23 @@ public class LocalEventBus implements EventBus, MetricsProvider {
protected final CodecManager codecManager = new CodecManager();
protected volatile boolean started;
- public LocalEventBus(VertxInternal vertx) {
+ public EventBusImpl(VertxInternal vertx) {
this.vertx = vertx;
this.metrics = vertx.metricsSPI().createMetrics(this);
}
+ @Override
+ public EventBus addInterceptor(Handler interceptor) {
+ interceptors.add(interceptor);
+ return this;
+ }
+
+ @Override
+ public EventBus removeInterceptor(Handler interceptor) {
+ interceptors.remove(interceptor);
+ return this;
+ }
+
public synchronized void start(Handler> completionHandler) {
if (started) {
throw new IllegalStateException("Already started");
@@ -75,34 +91,34 @@ public EventBus send(String address, Object message, DeliveryOptions options) {
@Override
public EventBus send(String address, Object message, DeliveryOptions options, Handler>> replyHandler) {
- sendOrPub(createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
+ sendOrPubInternal(createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
return this;
}
@Override
public MessageProducer sender(String address) {
Objects.requireNonNull(address, "address");
- return new MessageProducerImpl<>(this, address, true, new DeliveryOptions());
+ return new MessageProducerImpl<>(vertx, address, true, new DeliveryOptions());
}
@Override
public MessageProducer sender(String address, DeliveryOptions options) {
Objects.requireNonNull(address, "address");
Objects.requireNonNull(options, "options");
- return new MessageProducerImpl<>(this, address, true, options);
+ return new MessageProducerImpl<>(vertx, address, true, options);
}
@Override
public MessageProducer publisher(String address) {
Objects.requireNonNull(address, "address");
- return new MessageProducerImpl<>(this, address, false, new DeliveryOptions());
+ return new MessageProducerImpl<>(vertx, address, false, new DeliveryOptions());
}
@Override
public MessageProducer publisher(String address, DeliveryOptions options) {
Objects.requireNonNull(address, "address");
Objects.requireNonNull(options, "options");
- return new MessageProducerImpl<>(this, address, false, options);
+ return new MessageProducerImpl<>(vertx, address, false, options);
}
@Override
@@ -112,7 +128,7 @@ public EventBus publish(String address, Object message) {
@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
- sendOrPub(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
+ sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
return this;
}
@@ -192,40 +208,29 @@ public EventBusMetrics> getMetrics() {
return metrics;
}
- public LocalMessage createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
+ protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
- LocalMessage msg = new LocalMessage(address, null, headers, body, codec, send);
+ MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send);
return msg;
}
- public void addRegistration(String address, HandlerRegistration registration,
- boolean replyHandler, boolean localOnly) {
+ protected void addRegistration(String address, HandlerRegistration registration,
+ boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler");
- doAddRegistration(address, registration, replyHandler, localOnly);
- registration.setResult(Future.succeededFuture());
+ boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
+ addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}
- protected HandlerRegistration createReplyHandlerRegistration(LocalMessage message,
- DeliveryOptions options,
- Handler>> replyHandler) {
- if (replyHandler != null) {
- long timeout = options.getSendTimeout();
- String replyAddress = generateReplyAddress();
- message.setReplyAddress(replyAddress);
- Handler> simpleReplyHandler = convertHandler(replyHandler);
- HandlerRegistration registration =
- new HandlerRegistration<>(vertx, metrics, this, replyAddress, true, true, replyHandler, timeout);
- registration.handler(simpleReplyHandler);
- return registration;
- } else {
- return null;
- }
+ protected void addRegistration(boolean newAddress, String address,
+ boolean replyHandler, boolean localOnly,
+ Handler> completionHandler) {
+ completionHandler.handle(Future.succeededFuture());
}
- protected boolean doAddRegistration(String address, HandlerRegistration registration,
- boolean replyHandler, boolean localOnly) {
+ protected boolean addLocalRegistration(String address, HandlerRegistration registration,
+ boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");
Context context = Vertx.currentContext();
@@ -258,12 +263,17 @@ protected boolean doAddRegistration(String address, HandlerRegistration r
return newAddress;
}
- public void removeRegistration(String address, HandlerRegistration handler, Handler> completionHandler) {
- removeRegistration(address, handler);
+ protected void removeRegistration(String address, HandlerRegistration handler, Handler> completionHandler) {
+ HandlerHolder holder = removeLocalRegistration(address, handler);
+ removeRegistration(holder, address, completionHandler);
+ }
+
+ protected void removeRegistration(HandlerHolder handlerHolder, String address,
+ Handler> completionHandler) {
callCompletionHandlerAsync(completionHandler);
}
- protected HandlerHolder removeRegistration(String address, HandlerRegistration handler) {
+ protected HandlerHolder removeLocalRegistration(String address, HandlerRegistration handler) {
Handlers handlers = handlerMap.get(address);
HandlerHolder lastHolder = null;
if (handlers != null) {
@@ -289,21 +299,24 @@ protected HandlerHolder removeRegistration(String address, HandlerRegistrati
return lastHolder;
}
- protected void sendReply(LocalMessage message, DeliveryOptions options,
+ protected void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options,
Handler>> replyHandler) {
- if (message.address() == null) {
+ if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
- sendOrPub(message, options, replyHandler);
+ HandlerRegistration replyHandlerRegistration = createReplyHandlerRegistration(replyMessage, options, replyHandler);
+ new ReplySendContextImpl<>(replyMessage, options, replyHandlerRegistration, replierMessage).next();
}
}
- protected void sendOrPub(LocalMessage message, DeliveryOptions options,
- Handler>> replyHandler) {
- checkStarted();
- HandlerRegistration registration = createReplyHandlerRegistration(message, options, replyHandler);
+ protected void sendReply(SendContextImpl sendContext, MessageImpl replierMessage) {
+ sendOrPub(sendContext);
+ }
+
+ protected void sendOrPub(SendContextImpl sendContext) {
+ MessageImpl message = sendContext.message;
metrics.messageSent(message.address(), !message.send(), true, false);
- serveMessage(message, registration, true);
+ deliverMessageLocally(sendContext);
}
protected Handler> convertHandler(Handler>> handler) {
@@ -327,16 +340,21 @@ protected void callCompletionHandlerAsync(Handler> completionH
}
}
- protected void serveMessage(LocalMessage msg, HandlerRegistration handlerRegistration, boolean local) {
- if (!serveMessage(msg, local)) {
+ protected void deliverMessageLocally(SendContextImpl sendContext) {
+ if (!deliverMessageLocally(sendContext.message)) {
// no handlers
- if (handlerRegistration != null) {
- handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
+ if (sendContext.handlerRegistration != null) {
+ sendContext.handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address "
+ + sendContext.message.address);
}
}
}
- protected boolean serveMessage(LocalMessage msg, boolean local) {
+ protected boolean isMessageLocal(MessageImpl msg) {
+ return true;
+ }
+
+ protected boolean deliverMessageLocally(MessageImpl msg) {
msg.setBus(this);
Handlers handlers = handlerMap.get(msg.address());
if (handlers != null) {
@@ -344,19 +362,19 @@ protected boolean serveMessage(LocalMessage msg, boolean local) {
//Choose one
HandlerHolder holder = handlers.choose();
if (holder != null) {
- metrics.messageReceived(msg.address(), !msg.send(), local, 1);
- doServe(msg, holder);
+ metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), 1);
+ deliverToHandler(msg, holder);
}
} else {
// Publish
- metrics.messageReceived(msg.address(), !msg.send(), local, handlers.list.size());
+ metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), handlers.list.size());
for (HandlerHolder holder: handlers.list) {
- doServe(msg, holder);
+ deliverToHandler(msg, holder);
}
}
return true;
} else {
- metrics.messageReceived(msg.address(), !msg.send(), local, 0);
+ metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), 0);
return false;
}
}
@@ -371,6 +389,92 @@ protected String generateReplyAddress() {
return Long.toString(replySequence.incrementAndGet());
}
+ private HandlerRegistration createReplyHandlerRegistration(MessageImpl message,
+ DeliveryOptions options,
+ Handler>> replyHandler) {
+ if (replyHandler != null) {
+ long timeout = options.getSendTimeout();
+ String replyAddress = generateReplyAddress();
+ message.setReplyAddress(replyAddress);
+ Handler> simpleReplyHandler = convertHandler(replyHandler);
+ HandlerRegistration registration =
+ new HandlerRegistration<>(vertx, metrics, this, replyAddress, true, true, replyHandler, timeout);
+ registration.handler(simpleReplyHandler);
+ return registration;
+ } else {
+ return null;
+ }
+ }
+
+ private void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
+ Handler>> replyHandler) {
+ checkStarted();
+ HandlerRegistration replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler);
+ SendContextImpl sendContext = new SendContextImpl<>(message, options, replyHandlerRegistration);
+ sendContext.next();
+ }
+
+ protected class SendContextImpl implements SendContext {
+
+ public final MessageImpl message;
+ public final DeliveryOptions options;
+ public final HandlerRegistration handlerRegistration;
+ public final Iterator> iter;
+
+ public SendContextImpl(MessageImpl message, DeliveryOptions options, HandlerRegistration handlerRegistration) {
+ this.message = message;
+ this.options = options;
+ this.handlerRegistration = handlerRegistration;
+ this.iter = interceptors.iterator();
+ }
+
+ @Override
+ public Message message() {
+ return message;
+ }
+
+ @Override
+ public void next() {
+ if (iter.hasNext()) {
+ Handler handler = iter.next();
+ try {
+ handler.handle(this);
+ } catch (Throwable t) {
+ log.error("Failure in interceptor", t);
+ }
+ } else {
+ sendOrPub(this);
+ }
+ }
+
+ @Override
+ public boolean send() {
+ return message.send();
+ }
+ }
+
+ protected class ReplySendContextImpl extends SendContextImpl {
+
+ private final MessageImpl replierMessage;
+
+ public ReplySendContextImpl(MessageImpl message, DeliveryOptions options, HandlerRegistration handlerRegistration,
+ MessageImpl replierMessage) {
+ super(message, options, handlerRegistration);
+ this.replierMessage = replierMessage;
+ }
+
+ @Override
+ public void next() {
+ if (iter.hasNext()) {
+ Handler handler = iter.next();
+ handler.handle(this);
+ } else {
+ sendReply(this, replierMessage);
+ }
+ }
+ }
+
+
private void unregisterAll() {
// Unregister all handlers explicitly - don't rely on context hooks
for (Handlers handlers: handlerMap.values()) {
@@ -380,7 +484,7 @@ private void unregisterAll() {
}
}
- private void doServe(LocalMessage msg, HandlerHolder holder) {
+ private void deliverToHandler(MessageImpl msg, HandlerHolder holder) {
// Each handler gets a fresh copy
@SuppressWarnings("unchecked")
Message copied = msg.copyBeforeReceive();
diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
index 0b3b8bb341e..54c2387518e 100644
--- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
+++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
@@ -6,7 +6,6 @@
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
-import io.vertx.core.eventbus.impl.local.LocalEventBus;
import io.vertx.core.impl.Arguments;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
@@ -28,9 +27,11 @@ public class HandlerRegistration implements MessageConsumer, Handler implements MessageConsumer, Handler> completionHandler;
private Handler endHandler;
private Handler> discardHandler;
- private int maxBufferedMessages;
+ private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
private final Queue> pending = new ArrayDeque<>(8);
private boolean paused;
private Object metric;
- public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, LocalEventBus eventBus, String address,
+ public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address,
boolean replyHandler, boolean localOnly,
Handler>> asyncResultHandler, long timeout) {
this.vertx = vertx;
@@ -170,6 +171,8 @@ public void handle(Message message) {
} else {
if (discardHandler != null) {
discardHandler.handle(message);
+ } else {
+ log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer");
}
}
} else {
@@ -191,6 +194,10 @@ public void handle(Message message) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
if (theHandler != null) {
+ String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
+ if (creditsAddress != null) {
+ eventBus.send(creditsAddress, 1);
+ }
handleMessage(theHandler, message);
}
}
diff --git a/src/main/java/io/vertx/core/eventbus/impl/local/LocalMessage.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java
similarity index 80%
rename from src/main/java/io/vertx/core/eventbus/impl/local/LocalMessage.java
rename to src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java
index 10b2e08aa41..fc1f7b5f6dd 100644
--- a/src/main/java/io/vertx/core/eventbus/impl/local/LocalMessage.java
+++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java
@@ -14,7 +14,7 @@
* You may elect to redistribute this code under either of these licenses.
*/
-package io.vertx.core.eventbus.impl.local;
+package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
@@ -30,12 +30,12 @@
/**
* @author Tim Fox
*/
-public class LocalMessage implements Message {
+public class MessageImpl implements Message {
- private static final Logger log = LoggerFactory.getLogger(LocalMessage.class);
+ private static final Logger log = LoggerFactory.getLogger(MessageImpl.class);
protected MessageCodec messageCodec;
- protected LocalEventBus bus;
+ protected EventBusImpl bus;
protected String address;
protected String replyAddress;
protected MultiMap headers;
@@ -43,12 +43,12 @@ public class LocalMessage implements Message {
protected V receivedBody;
protected boolean send;
- public LocalMessage() {
+ public MessageImpl() {
}
- public LocalMessage(String address, String replyAddress, MultiMap headers, U sentBody,
- MessageCodec messageCodec,
- boolean send) {
+ public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
+ MessageCodec messageCodec,
+ boolean send) {
this.messageCodec = messageCodec;
this.address = address;
this.replyAddress = replyAddress;
@@ -57,7 +57,7 @@ public LocalMessage(String address, String replyAddress, MultiMap headers, U sen
this.send = send;
}
- protected LocalMessage(LocalMessage other) {
+ protected MessageImpl(MessageImpl other) {
this.bus = other.bus;
this.address = other.address;
this.replyAddress = other.replyAddress;
@@ -76,8 +76,8 @@ protected LocalMessage(LocalMessage other) {
this.send = other.send;
}
- public LocalMessage copyBeforeReceive() {
- return new LocalMessage<>(this);
+ public MessageImpl copyBeforeReceive() {
+ return new MessageImpl<>(this);
}
@Override
@@ -96,6 +96,9 @@ public MultiMap headers() {
@Override
public V body() {
+ if (receivedBody == null && sentBody != null) {
+ receivedBody = messageCodec.transform(sentBody);
+ }
return receivedBody;
}
@@ -146,15 +149,14 @@ public MessageCodec codec() {
return messageCodec;
}
- public void setBus(LocalEventBus bus) {
+ public void setBus(EventBusImpl bus) {
this.bus = bus;
}
- protected void sendReply(LocalMessage msg, DeliveryOptions options, Handler>> replyHandler) {
+ protected void sendReply(MessageImpl msg, DeliveryOptions options, Handler>> replyHandler) {
if (bus != null) {
- bus.sendReply(msg, options, replyHandler);
+ bus.sendReply(msg, this, options, replyHandler);
}
}
-
}
diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java
index deb87b64ae0..ab7e0ba496e 100644
--- a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java
+++ b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java
@@ -17,26 +17,48 @@
package io.vertx.core.eventbus.impl;
import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
-import io.vertx.core.streams.WriteStream;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
/**
* @author Julien Viet
*/
public class MessageProducerImpl implements MessageProducer {
+ public static final String CREDIT_ADDRESS_HEADER_NAME = "__vertx.credit";
+
+ private final Vertx vertx;
private final EventBus bus;
private final boolean send;
private final String address;
+ private final Queue pending = new ArrayDeque<>();
+ private final MessageConsumer creditConsumer;
private DeliveryOptions options;
+ private int credits = DEFAULT_WRITE_QUEUE_MAX_SIZE;
+ private Handler drainHandler;
- public MessageProducerImpl(EventBus bus, String address, boolean send, DeliveryOptions options) {
- this.bus = bus;
+ public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
+ this.vertx = vertx;
+ this.bus = vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
+ if (send) {
+ String creditAddress = UUID.randomUUID().toString() + "-credit";
+ creditConsumer = bus.consumer(creditAddress, msg -> {
+ doReceiveCredit(msg.body());
+ });
+ options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress);
+ } else {
+ creditConsumer = null;
+ }
}
@Override
@@ -45,20 +67,27 @@ public synchronized MessageProducer deliveryOptions(DeliveryOptions options)
return this;
}
+ @Override
+ public MessageProducer send(T message) {
+ write(message);
+ return this;
+ }
+
@Override
public MessageProducer exceptionHandler(Handler handler) {
return this;
}
@Override
- public MessageProducer setWriteQueueMaxSize(int maxSize) {
+ public synchronized MessageProducer setWriteQueueMaxSize(int maxSize) {
+ this.credits = maxSize;
return this;
}
@Override
public synchronized MessageProducer write(T data) {
if (send) {
- bus.send(address, data, options);
+ doSend(data);
} else {
bus.publish(address, data, options);
}
@@ -67,11 +96,12 @@ public synchronized MessageProducer write(T data) {
@Override
public boolean writeQueueFull() {
- return false;
+ return pending.size() >= 0;
}
@Override
- public MessageProducer drainHandler(Handler handler) {
+ public synchronized MessageProducer drainHandler(Handler handler) {
+ this.drainHandler = handler;
return this;
}
@@ -80,4 +110,45 @@ public String address() {
return address;
}
+ @Override
+ public void close() {
+ if (creditConsumer != null) {
+ creditConsumer.unregister();
+ }
+ }
+
+ // Just in case user forget to call close()
+ @Override
+ protected void finalize() throws Throwable {
+ close();
+ super.finalize();
+ }
+
+ private synchronized void doSend(T data) {
+ if (credits > 0) {
+ credits--;
+ bus.send(address, data, options);
+ } else {
+ pending.add(data);
+ }
+ }
+
+ private synchronized void doReceiveCredit(int credit) {
+ credits += credit;
+ while (credits > 0) {
+ T data = pending.poll();
+ if (data == null) {
+ break;
+ } else {
+ credits--;
+ bus.send(address, data, options);
+ }
+ }
+ final Handler theDrainHandler = drainHandler;
+ if (theDrainHandler != null && pending.isEmpty()) {
+ this.drainHandler = null;
+ vertx.runOnContext(v -> theDrainHandler.handle(null));
+ }
+ }
+
}
diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java
index 6f61dcbe3f7..78e12ffef87 100644
--- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java
+++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java
@@ -18,14 +18,8 @@
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
-import io.vertx.core.eventbus.DeliveryOptions;
-import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
-import io.vertx.core.eventbus.impl.CodecManager;
-import io.vertx.core.eventbus.impl.HandlerHolder;
-import io.vertx.core.eventbus.impl.HandlerRegistration;
-import io.vertx.core.eventbus.impl.local.LocalEventBus;
-import io.vertx.core.eventbus.impl.local.LocalMessage;
+import io.vertx.core.eventbus.impl.*;
import io.vertx.core.impl.HAManager;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
@@ -46,11 +40,11 @@
import java.util.concurrent.ConcurrentMap;
/**
- * This class is thread-safe
+ * An event bus implemmentation that clusters with other Vert.x nodes
*
* @author Tim Fox 7 T
*/
-public class ClusteredEventBus extends LocalEventBus {
+public class ClusteredEventBus extends EventBusImpl {
private static final Logger log = LoggerFactory.getLogger(ClusteredEventBus.class);
@@ -143,7 +137,7 @@ public void close(Handler> completionHandler) {
}
@Override
- public LocalMessage createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
+ protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
@@ -152,20 +146,20 @@ public LocalMessage createMessage(boolean send, String address, MultiMap headers
}
@Override
- public void addRegistration(String address, HandlerRegistration registration,
- boolean replyHandler, boolean localOnly) {
- boolean newAddress = doAddRegistration(address, registration, replyHandler, localOnly);
+ protected void addRegistration(boolean newAddress, String address,
+ boolean replyHandler, boolean localOnly,
+ Handler> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
// Propagate the information
- subs.add(address, serverID, registration::setResult);
+ subs.add(address, serverID, completionHandler);
} else {
- registration.setResult(Future.succeededFuture());
+ completionHandler.handle(Future.succeededFuture());
}
}
@Override
- public void removeRegistration(String address, HandlerRegistration handler, Handler> completionHandler) {
- HandlerHolder lastHolder = removeRegistration(address, handler);;
+ protected void removeRegistration(HandlerHolder lastHolder, String address,
+ Handler> completionHandler) {
if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) {
removeSub(address, serverID, completionHandler);
} else {
@@ -173,27 +167,22 @@ public void removeRegistration(String address, HandlerRegistration handle
}
}
- protected void sendReply(ServerID replyDest, LocalMessage message, DeliveryOptions options, Handler>> replyHandler) {
- if (message.address() == null) {
- throw new IllegalStateException("address not specified");
- } else {
- sendReply(replyDest, (ClusteredMessage)message, options, replyHandler);
- }
+ @Override
+ protected void sendReply(SendContextImpl sendContext, MessageImpl replierMessage) {
+ clusteredSendReply(((ClusteredMessage) replierMessage).getSender(), sendContext);
}
@Override
- protected void sendOrPub(LocalMessage message, DeliveryOptions options,
- Handler>> replyHandler) {
- checkStarted();
- final HandlerRegistration registration = createReplyHandlerRegistration(message, options, replyHandler);
+ protected void sendOrPub(SendContextImpl sendContext) {
+ String address = sendContext.message.address();
Handler>> resultHandler = asyncResult -> {
if (asyncResult.succeeded()) {
ChoosableIterable serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
- sendToSubs(serverIDs, message, registration);
+ sendToSubs(serverIDs, sendContext);
} else {
- metrics.messageSent(message.address(), !message.send(), true, false);
- serveMessage(message, registration, true);
+ metrics.messageSent(address, !sendContext.message.send(), true, false);
+ deliverMessageLocally(sendContext);
}
} else {
log.error("Failed to send message", asyncResult.cause());
@@ -202,10 +191,10 @@ protected void sendOrPub(LocalMessage message, DeliveryOptions options,
if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
- subs.get(message.address(), resultHandler);
+ subs.get(address, resultHandler);
});
} else {
- subs.get(message.address(), resultHandler);
+ subs.get(address, resultHandler);
}
}
@@ -215,6 +204,12 @@ protected String generateReplyAddress() {
return UUID.randomUUID().toString();
}
+ @Override
+ protected boolean isMessageLocal(MessageImpl msg) {
+ ClusteredMessage clusteredMessage = (ClusteredMessage)msg;
+ return !clusteredMessage.isFromWire();
+ }
+
private void setNodeCrashedHandler(HAManager haManager) {
haManager.setNodeCrashedHandler((failedNodeID, haInfo, failed) -> {
JsonObject jsid = haInfo.getJsonObject(SERVER_ID_HA_KEY);
@@ -266,7 +261,7 @@ public void handle(Buffer buff) {
// Just send back pong directly on connection
socket.write(PONG);
} else {
- serveMessage(received, false);
+ deliverMessageLocally(received);
}
}
}
@@ -276,18 +271,17 @@ public void handle(Buffer buff) {
};
}
- private void sendToSubs(ChoosableIterable subs, LocalMessage message,
- HandlerRegistration handlerRegistration) {
- String address = message.address();
- if (message.send()) {
+ private void sendToSubs(ChoosableIterable subs, SendContextImpl sendContext) {
+ String address = sendContext.message.address();
+ if (sendContext.message.send()) {
// Choose one
ServerID sid = subs.choose();
if (!sid.equals(serverID)) { //We don't send to this node
metrics.messageSent(address, false, false, true);
- sendRemote(sid, message);
+ sendRemote(sid, sendContext.message);
} else {
metrics.messageSent(address, false, true, false);
- serveMessage(message, handlerRegistration, true);
+ deliverMessageLocally(sendContext);
}
} else {
// Publish
@@ -296,35 +290,31 @@ private void sendToSubs(ChoosableIterable subs, LocalMessage messa
for (ServerID sid : subs) {
if (!sid.equals(serverID)) { //We don't send to this node
remote = true;
- sendRemote(sid, message);
+ sendRemote(sid, sendContext.message);
} else {
local = true;
}
}
metrics.messageSent(address, true, local, remote);
if (local) {
- serveMessage(message, handlerRegistration, true);
+ deliverMessageLocally(sendContext);
}
}
}
- private void sendReply(ServerID replyDest, ClusteredMessage message, DeliveryOptions options,
- Handler>> replyHandler) {
- HandlerRegistration registration = null;
- if (replyHandler != null) {
- registration = createReplyHandlerRegistration(message, options, replyHandler);
- }
+ private void clusteredSendReply(ServerID replyDest, SendContextImpl sendContext) {
+ MessageImpl message = sendContext.message;
String address = message.address();
if (!replyDest.equals(serverID)) {
metrics.messageSent(address, false, false, true);
sendRemote(replyDest, message);
} else {
metrics.messageSent(address, false, true, false);
- serveMessage(message, registration, true);
+ deliverMessageLocally(sendContext);
}
}
- private void sendRemote(ServerID theServerID, LocalMessage message) {
+ private void sendRemote(ServerID theServerID, MessageImpl message) {
// We need to deal with the fact that connecting can take some time and is async, and we cannot
// block to wait for it. So we add any sends to a pending list if not connected yet.
// Once we connect we send them.
@@ -377,6 +367,5 @@ VertxOptions options() {
return options;
}
-
}
diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java
index 6036bfda160..549122e08e6 100644
--- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java
+++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java
@@ -17,15 +17,11 @@
package io.vertx.core.eventbus.impl.clustered;
import io.netty.util.CharsetUtil;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
-import io.vertx.core.eventbus.DeliveryOptions;
-import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.impl.CodecManager;
-import io.vertx.core.eventbus.impl.local.LocalMessage;
+import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
@@ -37,7 +33,7 @@
/**
* @author Tim Fox
*/
-public class ClusteredMessage extends LocalMessage {
+public class ClusteredMessage extends MessageImpl {
private static final Logger log = LoggerFactory.getLogger(ClusteredMessage.class);
@@ -241,11 +237,8 @@ private void writeString(Buffer buff, String str) {
buff.appendBytes(strBytes);
}
- @Override
- protected void sendReply(LocalMessage msg, DeliveryOptions options, Handler>> replyHandler) {
- if (bus != null & replyAddress() != null) {
- ((ClusteredEventBus)bus).sendReply(sender, msg, options, replyHandler);
- }
+ ServerID getSender() {
+ return sender;
}
public boolean isFromWire() {
diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java
index 907a1cb9a27..baa6bf94c05 100644
--- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java
+++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java
@@ -106,7 +106,6 @@ private void schedulePing() {
});
}
-
private synchronized void connected(NetSocket socket) {
this.socket = socket;
connected = true;
diff --git a/src/main/java/io/vertx/core/eventbus/package-info.java b/src/main/java/io/vertx/core/eventbus/package-info.java
index 0157003b48a..9bc3a49393a 100644
--- a/src/main/java/io/vertx/core/eventbus/package-info.java
+++ b/src/main/java/io/vertx/core/eventbus/package-info.java
@@ -191,14 +191,14 @@
*
* The headers of the message are available with {@link io.vertx.core.eventbus.Message#headers}.
*
- * ==== Replying to messages
+ * ==== Acknowledging messages / sending replies
*
- * Sometimes after you send a message you want to receive a reply from the recipient.
- * This is known as the *request-response pattern*.
+ * When using {@link io.vertx.core.eventbus.EventBus#send} the event bus attempts to deliver the message to a
+ * {@link io.vertx.core.eventbus.MessageConsumer} registered with the event bus.
*
- * To do this you can specify a reply handler when sending the message.
+ * In some cases it's useful for the sender to know when the consumer has received the message and "processed" it.
*
- * When the receiver receives the message they can reply to it by calling {@link io.vertx.core.eventbus.Message#reply}.
+ * To acknowledge that the message has been processed the consumer can reply to the message by calling {@link io.vertx.core.eventbus.Message#reply}.
*
* When this happens it causes a reply to be sent back to the sender and the reply handler is invoked with the reply.
*
@@ -218,8 +218,19 @@
* {@link examples.EventBusExamples#example9}
* ----
*
- * The replies themselves can also be replied to so you can create a dialog between two different parties
- * consisting of multiple rounds.
+ * The reply can contain a message body which can contain useful information.
+ *
+ * What the "processing" actually means is application defined and depends entirely on what the message consumer does
+ * and is not something that the Vert.x event bus itself knows or cares about.
+ *
+ * Some examples:
+ *
+ * * A simple message consumer which implements a service which returns the time of the day would acknowledge with a message
+ * containing the time of day in the reply body
+ * * A message consumer which implements a persistent queue, might acknowledge with `true` if the message was successfully
+ * persisted in storage, or `false` if not.
+ * * A message consumer which processes an order might acknowledge with `true` when the order has been successfully processed
+ * so it can be deleted from the database
*
* ==== Sending with timeouts
*
diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java
index b3cb16e992d..0126ec01d24 100644
--- a/src/main/java/io/vertx/core/impl/VertxImpl.java
+++ b/src/main/java/io/vertx/core/impl/VertxImpl.java
@@ -29,6 +29,8 @@
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.impl.DnsClientImpl;
import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.impl.EventBusImpl;
+import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
@@ -51,7 +53,6 @@
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.shareddata.impl.SharedDataImpl;
-import io.vertx.core.spi.EventBusFactory;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.cluster.ClusterManager;
@@ -75,8 +76,6 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);
- private static final EventBusFactory eventBusFactory = ServiceHelper.loadFactory(EventBusFactory.class);
-
static {
// Netty resource leak detection has a performance overhead and we do not need it in Vert.x
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
@@ -163,7 +162,11 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
}
private void createAndStartEventBus(VertxOptions options, Handler> resultHandler) {
- eventBus = eventBusFactory.createEventBus(this, options, clusterManager, haManager);
+ if (options.isClustered()) {
+ eventBus = new ClusteredEventBus(this, options, clusterManager, haManager);
+ } else {
+ eventBus = new EventBusImpl(this);
+ }
eventBus.start(ar2 -> {
if (ar2.succeeded()) {
if (resultHandler != null) {
diff --git a/src/main/java/io/vertx/core/spi/EventBusFactory.java b/src/main/java/io/vertx/core/spi/EventBusFactory.java
deleted file mode 100644
index 863ea337ed6..00000000000
--- a/src/main/java/io/vertx/core/spi/EventBusFactory.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package io.vertx.core.spi;
-
-import io.vertx.core.VertxOptions;
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.impl.HAManager;
-import io.vertx.core.impl.VertxInternal;
-import io.vertx.core.spi.cluster.ClusterManager;
-
-/**
- * @author Tim Fox
- */
-public interface EventBusFactory {
-
- EventBus createEventBus(VertxInternal vertx, VertxOptions options, ClusterManager clusterManager, HAManager haManager);
-
-}
diff --git a/src/main/resources/META-INF/services/io.vertx.core.spi.EventBusFactory b/src/main/resources/META-INF/services/io.vertx.core.spi.EventBusFactory
deleted file mode 100644
index 4cd5d48c79f..00000000000
--- a/src/main/resources/META-INF/services/io.vertx.core.spi.EventBusFactory
+++ /dev/null
@@ -1 +0,0 @@
-io.vertx.core.eventbus.impl.EventBusFactoryImpl
\ No newline at end of file
diff --git a/src/test/java/io/vertx/test/core/EventBusFlowControlTest.java b/src/test/java/io/vertx/test/core/EventBusFlowControlTest.java
new file mode 100644
index 00000000000..094288a04dc
--- /dev/null
+++ b/src/test/java/io/vertx/test/core/EventBusFlowControlTest.java
@@ -0,0 +1,112 @@
+package io.vertx.test.core;
+
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.eventbus.MessageProducer;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Tim Fox
+ */
+public class EventBusFlowControlTest extends VertxTestBase {
+
+ protected EventBus eb;
+
+ @Test
+ public void testFlowControl() {
+
+ MessageProducer prod = eb.sender("some-address");
+ int numBatches = 1000;
+ int wqms = 2000;
+ prod.setWriteQueueMaxSize(wqms);
+
+ MessageConsumer consumer = eb.consumer("some-address");
+ AtomicInteger cnt = new AtomicInteger();
+ consumer.handler(msg -> {
+ int c = cnt.incrementAndGet();
+ if (c == numBatches * wqms) {
+ testComplete();
+ }
+ });
+
+ sendBatch(prod, wqms, numBatches, 0);
+ await();
+ }
+
+ private void sendBatch(MessageProducer prod, int batchSize, int numBatches, int batchNumber) {
+ boolean drainHandlerSet = false;
+ for (int i = 0; i < batchSize; i++) {
+ prod.send("message-" + i);
+ if (prod.writeQueueFull() && !drainHandlerSet) {
+ prod.drainHandler(v -> {
+ if (batchNumber < numBatches - 1) {
+ sendBatch(prod, batchSize, numBatches, batchNumber + 1);
+ }
+ });
+ drainHandlerSet = true;
+ }
+ }
+ }
+
+ @Test
+ public void testFlowControlPauseConsumer() {
+
+ MessageProducer prod = eb.sender("some-address");
+ int numBatches = 10;
+ int wqms = 100;
+ prod.setWriteQueueMaxSize(wqms);
+
+ MessageConsumer consumer = eb.consumer("some-address");
+ AtomicInteger cnt = new AtomicInteger();
+ AtomicBoolean paused = new AtomicBoolean();
+ consumer.handler(msg -> {
+ assertFalse(paused.get());
+ int c = cnt.incrementAndGet();
+ if (c == numBatches * wqms) {
+ testComplete();
+ }
+ if (c % 100 == 0) {
+ consumer.pause();
+ paused.set(true);
+ vertx.setTimer(100, tid -> {
+ paused.set(false);
+ consumer.resume();
+ });
+ }
+ });
+
+ sendBatch(prod, wqms, numBatches, 0);
+ await();
+ }
+
+ @Test
+ public void testFlowControlNoConsumer() {
+
+ MessageProducer prod = eb.sender("some-address");
+ int wqms = 2000;
+ prod.setWriteQueueMaxSize(wqms);
+
+ boolean drainHandlerSet = false;
+ for (int i = 0; i < wqms * 2; i++) {
+ prod.send("message-" + i);
+ if (prod.writeQueueFull() && !drainHandlerSet) {
+ prod.drainHandler(v -> {
+ fail("Should not be called");
+ });
+ drainHandlerSet = true;
+ }
+ }
+ assertTrue(drainHandlerSet);
+ vertx.setTimer(500, tid -> testComplete());
+ await();
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ eb = vertx.eventBus();
+ }
+}
diff --git a/src/test/java/io/vertx/test/core/EventBusInterceptorTest.java b/src/test/java/io/vertx/test/core/EventBusInterceptorTest.java
new file mode 100644
index 00000000000..d03a8810318
--- /dev/null
+++ b/src/test/java/io/vertx/test/core/EventBusInterceptorTest.java
@@ -0,0 +1,177 @@
+package io.vertx.test.core;
+
+import io.vertx.core.Handler;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.SendContext;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Tim Fox
+ */
+public class EventBusInterceptorTest extends VertxTestBase {
+
+ protected EventBus eb;
+
+ @Test
+ public void testInterceptorSend() {
+ eb.addInterceptor(sc -> {
+ assertEquals("armadillo", sc.message().body());
+ assertTrue(sc.send());
+ sc.next();
+ });
+ eb.consumer("some-address", msg -> {
+ assertEquals("armadillo", msg.body());
+ testComplete();
+ });
+ eb.send("some-address", "armadillo");
+ await();
+ }
+
+ @Test
+ public void testInterceptorPublish() {
+ eb.addInterceptor(sc -> {
+ assertEquals("armadillo", sc.message().body());
+ assertFalse(sc.send());
+ sc.next();
+ });
+ eb.consumer("some-address", msg -> {
+ assertEquals("armadillo", msg.body());
+ testComplete();
+ });
+ eb.publish("some-address", "armadillo");
+ await();
+ }
+
+ @Test
+ public void testInterceptorNoNext() {
+ eb.addInterceptor(sc -> {
+ assertEquals("armadillo", sc.message().body());
+ });
+ eb.consumer("some-address", msg -> {
+ fail("Should not receive message");
+ });
+ eb.send("some-address", "armadillo");
+ vertx.setTimer(200, tid -> testComplete());
+ await();
+ }
+
+ @Test
+ public void testMultipleInterceptors() {
+ AtomicInteger cnt = new AtomicInteger();
+ int interceptorNum = 10;
+ for (int i = 0; i < interceptorNum; i++) {
+ final int expectedCount = i;
+ eb.addInterceptor(sc -> {
+ assertEquals("armadillo", sc.message().body());
+ int count = cnt.getAndIncrement();
+ assertEquals(expectedCount, count);
+ sc.next();
+ });
+ }
+ eb.consumer("some-address", msg -> {
+ assertEquals("armadillo", msg.body());
+ assertEquals(interceptorNum, cnt.get());
+ testComplete();
+ });
+ eb.send("some-address", "armadillo");
+ await();
+ }
+
+ @Test
+ public void testRemoveInterceptor() {
+
+ AtomicInteger cnt1 = new AtomicInteger();
+ AtomicInteger cnt2 = new AtomicInteger();
+
+ Handler eb1 = sc -> {
+ cnt1.incrementAndGet();
+ sc.next();
+ };
+
+ Handler eb2 = sc -> {
+ cnt2.incrementAndGet();
+ sc.next();
+ };
+
+ eb.addInterceptor(eb1).addInterceptor(eb2);
+
+ eb.consumer("some-address", msg -> {
+ if (msg.body().equals("armadillo")) {
+ assertEquals(1, cnt1.get());
+ assertEquals(1, cnt2.get());
+ eb.removeInterceptor(eb2);
+ eb.send("some-address", "aardvark");
+ } else if (msg.body().equals("aardvark")) {
+ assertEquals(2, cnt1.get());
+ assertEquals(1, cnt2.get());
+ testComplete();
+ } else {
+ fail("wrong body");
+ }
+ });
+ eb.send("some-address", "armadillo");
+ await();
+ }
+
+ @Test
+ public void testInterceptorOnReply() {
+ AtomicInteger cnt = new AtomicInteger();
+ eb.addInterceptor(sc -> {
+ if (sc.message().body().equals("armadillo")) {
+ assertEquals(0, cnt.get());
+ } else if (sc.message().body().equals("echidna")) {
+ assertEquals(1, cnt.get());
+ } else {
+ fail("wrong body");
+ }
+ cnt.incrementAndGet();
+ sc.next();
+ });
+ eb.consumer("some-address", msg -> {
+ assertEquals("armadillo", msg.body());
+ assertEquals(1, cnt.get());
+ msg.reply("echidna");
+ });
+ eb.send("some-address", "armadillo", reply -> {
+ assertEquals("echidna", reply.result().body());
+ assertEquals(2, cnt.get());
+ testComplete();
+ });
+ await();
+ }
+
+ @Test
+ public void testExceptionInInterceptor() {
+ AtomicInteger cnt = new AtomicInteger();
+
+ Handler eb1 = sc -> {
+ cnt.incrementAndGet();
+ vertx.runOnContext(v -> sc.next());
+ throw new RuntimeException("foo");
+ };
+
+ Handler eb2 = sc -> {
+ cnt.incrementAndGet();
+ sc.next();
+ };
+
+ eb.addInterceptor(eb1).addInterceptor(eb2);
+
+ eb.consumer("some-address", msg -> {
+ assertEquals("armadillo", msg.body());
+ assertEquals(2, cnt.get());
+ testComplete();
+ });
+ eb.send("some-address", "armadillo");
+ await();
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ eb = vertx.eventBus();
+ }
+}
diff --git a/src/test/java/io/vertx/test/core/LocalEventBusTest.java b/src/test/java/io/vertx/test/core/LocalEventBusTest.java
index 6dee557b783..57ffcd729e0 100644
--- a/src/test/java/io/vertx/test/core/LocalEventBusTest.java
+++ b/src/test/java/io/vertx/test/core/LocalEventBusTest.java
@@ -1238,7 +1238,9 @@ public void testUpdateDeliveryOptionsOnProducer() {
producer.write("no-header");
});
consumer.handler(msg -> {
- switch (msg.body()) {
+ String body = msg.body();
+ assertNotNull(body);
+ switch (body) {
case "no-header":
assertNull(msg.headers().get("header-name"));
producer.deliveryOptions(new DeliveryOptions().addHeader("header-name", "header-value"));
diff --git a/src/test/java/io/vertx/test/core/NetTest.java b/src/test/java/io/vertx/test/core/NetTest.java
index 6800db30982..e13d1893eb4 100644
--- a/src/test/java/io/vertx/test/core/NetTest.java
+++ b/src/test/java/io/vertx/test/core/NetTest.java
@@ -557,7 +557,6 @@ public void testServerOptionsJson() {
Random rand = new Random();
boolean reuseAddress = rand.nextBoolean();
int trafficClass = TestUtils.randomByte() + 128;
- System.out.println("trafficClass = " + trafficClass);
boolean tcpNoDelay = rand.nextBoolean();
boolean tcpKeepAlive = rand.nextBoolean();
int soLinger = TestUtils.randomPositiveInt();