From c68ab7ccca9b9da6a2356214685f6753df5bcd4e Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Thu, 15 Oct 2015 12:58:26 +0100 Subject: [PATCH] Event bus flow control / interceptors --- src/main/asciidoc/java/eventbus.adoc | 25 +- src/main/asciidoc/java/http.adoc | 10 +- src/main/asciidoc/java/net.adoc | 4 +- src/main/asciidoc/java/streams.adoc | 8 +- .../core/eventbus/BridgeInterceptor.java | 16 ++ .../java/io/vertx/core/eventbus/EventBus.java | 20 +- .../core/eventbus/FilteringInterceptor.java | 29 +++ .../vertx/core/eventbus/MessageProducer.java | 11 + .../io/vertx/core/eventbus/SendContext.java | 29 +++ .../eventbus/impl/EventBusFactoryImpl.java | 27 --- .../LocalEventBus.java => EventBusImpl.java} | 216 +++++++++++++----- .../eventbus/impl/HandlerRegistration.java | 15 +- .../LocalMessage.java => MessageImpl.java} | 32 +-- .../eventbus/impl/MessageProducerImpl.java | 85 ++++++- .../impl/clustered/ClusteredEventBus.java | 89 ++++---- .../impl/clustered/ClusteredMessage.java | 15 +- .../impl/clustered/ConnectionHolder.java | 1 - .../io/vertx/core/eventbus/package-info.java | 25 +- .../java/io/vertx/core/impl/VertxImpl.java | 11 +- .../io/vertx/core/spi/EventBusFactory.java | 16 -- .../io.vertx.core.spi.EventBusFactory | 1 - .../test/core/EventBusFlowControlTest.java | 112 +++++++++ .../test/core/EventBusInterceptorTest.java | 177 ++++++++++++++ .../io/vertx/test/core/LocalEventBusTest.java | 4 +- src/test/java/io/vertx/test/core/NetTest.java | 1 - 25 files changed, 757 insertions(+), 222 deletions(-) create mode 100644 src/main/java/io/vertx/core/eventbus/BridgeInterceptor.java create mode 100644 src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java create mode 100644 src/main/java/io/vertx/core/eventbus/SendContext.java delete mode 100644 src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java rename src/main/java/io/vertx/core/eventbus/impl/{local/LocalEventBus.java => EventBusImpl.java} (64%) rename src/main/java/io/vertx/core/eventbus/impl/{local/LocalMessage.java => MessageImpl.java} (80%) delete mode 100644 src/main/java/io/vertx/core/spi/EventBusFactory.java delete mode 100644 src/main/resources/META-INF/services/io.vertx.core.spi.EventBusFactory create mode 100644 src/test/java/io/vertx/test/core/EventBusFlowControlTest.java create mode 100644 src/test/java/io/vertx/test/core/EventBusInterceptorTest.java diff --git a/src/main/asciidoc/java/eventbus.adoc b/src/main/asciidoc/java/eventbus.adoc index 121b360d320..a4d5f671579 100644 --- a/src/main/asciidoc/java/eventbus.adoc +++ b/src/main/asciidoc/java/eventbus.adoc @@ -211,14 +211,14 @@ The `link:../../apidocs/io/vertx/core/eventbus/Message.html#body--[body]` of the The headers of the message are available with `link:../../apidocs/io/vertx/core/eventbus/Message.html#headers--[headers]`. -==== Replying to messages +==== Acknowledging messages / sending replies -Sometimes after you send a message you want to receive a reply from the recipient. -This is known as the *request-response pattern*. +When using `link:../../apidocs/io/vertx/core/eventbus/EventBus.html#send-java.lang.String-java.lang.Object-[send]` the event bus attempts to deliver the message to a +`link:../../apidocs/io/vertx/core/eventbus/MessageConsumer.html[MessageConsumer]` registered with the event bus. -To do this you can specify a reply handler when sending the message. +In some cases it's useful for the sender to know when the consumer has received the message and "processed" it. -When the receiver receives the message they can reply to it by calling `link:../../apidocs/io/vertx/core/eventbus/Message.html#reply-java.lang.Object-[reply]`. +To acknowledge that the message has been processed the consumer can reply to the message by calling `link:../../apidocs/io/vertx/core/eventbus/Message.html#reply-java.lang.Object-[reply]`. When this happens it causes a reply to be sent back to the sender and the reply handler is invoked with the reply. @@ -246,8 +246,19 @@ eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of gra }); ---- -The replies themselves can also be replied to so you can create a dialog between two different parties -consisting of multiple rounds. +The reply can contain a message body which can contain useful information. + +What the "processing" actually means is application defined and depends entirely on what the message consumer does +and is not something that the Vert.x event bus itself knows or cares about. + +Some examples: + +* A simple message consumer which implements a service which returns the time of the day would acknowledge with a message +containing the time of day in the reply body +* A message consumer which implements a persistent queue, might acknowledge with `true` if the message was successfully +persisted in storage, or `false` if not. +* A message consumer which processes an order might acknowledge with `true` when the order has been successfully processed +so it can be deleted from the database ==== Sending with timeouts diff --git a/src/main/asciidoc/java/http.adoc b/src/main/asciidoc/java/http.adoc index 583e3025b6c..8b84b7c1f3d 100644 --- a/src/main/asciidoc/java/http.adoc +++ b/src/main/asciidoc/java/http.adoc @@ -66,7 +66,7 @@ server.listen(8080, "myhost.com", res -> { === Getting notified of incoming requests -To be notified when a request arrives you need to set a `link:../../apidocs/io/vertx/core/http/HttpServer.html#requestHandler-io.vertx.core.Handler-[requestHandler]`: +To be notified when a request arrives you need to set a `link:../../apidocs/io/vertx/core/http/HttpServer.html#requestHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[requestHandler]`: [source,java] ---- @@ -237,7 +237,7 @@ request.endHandler(v -> { }); ---- -This is such a common case, that Vert.x provides a `link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#bodyHandler-io.vertx.core.Handler-[bodyHandler]` to do this +This is such a common case, that Vert.x provides a `link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#bodyHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[bodyHandler]` to do this for you. The body handler is called once when all the body has been received: [source,java] @@ -286,7 +286,7 @@ server.requestHandler(request -> { Vert.x can also handle file uploads which are encoded in a multi-part request body. To receive file uploads you tell Vert.x to expect a multi-part form and set an -`link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#uploadHandler-io.vertx.core.Handler-[uploadHandler]` on the request. +`link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#uploadHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[uploadHandler]` on the request. This handler will be called once for every upload that arrives on the server. @@ -1061,7 +1061,7 @@ The idea here is it allows the server to authorise and accept/reject the request Sending large amounts of data if the request might not be accepted is a waste of bandwidth and ties up the server in reading data that it will just discard. -Vert.x allows you to set a `link:../../apidocs/io/vertx/core/http/HttpClientRequest.html#continueHandler-io.vertx.core.Handler-[continueHandler]` on the +Vert.x allows you to set a `link:../../apidocs/io/vertx/core/http/HttpClientRequest.html#continueHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[continueHandler]` on the client request object This will be called if the server sends back a `Status: 100 (Continue)` response to signify that it is ok to send @@ -1230,7 +1230,7 @@ There are two ways of handling WebSockets on the server side. ===== WebSocket handler -The first way involves providing a `link:../../apidocs/io/vertx/core/http/HttpServer.html#websocketHandler-io.vertx.core.Handler-[websocketHandler]` +The first way involves providing a `link:../../apidocs/io/vertx/core/http/HttpServer.html#websocketHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[websocketHandler]` on the server instance. When a WebSocket connection is made to the server, the handler will be called, passing in an instance of diff --git a/src/main/asciidoc/java/net.adoc b/src/main/asciidoc/java/net.adoc index 5859564c39d..6cdc2d70866 100644 --- a/src/main/asciidoc/java/net.adoc +++ b/src/main/asciidoc/java/net.adoc @@ -84,7 +84,7 @@ server.listen(0, "localhost", res -> { === Getting notified of incoming connections -To be notified when a connection is made you need to set a `link:../../apidocs/io/vertx/core/net/NetServer.html#connectHandler-io.vertx.core.Handler-[connectHandler]`: +To be notified when a connection is made you need to set a `link:../../apidocs/io/vertx/core/net/NetServer.html#connectHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[connectHandler]`: [source,java] ---- @@ -137,7 +137,7 @@ Write operations are asynchronous and may not occur until some time after the ca === Closed handler -If you want to be notified when a socket is closed, you can set a `link:../../apidocs/io/vertx/core/net/NetSocket.html#closeHandler-io.vertx.core.Handler-[closeHandler]` +If you want to be notified when a socket is closed, you can set a `link:../../apidocs/io/vertx/core/net/NetSocket.html#closeHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[closeHandler]` on it: [source,java] diff --git a/src/main/asciidoc/java/streams.adoc b/src/main/asciidoc/java/streams.adoc index d6f09b62907..159892f9155 100644 --- a/src/main/asciidoc/java/streams.adoc +++ b/src/main/asciidoc/java/streams.adoc @@ -101,7 +101,7 @@ server.connectHandler(sock -> { }).listen(); ---- -And there we have it. The `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-io.vertx.core.Handler-[drainHandler]` event handler will +And there we have it. The `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[drainHandler]` event handler will get called when the write queue is ready to accept more data, this resumes the `NetSocket` which allows it to read more data. @@ -134,7 +134,7 @@ Let's look at the methods on `ReadStream` and `WriteStream` in more detail: Functions: -- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#handler-io.vertx.core.Handler-[handler]`: +- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#handler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[handler]`: set a handler which will receive items from the ReadStream. - `link:../../apidocs/io/vertx/core/streams/ReadStream.html#pause--[pause]`: pause the handler. When paused no items will be received in the handler. @@ -142,7 +142,7 @@ pause the handler. When paused no items will be received in the handler. resume the handler. The handler will be called if any item arrives. - `link:../../apidocs/io/vertx/core/streams/ReadStream.html#exceptionHandler-io.vertx.core.Handler-[exceptionHandler]`: Will be called if an exception occurs on the ReadStream. -- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#endHandler-io.vertx.core.Handler-[endHandler]`: +- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#endHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[endHandler]`: Will be called when end of stream is reached. This might be when EOF is reached if the ReadStream represents a file, or when end of request is reached if it's an HTTP request, or when the connection is closed if it's a TCP socket. @@ -166,7 +166,7 @@ represents the actual number of bytes written and not the number of buffers. returns `true` if the write queue is considered full. - `link:../../apidocs/io/vertx/core/streams/WriteStream.html#exceptionHandler-io.vertx.core.Handler-[exceptionHandler]`: Will be called if an exception occurs on the `WriteStream`. -- `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-io.vertx.core.Handler-[drainHandler]`: +- `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[drainHandler]`: The handler will be called if the `WriteStream` is considered no longer full. === Pump diff --git a/src/main/java/io/vertx/core/eventbus/BridgeInterceptor.java b/src/main/java/io/vertx/core/eventbus/BridgeInterceptor.java new file mode 100644 index 00000000000..11fdc8b46d5 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/BridgeInterceptor.java @@ -0,0 +1,16 @@ +package io.vertx.core.eventbus; + +/** + * @author Tim Fox + */ +public class BridgeInterceptor extends FilteringInterceptor { + + public BridgeInterceptor(String startsWith) { + super(startsWith); + } + + @Override + protected void handleContext(SendContext sendContext) { + + } +} diff --git a/src/main/java/io/vertx/core/eventbus/EventBus.java b/src/main/java/io/vertx/core/eventbus/EventBus.java index 548b2d651a4..c7a424e47c8 100644 --- a/src/main/java/io/vertx/core/eventbus/EventBus.java +++ b/src/main/java/io/vertx/core/eventbus/EventBus.java @@ -62,7 +62,6 @@ public interface EventBus extends Measured { @Fluent EventBus send(String address, Object message, Handler>> replyHandler); - /** * Like {@link #send(String, Object)} but specifying {@code options} that can be used to configure the delivery. * @@ -236,7 +235,7 @@ public interface EventBus extends Measured { * Unregister a default message codec. *

