Skip to content

Commit

Permalink
Event bus flow control / interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Nov 6, 2015
1 parent f43a672 commit c68ab7c
Show file tree
Hide file tree
Showing 25 changed files with 757 additions and 222 deletions.
25 changes: 18 additions & 7 deletions src/main/asciidoc/java/eventbus.adoc
Expand Up @@ -211,14 +211,14 @@ The `link:../../apidocs/io/vertx/core/eventbus/Message.html#body--[body]` of the


The headers of the message are available with `link:../../apidocs/io/vertx/core/eventbus/Message.html#headers--[headers]`. The headers of the message are available with `link:../../apidocs/io/vertx/core/eventbus/Message.html#headers--[headers]`.


==== Replying to messages ==== Acknowledging messages / sending replies


Sometimes after you send a message you want to receive a reply from the recipient. When using `link:../../apidocs/io/vertx/core/eventbus/EventBus.html#send-java.lang.String-java.lang.Object-[send]` the event bus attempts to deliver the message to a
This is known as the *request-response pattern*. `link:../../apidocs/io/vertx/core/eventbus/MessageConsumer.html[MessageConsumer]` registered with the event bus.


To do this you can specify a reply handler when sending the message. In some cases it's useful for the sender to know when the consumer has received the message and "processed" it.


When the receiver receives the message they can reply to it by calling `link:../../apidocs/io/vertx/core/eventbus/Message.html#reply-java.lang.Object-[reply]`. To acknowledge that the message has been processed the consumer can reply to the message by calling `link:../../apidocs/io/vertx/core/eventbus/Message.html#reply-java.lang.Object-[reply]`.


When this happens it causes a reply to be sent back to the sender and the reply handler is invoked with the reply. When this happens it causes a reply to be sent back to the sender and the reply handler is invoked with the reply.


Expand Down Expand Up @@ -246,8 +246,19 @@ eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of gra
}); });
---- ----


The replies themselves can also be replied to so you can create a dialog between two different parties The reply can contain a message body which can contain useful information.
consisting of multiple rounds.
What the "processing" actually means is application defined and depends entirely on what the message consumer does
and is not something that the Vert.x event bus itself knows or cares about.

Some examples:

* A simple message consumer which implements a service which returns the time of the day would acknowledge with a message
containing the time of day in the reply body
* A message consumer which implements a persistent queue, might acknowledge with `true` if the message was successfully
persisted in storage, or `false` if not.
* A message consumer which processes an order might acknowledge with `true` when the order has been successfully processed
so it can be deleted from the database


==== Sending with timeouts ==== Sending with timeouts


Expand Down
10 changes: 5 additions & 5 deletions src/main/asciidoc/java/http.adoc
Expand Up @@ -66,7 +66,7 @@ server.listen(8080, "myhost.com", res -> {


=== Getting notified of incoming requests === Getting notified of incoming requests


To be notified when a request arrives you need to set a `link:../../apidocs/io/vertx/core/http/HttpServer.html#requestHandler-io.vertx.core.Handler-[requestHandler]`: To be notified when a request arrives you need to set a `link:../../apidocs/io/vertx/core/http/HttpServer.html#requestHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[requestHandler]`:


[source,java] [source,java]
---- ----
Expand Down Expand Up @@ -237,7 +237,7 @@ request.endHandler(v -> {
}); });
---- ----


This is such a common case, that Vert.x provides a `link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#bodyHandler-io.vertx.core.Handler-[bodyHandler]` to do this This is such a common case, that Vert.x provides a `link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#bodyHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[bodyHandler]` to do this
for you. The body handler is called once when all the body has been received: for you. The body handler is called once when all the body has been received:


