Skip to content

Commit

Permalink
Add WriteStream asynchronous write and end methods
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 14, 2019
1 parent 091de1c commit 645f6d4
Show file tree
Hide file tree
Showing 46 changed files with 1,728 additions and 367 deletions.
Expand Up @@ -31,6 +31,7 @@
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.ChannelFutureListenerAdapter;
import io.vertx.core.net.impl.SocketAddressImpl;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.net.impl.transport.Transport;
Expand Down Expand Up @@ -212,7 +213,7 @@ private DatagramSocket listen(SocketAddress local, Handler<AsyncResult<DatagramS
@SuppressWarnings("unchecked")
final void addListener(ChannelFuture future, Handler<AsyncResult<DatagramSocket>> handler) {
if (handler != null) {
future.addListener(new DatagramChannelFutureListener<>(this, handler, context));
future.addListener(new ChannelFutureListenerAdapter<>(context, this, handler));
}
}

Expand Down Expand Up @@ -314,7 +315,7 @@ public synchronized void close(final Handler<AsyncResult<Void>> handler) {
channel.flush();
ChannelFuture future = channel.close();
if (handler != null) {
future.addListener(new DatagramChannelFutureListener<>(null, handler, context));
future.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
}
}

Expand Down
Expand Up @@ -54,6 +54,17 @@ public PacketWriteStreamImpl write(Buffer data) {
return this;
}

@Override
public WriteStream<Buffer> write(Buffer data, Handler<AsyncResult<Void>> handler) {
datagramSocket.send(data, port, host, ar -> {
PacketWriteStreamImpl.this.handle(ar);
if (handler != null) {
handler.handle(ar.mapEmpty());
}
});
return this;
}

@Override
public PacketWriteStreamImpl setWriteQueueMaxSize(int maxSize) {
return this;
Expand All @@ -71,5 +82,11 @@ public PacketWriteStreamImpl drainHandler(Handler<Void> handler) {

@Override
public void end() {
datagramSocket.close();
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
datagramSocket.close(handler);
}
}
14 changes: 14 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageProducer.java
Expand Up @@ -53,6 +53,9 @@ public interface MessageProducer<T> extends WriteStream<T> {
@Override
MessageProducer<T> write(T data);

@Fluent
MessageProducer<T> write(T data, Handler<AsyncResult<Void>> handler);

@Override
MessageProducer<T> setWriteQueueMaxSize(int maxSize);

Expand All @@ -79,8 +82,19 @@ public interface MessageProducer<T> extends WriteStream<T> {
@Override
void end();

/**
* Closes the producer, calls {@link #close(Handler)}
*/
@Override
void end(Handler<AsyncResult<Void>> handler);

/**
* Closes the producer, this method should be called when the message producer is not used anymore.
*/
void close();

/**
* Same as {@link #close()} but with an {@code handler} called when the operation completes
*/
void close(Handler<AsyncResult<Void>> handler);
}
36 changes: 26 additions & 10 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Expand Up @@ -116,7 +116,7 @@ public EventBus send(String address, Object message, DeliveryOptions options) {

@Override
public <T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
sendOrPubInternal(createMessage(true, true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
sendOrPubInternal(createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null), options, replyHandler);
return this;
}

Expand Down Expand Up @@ -153,7 +153,7 @@ public EventBus publish(String address, Object message) {

@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName()), options, null);
sendOrPubInternal(createMessage(false, true, address, options.getHeaders(), message, options.getCodecName(), null), options, null);
return this;
}

Expand Down Expand Up @@ -233,11 +233,11 @@ public EventBusMetrics<?> getMetrics() {
return metrics;
}

protected MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName) {
public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this);
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this, writeHandler);
return msg;
}

Expand Down Expand Up @@ -366,9 +366,9 @@ protected void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionH

private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
Object trace = messageSent(sendContext, true, false);
if (!deliverMessageLocally(sendContext.message)) {
ReplyException failure = deliverMessageLocally(sendContext.message);
if (failure != null) {
// no handlers
ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + sendContext.message.address);
VertxTracer tracer = sendContext.ctx.tracer();
if (sendContext.replyHandler != null) {
sendContext.replyHandler.trace = trace;
Expand All @@ -379,6 +379,7 @@ private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
}
}
} else {
failure = null;
VertxTracer tracer = sendContext.ctx.tracer();
if (tracer != null && sendContext.message.src) {
if (sendContext.replyHandler == null) {
Expand All @@ -394,7 +395,7 @@ protected boolean isMessageLocal(MessageImpl msg) {
return true;
}

protected <T> boolean deliverMessageLocally(MessageImpl msg) {
protected ReplyException deliverMessageLocally(MessageImpl msg) {
ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
if (handlers != null) {
if (msg.isSend()) {
Expand All @@ -405,6 +406,12 @@ protected <T> boolean deliverMessageLocally(MessageImpl msg) {
}
if (holder != null) {
deliverToHandler(msg, holder);
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.succeededFuture());
}
} else {
// RACY issue !!!!!
}
} else {
// Publish
Expand All @@ -414,13 +421,22 @@ protected <T> boolean deliverMessageLocally(MessageImpl msg) {
for (HandlerHolder holder: handlers) {
deliverToHandler(msg, holder);
}
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}
return true;
return null;
} else {
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), 0);
}
return false;
ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.failedFuture(failure));
}
return failure;
}
}

Expand Down Expand Up @@ -497,7 +513,7 @@ public void handle(Message<T> reply) {
}
}

