diff --git a/src/main/java/io/vertx/core/eventbus/EventBus.java b/src/main/java/io/vertx/core/eventbus/EventBus.java index 140808e5c11..3d718093149 100644 --- a/src/main/java/io/vertx/core/eventbus/EventBus.java +++ b/src/main/java/io/vertx/core/eventbus/EventBus.java @@ -15,7 +15,6 @@ import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.eventbus.impl.DefaultSerializableChecker; @@ -199,6 +198,10 @@ default Future> request(String address, @Nullable Object message) */ MessageProducer publisher(String address, DeliveryOptions options); + Future bindStream(String address, Handler handler); + + Future connectStream(String address); + /** * Register a message codec. *

diff --git a/src/main/java/io/vertx/core/eventbus/MessageStream.java b/src/main/java/io/vertx/core/eventbus/MessageStream.java new file mode 100644 index 00000000000..c1d2952fd9c --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/MessageStream.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Handler; + +@VertxGen +public interface MessageStream { + + void handler(Handler> handler); + + void endHandler(Handler handler); + + void write(String msg); + + void end(); + +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java new file mode 100644 index 00000000000..872a9089800 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java @@ -0,0 +1,27 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Promise; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +class ClientStream extends StreamBase { + + private final Promise promise2; + + public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise promise2) { + super(sourceAddress, ctx, eventBus, sourceAddress, true); + this.promise2 = promise2; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof SynFrame) { + SynFrame syn = (SynFrame) frame; + remoteAddress = syn.src; + promise2.complete(this); + return true; + } else { + return super.doReceive(frame); + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 9b95ab0b4c2..2f4155570b2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -116,7 +117,7 @@ public EventBus send(String address, Object message) { @Override public EventBus send(String address, Object message, DeliveryOptions options) { MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); - sendOrPubInternal(msg, options, null, null); + sendOrPubInternal(msg, options, null); return this; } @@ -124,7 +125,7 @@ public EventBus send(String address, Object message, DeliveryOptions options) { public Future> request(String address, Object message, DeliveryOptions options) { MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); ReplyHandler handler = createReplyHandler(msg, true, options); - sendOrPubInternal(msg, options, handler, null); + sendOrPubInternal(msg, options, handler); return handler.result(); } @@ -154,6 +155,31 @@ public MessageProducer publisher(String address, DeliveryOptions options) return new MessageProducerImpl<>(vertx, address, false, options); } + @Override + public Future bindStream(String address, Handler handler) { + ContextInternal ctx = vertx.getOrCreateContext(); + HandlerRegistration reg = new StreamServer(this, ctx, address, handler); + Promise promise = ctx.promise(); + reg.register(true, false, promise); + return promise.future(); + } + + @Override + public Future connectStream(String address) { + ContextInternal ctx = vertx.getOrCreateContext(); + String sourceAddress = generateReplyAddress(); + Promise promise2 = ctx.promise(); + StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2); + Promise promise = ctx.promise(); + reg.register(false, false, promise); + promise.future().onComplete(ar -> { + if (ar.succeeded()) { + sendLocally(new SynFrame(sourceAddress, address), ctx.promise()); + } + }); + return promise2.future(); + } + @Override public EventBus publish(String address, Object message) { return publish(address, message, new DeliveryOptions()); @@ -161,7 +187,7 @@ public EventBus publish(String address, Object message) { @Override public EventBus publish(String address, Object message, DeliveryOptions options) { - sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null); + sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null); return this; } @@ -257,10 +283,18 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, return msg; } - protected HandlerHolder addRegistration(String address, HandlerRegistration registration, boolean replyHandler, boolean localOnly, Promise promise) { - HandlerHolder holder = addLocalRegistration(address, registration, replyHandler, localOnly); - onLocalRegistration(holder, promise); - return holder; + protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean broadcast, boolean localOnly, Promise promise) { + HandlerHolder holder = addLocalRegistration(address, registration, localOnly); + if (broadcast) { + onLocalRegistration(holder, promise); + } else { + if (promise != null) { + promise.complete(); + } + } + return p -> { + removeRegistration(holder, broadcast, p); + }; } protected void onLocalRegistration(HandlerHolder handlerHolder, Promise promise) { @@ -270,12 +304,12 @@ protected void onLocalRegistration(HandlerHolder handlerHolder, Promise HandlerHolder addLocalRegistration(String address, HandlerRegistration registration, - boolean replyHandler, boolean localOnly) { + boolean localOnly) { Objects.requireNonNull(address, "address"); ContextInternal context = registration.context; - HandlerHolder holder = createHandlerHolder(registration, replyHandler, localOnly, context); + HandlerHolder holder = createHandlerHolder(registration, localOnly, context); ConcurrentCyclicSequence handlers = new ConcurrentCyclicSequence().add(holder); ConcurrentCyclicSequence actualHandlers = handlerMap.merge( @@ -290,13 +324,17 @@ private HandlerHolder addLocalRegistration(String address, HandlerRegistr return holder; } - protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean replyHandler, boolean localOnly, ContextInternal context) { - return new HandlerHolder<>(registration, replyHandler, localOnly, context); + protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean localOnly, ContextInternal context) { + return new HandlerHolder<>(registration, localOnly, context); } - protected void removeRegistration(HandlerHolder handlerHolder, Promise promise) { + protected void removeRegistration(HandlerHolder handlerHolder, boolean broadcast, Promise promise) { removeLocalRegistration(handlerHolder); - onLocalUnregistration(handlerHolder, promise); + if (broadcast) { + onLocalUnregistration(handlerHolder, promise); + } else { + promise.complete(); + } } protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise promise) { @@ -321,59 +359,74 @@ protected void sendReply(MessageImpl replyMessage, DeliveryOptions options, if (replyMessage.address() == null) { throw new IllegalStateException("address not specified"); } else { - sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler, null)); + sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler)); } } + protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { + sendLocally(message, writePromise); + } + protected void sendOrPub(OutboundDeliveryContext sendContext) { - sendLocally(sendContext); + sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext); } - private void sendLocally(OutboundDeliveryContext sendContext) { - ReplyException failure = deliverMessageLocally(sendContext.message); + protected void sendLocally(Frame message, Promise writePromise) { + ReplyException failure = deliverMessageLocally(message); if (failure != null) { - sendContext.written(failure); + writePromise.tryFail(failure); } else { - sendContext.written(null); + writePromise.tryComplete(); } } - protected boolean isMessageLocal(MessageImpl msg) { + protected boolean isMessageLocal(Frame msg) { return true; } - protected ReplyException deliverMessageLocally(MessageImpl msg) { - ConcurrentCyclicSequence handlers = handlerMap.get(msg.address()); - boolean messageLocal = isMessageLocal(msg); - if (handlers != null) { - if (msg.isSend()) { - //Choose one - HandlerHolder holder = nextHandler(handlers, messageLocal); - if (metrics != null) { - metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0); - } - if (holder != null) { - holder.handler.receive(msg.copyBeforeReceive()); + protected ReplyException deliverMessageLocally(Frame frame) { + ConcurrentCyclicSequence handlers = handlerMap.get(frame.address()); + boolean messageLocal = isMessageLocal(frame); + if (frame instanceof MessageImpl) { + MessageImpl msg = (MessageImpl) frame; + if (handlers != null) { + if (msg.isSend()) { + //Choose one + HandlerHolder holder = nextHandler(handlers, messageLocal); + if (metrics != null) { + metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0); + } + if (holder != null) { + holder.handler.receive(msg.copyBeforeReceive()); + } else { + // RACY issue !!!!! + } } else { - // RACY issue !!!!! + // Publish + if (metrics != null) { + metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size()); + } + for (HandlerHolder holder: handlers) { + if (messageLocal || !holder.isLocalOnly()) { + holder.handler.receive(msg.copyBeforeReceive()); + } + } } + return null; } else { - // Publish if (metrics != null) { - metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size()); - } - for (HandlerHolder holder: handlers) { - if (messageLocal || !holder.isLocalOnly()) { - holder.handler.receive(msg.copyBeforeReceive()); - } + metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0); } + return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); } - return null; } else { - if (metrics != null) { - metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0); + if (handlers != null) { + HandlerHolder holder = nextHandler(handlers, messageLocal); + holder.handler.receive(frame); + return null; + } else { + return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + frame.address()); } - return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); } } @@ -403,8 +456,8 @@ ReplyHandler createReplyHandler(MessageImpl message, } public OutboundDeliveryContext newSendContext(MessageImpl message, DeliveryOptions options, - ReplyHandler handler, Promise writePromise) { - return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler, writePromise); + ReplyHandler handler) { + return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler); } public void sendOrPubInternal(OutboundDeliveryContext senderCtx) { @@ -414,10 +467,12 @@ public void sendOrPubInternal(OutboundDeliveryContext senderCtx) { senderCtx.next(); } - public void sendOrPubInternal(MessageImpl message, DeliveryOptions options, - ReplyHandler handler, Promise writePromise) { + public Future sendOrPubInternal(MessageImpl message, DeliveryOptions options, + ReplyHandler handler) { checkStarted(); - sendOrPubInternal(newSendContext(message, options, handler, writePromise)); + OutboundDeliveryContext ctx = newSendContext(message, options, handler); + sendOrPubInternal(ctx); + return ctx.writePromise.future(); } private Future unregisterAll() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java new file mode 100644 index 00000000000..70309c74ff7 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java @@ -0,0 +1,27 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +class FinFrame implements Frame { + + final String addr; + + public FinFrame(String addr) { + this.addr = addr; + } + + @Override + public String address() { + return addr; + } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + return false; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/Frame.java b/src/main/java/io/vertx/core/eventbus/impl/Frame.java new file mode 100644 index 00000000000..0f0dadf94cb --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/Frame.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +public interface Frame { + + String address(); + + Buffer encodeToWire(); + + boolean isFromWire(); + +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java index d1135b2b2f0..5a0e32c1497 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java @@ -22,14 +22,12 @@ public class HandlerHolder { public final ContextInternal context; public final HandlerRegistration handler; - public final boolean replyHandler; public final boolean localOnly; private boolean removed; - public HandlerHolder(HandlerRegistration handler, boolean replyHandler, boolean localOnly, ContextInternal context) { + public HandlerHolder(HandlerRegistration handler, boolean localOnly, ContextInternal context) { this.context = context; this.handler = handler; - this.replyHandler = replyHandler; this.localOnly = localOnly; } @@ -76,10 +74,6 @@ public HandlerRegistration getHandler() { return handler; } - public boolean isReplyHandler() { - return replyHandler; - } - public boolean isLocalOnly() { return localOnly; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index a1c15ea6c6d..840cf7266f3 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -20,13 +20,15 @@ import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.core.tracing.TracingPolicy; +import java.util.function.Consumer; + public abstract class HandlerRegistration implements Closeable { public final ContextInternal context; public final EventBusImpl bus; public final String address; public final boolean src; - private HandlerHolder registered; + private Consumer> registered; private Object metric; HandlerRegistration(ContextInternal context, @@ -39,30 +41,32 @@ public abstract class HandlerRegistration implements Closeable { this.address = address; } - void receive(MessageImpl msg) { + void receive(Frame msg) { if (bus.metrics != null) { - bus.metrics.scheduleMessage(metric, msg.isLocal()); + bus.metrics.scheduleMessage(metric, ((MessageImpl)msg).isLocal()); // Will CCE } context.executor().execute(() -> { // Need to check handler is still there - the handler might have been removed after the message were sent but // before it was received if (!doReceive(msg)) { - discard(msg); + if (msg instanceof Message) { + discard((Message) msg); + } } }); } - protected abstract boolean doReceive(Message msg); + protected abstract boolean doReceive(Frame msg); protected abstract void dispatch(Message msg, ContextInternal context, Handler> handler); - synchronized void register(String repliedAddress, boolean localOnly, Promise promise) { + synchronized void register(boolean broadcast, boolean localOnly, Promise promise) { if (registered != null) { throw new IllegalStateException(); } - registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise); + registered = bus.addRegistration(address, this, broadcast, localOnly, promise); if (bus.metrics != null) { - metric = bus.metrics.handlerRegistered(address, repliedAddress); + metric = bus.metrics.handlerRegistered(address, null /* regression */); } } @@ -74,7 +78,7 @@ public Future unregister() { Promise promise = context.promise(); synchronized (this) { if (registered != null) { - bus.removeRegistration(registered, promise); + registered.accept(promise); registered = null; if (bus.metrics != null) { bus.metrics.handlerUnregistered(metric); diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 776cbe0800d..7304bd955c6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -131,6 +131,15 @@ public synchronized Future unregister() { return fut; } + @Override + protected boolean doReceive(Frame msg) { + if (msg instanceof Message) { + return doReceive((Message) msg); + } else { + return false; + } + } + protected boolean doReceive(Message message) { Handler> theHandler; synchronized (this) { @@ -216,7 +225,7 @@ public synchronized MessageConsumer handler(Handler> h) { registered = true; Promise p = result; Promise registration = context.promise(); - register(null, localOnly, registration); + register(true, localOnly, registration); registration.future().onComplete(ar -> { if (ar.succeeded()) { p.tryComplete(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index c2919156d20..67395486fb8 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -13,6 +13,7 @@ import io.vertx.core.Future; import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.*; import java.util.List; @@ -21,7 +22,7 @@ /** * @author Tim Fox */ -public class MessageImpl implements Message { +public class MessageImpl implements Message, Frame { protected MessageCodec messageCodec; protected final EventBusImpl bus; @@ -140,4 +141,14 @@ public MessageCodec codec() { protected boolean isLocal() { return true; } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + throw new UnsupportedOperationException(); + } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java index 41d4bb2be81..18f02b646f6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java @@ -45,14 +45,8 @@ public synchronized MessageProducer deliveryOptions(DeliveryOptions options) @Override public Future write(T body) { - Promise promise = ((VertxInternal)vertx).getOrCreateContext().promise(); - write(body, promise); - return promise.future(); - } - - private void write(T data, Promise handler) { - MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName()); - bus.sendOrPubInternal(msg, options, null, handler); + MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), body, options.getCodecName()); + return bus.sendOrPubInternal(msg, options, null); } @Override diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index 657ae791543..29c26f40803 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -11,6 +11,7 @@ package io.vertx.core.eventbus.impl; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryOptions; @@ -25,31 +26,43 @@ import java.util.function.BiConsumer; -public class OutboundDeliveryContext extends DeliveryContextBase implements Handler> { +public class OutboundDeliveryContext extends DeliveryContextBase implements Promise { public final ContextInternal ctx; public final DeliveryOptions options; public final ReplyHandler replyHandler; - private final Promise writePromise; + public final Promise writePromise; private boolean src; EventBusImpl bus; EventBusMetrics metrics; - OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler, Promise writePromise) { + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler) { super(message, message.bus.outboundInterceptors(), ctx); this.ctx = ctx; this.options = options; this.replyHandler = replyHandler; - this.writePromise = writePromise; + this.writePromise = ctx.promise(); } @Override - public void handle(AsyncResult event) { - written(event.cause()); + public boolean tryComplete(Void result) { + written(null); + return true; } - public void written(Throwable failure) { + @Override + public boolean tryFail(Throwable cause) { + written(cause); + return false; + } + + @Override + public Future future() { + throw new UnsupportedOperationException(); + } + + private void written(Throwable failure) { // Metrics if (metrics != null) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index 649668090e1..2eae66ca26f 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -77,13 +77,17 @@ public void handle(Long id) { } @Override - protected boolean doReceive(Message reply) { - dispatch(null, reply, context); - return true; + protected boolean doReceive(Frame msg) { + if (msg instanceof Message) { + dispatch(null, (Message) msg, context); + return true; + } else { + return false; + } } void register() { - register(repliedAddress, true, null); + register(false, false, null); } @Override diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java new file mode 100644 index 00000000000..3b663b1566f --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java @@ -0,0 +1,75 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +class StreamBase extends HandlerRegistration implements MessageStream { + + private Handler> handler; + private Handler endHandler; + final String localAddress; + String remoteAddress; + private boolean halfClosed; + + StreamBase(String localAddress, ContextInternal context, EventBusImpl bus, String address, boolean src) { + super(context, bus, address, src); + this.localAddress = localAddress; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof MessageImpl) { + MessageImpl msg = (MessageImpl) frame; + Handler> h = handler; + if (h != null) { + h.handle(msg); + } + } else if (frame instanceof FinFrame) { + Handler h = endHandler; + if (h != null) { + h.handle(null); + } + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + } + return true; + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + + } + + @Override + public void handler(Handler> handler) { + this.handler = handler; + } + + @Override + public void endHandler(Handler handler) { + this.endHandler = handler; + } + + @Override + public void write(String body) { + MessageImpl msg = new MessageImpl(remoteAddress, MultiMap.caseInsensitiveMultiMap(), body, CodecManager.STRING_MESSAGE_CODEC, true, bus); + bus.sendLocally(msg, context.promise()); + } + + @Override + public void end() { + FinFrame fin = new FinFrame(remoteAddress); + bus.sendLocally(fin, context.promise()); + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java new file mode 100644 index 00000000000..f60a5d87109 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java @@ -0,0 +1,42 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.future.PromiseInternal; + +class StreamServer extends HandlerRegistration { + private final EventBusImpl eventBus; + private final Handler handler; + + public StreamServer(EventBusImpl eventBus, ContextInternal ctx, String address, Handler handler) { + super(ctx, eventBus, address, false); + this.eventBus = eventBus; + this.handler = handler; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof SynFrame) { + SynFrame syn = (SynFrame) frame; + String localAddress = eventBus.generateReplyAddress(); + StreamBase ss = new StreamBase(localAddress, context, eventBus, localAddress, false); + ss.remoteAddress = syn.src; + PromiseInternal p = context.promise(); + ss.register(false, true, p); + p.onComplete(ar -> { + if (ar.succeeded()) { + SynFrame reply = new SynFrame(localAddress, syn.src); + eventBus.sendLocally(reply, context.promise()); + handler.handle(ss); + } + }); + } + return true; + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java new file mode 100644 index 00000000000..d55d6250a7e --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java @@ -0,0 +1,29 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +public class SynFrame implements Frame { + + final String src; + final String dst; + + public SynFrame(String src, String dst) { + this.src = src; + this.dst = dst; + } + + @Override + public String address() { + return dst; + } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + return false; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 3fe279ecc00..6b78ef0fc59 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -11,20 +11,13 @@ package io.vertx.core.eventbus.impl.clustered; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.Promise; -import io.vertx.core.VertxOptions; +import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.AddressHelper; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBusOptions; import io.vertx.core.eventbus.MessageCodec; -import io.vertx.core.eventbus.impl.CodecManager; -import io.vertx.core.eventbus.impl.EventBusImpl; -import io.vertx.core.eventbus.impl.HandlerHolder; -import io.vertx.core.eventbus.impl.HandlerRegistration; -import io.vertx.core.eventbus.impl.MessageImpl; -import io.vertx.core.eventbus.impl.OutboundDeliveryContext; +import io.vertx.core.eventbus.impl.*; import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; @@ -159,76 +152,68 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, @Override protected void onLocalRegistration(HandlerHolder handlerHolder, Promise promise) { - if (!handlerHolder.isReplyHandler()) { - RegistrationInfo registrationInfo = new RegistrationInfo( - nodeId, - handlerHolder.getSeq(), - handlerHolder.isLocalOnly() - ); - clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise)); - } else if (promise != null) { - promise.complete(); - } + RegistrationInfo registrationInfo = new RegistrationInfo( + nodeId, + handlerHolder.getSeq(), + handlerHolder.isLocalOnly() + ); + clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise)); } @Override - protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean replyHandler, boolean localOnly, ContextInternal context) { - return new ClusteredHandlerHolder<>(registration, replyHandler, localOnly, context, handlerSequence.getAndIncrement()); + protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean localOnly, ContextInternal context) { + return new ClusteredHandlerHolder<>(registration, localOnly, context, handlerSequence.getAndIncrement()); } @Override protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise completionHandler) { - if (!handlerHolder.isReplyHandler()) { - RegistrationInfo registrationInfo = new RegistrationInfo( - nodeId, - handlerHolder.getSeq(), - handlerHolder.isLocalOnly() - ); - Promise promise = Promise.promise(); - clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise); - promise.future().onComplete(completionHandler); - } else { - completionHandler.complete(); - } + RegistrationInfo registrationInfo = new RegistrationInfo( + nodeId, + handlerHolder.getSeq(), + handlerHolder.isLocalOnly() + ); + Promise promise = Promise.promise(); + clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise); + promise.future().onComplete(completionHandler); } @Override - protected void sendOrPub(OutboundDeliveryContext sendContext) { - if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) { - clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext); - } else if (sendContext.options.isLocalOnly()) { - super.sendOrPub(sendContext); + protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { + if (((ClusteredMessage) message).getRepliedTo() != null) { + clusteredSendReply(message, writePromise, ((ClusteredMessage) message).getRepliedTo()); + } else if (options.isLocalOnly()) { + sendLocally(message, writePromise); } else { - Serializer serializer = Serializer.get(sendContext.ctx); - if (sendContext.message.isSend()) { - Promise promise = sendContext.ctx.promise(); - serializer.queue(sendContext.message, nodeSelector::selectForSend, promise); + Serializer serializer = Serializer.get(ctx); + if (message.isSend()) { + Promise promise = Promise.promise(); + serializer.queue(message, nodeSelector::selectForSend, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNode(sendContext, ar.result()); + sendToNode(ar.result(), message, writePromise); } else { - sendOrPublishFailed(sendContext, ar.cause()); + sendOrPublishFailed(writePromise, ar.cause()); } }); } else { - Promise> promise = sendContext.ctx.promise(); - serializer.queue(sendContext.message, nodeSelector::selectForPublish, promise); + Promise> promise = Promise.promise(); + serializer.queue(message, nodeSelector::selectForPublish, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNodes(sendContext, ar.result()); + sendToNodes(ar.result(), message, writePromise); } else { - sendOrPublishFailed(sendContext, ar.cause()); + sendOrPublishFailed(writePromise, ar.cause()); } }); } } } - private void sendOrPublishFailed(OutboundDeliveryContext sendContext, Throwable cause) { + private void sendOrPublishFailed(Promise promise, Throwable cause) { if (log.isDebugEnabled()) { log.error("Failed to send message", cause); } - sendContext.written(cause); + promise.tryFail(cause); } @Override @@ -238,7 +223,7 @@ protected String generateReplyAddress() { } @Override - protected boolean isMessageLocal(MessageImpl msg) { + protected boolean isMessageLocal(Frame msg) { ClusteredMessage clusteredMessage = (ClusteredMessage) msg; return !clusteredMessage.isFromWire(); } @@ -252,7 +237,7 @@ protected HandlerHolder nextHandler(ConcurrentCyclicSequence hand Iterator iterator = handlers.iterator(false); while (iterator.hasNext()) { HandlerHolder next = iterator.next(); - if (next.isReplyHandler() || !next.isLocalOnly()) { + if (!next.isLocalOnly()) { handlerHolder = next; break; } @@ -329,39 +314,39 @@ public void handle(Buffer buff) { }; } - private void sendToNode(OutboundDeliveryContext sendContext, String nodeId) { + private void sendToNode(String nodeId, Frame message, Promise writePromise) { if (nodeId != null && !nodeId.equals(this.nodeId)) { - sendRemote(sendContext, nodeId, sendContext.message); + sendRemote(nodeId, message, writePromise); } else { - super.sendOrPub(sendContext); + sendLocally(message, writePromise); } } - private void sendToNodes(OutboundDeliveryContext sendContext, Iterable nodeIds) { + private void sendToNodes(Iterable nodeIds, Frame message, Promise writePromise) { boolean sentRemote = false; if (nodeIds != null) { for (String nid : nodeIds) { if (!sentRemote) { sentRemote = true; } - sendToNode(sendContext, nid); + // Write promise might be completed several times!!!! + sendToNode(nid, message, writePromise); } } if (!sentRemote) { - super.sendOrPub(sendContext); + sendLocally(message, writePromise); } } - private void clusteredSendReply(String replyDest, OutboundDeliveryContext sendContext) { - MessageImpl message = sendContext.message; + private void clusteredSendReply(MessageImpl message, Promise writePromise, String replyDest) { if (!replyDest.equals(nodeId)) { - sendRemote(sendContext, replyDest, message); + sendRemote(replyDest, message, writePromise); } else { - super.sendOrPub(sendContext); + sendLocally(message, writePromise); } } - private void sendRemote(OutboundDeliveryContext sendContext, String remoteNodeId, MessageImpl message) { + private void sendRemote(String remoteNodeId, Frame message, Promise writePromise) { // We need to deal with the fact that connecting can take some time and is async, and we cannot // block to wait for it. So we add any sends to a pending list if not connected yet. // Once we connect we send them. @@ -380,7 +365,7 @@ private void sendRemote(OutboundDeliveryContext sendContext, String remoteNod holder.connect(); } } - holder.writeMessage(sendContext); + holder.writeMessage(message, writePromise); } ConcurrentMap connections() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java index 8940eff5184..c5703d86117 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java @@ -19,8 +19,8 @@ public class ClusteredHandlerHolder extends HandlerHolder { private final long seq; - public ClusteredHandlerHolder(HandlerRegistration handler, boolean replyHandler, boolean localOnly, ContextInternal context, long seq) { - super(handler, replyHandler, localOnly, context); + public ClusteredHandlerHolder(HandlerRegistration handler, boolean localOnly, ContextInternal context, long seq) { + super(handler, localOnly, context); this.seq = seq; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java index 07348e82440..11684358915 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java @@ -14,7 +14,8 @@ import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBusOptions; -import io.vertx.core.eventbus.impl.OutboundDeliveryContext; +import io.vertx.core.eventbus.impl.Frame; +import io.vertx.core.eventbus.impl.MessageImpl; import io.vertx.core.eventbus.impl.codecs.PingMessageCodec; import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.logging.Logger; @@ -41,12 +42,21 @@ class ConnectionHolder { private final VertxInternal vertx; private final EventBusMetrics metrics; - private Queue> pending; + private Queue pending; private NetSocket socket; private boolean connected; private long timeoutID = -1; private long pingTimeoutID = -1; + private static class SomeTask { + final Frame message; + final Promise writePromise; + SomeTask(Frame message, Promise writePromise) { + this.message = message; + this.writePromise = writePromise; + } + } + ConnectionHolder(ClusteredEventBus eventBus, String remoteNodeId) { this.eventBus = eventBus; this.remoteNodeId = remoteNodeId; @@ -70,13 +80,13 @@ void connect() { } // TODO optimise this (contention on monitor) - synchronized void writeMessage(OutboundDeliveryContext ctx) { + synchronized void writeMessage(Frame message, Promise writePromise) { if (connected) { - Buffer data = ((ClusteredMessage) ctx.message).encodeToWire(); + Buffer data = message.encodeToWire(); if (metrics != null) { - metrics.messageWritten(ctx.message.address(), data.length()); + metrics.messageWritten(message.address(), data.length()); } - socket.write(data).onComplete(ctx); + socket.write(data).onComplete(writePromise); } else { if (pending == null) { if (log.isDebugEnabled()) { @@ -84,7 +94,7 @@ synchronized void writeMessage(OutboundDeliveryContext ctx) { } pending = new ArrayDeque<>(); } - pending.add(ctx); + pending.add(new SomeTask(message, writePromise)); } } @@ -100,10 +110,10 @@ private void close(Throwable cause) { vertx.cancelTimer(pingTimeoutID); } synchronized (this) { - OutboundDeliveryContext msg; + SomeTask msg; if (pending != null) { while ((msg = pending.poll()) != null) { - msg.written(cause); + msg.writePromise.tryFail(cause); } } } @@ -150,12 +160,12 @@ private synchronized void connected(NetSocket socket) { if (log.isDebugEnabled()) { log.debug("Draining the queue for server " + remoteNodeId); } - for (OutboundDeliveryContext ctx : pending) { + for (SomeTask ctx : pending) { Buffer data = ((ClusteredMessage)ctx.message).encodeToWire(); if (metrics != null) { metrics.messageWritten(ctx.message.address(), data.length()); } - socket.write(data).onComplete(ctx); + socket.write(data).onComplete(ctx.writePromise); } } pending = null; diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java index 145011725b9..08d0e403ae8 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java @@ -60,7 +60,7 @@ public static Serializer get(ContextInternal context) { return serializer; } - public void queue(Message message, BiConsumer, Promise> selectHandler, Promise promise) { + public void queue(Message message, BiConsumer> selectHandler, Promise promise) { ctx.emit(v -> { String address = message.address(); SerializerQueue queue = queues.computeIfAbsent(address, SerializerQueue::new); @@ -110,7 +110,7 @@ void checkPending() { } } - void add(Message msg, BiConsumer, Promise> selectHandler, Promise promise) { + void add(Message msg, BiConsumer> selectHandler, Promise promise) { SerializedTask serializedTask = new SerializedTask<>(ctx, msg, selectHandler); Future fut = serializedTask.internalPromise.future(); fut.onComplete(promise); @@ -136,20 +136,20 @@ void close() { private class SerializedTask implements Handler> { final Message msg; - final BiConsumer, Promise> selectHandler; + final BiConsumer> selectHandler; final Promise internalPromise; SerializedTask( ContextInternal context, Message msg, - BiConsumer, Promise> selectHandler) { + BiConsumer> selectHandler) { this.msg = msg; this.selectHandler = selectHandler; this.internalPromise = context.promise(); } void process() { - selectHandler.accept(msg, internalPromise); + selectHandler.accept(msg.address(), internalPromise); } @Override diff --git a/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java b/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java index 062322c24cf..f767a029a6e 100644 --- a/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java +++ b/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java @@ -13,7 +13,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.Message; import io.vertx.core.impl.VertxBuilder; import io.vertx.core.spi.VertxServiceProvider; @@ -47,20 +46,16 @@ default void init(VertxBuilder builder) { * *

The provided {@code promise} needs to be completed with {@link Promise#tryComplete} and {@link Promise#tryFail} * as it might completed outside the selector. - * - * @throws IllegalArgumentException if {@link Message#isSend()} returns {@code false} */ - void selectForSend(Message message, Promise promise); + void selectForSend(String address, Promise promise); /** * Select a node for publishing the given {@code message}. * *

The provided {@code promise} needs to be completed with {@link Promise#tryComplete} and {@link Promise#tryFail} * as it might completed outside the selector. - * - * @throws IllegalArgumentException if {@link Message#isSend()} returns {@code true} */ - void selectForPublish(Message message, Promise> promise); + void selectForPublish(String address, Promise> promise); /** * Invoked by the {@link ClusterManager} when messaging handler registrations are added or removed. diff --git a/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java b/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java index 6198477d340..72007135316 100644 --- a/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java +++ b/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java @@ -13,8 +13,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.Message; -import io.vertx.core.impl.Arguments; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeSelector; import io.vertx.core.spi.cluster.RegistrationUpdateEvent; @@ -37,17 +35,15 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { - Arguments.require(message.isSend(), "selectForSend used for publishing"); - selectors.withSelector(message, promise, (prom, selector) -> { + public void selectForSend(String address, Promise promise) { + selectors.withSelector(address, promise, (prom, selector) -> { prom.tryComplete(selector.selectForSend()); }); } @Override - public void selectForPublish(Message message, Promise> promise) { - Arguments.require(!message.isSend(), "selectForPublish used for sending"); - selectors.withSelector(message, promise, (prom, selector) -> { + public void selectForPublish(String address, Promise> promise) { + selectors.withSelector(address, promise, (prom, selector) -> { prom.tryComplete(selector.selectForPublish()); }); } diff --git a/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java b/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java index 2244e85cc6f..0e5423e0153 100644 --- a/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java +++ b/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java @@ -35,8 +35,7 @@ public Selectors(ClusterManager clusterManager) { this.clusterManager = clusterManager; } - public void withSelector(Message message, Promise promise, BiConsumer, RoundRobinSelector> task) { - String address = message.address(); + public void withSelector(String address, Promise promise, BiConsumer, RoundRobinSelector> task) { SelectorEntry entry = map.compute(address, (addr, curr) -> { return curr == null ? new SelectorEntry() : (curr.isNotReady() ? curr.increment() : curr); }); diff --git a/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java b/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java index 9b1645d09d2..f2762d0db70 100644 --- a/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java +++ b/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java @@ -105,12 +105,12 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { promise.fail("Not implemented"); } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { List nodes = clusterManager.getNodes(); CompositeFuture future = nodes.stream() .map(nodeId -> { diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 43107da8963..f8784d2e369 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -1531,5 +1531,34 @@ public void testEarlyTimeoutOnHandlerUnregistration() { }); await(); } + + @Test + public void testStream() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + vertx.eventBus().bindStream(ADDRESS1, stream -> { + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.write(msg.body()); + }); + stream.endHandler(v -> { + stream.end(); + }); + }).onComplete(onSuccess(v -> { + latch.countDown(); + })); + awaitLatch(latch); + vertx.eventBus().connectStream(ADDRESS1).onComplete(onSuccess(stream -> { + stream.write("ping"); + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.end(); + }); + stream.endHandler(v -> { + testComplete(); + }); + })); + await(); + } + } diff --git a/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java b/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java index 28ff053e0e6..d08744e087d 100644 --- a/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java +++ b/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java @@ -89,7 +89,7 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { try { NANOSECONDS.sleep(150); } catch (InterruptedException e) { @@ -99,7 +99,7 @@ public void selectForSend(Message message, Promise promise) { } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { throw new UnsupportedOperationException(); } diff --git a/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java index be2b2b02835..92dd56431d3 100644 --- a/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java +++ b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java @@ -38,12 +38,12 @@ public void test() { .setPort(0); NodeSelector nodeSelector = new DefaultNodeSelector() { @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { promise.fail(cause); } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { promise.fail("Not implemented"); } }; diff --git a/src/test/java/io/vertx/core/eventbus/eventbus.md b/src/test/java/io/vertx/core/eventbus/eventbus.md new file mode 100644 index 00000000000..a041cb1d1b6 --- /dev/null +++ b/src/test/java/io/vertx/core/eventbus/eventbus.md @@ -0,0 +1,60 @@ +```plantuml +title Send +hide footbox +participant Producer as P +[-> P: send(message) +participant Consumer as C +P -> C: MSG(request) +``` + +```plantuml +title Request and response +hide footbox +participant Producer as P +[-> P: request(request_message) +create Source as S +P --> S: bind ephemeral address +participant Consumer as C +P -> C: SYN(Source)/MSG(request_message) +S <- C: FIN/MSG(reply_message) +Destroy S +[<- S: response(reply_message) +``` + +```plantuml +title General case +hide footbox +participant Producer as P +[-> P: stream = connect("consumer") +create Source as S +P --> S: src = bind ephemeral address +participant Consumer as C +P -> C: SYN(src) +create Destination as D +C --> D: dst = bind ephemeral address +S <- C: ACK(dst) +[-> S: stream.write(m1) +S -> D: MSG(m1) +D -> S: MSG(m2) +[<- S: handler.handle(m2) +[-> S: stream.write(m3) +S -> D: MSG(m3) +[-> S: stream.end() +S -> D: FIN +Destroy D +[<- S: handler.handle(m4) +D -> S: MSG(m4) +[<- S: handler.handle(m5) +D -> S: MSG(m5) +D -> S: FIN +Destroy S +[<- S: endHandler.handle(null) +``` + +## Todo + +- Fragmentation +- Gauge interest + - Pascal K. + - Stephane Martin +- service-proxy usage diff --git a/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java b/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java index 76e2597863f..479d6a98a8e 100644 --- a/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java +++ b/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java @@ -34,13 +34,13 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { - delegate.selectForSend(message, promise); + public void selectForSend(String address, Promise promise) { + delegate.selectForSend(address, promise); } @Override - public void selectForPublish(Message message, Promise> promise) { - delegate.selectForPublish(message, promise); + public void selectForPublish(String address, Promise> promise) { + delegate.selectForPublish(address, promise); } @Override