[source,java] [source,java]
Expand Down Expand Up @@ -286,7 +286,7 @@ server.requestHandler(request -> {
Vert.x can also handle file uploads which are encoded in a multi-part request body. Vert.x can also handle file uploads which are encoded in a multi-part request body.


To receive file uploads you tell Vert.x to expect a multi-part form and set an To receive file uploads you tell Vert.x to expect a multi-part form and set an
`link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#uploadHandler-io.vertx.core.Handler-[uploadHandler]` on the request. `link:../../apidocs/io/vertx/core/http/HttpServerRequest.html#uploadHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[uploadHandler]` on the request.


This handler will be called once for every This handler will be called once for every
upload that arrives on the server. upload that arrives on the server.
Expand Down Expand Up @@ -1061,7 +1061,7 @@ The idea here is it allows the server to authorise and accept/reject the request
Sending large amounts of data if the request might not be accepted is a waste of bandwidth and ties up the server Sending large amounts of data if the request might not be accepted is a waste of bandwidth and ties up the server
in reading data that it will just discard. in reading data that it will just discard.


Vert.x allows you to set a `link:../../apidocs/io/vertx/core/http/HttpClientRequest.html#continueHandler-io.vertx.core.Handler-[continueHandler]` on the Vert.x allows you to set a `link:../../apidocs/io/vertx/core/http/HttpClientRequest.html#continueHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[continueHandler]` on the
client request object client request object


This will be called if the server sends back a `Status: 100 (Continue)` response to signify that it is ok to send This will be called if the server sends back a `Status: 100 (Continue)` response to signify that it is ok to send
Expand Down Expand Up @@ -1230,7 +1230,7 @@ There are two ways of handling WebSockets on the server side.


===== WebSocket handler ===== WebSocket handler


The first way involves providing a `link:../../apidocs/io/vertx/core/http/HttpServer.html#websocketHandler-io.vertx.core.Handler-[websocketHandler]` The first way involves providing a `link:../../apidocs/io/vertx/core/http/HttpServer.html#websocketHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[websocketHandler]`
on the server instance. on the server instance.


When a WebSocket connection is made to the server, the handler will be called, passing in an instance of When a WebSocket connection is made to the server, the handler will be called, passing in an instance of
Expand Down
4 changes: 2 additions & 2 deletions src/main/asciidoc/java/net.adoc
Expand Up @@ -84,7 +84,7 @@ server.listen(0, "localhost", res -> {


=== Getting notified of incoming connections === Getting notified of incoming connections


To be notified when a connection is made you need to set a `link:../../apidocs/io/vertx/core/net/NetServer.html#connectHandler-io.vertx.core.Handler-[connectHandler]`: To be notified when a connection is made you need to set a `link:../../apidocs/io/vertx/core/net/NetServer.html#connectHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[connectHandler]`:


[source,java] [source,java]
---- ----
Expand Down Expand Up @@ -137,7 +137,7 @@ Write operations are asynchronous and may not occur until some time after the ca


=== Closed handler === Closed handler


If you want to be notified when a socket is closed, you can set a `link:../../apidocs/io/vertx/core/net/NetSocket.html#closeHandler-io.vertx.core.Handler-[closeHandler]` If you want to be notified when a socket is closed, you can set a `link:../../apidocs/io/vertx/core/net/NetSocket.html#closeHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[closeHandler]`
on it: on it:


[source,java] [source,java]
Expand Down
8 changes: 4 additions & 4 deletions src/main/asciidoc/java/streams.adoc
Expand Up @@ -101,7 +101,7 @@ server.connectHandler(sock -> {
}).listen(); }).listen();
---- ----


And there we have it. The `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-io.vertx.core.Handler-[drainHandler]` event handler will And there we have it. The `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[drainHandler]` event handler will
get called when the write queue is ready to accept more data, this resumes the `NetSocket` which get called when the write queue is ready to accept more data, this resumes the `NetSocket` which
allows it to read more data. allows it to read more data.


Expand Down Expand Up @@ -134,15 +134,15 @@ Let's look at the methods on `ReadStream` and `WriteStream` in more detail:


Functions: Functions:


- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#handler-io.vertx.core.Handler-[handler]`: - `link:../../apidocs/io/vertx/core/streams/ReadStream.html#handler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[handler]`:
set a handler which will receive items from the ReadStream. set a handler which will receive items from the ReadStream.
- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#pause--[pause]`: - `link:../../apidocs/io/vertx/core/streams/ReadStream.html#pause--[pause]`:
pause the handler. When paused no items will be received in the handler. pause the handler. When paused no items will be received in the handler.
- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#resume--[resume]`: - `link:../../apidocs/io/vertx/core/streams/ReadStream.html#resume--[resume]`:
resume the handler. The handler will be called if any item arrives. resume the handler. The handler will be called if any item arrives.
- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#exceptionHandler-io.vertx.core.Handler-[exceptionHandler]`: - `link:../../apidocs/io/vertx/core/streams/ReadStream.html#exceptionHandler-io.vertx.core.Handler-[exceptionHandler]`:
Will be called if an exception occurs on the ReadStream. Will be called if an exception occurs on the ReadStream.
- `link:../../apidocs/io/vertx/core/streams/ReadStream.html#endHandler-io.vertx.core.Handler-[endHandler]`: - `link:../../apidocs/io/vertx/core/streams/ReadStream.html#endHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[endHandler]`:
Will be called when end of stream is reached. This might be when EOF is reached if the ReadStream represents a file, Will be called when end of stream is reached. This might be when EOF is reached if the ReadStream represents a file,
or when end of request is reached if it's an HTTP request, or when the connection is closed if it's a TCP socket. or when end of request is reached if it's an HTTP request, or when the connection is closed if it's a TCP socket.


Expand All @@ -166,7 +166,7 @@ represents the actual number of bytes written and not the number of buffers.
returns `true` if the write queue is considered full. returns `true` if the write queue is considered full.
- `link:../../apidocs/io/vertx/core/streams/WriteStream.html#exceptionHandler-io.vertx.core.Handler-[exceptionHandler]`: - `link:../../apidocs/io/vertx/core/streams/WriteStream.html#exceptionHandler-io.vertx.core.Handler-[exceptionHandler]`:
Will be called if an exception occurs on the `WriteStream`. Will be called if an exception occurs on the `WriteStream`.
- `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-io.vertx.core.Handler-[drainHandler]`: - `link:../../apidocs/io/vertx/core/streams/WriteStream.html#drainHandler-(@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler)-[drainHandler]`:
The handler will be called if the `WriteStream` is considered no longer full. The handler will be called if the `WriteStream` is considered no longer full.


=== Pump === Pump
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/vertx/core/eventbus/BridgeInterceptor.java
@@ -0,0 +1,16 @@
package io.vertx.core.eventbus;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class BridgeInterceptor extends FilteringInterceptor {

public BridgeInterceptor(String startsWith) {
super(startsWith);
}

@Override
protected void handleContext(SendContext sendContext) {

}
}
20 changes: 17 additions & 3 deletions src/main/java/io/vertx/core/eventbus/EventBus.java
Expand Up @@ -62,7 +62,6 @@ public interface EventBus extends Measured {
@Fluent @Fluent
<T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler); <T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);



/** /**
* Like {@link #send(String, Object)} but specifying {@code options} that can be used to configure the delivery. * Like {@link #send(String, Object)} but specifying {@code options} that can be used to configure the delivery.
* *
Expand Down Expand Up @@ -236,15 +235,15 @@ public interface EventBus extends Measured {
* Unregister a default message codec. * Unregister a default message codec.
* <p> * <p>
* @param clazz the class for which the codec was registered * @param clazz the class for which the codec was registered
* @return @return a reference to this, so the API can be used fluently * @return a reference to this, so the API can be used fluently
*/ */
@GenIgnore @GenIgnore
EventBus unregisterDefaultCodec(Class clazz); EventBus unregisterDefaultCodec(Class clazz);


/** /**
* Start the event bus. This would not normally be called in user code * Start the event bus. This would not normally be called in user code
* *
* @param completionHandler * @param completionHandler handler will be called when event bus is started
*/ */
@GenIgnore @GenIgnore
void start(Handler<AsyncResult<Void>> completionHandler); void start(Handler<AsyncResult<Void>> completionHandler);
Expand All @@ -257,6 +256,21 @@ public interface EventBus extends Measured {
@GenIgnore @GenIgnore
void close(Handler<AsyncResult<Void>> completionHandler); void close(Handler<AsyncResult<Void>> completionHandler);


/**
* Add an interceptor that will be called whenever a message is sent from Vert.x
*
* @param interceptor the interceptor
* @return a reference to this, so the API can be used fluently
*/
EventBus addInterceptor(Handler<SendContext> interceptor);

/**
* Remove an interceptor
*
* @param interceptor the interceptor
* @return a reference to this, so the API can be used fluently
*/
EventBus removeInterceptor(Handler<SendContext> interceptor);


} }