private <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
checkStarted();
ReplyHandler<T> handler = createReplyHandler(message, true, options, replyHandler);
Expand Down
Expand Up @@ -127,7 +127,6 @@ public void unregister() {

@Override
public void unregister(Handler<AsyncResult<Void>> completionHandler) {
Objects.requireNonNull(completionHandler);
doUnregister(completionHandler);
}

Expand Down
12 changes: 10 additions & 2 deletions src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java
Expand Up @@ -38,6 +38,7 @@ public class MessageImpl<U, V> implements Message<V> {
protected U sentBody;
protected V receivedBody;
protected boolean send;
protected Handler<AsyncResult<Void>> writeHandler;

public MessageImpl(boolean src, EventBusImpl bus) {
this.bus = bus;
Expand All @@ -46,7 +47,8 @@ public MessageImpl(boolean src, EventBusImpl bus) {

public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, boolean src, EventBusImpl bus) {
boolean send, boolean src, EventBusImpl bus,
Handler<AsyncResult<Void>> writeHandler) {
this.messageCodec = messageCodec;
this.address = address;
this.replyAddress = replyAddress;
Expand All @@ -55,6 +57,7 @@ public MessageImpl(String address, String replyAddress, MultiMap headers, U sent
this.send = send;
this.bus = bus;
this.src = src;
this.writeHandler = writeHandler;
}

protected MessageImpl(MessageImpl<U, V> other, boolean src) {
Expand All @@ -75,6 +78,7 @@ protected MessageImpl(MessageImpl<U, V> other, boolean src) {
this.receivedBody = messageCodec.transform(other.sentBody);
}
this.send = other.send;
this.writeHandler = other.writeHandler;
}

public MessageImpl<U, V> copyBeforeReceive(boolean src) {
Expand Down Expand Up @@ -131,7 +135,7 @@ public void reply(Object message, DeliveryOptions options) {
@Override
public <R> void reply(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
if (replyAddress != null) {
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName());
MessageImpl reply = bus.createMessage(true, src, replyAddress, options.getHeaders(), message, options.getCodecName(), null);
bus.sendReply(reply, this, options, replyHandler);
}
}
Expand All @@ -145,6 +149,10 @@ public void setReplyAddress(String replyAddress) {
this.replyAddress = replyAddress;
}

public Handler<AsyncResult<Void>> writeHandler() {
return writeHandler;
}

public MessageCodec<U, V> codec() {
return messageCodec;
}
Expand Down
53 changes: 35 additions & 18 deletions src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java
Expand Up @@ -12,6 +12,7 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.*;
Expand All @@ -28,10 +29,10 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {
public static final String CREDIT_ADDRESS_HEADER_NAME = "__vertx.credit";

private final Vertx vertx;
private final EventBus bus;
private final EventBusImpl bus;
private final boolean send;
private final String address;
private final Queue<T> pending = new ArrayDeque<>();
private final Queue<MessageImpl<T, ?>> pending = new ArrayDeque<>();
private final MessageConsumer<Integer> creditConsumer;
private DeliveryOptions options;
private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE;
Expand All @@ -40,7 +41,7 @@ public class MessageProducerImpl<T> implements MessageProducer<T> {

public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
this.vertx = vertx;
this.bus = vertx.eventBus();
this.bus = (EventBusImpl) vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
Expand All @@ -67,13 +68,13 @@ public synchronized MessageProducer<T> deliveryOptions(DeliveryOptions options)

@Override
public MessageProducer<T> send(T message) {
doSend(message, null);
doSend(message, null, null);
return this;
}

@Override
public <R> MessageProducer<T> send(T message, Handler<AsyncResult<Message<R>>> replyHandler) {
doSend(message, replyHandler);
doSend(message, replyHandler, null);
return this;
}

Expand All @@ -92,10 +93,17 @@ public synchronized MessageProducer<T> setWriteQueueMaxSize(int s) {

@Override
public synchronized MessageProducer<T> write(T data) {
return write(data, null);
}

@Override
public MessageProducer<T> write(T data, Handler<AsyncResult<Void>> handler) {
if (send) {
doSend(data, null);
doSend(data, null, handler);
} else {
bus.publish(address, data, options);
MessageImpl msg = bus.createMessage(false, true, address, options.getHeaders(), data, options.getCodecName(), handler);
msg.writeHandler = handler;
bus.sendOrPubInternal(msg, options, null);
}
return this;
}
Expand Down Expand Up @@ -132,10 +140,22 @@ public void end() {
close();
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
close(null);
}

@Override
public void close() {
close(null);
}

@Override
public void close(Handler<AsyncResult<Void>> handler) {
if (creditConsumer != null) {
creditConsumer.unregister();
creditConsumer.unregister(handler);
} else {
vertx.runOnContext(v -> handler.handle(Future.succeededFuture()));
}
}

Expand All @@ -146,28 +166,25 @@ protected void finalize() throws Throwable {
super.finalize();
}

private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler) {
private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler, Handler<AsyncResult<Void>> handler) {
MessageImpl msg = bus.createMessage(true, true, address, options.getHeaders(), data, options.getCodecName(), handler);
if (credits > 0) {
credits--;
if (replyHandler == null) {
bus.send(address, data, options);
} else {
bus.send(address, data, options, replyHandler);
}
bus.sendOrPubInternal(msg, options, replyHandler);
} else {
pending.add(data);
pending.add(msg);
}
}

private synchronized void doReceiveCredit(int credit) {
credits += credit;
while (credits > 0) {
T data = pending.poll();
if (data == null) {
MessageImpl<T, ?> msg = pending.poll();
if (msg == null) {
break;
} else {
credits--;
bus.send(address, data, options);
bus.sendOrPubInternal(msg, options, null);
}
}
checkDrained();
Expand Down

0 comments on commit 645f6d4

Please sign in to comment.