From 2c0c4a1af9705d49cf4657b80f7863b1984a3463 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Fri, 5 Apr 2019 19:29:15 +0100 Subject: [PATCH] Add a scuttlebutt-mux module for multiplexing RPC requests into futures and streams (#202) * Start adding code to multiplex RPC requests. * Fix typos. * Add helper classes. * Move mux module to the RPC module. * Add missing docs and handle an underlying connection being closed. * Fix mistakenly refactored gradle files. * Take concurrency into account. * Formatting * Remove redundant casting. * Fix formatting. * Handle stream end properly. * Marshall scuttlebutt error body. * Disable patchwork integration test by default. * Pass an ObjectMapper instance to functions which serialize and deserialize to allow for object mapper configuration. --- .../handshake/vertx/ClientHandlerFactory.java | 4 +- .../vertx/SecureScuttlebuttVertxClient.java | 20 +- .../vertx/SecureScuttlebuttVertxServer.java | 2 +- .../handshake/vertx/VertxIntegrationTest.java | 3 +- scuttlebutt-rpc/build.gradle | 2 + .../cava/scuttlebutt/rpc/RPCAsyncRequest.java | 58 ++++ .../cava/scuttlebutt/rpc/RPCCodec.java | 30 +++ .../cava/scuttlebutt/rpc/RPCErrorBody.java | 64 +++++ .../cava/scuttlebutt/rpc/RPCFunction.java | 52 ++++ .../cava/scuttlebutt/rpc/RPCMessage.java | 44 ++- .../cava/scuttlebutt/rpc/RPCRequestBody.java | 71 +++++ .../cava/scuttlebutt/rpc/RPCRequestType.java | 35 +++ .../scuttlebutt/rpc/RPCStreamRequest.java | 57 ++++ .../cava/scuttlebutt/rpc/mux/Multiplexer.java | 59 +++++ .../cava/scuttlebutt/rpc/mux/RPCHandler.java | 250 ++++++++++++++++++ .../rpc/mux/ScuttlebuttStreamHandler.java | 41 +++ .../exceptions/ConnectionClosedException.java | 21 ++ .../rpc/PatchworkIntegrationTest.java | 1 + .../cava/scuttlebutt/rpc/RPCEncodingTest.java | 3 +- .../rpc/mux/PatchworkIntegrationTest.java | 249 +++++++++++++++++ 20 files changed, 1045 insertions(+), 21 deletions(-) create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCAsyncRequest.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCErrorBody.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCFunction.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestBody.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestType.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCStreamRequest.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java create mode 100644 scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java diff --git a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/ClientHandlerFactory.java b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/ClientHandlerFactory.java index f23e5ca9..59143b02 100644 --- a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/ClientHandlerFactory.java +++ b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/ClientHandlerFactory.java @@ -19,7 +19,7 @@ /** * Factory creating stream handlers, managing client-side connections. */ -public interface ClientHandlerFactory { +public interface ClientHandlerFactory { /** * Creates a new handler associated with a valid streaming connection. @@ -27,5 +27,5 @@ public interface ClientHandlerFactory { * @param sender the function to send bytes to the server * @param terminationFunction a function to terminate the stream properly */ - ClientHandler createHandler(Consumer sender, Runnable terminationFunction); + T createHandler(Consumer sender, Runnable terminationFunction); } diff --git a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java index 71df3803..a25c87fa 100644 --- a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java +++ b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java @@ -39,16 +39,16 @@ */ public final class SecureScuttlebuttVertxClient { - private class NetSocketClientHandler { + private class NetSocketClientHandler { private final Logger logger; private final NetSocket socket; private final SecureScuttlebuttHandshakeClient handshakeClient; - private final ClientHandlerFactory handlerFactory; - private final CompletableAsyncResult completionHandle; + private final ClientHandlerFactory handlerFactory; + private final CompletableAsyncResult completionHandle; private int handshakeCounter; private SecureScuttlebuttStreamClient client; - private ClientHandler handler; + private T handler; private Bytes messageBuffer = Bytes.EMPTY; @@ -56,8 +56,8 @@ private class NetSocketClientHandler { Logger logger, NetSocket socket, Signature.PublicKey remotePublicKey, - ClientHandlerFactory handlerFactory, - CompletableAsyncResult completionHandle) { + ClientHandlerFactory handlerFactory, + CompletableAsyncResult completionHandle) { this.logger = logger; this.socket = socket; this.handshakeClient = SecureScuttlebuttHandshakeClient.create(keyPair, networkIdentifier, remotePublicKey); @@ -182,19 +182,19 @@ public SecureScuttlebuttVertxClient( * @param handlerFactory the factory of handlers for connections * @return a handle to a new stream handler with the remote host */ - public AsyncResult connectTo( + public AsyncResult connectTo( int port, String host, Signature.PublicKey remotePublicKey, - ClientHandlerFactory handlerFactory) { + ClientHandlerFactory handlerFactory) { client = vertx.createNetClient(new NetClientOptions().setTcpKeepAlive(true)); - CompletableAsyncResult completion = AsyncResult.incomplete(); + CompletableAsyncResult completion = AsyncResult.incomplete(); client.connect(port, host, res -> { if (res.failed()) { completion.completeExceptionally(res.cause()); } else { NetSocket socket = res.result(); - new NetSocketClientHandler( + new NetSocketClientHandler( loggerProvider.getLogger(host + ":" + port), socket, remotePublicKey, diff --git a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java index 95249db8..f1cf1609 100644 --- a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java +++ b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java @@ -113,8 +113,8 @@ private void handleMessage(Buffer buffer) { } } } catch (HandshakeException | StreamException e) { - e.printStackTrace(); netSocket.close(); + throw e; } } } diff --git a/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java b/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java index b6eb21b1..72b0b3e7 100644 --- a/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java +++ b/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java @@ -111,8 +111,7 @@ void connectToServer(@VertxInstance Vertx vertx) throws Exception { SecureScuttlebuttVertxClient client = new SecureScuttlebuttVertxClient(provider, vertx, Signature.KeyPair.random(), networkIdentifier); - MyClientHandler handler = - (MyClientHandler) client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get(); + MyClientHandler handler = client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get(); Thread.sleep(1000); assertNotNull(handler); diff --git a/scuttlebutt-rpc/build.gradle b/scuttlebutt-rpc/build.gradle index b6f13391..cf71a725 100644 --- a/scuttlebutt-rpc/build.gradle +++ b/scuttlebutt-rpc/build.gradle @@ -5,6 +5,8 @@ dependencies { compile project(':concurrent') compile project(':crypto') compile project(':scuttlebutt') + compile project(':scuttlebutt-handshake') + compile 'org.logl:logl-api' compile 'com.fasterxml.jackson.core:jackson-databind' diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCAsyncRequest.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCAsyncRequest.java new file mode 100644 index 00000000..f4281d75 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCAsyncRequest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +import net.consensys.cava.bytes.Bytes; + +import java.util.List; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class RPCAsyncRequest { + + + private final RPCFunction function; + private final List arguments; + + /** + * + * @param function the function to be in invoked. If the function is in a namespace, the first n-1 items in the array + * are the namespace followed by the function name (e.g. 'blobs.get' becomes ['blobs', 'get']). + * @param arguments The arguments passed to the function being invoked. Each item can be any arbitrary object which is + * JSON serializable (e.g. String, Int, list, object.) + * + */ + public RPCAsyncRequest(RPCFunction function, List arguments) { + this.function = function; + this.arguments = arguments; + } + + /** + * Encode the RPC request as bytes. + * + * @param objectMapper the object mapper to serialize the request with + * @return an RPC request serialized into bytes + * @throws JsonProcessingException thrown if there is an error while serializing the request to bytes + */ + public Bytes toEncodedRpcMessage(ObjectMapper objectMapper) throws JsonProcessingException { + return RPCCodec.encodeRequest( + new RPCRequestBody(function.asList(), RPCRequestType.ASYNC, arguments).asBytes(objectMapper), + getRPCFlags()); + } + + public RPCFlag[] getRPCFlags() { + return new RPCFlag[] {RPCFlag.BodyType.JSON}; + } + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCCodec.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCCodec.java index 4164deb9..bed5d14f 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCCodec.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCCodec.java @@ -18,6 +18,9 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicInteger; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * Encoder responsible for encoding requests. *

@@ -27,6 +30,8 @@ public final class RPCCodec { static final AtomicInteger counter = new AtomicInteger(1); + private static ObjectMapper mapper = new ObjectMapper(); + private static int nextRequestNumber() { int requestNumber = counter.getAndIncrement(); if (requestNumber < 1) { @@ -81,6 +86,19 @@ public static Bytes encodeRequest(Bytes body, int requestNumber, RPCFlag... flag body); } + /** + * Encode a message as an RPC request. + * + * @param body the body to encode as an RPC request + * @param requestNumber the request number + * @param flags the flags of the RPC request (already encoded.) + * @return the message encoded as an RPC request + */ + public static Bytes encodeRequest(Bytes body, int requestNumber, byte flags) { + return Bytes + .concatenate(Bytes.of(flags), Bytes.ofUnsignedInt(body.size()), Bytes.ofUnsignedInt(requestNumber), body); + } + /** * Encode a message as a response to a RPC request. * @@ -116,6 +134,18 @@ public static Bytes encodeResponse(Bytes body, int requestNumber, byte flagByte, return encodeResponse(body, requestNumber, flagByte); } + /** + * Encodes a message with the body and headers set in the appropriate way to end a stream. + * + * @return the response encoded as an RPC request + * @throws JsonProcessingException + */ + public static Bytes encodeStreamEndRequest(int requestNumber) throws JsonProcessingException { + Boolean bool = Boolean.TRUE; + byte[] bytes = mapper.writeValueAsBytes(bool); + return encodeRequest(Bytes.wrap(bytes), requestNumber, RPCFlag.EndOrError.END); + } + /** * Encode a message as a response to a RPC request. * diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCErrorBody.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCErrorBody.java new file mode 100644 index 00000000..87fa5b6c --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCErrorBody.java @@ -0,0 +1,64 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +/** + * An RPC message response body which contains an error + */ +public class RPCErrorBody { + + private String name; + private String message; + private String stack; + + public RPCErrorBody() { + + } + + /** + * A description of an error that occurred while performing an RPC request. + * + * @param name the name of the error type + * @param message the message describing the error + * @param stack the stack trace from the error + */ + public RPCErrorBody(String name, String message, String stack) { + this.name = name; + this.message = message; + this.stack = stack; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getStack() { + return stack; + } + + public void setStack(String stack) { + this.stack = stack; + } +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCFunction.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCFunction.java new file mode 100644 index 00000000..511a4911 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCFunction.java @@ -0,0 +1,52 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +import java.util.ArrayList; +import java.util.List; + +/** + * A scuttlebutt RPC function namespace and name representation. + */ +public class RPCFunction { + + private final List namespace; + private final String functionName; + + /** + * + * @param namespace the namespace of the function (e.g. ['blobs']. May be empty if there is no namespace for the + * function. + * @param functionName the function (e.g. 'add'.) + */ + public RPCFunction(List namespace, String functionName) { + this.namespace = namespace; + this.functionName = functionName; + } + + public RPCFunction(String functionName) { + this.namespace = new ArrayList<>(); + this.functionName = functionName; + } + + /** + * @return The list representation of the namespace and function call. + */ + public List asList() { + List list = new ArrayList<>(); + list.addAll(namespace); + list.add(functionName); + return list; + } + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java index 2a4c426e..60942ff7 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java @@ -19,14 +19,13 @@ import java.io.IOException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; /** * Decoded RPC message, making elements of the message available directly. */ public final class RPCMessage { - private static final ObjectMapper mapper = new ObjectMapper(); - private final byte rpcFlags; private final boolean stream; private final boolean lastMessageOrError; @@ -81,6 +80,40 @@ public boolean lastMessageOrError() { return lastMessageOrError; } + /** + * + * @return true if this is a last message in a stream, and it is not an error + */ + public boolean isSuccessfulLastMessage() { + return lastMessageOrError() && asString().equals("true"); + } + + /** + * + * @return true if this is an error message response + */ + public boolean isErrorMessage() { + return lastMessageOrError && !isSuccessfulLastMessage(); + } + + /** + * @param objectMapper the object mapper to deserialize with + * @return the RPC error response body, if this is an error response - nothing otherwise + */ + public Optional getErrorBody(ObjectMapper objectMapper) { + + if (!isErrorMessage()) { + // If the body of the response is 'true' or the error flag isn't set, it's a successful end condition + return Optional.absent(); + } else { + try { + return Optional.of(asJSON(objectMapper, RPCErrorBody.class)); + } catch (IOException e) { + return Optional.absent(); + } + } + } + /** * Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message. * @@ -128,13 +161,14 @@ public String asString() { /** * Provides the body of the message, marshalled as a JSON object. - * + * + * @param objectMapper the object mapper to deserialize with * @param clazz the JSON object class * @param the matching JSON object class * @return a new instance of the JSON object class * @throws IOException if an error occurs during marshalling */ - public T asJSON(Class clazz) throws IOException { - return mapper.readerFor(clazz).readValue(body().toArrayUnsafe()); + public T asJSON(ObjectMapper objectMapper, Class clazz) throws IOException { + return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe()); } } diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestBody.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestBody.java new file mode 100644 index 00000000..70ad4e37 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestBody.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +import net.consensys.cava.bytes.Bytes; + +import java.util.List; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * The request payload of an RPC request to another node. The fields are as specified in the scuttlebutt protocol docs + */ +public class RPCRequestBody { + + private final List name; + private final RPCRequestType type; + private final List args; + + /** + * + * @param name the function to be in invoked. If the function is in a namespace, the first n-1 items in the array are + * the namespace followed by the function name (e.g. 'blobs.get' becomes ['blobs', 'get']). If the function is + * not in a namespace, it is an array with one item (e.g. ['createFeedStream']. + * @param type the type of the request (e.g. stream or async.) + * @param args The args passed to the function being invoked. Each item can be any arbitrary object which is JSON + * serializable (e.g. String, Int, list, object.) + */ + public RPCRequestBody(List name, RPCRequestType type, List args) { + this.name = name; + this.type = type; + this.args = args; + } + + public List getName() { + return name; + } + + public RPCRequestType getType() { + return type; + } + + public List getArgs() { + return args; + } + + /** + * + * @param objectMapper the object mapper to serialize to bytes with + * @return the bytes representation of this RPC request body. The request is first encoded into JSON, then from JSON + * to a byte array + * @throws JsonProcessingException thrown if there is a problem transforming the object to JSON. + */ + public Bytes asBytes(ObjectMapper objectMapper) throws JsonProcessingException { + byte[] bytes = objectMapper.writeValueAsBytes(this); + return Bytes.wrap(bytes); + } + + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestType.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestType.java new file mode 100644 index 00000000..9703da4f --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCRequestType.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The available type of Scuttlebutt RPC requests + */ +public enum RPCRequestType { + + /** + * An 'async' request, which returns one result some time in the future. + */ + @JsonProperty("async") + ASYNC, + + /** + * A 'source' type request, which begins a stream of results + */ + @JsonProperty("source") + SOURCE + + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCStreamRequest.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCStreamRequest.java new file mode 100644 index 00000000..ff198daa --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCStreamRequest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +import net.consensys.cava.bytes.Bytes; + +import java.util.List; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A request which returns a 'source' type result (e.g. opens up a stream that is followed by the request ID.) + */ +public class RPCStreamRequest { + + private final RPCFunction function; + private final List arguments; + + /** + * The details for the function (the name of the function and its arguments.) + * + * @param function the function to be invoked + * @param arguments the arguments for the function (can be any arbitrary class which can be marshalled into JSON.) + */ + public RPCStreamRequest(RPCFunction function, List arguments) { + this.function = function; + this.arguments = arguments; + } + + /** + * @return The byte representation for the request after it is marshalled into a JSON string. + * @throws JsonProcessingException if an error was thrown while marshalling to JSON + */ + public Bytes toEncodedRpcMessage(ObjectMapper mapper) throws JsonProcessingException { + RPCRequestBody body = new RPCRequestBody(function.asList(), RPCRequestType.SOURCE, arguments); + return RPCCodec.encodeRequest(body.asBytes(mapper), getRPCFlags()); + } + + /** + * @return The correct RPC flags for a stream request + */ + public RPCFlag[] getRPCFlags() { + return new RPCFlag[] {RPCFlag.Stream.STREAM, RPCFlag.BodyType.JSON}; + } + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java new file mode 100644 index 00000000..722dcbfa --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java @@ -0,0 +1,59 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc.mux; + +import net.consensys.cava.concurrent.AsyncResult; +import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; +import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; +import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; + +import java.util.function.Function; + +import com.fasterxml.jackson.core.JsonProcessingException; + +/** + * Multiplexes asynchronous requests and streams across a connection to a node. Handles multiple active requests and + * streams across one connection. + */ +public interface Multiplexer { + + /** + * Issue an 'async' type request to a node, which will eventually return a result from the node. + * + * @param request the request details + * + * @return an async result which will be completed with the result or an error if the request fails. + */ + AsyncResult makeAsyncRequest(RPCAsyncRequest request); + + /** + * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.) + * + * @param request the request details + * @param streamFactory a function which takes a 'Runnable' which closes the stream when ran, and returns a stream + * handler to pass messages to + * + * @throws JsonProcessingException + */ + void openStream(RPCStreamRequest request, Function streamFactory) + throws JsonProcessingException, + ConnectionClosedException; + + + /** + * Close the underlying connection + */ + void close(); + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java new file mode 100644 index 00000000..0ac45833 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java @@ -0,0 +1,250 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc.mux; + +import net.consensys.cava.bytes.Bytes; +import net.consensys.cava.concurrent.AsyncResult; +import net.consensys.cava.concurrent.CompletableAsyncResult; +import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler; +import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; +import net.consensys.cava.scuttlebutt.rpc.RPCCodec; +import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody; +import net.consensys.cava.scuttlebutt.rpc.RPCFlag; +import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; +import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import org.logl.Logger; +import org.logl.LoggerProvider; + +/** + * Handles RPC requests and responses from an active connection to a scuttlebutt node + * + * Note: the public methods on this class are synchronized so that a request is rejected if the connection has been + * closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without + * new incoming requests being added to the maps by threads. + * + * In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of + * concurrency. + * + */ +public class RPCHandler implements Multiplexer, ClientHandler { + + private final Consumer messageSender; + private final Logger logger; + private final Runnable connectionCloser; + private final ObjectMapper objectMapper; + + private Map> awaitingAsyncResponse = new HashMap<>(); + private Map streams = new HashMap<>(); + + private boolean closed; + + /** + * Makes RPC requests over a connection + * + * @param messageSender sends the request to the node + * @param terminationFn closes the connection + * @param objectMapper the objectMapper to serialize and deserialize message request and response bodies + * @param logger + */ + public RPCHandler( + Consumer messageSender, + Runnable terminationFn, + ObjectMapper objectMapper, + LoggerProvider logger) { + this.messageSender = messageSender; + this.connectionCloser = terminationFn; + this.closed = false; + this.objectMapper = objectMapper; + + this.logger = logger.getLogger("rpc handler"); + } + + @Override + public synchronized AsyncResult makeAsyncRequest(RPCAsyncRequest request) { + + CompletableAsyncResult result = AsyncResult.incomplete(); + + if (closed) { + result.completeExceptionally(new ConnectionClosedException()); + } + + try { + RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper)); + int requestNumber = message.requestNumber(); + awaitingAsyncResponse.put(requestNumber, result); + Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags()); + messageSender.accept(bytes); + + } catch (JsonProcessingException e) { + result.completeExceptionally(e); + } + + return result; + } + + @Override + public synchronized void openStream( + RPCStreamRequest request, + Function responseSink) throws JsonProcessingException, + ConnectionClosedException { + + if (closed) { + throw new ConnectionClosedException(); + } + + try { + RPCFlag[] rpcFlags = request.getRPCFlags(); + RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper)); + int requestNumber = message.requestNumber(); + + Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags); + messageSender.accept(bytes); + + Runnable closeStreamHandler = new Runnable() { + @Override + public void run() { + + try { + Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber); + messageSender.accept(bytes); + } catch (JsonProcessingException e) { + logger.warn("Unexpectedly could not encode stream end message to JSON."); + } + + } + }; + + ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler); + + streams.put(requestNumber, scuttlebuttStreamHandler); + } catch (JsonProcessingException ex) { + throw ex; + } + } + + @Override + public synchronized void close() { + connectionCloser.run(); + } + + @Override + public synchronized void receivedMessage(Bytes message) { + + RPCMessage rpcMessage = new RPCMessage(message); + + // A negative request number indicates that this is a response, rather than a request that this node + // should service + if (rpcMessage.requestNumber() < 0) { + handleResponse(rpcMessage); + } else { + handleRequest(rpcMessage); + } + + } + + private void handleRequest(RPCMessage rpcMessage) { + // Not yet implemented + logger.warn("Received incoming request, but we do not yet handle any requests: " + rpcMessage.asString()); + + } + + private void handleResponse(RPCMessage response) { + int requestNumber = response.requestNumber() * -1; + + if (logger.isDebugEnabled()) { + logger.debug("Incoming response: " + response.asString()); + } + + byte rpcFlags = response.rpcFlags(); + + boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags); + + if (isStream) { + ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber); + + if (scuttlebuttStreamHandler != null) { + + if (response.isSuccessfulLastMessage()) { + streams.remove(requestNumber); + scuttlebuttStreamHandler.onStreamEnd(); + } else if (response.isErrorMessage()) { + + Optional errorBody = response.getErrorBody(objectMapper); + + if (errorBody.isPresent()) { + scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage())); + } else { + // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message + // if we fail to marshall it for whatever reason + scuttlebuttStreamHandler.onStreamError(new Exception(response.asString())); + } + + } else { + scuttlebuttStreamHandler.onMessage(response); + } + } else { + logger.warn( + "Couldn't find stream handler for RPC response with request number " + + requestNumber + + " " + + response.asString()); + } + + } else { + + CompletableAsyncResult rpcMessageFuture = awaitingAsyncResponse.get(requestNumber); + + if (rpcMessageFuture != null) { + rpcMessageFuture.complete(response); + awaitingAsyncResponse.remove(requestNumber); + } else { + logger.warn( + "Couldn't find async handler for RPC response with request number " + + requestNumber + + " " + + response.asString()); + } + } + + } + + @Override + public void streamClosed() { + this.closed = true; + + streams.forEach((key, streamHandler) -> { + streamHandler.onStreamError(new ConnectionClosedException()); + }); + + streams.clear(); + + awaitingAsyncResponse.forEach((key, value) -> { + if (!value.isDone()) { + value.completeExceptionally(new ConnectionClosedException()); + } + + }); + + + } +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java new file mode 100644 index 00000000..d108663a --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc.mux; + +import net.consensys.cava.scuttlebutt.rpc.RPCMessage; + +/** + * Handles incoming items from a result stream + */ +public interface ScuttlebuttStreamHandler { + + /** + * Handles a new message from the result stream. + * + * @param message + */ + void onMessage(RPCMessage message); + + /** + * Invoked when the stream has been closed. + */ + void onStreamEnd(); + + /** + * Invoked when there is an error in the stream. + * + * @param ex the underlying error + */ + void onStreamError(Exception ex); + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java new file mode 100644 index 00000000..160946c9 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java @@ -0,0 +1,21 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc.mux.exceptions; + +public class ConnectionClosedException extends Exception { + + public ConnectionClosedException() { + super("Connection is closed."); + } + +} diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java index 60d61942..34280959 100644 --- a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java @@ -180,4 +180,5 @@ void runWithPatchWork(@VertxInstance Vertx vertx) throws Exception { secureScuttlebuttVertxClient.stop().join(); } + } diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/RPCEncodingTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/RPCEncodingTest.java index 2596b389..84f2d23e 100644 --- a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/RPCEncodingTest.java +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/RPCEncodingTest.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; class RPCEncodingTest { @@ -44,7 +45,7 @@ void rpcRoundtripJSON() throws Exception { RPCFlag.Stream.STREAM); RPCMessage decoded = new RPCMessage(message); assertTrue(decoded.stream()); - assertEquals("some JSON string", decoded.asJSON(String.class)); + assertEquals("some JSON string", decoded.asJSON(new ObjectMapper(), String.class)); assertEquals(RPCFlag.BodyType.JSON, decoded.bodyType()); assertEquals(RPCCodec.counter.get() - 1, decoded.requestNumber()); } diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java new file mode 100644 index 00000000..293ac92b --- /dev/null +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc.mux; + +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import net.consensys.cava.bytes.Bytes; +import net.consensys.cava.bytes.Bytes32; +import net.consensys.cava.concurrent.AsyncResult; +import net.consensys.cava.concurrent.CompletableAsyncResult; +import net.consensys.cava.crypto.sodium.Signature; +import net.consensys.cava.io.Base64; +import net.consensys.cava.junit.VertxExtension; +import net.consensys.cava.junit.VertxInstance; +import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient; +import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; +import net.consensys.cava.scuttlebutt.rpc.RPCFunction; +import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; +import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import io.vertx.core.Vertx; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.logl.Level; +import org.logl.LoggerProvider; +import org.logl.logl.SimpleLogger; +import org.logl.vertx.LoglLogDelegateFactory; + +@ExtendWith(VertxExtension.class) +public class PatchworkIntegrationTest { + + LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter( + new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8)))); + + @Test + @Disabled + public void testWithPatchwork(@VertxInstance Vertx vertx) throws Exception { + + RPCHandler rpcHandler = makeRPCHandler(vertx); + + List> results = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + RPCFunction function = new RPCFunction("whoami"); + RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>()); + + AsyncResult res = rpcHandler.makeAsyncRequest(asyncRequest); + + results.add(res); + } + + AsyncResult> allResults = AsyncResult.combine(results); + List rpcMessages = allResults.get(); + + assertEquals(10, rpcMessages.size()); + + rpcMessages.forEach(msg -> { + assertFalse(msg.lastMessageOrError()); + + }); + + } + + + // TODO: Move this to a utility class that all the scuttlebutt modules' tests can use. + private Signature.KeyPair getLocalKeys() throws Exception { + Optional ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir")); + Optional homePath = + Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb"); + + Optional path = ssbDir.or(homePath); + + if (!path.isPresent()) { + throw new Exception("Cannot find ssb directory config value"); + } + + String secretPath = path.get() + "/secret"; + File file = new File(secretPath); + + if (!file.exists()) { + throw new Exception("Secret file does not exist"); + } + + Scanner s = new Scanner(file, UTF_8.name()); + s.useDelimiter("\n"); + + ArrayList list = new ArrayList(); + while (s.hasNext()) { + String next = s.next(); + + // Filter out the comment lines + if (!next.startsWith("#")) { + list.add(next); + } + } + + String secretJSON = String.join("", list); + + ObjectMapper mapper = new ObjectMapper(); + + HashMap values = mapper.readValue(secretJSON, new TypeReference>() {}); + String pubKey = values.get("public").replace(".ed25519", ""); + String privateKey = values.get("private").replace(".ed25519", ""); + + Bytes pubKeyBytes = Base64.decode(pubKey); + Bytes privKeyBytes = Base64.decode(privateKey); + + Signature.PublicKey pub = Signature.PublicKey.fromBytes(pubKeyBytes); + Signature.SecretKey secretKey = Signature.SecretKey.fromBytes(privKeyBytes); + + return new Signature.KeyPair(pub, secretKey); + } + + @Test + @Disabled + public void postMessageTest(@VertxInstance Vertx vertx) throws Exception { + + RPCHandler rpcHandler = makeRPCHandler(vertx); + + + List> results = new ArrayList<>(); + + for (int i = 0; i < 20; i++) { + // Note: in a real use case, this would more likely be a Java class with these fields + HashMap params = new HashMap<>(); + params.put("type", "post"); + params.put("text", "test test " + i); + + RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params)); + + AsyncResult rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest); + + results.add(rpcMessageAsyncResult); + + } + + List rpcMessages = AsyncResult.combine(results).get(); + + rpcMessages.forEach(msg -> System.out.println(msg.asString())); + } + + private RPCHandler makeRPCHandler(Vertx vertx) throws Exception { + Signature.KeyPair keyPair = getLocalKeys(); + String networkKeyBase64 = "1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s="; + Bytes32 networkKeyBytes32 = Bytes32.wrap(Base64.decode(networkKeyBase64)); + + String host = "localhost"; + int port = 8008; + LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter( + new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8)))); + LoglLogDelegateFactory.setProvider(loggerProvider); + + SecureScuttlebuttVertxClient secureScuttlebuttVertxClient = + new SecureScuttlebuttVertxClient(loggerProvider, vertx, keyPair, networkKeyBytes32); + + AsyncResult onConnect = + secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> { + + return new RPCHandler(sender, terminationFn, new ObjectMapper(), loggerProvider); + }); + + return onConnect.get(); + } + + + @Test + @Disabled + public void streamTest(@VertxInstance Vertx vertx) throws Exception { + + RPCHandler handler = makeRPCHandler(vertx); + Signature.PublicKey publicKey = getLocalKeys().publicKey(); + + String pubKey = "@" + Base64.encode(publicKey.bytes()) + ".ed25519"; + + Map params = new HashMap<>(); + params.put("id", pubKey); + + CompletableAsyncResult streamEnded = AsyncResult.incomplete(); + + RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params)); + + try { + handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() { + @Override + public void onMessage(RPCMessage message) { + System.out.print(message.asString()); + } + + @Override + public void onStreamEnd() { + streamEnded.complete(null); + } + + @Override + public void onStreamError(Exception ex) { + + } + }); + } catch (ConnectionClosedException e) { + throw e; + } + + // Wait until the stream is complete + streamEnded.get(); + + } + + +}