29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java
@@ -0,0 +1,29 @@
package io.vertx.core.eventbus;

import io.vertx.core.Handler;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public abstract class FilteringInterceptor implements Handler<SendContext> {

private final String startsWith;

public FilteringInterceptor(String startsWith) {
this.startsWith = startsWith;
}

// TODO regex

@Override
public void handle(SendContext sendContext) {
if (sendContext.message().address().startsWith(startsWith)) {
handleContext(sendContext);
} else {
sendContext.next();
}
}

protected abstract void handleContext(SendContext sendContext);

}
11 changes: 11 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageProducer.java
Expand Up @@ -30,6 +30,16 @@
@VertxGen @VertxGen
public interface MessageProducer<T> extends WriteStream<T> { public interface MessageProducer<T> extends WriteStream<T> {


int DEFAULT_WRITE_QUEUE_MAX_SIZE = 1000;

/**
* Synonym for {@link #write(Object)}.
*
* @param message the message to send
* @return reference to this for fluency
*/
MessageProducer<T> send(T message);

@Override @Override
MessageProducer<T> exceptionHandler(Handler<Throwable> handler); MessageProducer<T> exceptionHandler(Handler<Throwable> handler);


Expand All @@ -56,4 +66,5 @@ public interface MessageProducer<T> extends WriteStream<T> {
*/ */
String address(); String address();


void close();
} }
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/eventbus/SendContext.java
@@ -0,0 +1,29 @@
package io.vertx.core.eventbus;

import io.vertx.codegen.annotations.VertxGen;

/**
*
* Encapsulates a message being sent from Vert.x. Used with event bus interceptors
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@VertxGen
public interface SendContext<T> {

/**
* @return The message being sent
*/
Message<T> message();

/**
* Call the next interceptor
*/
void next();

/**
*
* @return true if the message is being sent (point to point) or False if the message is being published
*/
boolean send();
}
27 changes: 0 additions & 27 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java

This file was deleted.

0 comments on commit c68ab7c

Please sign in to comment.