Skip to content

Commit

Permalink
Basic stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 24, 2023
1 parent b5cc841 commit ddaebb0
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/main/java/io/vertx/core/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.impl.DefaultSerializableChecker;
Expand Down Expand Up @@ -199,6 +198,10 @@ default <T> Future<Message<T>> request(String address, @Nullable Object message)
*/
<T> MessageProducer<T> publisher(String address, DeliveryOptions options);

Future<Void> bindStream(String address, Handler<MessageStream> handler);

Future<MessageStream> connectStream(String address);

/**
* Register a message codec.
* <p>
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.eventbus;

import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;

@VertxGen
public interface MessageStream {

void handler(Handler<Message<String>> handler);

void endHandler(Handler<Void> handler);

void write(String msg);

void end();

}
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/ClientStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageStream;
import io.vertx.core.impl.ContextInternal;

class ClientStream extends StreamBase {

private final Promise<MessageStream> promise2;

public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise<MessageStream> promise2) {
super(sourceAddress, ctx, eventBus, sourceAddress, true);
this.promise2 = promise2;
}

@Override
protected boolean doReceive(Frame frame) {
if (frame instanceof SynFrame) {
SynFrame syn = (SynFrame) frame;
remoteAddress = syn.src;
promise2.complete(this);
return true;
} else {
return super.doReceive(frame);
}
}
}
25 changes: 25 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,31 @@ public <T> MessageProducer<T> publisher(String address, DeliveryOptions options)
return new MessageProducerImpl<>(vertx, address, false, options);
}

@Override
public Future<Void> bindStream(String address, Handler<MessageStream> handler) {
ContextInternal ctx = vertx.getOrCreateContext();
HandlerRegistration reg = new StreamServer(this, ctx, address, handler);
Promise<Void> promise = ctx.promise();
reg.register(true, false, promise);
return promise.future();
}

@Override
public Future<MessageStream> connectStream(String address) {
ContextInternal ctx = vertx.getOrCreateContext();
String sourceAddress = generateReplyAddress();
Promise<MessageStream> promise2 = ctx.promise();
StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2);
Promise<Void> promise = ctx.promise();
reg.register(false, false, promise);
promise.future().onComplete(ar -> {
if (ar.succeeded()) {
sendLocally(new SynFrame(sourceAddress, address), ctx.promise());
}
});
return promise2.future();
}

@Override
public EventBus publish(String address, Object message) {
return publish(address, message, new DeliveryOptions());
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/FinFrame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.buffer.Buffer;

class FinFrame implements Frame {

final String addr;

public FinFrame(String addr) {
this.addr = addr;
}

@Override
public String address() {
return addr;
}

@Override
public Buffer encodeToWire() {
throw new UnsupportedOperationException();
}

@Override
public boolean isFromWire() {
return false;
}
}
75 changes: 75 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/StreamBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageStream;
import io.vertx.core.impl.ContextInternal;

class StreamBase extends HandlerRegistration implements MessageStream {

private Handler<Message<String>> handler;
private Handler<Void> endHandler;
final String localAddress;
String remoteAddress;
private boolean halfClosed;

StreamBase(String localAddress, ContextInternal context, EventBusImpl bus, String address, boolean src) {
super(context, bus, address, src);
this.localAddress = localAddress;
}

@Override
protected boolean doReceive(Frame frame) {
if (frame instanceof MessageImpl) {
MessageImpl msg = (MessageImpl) frame;
Handler<Message<String>> h = handler;
if (h != null) {
h.handle(msg);
}
} else if (frame instanceof FinFrame) {
Handler<Void> h = endHandler;
if (h != null) {
h.handle(null);
}
if (halfClosed) {
unregister();
} else {
halfClosed = true;
}
}
return true;
}

@Override
protected void dispatch(Message msg, ContextInternal context, Handler handler) {

}

@Override
public void handler(Handler<Message<String>> handler) {
this.handler = handler;
}

@Override
public void endHandler(Handler<Void> handler) {
this.endHandler = handler;
}

@Override
public void write(String body) {
MessageImpl msg = new MessageImpl(remoteAddress, MultiMap.caseInsensitiveMultiMap(), body, CodecManager.STRING_MESSAGE_CODEC, true, bus);
bus.sendLocally(msg, context.promise());
}

@Override
public void end() {
FinFrame fin = new FinFrame(remoteAddress);
bus.sendLocally(fin, context.promise());
if (halfClosed) {
unregister();
} else {
halfClosed = true;
}
}
}
42 changes: 42 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/StreamServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageStream;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;

class StreamServer extends HandlerRegistration {
private final EventBusImpl eventBus;
private final Handler<MessageStream> handler;

public StreamServer(EventBusImpl eventBus, ContextInternal ctx, String address, Handler<MessageStream> handler) {
super(ctx, eventBus, address, false);
this.eventBus = eventBus;
this.handler = handler;
}

@Override
protected boolean doReceive(Frame frame) {
if (frame instanceof SynFrame) {
SynFrame syn = (SynFrame) frame;
String localAddress = eventBus.generateReplyAddress();
StreamBase ss = new StreamBase(localAddress, context, eventBus, localAddress, false);
ss.remoteAddress = syn.src;
PromiseInternal<Void> p = context.promise();
ss.register(false, true, p);
p.onComplete(ar -> {
if (ar.succeeded()) {
SynFrame reply = new SynFrame(localAddress, syn.src);
eventBus.sendLocally(reply, context.promise());
handler.handle(ss);
}
});
}
return true;
}

@Override
protected void dispatch(Message msg, ContextInternal context, Handler handler) {
}
}
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/SynFrame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.buffer.Buffer;

public class SynFrame implements Frame {

final String src;
final String dst;

public SynFrame(String src, String dst) {
this.src = src;
this.dst = dst;
}

@Override
public String address() {
return dst;
}

@Override
public Buffer encodeToWire() {
throw new UnsupportedOperationException();
}

@Override
public boolean isFromWire() {
return false;
}
}
29 changes: 29 additions & 0 deletions src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1531,5 +1531,34 @@ public void testEarlyTimeoutOnHandlerUnregistration() {
});
await();
}

@Test
public void testStream() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
vertx.eventBus().bindStream(ADDRESS1, stream -> {
stream.handler(msg -> {
assertEquals("ping", msg.body());
stream.write(msg.body());
});
stream.endHandler(v -> {
stream.end();
});
}).onComplete(onSuccess(v -> {
latch.countDown();
}));
awaitLatch(latch);
vertx.eventBus().connectStream(ADDRESS1).onComplete(onSuccess(stream -> {
stream.write("ping");
stream.handler(msg -> {
assertEquals("ping", msg.body());
stream.end();
});
stream.endHandler(v -> {
testComplete();
});
}));
await();
}

}

0 comments on commit ddaebb0

Please sign in to comment.