Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Add a scuttlebutt-mux module for multiplexing RPC requests into futur…
Browse files Browse the repository at this point in the history
…es and streams (#202)

* Start adding code to multiplex RPC requests.

* Fix typos.

* Add helper classes.

* Move mux module to the RPC module.

* Add missing docs and handle an underlying connection being closed.

* Fix mistakenly refactored gradle files.

* Take concurrency into account.

* Formatting

* Remove redundant casting.

* Fix formatting.

* Handle stream end properly.

* Marshall scuttlebutt error body.

* Disable patchwork integration test by default.

* Pass an ObjectMapper instance to functions which serialize and deserialize to allow for object mapper configuration.
  • Loading branch information
Happy0 authored and atoulme committed Apr 5, 2019
1 parent 1d00b93 commit 2c0c4a1
Show file tree
Hide file tree
Showing 20 changed files with 1,045 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
/**
* Factory creating stream handlers, managing client-side connections.
*/
public interface ClientHandlerFactory {
public interface ClientHandlerFactory<T extends ClientHandler> {

/**
* Creates a new handler associated with a valid streaming connection.
*
* @param sender the function to send bytes to the server
* @param terminationFunction a function to terminate the stream properly
*/
ClientHandler createHandler(Consumer<Bytes> sender, Runnable terminationFunction);
T createHandler(Consumer<Bytes> sender, Runnable terminationFunction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,25 @@
*/
public final class SecureScuttlebuttVertxClient {

private class NetSocketClientHandler {
private class NetSocketClientHandler<T extends ClientHandler> {

private final Logger logger;
private final NetSocket socket;
private final SecureScuttlebuttHandshakeClient handshakeClient;
private final ClientHandlerFactory handlerFactory;
private final CompletableAsyncResult<ClientHandler> completionHandle;
private final ClientHandlerFactory<T> handlerFactory;
private final CompletableAsyncResult<T> completionHandle;
private int handshakeCounter;
private SecureScuttlebuttStreamClient client;
private ClientHandler handler;
private T handler;

private Bytes messageBuffer = Bytes.EMPTY;

NetSocketClientHandler(
Logger logger,
NetSocket socket,
Signature.PublicKey remotePublicKey,
ClientHandlerFactory handlerFactory,
CompletableAsyncResult<ClientHandler> completionHandle) {
ClientHandlerFactory<T> handlerFactory,
CompletableAsyncResult<T> completionHandle) {
this.logger = logger;
this.socket = socket;
this.handshakeClient = SecureScuttlebuttHandshakeClient.create(keyPair, networkIdentifier, remotePublicKey);
Expand Down Expand Up @@ -182,19 +182,19 @@ public SecureScuttlebuttVertxClient(
* @param handlerFactory the factory of handlers for connections
* @return a handle to a new stream handler with the remote host
*/
public AsyncResult<ClientHandler> connectTo(
public <T extends ClientHandler> AsyncResult<T> connectTo(
int port,
String host,
Signature.PublicKey remotePublicKey,
ClientHandlerFactory handlerFactory) {
ClientHandlerFactory<T> handlerFactory) {
client = vertx.createNetClient(new NetClientOptions().setTcpKeepAlive(true));
CompletableAsyncResult<ClientHandler> completion = AsyncResult.incomplete();
CompletableAsyncResult<T> completion = AsyncResult.incomplete();
client.connect(port, host, res -> {
if (res.failed()) {
completion.completeExceptionally(res.cause());
} else {
NetSocket socket = res.result();
new NetSocketClientHandler(
new NetSocketClientHandler<T>(
loggerProvider.getLogger(host + ":" + port),
socket,
remotePublicKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ private void handleMessage(Buffer buffer) {
}
}
} catch (HandshakeException | StreamException e) {
e.printStackTrace();
netSocket.close();
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ void connectToServer(@VertxInstance Vertx vertx) throws Exception {

SecureScuttlebuttVertxClient client =
new SecureScuttlebuttVertxClient(provider, vertx, Signature.KeyPair.random(), networkIdentifier);
MyClientHandler handler =
(MyClientHandler) client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get();
MyClientHandler handler = client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get();

Thread.sleep(1000);
assertNotNull(handler);
Expand Down
2 changes: 2 additions & 0 deletions scuttlebutt-rpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ dependencies {
compile project(':concurrent')
compile project(':crypto')
compile project(':scuttlebutt')
compile project(':scuttlebutt-handshake')
compile 'org.logl:logl-api'

compile 'com.fasterxml.jackson.core:jackson-databind'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package net.consensys.cava.scuttlebutt.rpc;

import net.consensys.cava.bytes.Bytes;

import java.util.List;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class RPCAsyncRequest {


private final RPCFunction function;
private final List<Object> arguments;

/**
*
* @param function the function to be in invoked. If the function is in a namespace, the first n-1 items in the array
* are the namespace followed by the function name (e.g. 'blobs.get' becomes ['blobs', 'get']).
* @param arguments The arguments passed to the function being invoked. Each item can be any arbitrary object which is
* JSON serializable (e.g. String, Int, list, object.)
*
*/
public RPCAsyncRequest(RPCFunction function, List<Object> arguments) {
this.function = function;
this.arguments = arguments;
}

/**
* Encode the RPC request as bytes.
*
* @param objectMapper the object mapper to serialize the request with
* @return an RPC request serialized into bytes
* @throws JsonProcessingException thrown if there is an error while serializing the request to bytes
*/
public Bytes toEncodedRpcMessage(ObjectMapper objectMapper) throws JsonProcessingException {
return RPCCodec.encodeRequest(
new RPCRequestBody(function.asList(), RPCRequestType.ASYNC, arguments).asBytes(objectMapper),
getRPCFlags());
}

public RPCFlag[] getRPCFlags() {
return new RPCFlag[] {RPCFlag.BodyType.JSON};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Encoder responsible for encoding requests.
* <p>
Expand All @@ -27,6 +30,8 @@ public final class RPCCodec {

static final AtomicInteger counter = new AtomicInteger(1);

private static ObjectMapper mapper = new ObjectMapper();

private static int nextRequestNumber() {
int requestNumber = counter.getAndIncrement();
if (requestNumber < 1) {
Expand Down Expand Up @@ -81,6 +86,19 @@ public static Bytes encodeRequest(Bytes body, int requestNumber, RPCFlag... flag
body);
}

/**
* Encode a message as an RPC request.
*
* @param body the body to encode as an RPC request
* @param requestNumber the request number
* @param flags the flags of the RPC request (already encoded.)
* @return the message encoded as an RPC request
*/
public static Bytes encodeRequest(Bytes body, int requestNumber, byte flags) {
return Bytes
.concatenate(Bytes.of(flags), Bytes.ofUnsignedInt(body.size()), Bytes.ofUnsignedInt(requestNumber), body);
}

/**
* Encode a message as a response to a RPC request.
*
Expand Down Expand Up @@ -116,6 +134,18 @@ public static Bytes encodeResponse(Bytes body, int requestNumber, byte flagByte,
return encodeResponse(body, requestNumber, flagByte);
}

/**
* Encodes a message with the body and headers set in the appropriate way to end a stream.
*
* @return the response encoded as an RPC request
* @throws JsonProcessingException
*/
public static Bytes encodeStreamEndRequest(int requestNumber) throws JsonProcessingException {
Boolean bool = Boolean.TRUE;
byte[] bytes = mapper.writeValueAsBytes(bool);
return encodeRequest(Bytes.wrap(bytes), requestNumber, RPCFlag.EndOrError.END);
}

/**
* Encode a message as a response to a RPC request.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package net.consensys.cava.scuttlebutt.rpc;

/**
* An RPC message response body which contains an error
*/
public class RPCErrorBody {

private String name;
private String message;
private String stack;

public RPCErrorBody() {

}

/**
* A description of an error that occurred while performing an RPC request.
*
* @param name the name of the error type
* @param message the message describing the error
* @param stack the stack trace from the error
*/
public RPCErrorBody(String name, String message, String stack) {
this.name = name;
this.message = message;
this.stack = stack;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public String getStack() {
return stack;
}

public void setStack(String stack) {
this.stack = stack;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package net.consensys.cava.scuttlebutt.rpc;

import java.util.ArrayList;
import java.util.List;

/**
* A scuttlebutt RPC function namespace and name representation.
*/
public class RPCFunction {

private final List<String> namespace;
private final String functionName;

/**
*
* @param namespace the namespace of the function (e.g. ['blobs']. May be empty if there is no namespace for the
* function.
* @param functionName the function (e.g. 'add'.)
*/
public RPCFunction(List<String> namespace, String functionName) {
this.namespace = namespace;
this.functionName = functionName;
}

public RPCFunction(String functionName) {
this.namespace = new ArrayList<>();
this.functionName = functionName;
}

/**
* @return The list representation of the namespace and function call.
*/
public List<String> asList() {
List<String> list = new ArrayList<>();
list.addAll(namespace);
list.add(functionName);
return list;
}

}

0 comments on commit 2c0c4a1

Please sign in to comment.