* @param clazz the class for which the codec was registered - * @return @return a reference to this, so the API can be used fluently + * @return a reference to this, so the API can be used fluently */ @GenIgnore EventBus unregisterDefaultCodec(Class clazz); @@ -244,7 +243,7 @@ public interface EventBus extends Measured { /** * Start the event bus. This would not normally be called in user code * - * @param completionHandler + * @param completionHandler handler will be called when event bus is started */ @GenIgnore void start(Handler> completionHandler); @@ -257,6 +256,21 @@ public interface EventBus extends Measured { @GenIgnore void close(Handler> completionHandler); + /** + * Add an interceptor that will be called whenever a message is sent from Vert.x + * + * @param interceptor the interceptor + * @return a reference to this, so the API can be used fluently + */ + EventBus addInterceptor(Handler interceptor); + + /** + * Remove an interceptor + * + * @param interceptor the interceptor + * @return a reference to this, so the API can be used fluently + */ + EventBus removeInterceptor(Handler interceptor); } diff --git a/src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java b/src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java new file mode 100644 index 00000000000..8a9b7ac3737 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java @@ -0,0 +1,29 @@ +package io.vertx.core.eventbus; + +import io.vertx.core.Handler; + +/** + * @author Tim Fox + */ +public abstract class FilteringInterceptor implements Handler { + + private final String startsWith; + + public FilteringInterceptor(String startsWith) { + this.startsWith = startsWith; + } + + // TODO regex + + @Override + public void handle(SendContext sendContext) { + if (sendContext.message().address().startsWith(startsWith)) { + handleContext(sendContext); + } else { + sendContext.next(); + } + } + + protected abstract void handleContext(SendContext sendContext); + +} diff --git a/src/main/java/io/vertx/core/eventbus/MessageProducer.java b/src/main/java/io/vertx/core/eventbus/MessageProducer.java index 8a5081f4de9..bf0d1b464dd 100644 --- a/src/main/java/io/vertx/core/eventbus/MessageProducer.java +++ b/src/main/java/io/vertx/core/eventbus/MessageProducer.java @@ -30,6 +30,16 @@ @VertxGen public interface MessageProducer extends WriteStream { + int DEFAULT_WRITE_QUEUE_MAX_SIZE = 1000; + + /** + * Synonym for {@link #write(Object)}. + * + * @param message the message to send + * @return reference to this for fluency + */ + MessageProducer send(T message); + @Override MessageProducer exceptionHandler(Handler handler); @@ -56,4 +66,5 @@ public interface MessageProducer extends WriteStream { */ String address(); + void close(); } diff --git a/src/main/java/io/vertx/core/eventbus/SendContext.java b/src/main/java/io/vertx/core/eventbus/SendContext.java new file mode 100644 index 00000000000..7032c94c089 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/SendContext.java @@ -0,0 +1,29 @@ +package io.vertx.core.eventbus; + +import io.vertx.codegen.annotations.VertxGen; + +/** + * + * Encapsulates a message being sent from Vert.x. Used with event bus interceptors + * + * @author Tim Fox + */ +@VertxGen +public interface SendContext { + + /** + * @return The message being sent + */ + Message message(); + + /** + * Call the next interceptor + */ + void next(); + + /** + * + * @return true if the message is being sent (point to point) or False if the message is being published + */ + boolean send(); +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java deleted file mode 100644 index 50f96bd39cb..00000000000 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java +++ /dev/null @@ -1,27 +0,0 @@ -package io.vertx.core.eventbus.impl; - -import io.vertx.core.VertxOptions; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.spi.EventBusFactory; -import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus; -import io.vertx.core.eventbus.impl.local.LocalEventBus; -import io.vertx.core.impl.HAManager; -import io.vertx.core.impl.VertxInternal; -import io.vertx.core.spi.cluster.ClusterManager; - -/** - * @author Tim Fox - */ -public class EventBusFactoryImpl implements EventBusFactory { - - @Override - public EventBus createEventBus(VertxInternal vertx, VertxOptions options, ClusterManager clusterManager, HAManager haManager) { - EventBus eb; - if (options.isClustered()) { - eb = new ClusteredEventBus(vertx, options, clusterManager, haManager); - } else { - eb = new LocalEventBus(vertx); - } - return eb; - } -} diff --git a/src/main/java/io/vertx/core/eventbus/impl/local/LocalEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java similarity index 64% rename from src/main/java/io/vertx/core/eventbus/impl/local/LocalEventBus.java rename to src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 06c7f65e430..ab98bcf2255 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/local/LocalEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -14,30 +14,34 @@ * You may elect to redistribute this code under either of these licenses. */ -package io.vertx.core.eventbus.impl.local; +package io.vertx.core.eventbus.impl; import io.vertx.core.*; import io.vertx.core.eventbus.*; -import io.vertx.core.eventbus.impl.*; import io.vertx.core.impl.VertxInternal; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.core.spi.metrics.EventBusMetrics; import io.vertx.core.spi.metrics.MetricsProvider; +import java.util.Iterator; +import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; /** + * A local event bus implementation * * @author Tim Fox T */ -public class LocalEventBus implements EventBus, MetricsProvider { +public class EventBusImpl implements EventBus, MetricsProvider { - private static final Logger log = LoggerFactory.getLogger(LocalEventBus.class); + private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class); + private final List> interceptors = new CopyOnWriteArrayList<>(); private final AtomicLong replySequence = new AtomicLong(0); protected final VertxInternal vertx; protected final EventBusMetrics metrics; @@ -45,11 +49,23 @@ public class LocalEventBus implements EventBus, MetricsProvider { protected final CodecManager codecManager = new CodecManager(); protected volatile boolean started; - public LocalEventBus(VertxInternal vertx) { + public EventBusImpl(VertxInternal vertx) { this.vertx = vertx; this.metrics = vertx.metricsSPI().createMetrics(this); } + @Override + public EventBus addInterceptor(Handler interceptor) { + interceptors.add(interceptor); + return this; + } + + @Override + public EventBus removeInterceptor(Handler interceptor) { + interceptors.remove(interceptor); + return this; + } + public synchronized void start(Handler> completionHandler) { if (started) { throw new IllegalStateException("Already started"); @@ -75,34 +91,34 @@ public EventBus send(String address, Object message, DeliveryOptions options) { @Override public EventBus send(String address, Object message, DeliveryOptions options, Handler>> replyHandler) { - sendOrPub(createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler); + sendOrPubInternal(createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler); return this; } @Override public MessageProducer sender(String address) { Objects.requireNonNull(address, "address"); - return new MessageProducerImpl<>(this, address, true, new DeliveryOptions()); + return new MessageProducerImpl<>(vertx, address, true, new DeliveryOptions()); } @Override public MessageProducer sender(String address, DeliveryOptions options) { Objects.requireNonNull(address, "address"); Objects.requireNonNull(options, "options"); - return new MessageProducerImpl<>(this, address, true, options); + return new MessageProducerImpl<>(vertx, address, true, options); } @Override public MessageProducer publisher(String address) { Objects.requireNonNull(address, "address"); - return new MessageProducerImpl<>(this, address, false, new DeliveryOptions()); + return new MessageProducerImpl<>(vertx, address, false, new DeliveryOptions()); } @Override public MessageProducer publisher(String address, DeliveryOptions options) { Objects.requireNonNull(address, "address"); Objects.requireNonNull(options, "options"); - return new MessageProducerImpl<>(this, address, false, options); + return new MessageProducerImpl<>(vertx, address, false, options); } @Override @@ -112,7 +128,7 @@ public EventBus publish(String address, Object message) { @Override public EventBus publish(String address, Object message, DeliveryOptions options) { - sendOrPub(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null); + sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null); return this; } @@ -192,40 +208,29 @@ public EventBusMetrics getMetrics() { return metrics; } - public LocalMessage createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) { + protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) { Objects.requireNonNull(address, "no null address accepted"); MessageCodec codec = codecManager.lookupCodec(body, codecName); @SuppressWarnings("unchecked") - LocalMessage msg = new LocalMessage(address, null, headers, body, codec, send); + MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send); return msg; } - public void addRegistration(String address, HandlerRegistration registration, - boolean replyHandler, boolean localOnly) { + protected void addRegistration(String address, HandlerRegistration registration, + boolean replyHandler, boolean localOnly) { Objects.requireNonNull(registration.getHandler(), "handler"); - doAddRegistration(address, registration, replyHandler, localOnly); - registration.setResult(Future.succeededFuture()); + boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly); + addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult); } - protected HandlerRegistration createReplyHandlerRegistration(LocalMessage message, - DeliveryOptions options, - Handler>> replyHandler) { - if (replyHandler != null) { - long timeout = options.getSendTimeout(); - String replyAddress = generateReplyAddress(); - message.setReplyAddress(replyAddress); - Handler> simpleReplyHandler = convertHandler(replyHandler); - HandlerRegistration registration = - new HandlerRegistration<>(vertx, metrics, this, replyAddress, true, true, replyHandler, timeout); - registration.handler(simpleReplyHandler); - return registration; - } else { - return null; - } + protected void addRegistration(boolean newAddress, String address, + boolean replyHandler, boolean localOnly, + Handler> completionHandler) { + completionHandler.handle(Future.succeededFuture()); } - protected boolean doAddRegistration(String address, HandlerRegistration registration, - boolean replyHandler, boolean localOnly) { + protected boolean addLocalRegistration(String address, HandlerRegistration registration, + boolean replyHandler, boolean localOnly) { Objects.requireNonNull(address, "address"); Context context = Vertx.currentContext(); @@ -258,12 +263,17 @@ protected boolean doAddRegistration(String address, HandlerRegistration r return newAddress; } - public void removeRegistration(String address, HandlerRegistration handler, Handler> completionHandler) { - removeRegistration(address, handler); + protected void removeRegistration(String address, HandlerRegistration handler, Handler> completionHandler) { + HandlerHolder holder = removeLocalRegistration(address, handler); + removeRegistration(holder, address, completionHandler); + } + + protected void removeRegistration(HandlerHolder handlerHolder, String address, + Handler> completionHandler) { callCompletionHandlerAsync(completionHandler); } - protected HandlerHolder removeRegistration(String address, HandlerRegistration handler) { + protected HandlerHolder removeLocalRegistration(String address, HandlerRegistration handler) { Handlers handlers = handlerMap.get(address); HandlerHolder lastHolder = null; if (handlers != null) { @@ -289,21 +299,24 @@ protected HandlerHolder removeRegistration(String address, HandlerRegistrati return lastHolder; } - protected void sendReply(LocalMessage message, DeliveryOptions options, + protected void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options, Handler>> replyHandler) { - if (message.address() == null) { + if (replyMessage.address() == null) { throw new IllegalStateException("address not specified"); } else { - sendOrPub(message, options, replyHandler); + HandlerRegistration replyHandlerRegistration = createReplyHandlerRegistration(replyMessage, options, replyHandler); + new ReplySendContextImpl<>(replyMessage, options, replyHandlerRegistration, replierMessage).next(); } } - protected void sendOrPub(LocalMessage message, DeliveryOptions options, - Handler>> replyHandler) { - checkStarted(); - HandlerRegistration registration = createReplyHandlerRegistration(message, options, replyHandler); + protected void sendReply(SendContextImpl sendContext, MessageImpl replierMessage) { + sendOrPub(sendContext); + } + + protected void sendOrPub(SendContextImpl sendContext) { + MessageImpl message = sendContext.message; metrics.messageSent(message.address(), !message.send(), true, false); - serveMessage(message, registration, true); + deliverMessageLocally(sendContext); } protected Handler> convertHandler(Handler>> handler) { @@ -327,16 +340,21 @@ protected void callCompletionHandlerAsync(Handler> completionH } } - protected void serveMessage(LocalMessage msg, HandlerRegistration handlerRegistration, boolean local) { - if (!serveMessage(msg, local)) { + protected void deliverMessageLocally(SendContextImpl sendContext) { + if (!deliverMessageLocally(sendContext.message)) { // no handlers - if (handlerRegistration != null) { - handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); + if (sendContext.handlerRegistration != null) { + sendContext.handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address " + + sendContext.message.address); } } } - protected boolean serveMessage(LocalMessage msg, boolean local) { + protected boolean isMessageLocal(MessageImpl msg) { + return true; + } + + protected boolean deliverMessageLocally(MessageImpl msg) { msg.setBus(this); Handlers handlers = handlerMap.get(msg.address()); if (handlers != null) { @@ -344,19 +362,19 @@ protected boolean serveMessage(LocalMessage msg, boolean local) { //Choose one HandlerHolder holder = handlers.choose(); if (holder != null) { - metrics.messageReceived(msg.address(), !msg.send(), local, 1); - doServe(msg, holder); + metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), 1); + deliverToHandler(msg, holder); } } else { // Publish - metrics.messageReceived(msg.address(), !msg.send(), local, handlers.list.size()); + metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), handlers.list.size()); for (HandlerHolder holder: handlers.list) { - doServe(msg, holder); + deliverToHandler(msg, holder); } } return true; } else { - metrics.messageReceived(msg.address(), !msg.send(), local, 0); + metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), 0); return false; } } @@ -371,6 +389,92 @@ protected String generateReplyAddress() { return Long.toString(replySequence.incrementAndGet()); } + private HandlerRegistration createReplyHandlerRegistration(MessageImpl message, + DeliveryOptions options, + Handler>> replyHandler) { + if (replyHandler != null) { + long timeout = options.getSendTimeout(); + String replyAddress = generateReplyAddress(); + message.setReplyAddress(replyAddress); + Handler> simpleReplyHandler = convertHandler(replyHandler); + HandlerRegistration registration = + new HandlerRegistration<>(vertx, metrics, this, replyAddress, true, true, replyHandler, timeout); + registration.handler(simpleReplyHandler); + return registration; + } else { + return null; + } + } + + private void sendOrPubInternal(MessageImpl message, DeliveryOptions options, + Handler>> replyHandler) { + checkStarted(); + HandlerRegistration replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler); + SendContextImpl sendContext = new SendContextImpl<>(message, options, replyHandlerRegistration); + sendContext.next(); + } + + protected class SendContextImpl implements SendContext { + + public final MessageImpl message; + public final DeliveryOptions options; + public final HandlerRegistration handlerRegistration; + public final Iterator> iter; + + public SendContextImpl(MessageImpl message, DeliveryOptions options, HandlerRegistration handlerRegistration) { + this.message = message; + this.options = options; + this.handlerRegistration = handlerRegistration; + this.iter = interceptors.iterator(); + } + + @Override + public Message message() { + return message; + } + + @Override + public void next() { + if (iter.hasNext()) { + Handler handler = iter.next(); + try { + handler.handle(this); + } catch (Throwable t) { + log.error("Failure in interceptor", t); + } + } else { + sendOrPub(this); + } + } + + @Override + public boolean send() { + return message.send(); + } + } + + protected class ReplySendContextImpl extends SendContextImpl { + + private final MessageImpl replierMessage; + + public ReplySendContextImpl(MessageImpl message, DeliveryOptions options, HandlerRegistration handlerRegistration, + MessageImpl replierMessage) { + super(message, options, handlerRegistration); + this.replierMessage = replierMessage; + } + + @Override + public void next() { + if (iter.hasNext()) { + Handler handler = iter.next(); + handler.handle(this); + } else { + sendReply(this, replierMessage); + } + } + } + + private void unregisterAll() { // Unregister all handlers explicitly - don't rely on context hooks for (Handlers handlers: handlerMap.values()) { @@ -380,7 +484,7 @@ private void unregisterAll() { } } - private void doServe(LocalMessage msg, HandlerHolder holder) { + private void deliverToHandler(MessageImpl msg, HandlerHolder holder) { // Each handler gets a fresh copy @SuppressWarnings("unchecked") Message copied = msg.copyBeforeReceive(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index 0b3b8bb341e..54c2387518e 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -6,7 +6,6 @@ import io.vertx.core.eventbus.ReplyException; import io.vertx.core.eventbus.ReplyFailure; import io.vertx.core.eventbus.impl.clustered.ClusteredMessage; -import io.vertx.core.eventbus.impl.local.LocalEventBus; import io.vertx.core.impl.Arguments; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; @@ -28,9 +27,11 @@ public class HandlerRegistration implements MessageConsumer, Handler implements MessageConsumer, Handler> completionHandler; private Handler endHandler; private Handler> discardHandler; - private int maxBufferedMessages; + private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES; private final Queue> pending = new ArrayDeque<>(8); private boolean paused; private Object metric; - public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, LocalEventBus eventBus, String address, + public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address, boolean replyHandler, boolean localOnly, Handler>> asyncResultHandler, long timeout) { this.vertx = vertx; @@ -170,6 +171,8 @@ public void handle(Message message) { } else { if (discardHandler != null) { discardHandler.handle(message); + } else { + log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer"); } } } else { @@ -191,6 +194,10 @@ public void handle(Message message) { // Handle the message outside the sync block // https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714 if (theHandler != null) { + String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME); + if (creditsAddress != null) { + eventBus.send(creditsAddress, 1); + } handleMessage(theHandler, message); } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/local/LocalMessage.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java similarity index 80% rename from src/main/java/io/vertx/core/eventbus/impl/local/LocalMessage.java rename to src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index 10b2e08aa41..fc1f7b5f6dd 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/local/LocalMessage.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -14,7 +14,7 @@ * You may elect to redistribute this code under either of these licenses. */ -package io.vertx.core.eventbus.impl.local; +package io.vertx.core.eventbus.impl; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; @@ -30,12 +30,12 @@ /** * @author Tim Fox */ -public class LocalMessage implements Message { +public class MessageImpl implements Message { - private static final Logger log = LoggerFactory.getLogger(LocalMessage.class); + private static final Logger log = LoggerFactory.getLogger(MessageImpl.class); protected MessageCodec messageCodec; - protected LocalEventBus bus; + protected EventBusImpl bus; protected String address; protected String replyAddress; protected MultiMap headers; @@ -43,12 +43,12 @@ public class LocalMessage implements Message { protected V receivedBody; protected boolean send; - public LocalMessage() { + public MessageImpl() { } - public LocalMessage(String address, String replyAddress, MultiMap headers, U sentBody, - MessageCodec messageCodec, - boolean send) { + public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody, + MessageCodec messageCodec, + boolean send) { this.messageCodec = messageCodec; this.address = address; this.replyAddress = replyAddress; @@ -57,7 +57,7 @@ public LocalMessage(String address, String replyAddress, MultiMap headers, U sen this.send = send; } - protected LocalMessage(LocalMessage other) { + protected MessageImpl(MessageImpl other) { this.bus = other.bus; this.address = other.address; this.replyAddress = other.replyAddress; @@ -76,8 +76,8 @@ protected LocalMessage(LocalMessage other) { this.send = other.send; } - public LocalMessage copyBeforeReceive() { - return new LocalMessage<>(this); + public MessageImpl copyBeforeReceive() { + return new MessageImpl<>(this); } @Override @@ -96,6 +96,9 @@ public MultiMap headers() { @Override public V body() { + if (receivedBody == null && sentBody != null) { + receivedBody = messageCodec.transform(sentBody); + } return receivedBody; } @@ -146,15 +149,14 @@ public MessageCodec codec() { return messageCodec; } - public void setBus(LocalEventBus bus) { + public void setBus(EventBusImpl bus) { this.bus = bus; } - protected void sendReply(LocalMessage msg, DeliveryOptions options, Handler>> replyHandler) { + protected void sendReply(MessageImpl msg, DeliveryOptions options, Handler>> replyHandler) { if (bus != null) { - bus.sendReply(msg, options, replyHandler); + bus.sendReply(msg, this, options, replyHandler); } } - } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java index deb87b64ae0..ab7e0ba496e 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java @@ -17,26 +17,48 @@ package io.vertx.core.eventbus.impl; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.eventbus.MessageProducer; -import io.vertx.core.streams.WriteStream; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.UUID; /** * @author Julien Viet */ public class MessageProducerImpl implements MessageProducer { + public static final String CREDIT_ADDRESS_HEADER_NAME = "__vertx.credit"; + + private final Vertx vertx; private final EventBus bus; private final boolean send; private final String address; + private final Queue pending = new ArrayDeque<>(); + private final MessageConsumer creditConsumer; private DeliveryOptions options; + private int credits = DEFAULT_WRITE_QUEUE_MAX_SIZE; + private Handler drainHandler; - public MessageProducerImpl(EventBus bus, String address, boolean send, DeliveryOptions options) { - this.bus = bus; + public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) { + this.vertx = vertx; + this.bus = vertx.eventBus(); this.address = address; this.send = send; this.options = options; + if (send) { + String creditAddress = UUID.randomUUID().toString() + "-credit"; + creditConsumer = bus.consumer(creditAddress, msg -> { + doReceiveCredit(msg.body()); + }); + options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress); + } else { + creditConsumer = null; + } } @Override @@ -45,20 +67,27 @@ public synchronized MessageProducer deliveryOptions(DeliveryOptions options) return this; } + @Override + public MessageProducer send(T message) { + write(message); + return this; + } + @Override public MessageProducer exceptionHandler(Handler handler) { return this; } @Override - public MessageProducer setWriteQueueMaxSize(int maxSize) { + public synchronized MessageProducer setWriteQueueMaxSize(int maxSize) { + this.credits = maxSize; return this; } @Override public synchronized MessageProducer write(T data) { if (send) { - bus.send(address, data, options); + doSend(data); } else { bus.publish(address, data, options); } @@ -67,11 +96,12 @@ public synchronized MessageProducer write(T data) { @Override public boolean writeQueueFull() { - return false; + return pending.size() >= 0; } @Override - public MessageProducer drainHandler(Handler handler) { + public synchronized MessageProducer drainHandler(Handler handler) { + this.drainHandler = handler; return this; } @@ -80,4 +110,45 @@ public String address() { return address; } + @Override + public void close() { + if (creditConsumer != null) { + creditConsumer.unregister(); + } + } + + // Just in case user forget to call close() + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + + private synchronized void doSend(T data) { + if (credits > 0) { + credits--; + bus.send(address, data, options); + } else { + pending.add(data); + } + } + + private synchronized void doReceiveCredit(int credit) { + credits += credit; + while (credits > 0) { + T data = pending.poll(); + if (data == null) { + break; + } else { + credits--; + bus.send(address, data, options); + } + } + final Handler theDrainHandler = drainHandler; + if (theDrainHandler != null && pending.isEmpty()) { + this.drainHandler = null; + vertx.runOnContext(v -> theDrainHandler.handle(null)); + } + } + } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 6f61dcbe3f7..78e12ffef87 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -18,14 +18,8 @@ import io.vertx.core.*; import io.vertx.core.buffer.Buffer; -import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageCodec; -import io.vertx.core.eventbus.impl.CodecManager; -import io.vertx.core.eventbus.impl.HandlerHolder; -import io.vertx.core.eventbus.impl.HandlerRegistration; -import io.vertx.core.eventbus.impl.local.LocalEventBus; -import io.vertx.core.eventbus.impl.local.LocalMessage; +import io.vertx.core.eventbus.impl.*; import io.vertx.core.impl.HAManager; import io.vertx.core.impl.VertxInternal; import io.vertx.core.json.JsonObject; @@ -46,11 +40,11 @@ import java.util.concurrent.ConcurrentMap; /** - * This class is thread-safe + * An event bus implemmentation that clusters with other Vert.x nodes * * @author Tim Fox 7 T */ -public class ClusteredEventBus extends LocalEventBus { +public class ClusteredEventBus extends EventBusImpl { private static final Logger log = LoggerFactory.getLogger(ClusteredEventBus.class); @@ -143,7 +137,7 @@ public void close(Handler> completionHandler) { } @Override - public LocalMessage createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) { + protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) { Objects.requireNonNull(address, "no null address accepted"); MessageCodec codec = codecManager.lookupCodec(body, codecName); @SuppressWarnings("unchecked") @@ -152,20 +146,20 @@ public LocalMessage createMessage(boolean send, String address, MultiMap headers } @Override - public void addRegistration(String address, HandlerRegistration registration, - boolean replyHandler, boolean localOnly) { - boolean newAddress = doAddRegistration(address, registration, replyHandler, localOnly); + protected void addRegistration(boolean newAddress, String address, + boolean replyHandler, boolean localOnly, + Handler> completionHandler) { if (newAddress && subs != null && !replyHandler && !localOnly) { // Propagate the information - subs.add(address, serverID, registration::setResult); + subs.add(address, serverID, completionHandler); } else { - registration.setResult(Future.succeededFuture()); + completionHandler.handle(Future.succeededFuture()); } } @Override - public void removeRegistration(String address, HandlerRegistration handler, Handler> completionHandler) { - HandlerHolder lastHolder = removeRegistration(address, handler);; + protected void removeRegistration(HandlerHolder lastHolder, String address, + Handler> completionHandler) { if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) { removeSub(address, serverID, completionHandler); } else { @@ -173,27 +167,22 @@ public void removeRegistration(String address, HandlerRegistration handle } } - protected void sendReply(ServerID replyDest, LocalMessage message, DeliveryOptions options, Handler>> replyHandler) { - if (message.address() == null) { - throw new IllegalStateException("address not specified"); - } else { - sendReply(replyDest, (ClusteredMessage)message, options, replyHandler); - } + @Override + protected void sendReply(SendContextImpl sendContext, MessageImpl replierMessage) { + clusteredSendReply(((ClusteredMessage) replierMessage).getSender(), sendContext); } @Override - protected void sendOrPub(LocalMessage message, DeliveryOptions options, - Handler>> replyHandler) { - checkStarted(); - final HandlerRegistration registration = createReplyHandlerRegistration(message, options, replyHandler); + protected void sendOrPub(SendContextImpl sendContext) { + String address = sendContext.message.address(); Handler>> resultHandler = asyncResult -> { if (asyncResult.succeeded()) { ChoosableIterable serverIDs = asyncResult.result(); if (serverIDs != null && !serverIDs.isEmpty()) { - sendToSubs(serverIDs, message, registration); + sendToSubs(serverIDs, sendContext); } else { - metrics.messageSent(message.address(), !message.send(), true, false); - serveMessage(message, registration, true); + metrics.messageSent(address, !sendContext.message.send(), true, false); + deliverMessageLocally(sendContext); } } else { log.error("Failed to send message", asyncResult.cause()); @@ -202,10 +191,10 @@ protected void sendOrPub(LocalMessage message, DeliveryOptions options, if (Vertx.currentContext() == null) { // Guarantees the order when there is no current context sendNoContext.runOnContext(v -> { - subs.get(message.address(), resultHandler); + subs.get(address, resultHandler); }); } else { - subs.get(message.address(), resultHandler); + subs.get(address, resultHandler); } } @@ -215,6 +204,12 @@ protected String generateReplyAddress() { return UUID.randomUUID().toString(); } + @Override + protected boolean isMessageLocal(MessageImpl msg) { + ClusteredMessage clusteredMessage = (ClusteredMessage)msg; + return !clusteredMessage.isFromWire(); + } + private void setNodeCrashedHandler(HAManager haManager) { haManager.setNodeCrashedHandler((failedNodeID, haInfo, failed) -> { JsonObject jsid = haInfo.getJsonObject(SERVER_ID_HA_KEY); @@ -266,7 +261,7 @@ public void handle(Buffer buff) { // Just send back pong directly on connection socket.write(PONG); } else { - serveMessage(received, false); + deliverMessageLocally(received); } } } @@ -276,18 +271,17 @@ public void handle(Buffer buff) { }; } - private void sendToSubs(ChoosableIterable subs, LocalMessage message, - HandlerRegistration handlerRegistration) { - String address = message.address(); - if (message.send()) { + private void sendToSubs(ChoosableIterable subs, SendContextImpl sendContext) { + String address = sendContext.message.address(); + if (sendContext.message.send()) { // Choose one ServerID sid = subs.choose(); if (!sid.equals(serverID)) { //We don't send to this node metrics.messageSent(address, false, false, true); - sendRemote(sid, message); + sendRemote(sid, sendContext.message); } else { metrics.messageSent(address, false, true, false); - serveMessage(message, handlerRegistration, true); + deliverMessageLocally(sendContext); } } else { // Publish @@ -296,35 +290,31 @@ private void sendToSubs(ChoosableIterable subs, LocalMessage messa for (ServerID sid : subs) { if (!sid.equals(serverID)) { //We don't send to this node remote = true; - sendRemote(sid, message); + sendRemote(sid, sendContext.message); } else { local = true; } } metrics.messageSent(address, true, local, remote); if (local) { - serveMessage(message, handlerRegistration, true); + deliverMessageLocally(sendContext); } } } - private void sendReply(ServerID replyDest, ClusteredMessage message, DeliveryOptions options, - Handler>> replyHandler) { - HandlerRegistration registration = null; - if (replyHandler != null) { - registration = createReplyHandlerRegistration(message, options, replyHandler); - } + private void clusteredSendReply(ServerID replyDest, SendContextImpl sendContext) { + MessageImpl message = sendContext.message; String address = message.address(); if (!replyDest.equals(serverID)) { metrics.messageSent(address, false, false, true); sendRemote(replyDest, message); } else { metrics.messageSent(address, false, true, false); - serveMessage(message, registration, true); + deliverMessageLocally(sendContext); } } - private void sendRemote(ServerID theServerID, LocalMessage message) { + private void sendRemote(ServerID theServerID, MessageImpl message) { // We need to deal with the fact that connecting can take some time and is async, and we cannot // block to wait for it. So we add any sends to a pending list if not connected yet. // Once we connect we send them. @@ -377,6 +367,5 @@ VertxOptions options() { return options; } - } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java index 6036bfda160..549122e08e6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredMessage.java @@ -17,15 +17,11 @@ package io.vertx.core.eventbus.impl.clustered; import io.netty.util.CharsetUtil; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; -import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.impl.CodecManager; -import io.vertx.core.eventbus.impl.local.LocalMessage; +import io.vertx.core.eventbus.impl.MessageImpl; import io.vertx.core.http.CaseInsensitiveHeaders; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; @@ -37,7 +33,7 @@ /** * @author Tim Fox */ -public class ClusteredMessage extends LocalMessage { +public class ClusteredMessage extends MessageImpl { private static final Logger log = LoggerFactory.getLogger(ClusteredMessage.class); @@ -241,11 +237,8 @@ private void writeString(Buffer buff, String str) { buff.appendBytes(strBytes); } - @Override - protected void sendReply(LocalMessage msg, DeliveryOptions options, Handler>> replyHandler) { - if (bus != null & replyAddress() != null) { - ((ClusteredEventBus)bus).sendReply(sender, msg, options, replyHandler); - } + ServerID getSender() { + return sender; } public boolean isFromWire() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java index 907a1cb9a27..baa6bf94c05 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java @@ -106,7 +106,6 @@ private void schedulePing() { }); } - private synchronized void connected(NetSocket socket) { this.socket = socket; connected = true; diff --git a/src/main/java/io/vertx/core/eventbus/package-info.java b/src/main/java/io/vertx/core/eventbus/package-info.java index 0157003b48a..9bc3a49393a 100644 --- a/src/main/java/io/vertx/core/eventbus/package-info.java +++ b/src/main/java/io/vertx/core/eventbus/package-info.java @@ -191,14 +191,14 @@ * * The headers of the message are available with {@link io.vertx.core.eventbus.Message#headers}. * - * ==== Replying to messages + * ==== Acknowledging messages / sending replies * - * Sometimes after you send a message you want to receive a reply from the recipient. - * This is known as the *request-response pattern*. + * When using {@link io.vertx.core.eventbus.EventBus#send} the event bus attempts to deliver the message to a + * {@link io.vertx.core.eventbus.MessageConsumer} registered with the event bus. * - * To do this you can specify a reply handler when sending the message. + * In some cases it's useful for the sender to know when the consumer has received the message and "processed" it. * - * When the receiver receives the message they can reply to it by calling {@link io.vertx.core.eventbus.Message#reply}. + * To acknowledge that the message has been processed the consumer can reply to the message by calling {@link io.vertx.core.eventbus.Message#reply}. * * When this happens it causes a reply to be sent back to the sender and the reply handler is invoked with the reply. * @@ -218,8 +218,19 @@ * {@link examples.EventBusExamples#example9} * ---- * - * The replies themselves can also be replied to so you can create a dialog between two different parties - * consisting of multiple rounds. + * The reply can contain a message body which can contain useful information. + * + * What the "processing" actually means is application defined and depends entirely on what the message consumer does + * and is not something that the Vert.x event bus itself knows or cares about. + * + * Some examples: + * + * * A simple message consumer which implements a service which returns the time of the day would acknowledge with a message + * containing the time of day in the reply body + * * A message consumer which implements a persistent queue, might acknowledge with `true` if the message was successfully + * persisted in storage, or `false` if not. + * * A message consumer which processes an order might acknowledge with `true` when the order has been successfully processed + * so it can be deleted from the database * * ==== Sending with timeouts * diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index b3cb16e992d..0126ec01d24 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -29,6 +29,8 @@ import io.vertx.core.dns.DnsClient; import io.vertx.core.dns.impl.DnsClientImpl; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusImpl; +import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus; import io.vertx.core.file.FileSystem; import io.vertx.core.file.impl.FileSystemImpl; import io.vertx.core.file.impl.WindowsFileSystem; @@ -51,7 +53,6 @@ import io.vertx.core.net.impl.ServerID; import io.vertx.core.shareddata.SharedData; import io.vertx.core.shareddata.impl.SharedDataImpl; -import io.vertx.core.spi.EventBusFactory; import io.vertx.core.spi.VerticleFactory; import io.vertx.core.spi.VertxMetricsFactory; import io.vertx.core.spi.cluster.ClusterManager; @@ -75,8 +76,6 @@ public class VertxImpl implements VertxInternal, MetricsProvider { private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio"; private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50); - private static final EventBusFactory eventBusFactory = ServiceHelper.loadFactory(EventBusFactory.class); - static { // Netty resource leak detection has a performance overhead and we do not need it in Vert.x ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); @@ -163,7 +162,11 @@ public class VertxImpl implements VertxInternal, MetricsProvider { } private void createAndStartEventBus(VertxOptions options, Handler> resultHandler) { - eventBus = eventBusFactory.createEventBus(this, options, clusterManager, haManager); + if (options.isClustered()) { + eventBus = new ClusteredEventBus(this, options, clusterManager, haManager); + } else { + eventBus = new EventBusImpl(this); + } eventBus.start(ar2 -> { if (ar2.succeeded()) { if (resultHandler != null) { diff --git a/src/main/java/io/vertx/core/spi/EventBusFactory.java b/src/main/java/io/vertx/core/spi/EventBusFactory.java deleted file mode 100644 index 863ea337ed6..00000000000 --- a/src/main/java/io/vertx/core/spi/EventBusFactory.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.vertx.core.spi; - -import io.vertx.core.VertxOptions; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.impl.HAManager; -import io.vertx.core.impl.VertxInternal; -import io.vertx.core.spi.cluster.ClusterManager; - -/** - * @author Tim Fox - */ -public interface EventBusFactory { - - EventBus createEventBus(VertxInternal vertx, VertxOptions options, ClusterManager clusterManager, HAManager haManager); - -} diff --git a/src/main/resources/META-INF/services/io.vertx.core.spi.EventBusFactory b/src/main/resources/META-INF/services/io.vertx.core.spi.EventBusFactory deleted file mode 100644 index 4cd5d48c79f..00000000000 --- a/src/main/resources/META-INF/services/io.vertx.core.spi.EventBusFactory +++ /dev/null @@ -1 +0,0 @@ -io.vertx.core.eventbus.impl.EventBusFactoryImpl \ No newline at end of file diff --git a/src/test/java/io/vertx/test/core/EventBusFlowControlTest.java b/src/test/java/io/vertx/test/core/EventBusFlowControlTest.java new file mode 100644 index 00000000000..094288a04dc --- /dev/null +++ b/src/test/java/io/vertx/test/core/EventBusFlowControlTest.java @@ -0,0 +1,112 @@ +package io.vertx.test.core; + +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.eventbus.MessageProducer; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Tim Fox + */ +public class EventBusFlowControlTest extends VertxTestBase { + + protected EventBus eb; + + @Test + public void testFlowControl() { + + MessageProducer prod = eb.sender("some-address"); + int numBatches = 1000; + int wqms = 2000; + prod.setWriteQueueMaxSize(wqms); + + MessageConsumer consumer = eb.consumer("some-address"); + AtomicInteger cnt = new AtomicInteger(); + consumer.handler(msg -> { + int c = cnt.incrementAndGet(); + if (c == numBatches * wqms) { + testComplete(); + } + }); + + sendBatch(prod, wqms, numBatches, 0); + await(); + } + + private void sendBatch(MessageProducer prod, int batchSize, int numBatches, int batchNumber) { + boolean drainHandlerSet = false; + for (int i = 0; i < batchSize; i++) { + prod.send("message-" + i); + if (prod.writeQueueFull() && !drainHandlerSet) { + prod.drainHandler(v -> { + if (batchNumber < numBatches - 1) { + sendBatch(prod, batchSize, numBatches, batchNumber + 1); + } + }); + drainHandlerSet = true; + } + } + } + + @Test + public void testFlowControlPauseConsumer() { + + MessageProducer prod = eb.sender("some-address"); + int numBatches = 10; + int wqms = 100; + prod.setWriteQueueMaxSize(wqms); + + MessageConsumer consumer = eb.consumer("some-address"); + AtomicInteger cnt = new AtomicInteger(); + AtomicBoolean paused = new AtomicBoolean(); + consumer.handler(msg -> { + assertFalse(paused.get()); + int c = cnt.incrementAndGet(); + if (c == numBatches * wqms) { + testComplete(); + } + if (c % 100 == 0) { + consumer.pause(); + paused.set(true); + vertx.setTimer(100, tid -> { + paused.set(false); + consumer.resume(); + }); + } + }); + + sendBatch(prod, wqms, numBatches, 0); + await(); + } + + @Test + public void testFlowControlNoConsumer() { + + MessageProducer prod = eb.sender("some-address"); + int wqms = 2000; + prod.setWriteQueueMaxSize(wqms); + + boolean drainHandlerSet = false; + for (int i = 0; i < wqms * 2; i++) { + prod.send("message-" + i); + if (prod.writeQueueFull() && !drainHandlerSet) { + prod.drainHandler(v -> { + fail("Should not be called"); + }); + drainHandlerSet = true; + } + } + assertTrue(drainHandlerSet); + vertx.setTimer(500, tid -> testComplete()); + await(); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + eb = vertx.eventBus(); + } +} diff --git a/src/test/java/io/vertx/test/core/EventBusInterceptorTest.java b/src/test/java/io/vertx/test/core/EventBusInterceptorTest.java new file mode 100644 index 00000000000..d03a8810318 --- /dev/null +++ b/src/test/java/io/vertx/test/core/EventBusInterceptorTest.java @@ -0,0 +1,177 @@ +package io.vertx.test.core; + +import io.vertx.core.Handler; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.SendContext; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Tim Fox + */ +public class EventBusInterceptorTest extends VertxTestBase { + + protected EventBus eb; + + @Test + public void testInterceptorSend() { + eb.addInterceptor(sc -> { + assertEquals("armadillo", sc.message().body()); + assertTrue(sc.send()); + sc.next(); + }); + eb.consumer("some-address", msg -> { + assertEquals("armadillo", msg.body()); + testComplete(); + }); + eb.send("some-address", "armadillo"); + await(); + } + + @Test + public void testInterceptorPublish() { + eb.addInterceptor(sc -> { + assertEquals("armadillo", sc.message().body()); + assertFalse(sc.send()); + sc.next(); + }); + eb.consumer("some-address", msg -> { + assertEquals("armadillo", msg.body()); + testComplete(); + }); + eb.publish("some-address", "armadillo"); + await(); + } + + @Test + public void testInterceptorNoNext() { + eb.addInterceptor(sc -> { + assertEquals("armadillo", sc.message().body()); + }); + eb.consumer("some-address", msg -> { + fail("Should not receive message"); + }); + eb.send("some-address", "armadillo"); + vertx.setTimer(200, tid -> testComplete()); + await(); + } + + @Test + public void testMultipleInterceptors() { + AtomicInteger cnt = new AtomicInteger(); + int interceptorNum = 10; + for (int i = 0; i < interceptorNum; i++) { + final int expectedCount = i; + eb.addInterceptor(sc -> { + assertEquals("armadillo", sc.message().body()); + int count = cnt.getAndIncrement(); + assertEquals(expectedCount, count); + sc.next(); + }); + } + eb.consumer("some-address", msg -> { + assertEquals("armadillo", msg.body()); + assertEquals(interceptorNum, cnt.get()); + testComplete(); + }); + eb.send("some-address", "armadillo"); + await(); + } + + @Test + public void testRemoveInterceptor() { + + AtomicInteger cnt1 = new AtomicInteger(); + AtomicInteger cnt2 = new AtomicInteger(); + + Handler eb1 = sc -> { + cnt1.incrementAndGet(); + sc.next(); + }; + + Handler eb2 = sc -> { + cnt2.incrementAndGet(); + sc.next(); + }; + + eb.addInterceptor(eb1).addInterceptor(eb2); + + eb.consumer("some-address", msg -> { + if (msg.body().equals("armadillo")) { + assertEquals(1, cnt1.get()); + assertEquals(1, cnt2.get()); + eb.removeInterceptor(eb2); + eb.send("some-address", "aardvark"); + } else if (msg.body().equals("aardvark")) { + assertEquals(2, cnt1.get()); + assertEquals(1, cnt2.get()); + testComplete(); + } else { + fail("wrong body"); + } + }); + eb.send("some-address", "armadillo"); + await(); + } + + @Test + public void testInterceptorOnReply() { + AtomicInteger cnt = new AtomicInteger(); + eb.addInterceptor(sc -> { + if (sc.message().body().equals("armadillo")) { + assertEquals(0, cnt.get()); + } else if (sc.message().body().equals("echidna")) { + assertEquals(1, cnt.get()); + } else { + fail("wrong body"); + } + cnt.incrementAndGet(); + sc.next(); + }); + eb.consumer("some-address", msg -> { + assertEquals("armadillo", msg.body()); + assertEquals(1, cnt.get()); + msg.reply("echidna"); + }); + eb.send("some-address", "armadillo", reply -> { + assertEquals("echidna", reply.result().body()); + assertEquals(2, cnt.get()); + testComplete(); + }); + await(); + } + + @Test + public void testExceptionInInterceptor() { + AtomicInteger cnt = new AtomicInteger(); + + Handler eb1 = sc -> { + cnt.incrementAndGet(); + vertx.runOnContext(v -> sc.next()); + throw new RuntimeException("foo"); + }; + + Handler eb2 = sc -> { + cnt.incrementAndGet(); + sc.next(); + }; + + eb.addInterceptor(eb1).addInterceptor(eb2); + + eb.consumer("some-address", msg -> { + assertEquals("armadillo", msg.body()); + assertEquals(2, cnt.get()); + testComplete(); + }); + eb.send("some-address", "armadillo"); + await(); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + + eb = vertx.eventBus(); + } +} diff --git a/src/test/java/io/vertx/test/core/LocalEventBusTest.java b/src/test/java/io/vertx/test/core/LocalEventBusTest.java index 6dee557b783..57ffcd729e0 100644 --- a/src/test/java/io/vertx/test/core/LocalEventBusTest.java +++ b/src/test/java/io/vertx/test/core/LocalEventBusTest.java @@ -1238,7 +1238,9 @@ public void testUpdateDeliveryOptionsOnProducer() { producer.write("no-header"); }); consumer.handler(msg -> { - switch (msg.body()) { + String body = msg.body(); + assertNotNull(body); + switch (body) { case "no-header": assertNull(msg.headers().get("header-name")); producer.deliveryOptions(new DeliveryOptions().addHeader("header-name", "header-value")); diff --git a/src/test/java/io/vertx/test/core/NetTest.java b/src/test/java/io/vertx/test/core/NetTest.java index 6800db30982..e13d1893eb4 100644 --- a/src/test/java/io/vertx/test/core/NetTest.java +++ b/src/test/java/io/vertx/test/core/NetTest.java @@ -557,7 +557,6 @@ public void testServerOptionsJson() { Random rand = new Random(); boolean reuseAddress = rand.nextBoolean(); int trafficClass = TestUtils.randomByte() + 128; - System.out.println("trafficClass = " + trafficClass); boolean tcpNoDelay = rand.nextBoolean(); boolean tcpKeepAlive = rand.nextBoolean(); int soLinger = TestUtils.randomPositiveInt();