Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Add higher level 'RPCResponse' class to represent successful RPC resp…
Browse files Browse the repository at this point in the history
…onse bodies as implementers may think they have to check and handle errors in RPCMessage, rather than handling the future completing exceptionally.
  • Loading branch information
Happy0 committed Apr 14, 2019
1 parent 22ac65c commit d2c036b
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 44 deletions.
Expand Up @@ -15,11 +15,12 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import net.consensys.cava.bytes.Bytes;
import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;

import java.io.IOException;
import java.util.Optional;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;

/**
* Decoded RPC message, making elements of the message available directly.
Expand Down Expand Up @@ -104,16 +105,40 @@ public Optional<RPCErrorBody> getErrorBody(ObjectMapper objectMapper) {

if (!isErrorMessage()) {
// If the body of the response is 'true' or the error flag isn't set, it's a successful end condition
return Optional.absent();
return Optional.empty();
} else {
try {
return Optional.of(asJSON(objectMapper, RPCErrorBody.class));
} catch (IOException e) {
return Optional.absent();
return Optional.empty();
}
}
}

/**
*
* @param objectMapper the objectmatter to deserialize the error with.
*
* @return an exception if this represents an error RPC response, otherwise nothing
*/
public Optional<RPCRequestFailedException> getException(ObjectMapper objectMapper) {
if (isErrorMessage()) {
Optional<RPCRequestFailedException> exception =
getErrorBody(objectMapper).map(errorBody -> new RPCRequestFailedException(errorBody.getMessage()));

if (!exception.isPresent()) {
// If we failed to deserialize into the RPCErrorBody type there may be a bug in the server implementation
// which prevented it returning the correct type, so we just print whatever it returned
return Optional.of(new RPCRequestFailedException(this.asString()));
} else {
return exception;
}

} else {
return Optional.empty();
}
}

/**
* Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message.
*
Expand Down
@@ -0,0 +1,74 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package net.consensys.cava.scuttlebutt.rpc;

import static java.nio.charset.StandardCharsets.UTF_8;

import net.consensys.cava.bytes.Bytes;
import net.consensys.cava.scuttlebutt.rpc.RPCFlag.BodyType;

import java.io.IOException;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* A successful RPC response.
*/
public class RPCResponse {

private final Bytes body;
private final BodyType bodyType;

/**
* A successful RPC response.
*
* @param body the body of the response in bytes
* @param bodyType the type of the response (e.g. JSON, UTF-8 or binary.)
*/
public RPCResponse(Bytes body, BodyType bodyType) {

this.body = body;
this.bodyType = bodyType;
}

public Bytes getBody() {
return body;
}

public BodyType getBodyType() {
return bodyType;
}

/**
* Provides the body of the message as a UTF-8 string.
*
* @return the body of the message as a UTF-8 string
*/
public String asString() {
return new String(getBody().toArrayUnsafe(), UTF_8);
}

/**
* Provides the body of the message, marshalled as a JSON object.
*
* @param objectMapper the object mapper to deserialize with
* @param clazz the JSON object class
* @param <T> the matching JSON object class
* @return a new instance of the JSON object class
* @throws IOException if an error occurs during marshalling
*/
public <T> T asJSON(ObjectMapper objectMapper, Class<T> clazz) throws IOException {
return objectMapper.readerFor(clazz).readValue(getBody().toArrayUnsafe());
}

}
Expand Up @@ -14,7 +14,7 @@

import net.consensys.cava.concurrent.AsyncResult;
import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
import net.consensys.cava.scuttlebutt.rpc.RPCResponse;
import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;

Expand All @@ -35,7 +35,7 @@ public interface Multiplexer {
*
* @return an async result which will be completed with the result or an error if the request fails.
*/
AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException;
AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException;

/**
* Creates a request which opens a stream (e.g. a 'source' in the protocol docs.)
Expand Down
Expand Up @@ -18,22 +18,23 @@
import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler;
import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
import net.consensys.cava.scuttlebutt.rpc.RPCCodec;
import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody;
import net.consensys.cava.scuttlebutt.rpc.RPCFlag;
import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
import net.consensys.cava.scuttlebutt.rpc.RPCResponse;
import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import org.logl.Logger;
import org.logl.LoggerProvider;

Expand All @@ -47,7 +48,7 @@ public class RPCHandler implements Multiplexer, ClientHandler {
private final Runnable connectionCloser;
private final ObjectMapper objectMapper;

private Map<Integer, CompletableAsyncResult<RPCMessage>> awaitingAsyncResponse = new HashMap<>();
private Map<Integer, CompletableAsyncResult<RPCResponse>> awaitingAsyncResponse = new HashMap<>();
private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap<>();

private boolean closed;
Expand Down Expand Up @@ -80,18 +81,19 @@ public RPCHandler(
}

@Override
public AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException {
public AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException {

Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);

CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete();
CompletableAsyncResult<RPCResponse> result = AsyncResult.incomplete();

Runnable synchronizedAddRequest = () -> {
if (closed) {
result.completeExceptionally(new ConnectionClosedException());
} else {
RPCMessage message = new RPCMessage(bodyBytes);
int requestNumber = message.requestNumber();

awaitingAsyncResponse.put(requestNumber, result);
Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
sendBytes(bytes);
Expand Down Expand Up @@ -206,6 +208,8 @@ private void handleResponse(RPCMessage response) {

boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags);

Optional<RPCRequestFailedException> exception = response.getException(objectMapper);

if (isStream) {
ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber);

Expand All @@ -214,20 +218,11 @@ private void handleResponse(RPCMessage response) {
if (response.isSuccessfulLastMessage()) {
streams.remove(requestNumber);
scuttlebuttStreamHandler.onStreamEnd();
} else if (response.isErrorMessage()) {

Optional<RPCErrorBody> errorBody = response.getErrorBody(objectMapper);

if (errorBody.isPresent()) {
scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage()));
} else {
// This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message
// if we fail to marshall it for whatever reason
scuttlebuttStreamHandler.onStreamError(new Exception(response.asString()));
}

} else if (exception.isPresent()) {
scuttlebuttStreamHandler.onStreamError(exception.get());
} else {
scuttlebuttStreamHandler.onMessage(response);
RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());
scuttlebuttStreamHandler.onMessage(successfulResponse);
}
} else {
logger.warn(
Expand All @@ -239,11 +234,18 @@ private void handleResponse(RPCMessage response) {

} else {

CompletableAsyncResult<RPCMessage> rpcMessageFuture = awaitingAsyncResponse.get(requestNumber);
CompletableAsyncResult<RPCResponse> rpcMessageFuture = awaitingAsyncResponse.remove(requestNumber);

if (rpcMessageFuture != null) {
rpcMessageFuture.complete(response);
awaitingAsyncResponse.remove(requestNumber);

if (exception.isPresent()) {
rpcMessageFuture.completeExceptionally(exception.get());
} else {
RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());

rpcMessageFuture.complete(successfulResponse);
}

} else {
logger.warn(
"Couldn't find async handler for RPC response with request number "
Expand Down
Expand Up @@ -12,7 +12,7 @@
*/
package net.consensys.cava.scuttlebutt.rpc.mux;

import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
import net.consensys.cava.scuttlebutt.rpc.RPCResponse;

/**
* Handles incoming items from a result stream
Expand All @@ -24,7 +24,7 @@ public interface ScuttlebuttStreamHandler {
*
* @param message
*/
void onMessage(RPCMessage message);
void onMessage(RPCResponse message);

/**
* Invoked when the stream has been closed.
Expand Down
@@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package net.consensys.cava.scuttlebutt.rpc.mux.exceptions;

public class RPCRequestFailedException extends Exception {

public RPCRequestFailedException(String errorMessage) {
super(errorMessage);
}
}
Expand Up @@ -26,7 +26,6 @@
*/
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import net.consensys.cava.bytes.Bytes;
import net.consensys.cava.bytes.Bytes32;
Expand All @@ -39,7 +38,7 @@
import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
import net.consensys.cava.scuttlebutt.rpc.RPCFunction;
import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
import net.consensys.cava.scuttlebutt.rpc.RPCResponse;
import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;

import java.io.BufferedWriter;
Expand Down Expand Up @@ -77,27 +76,21 @@ public void testWithPatchwork(@VertxInstance Vertx vertx) throws Exception {

RPCHandler rpcHandler = makeRPCHandler(vertx);

List<AsyncResult<RPCMessage>> results = new ArrayList<>();
List<AsyncResult<RPCResponse>> results = new ArrayList<>();

for (int i = 0; i < 10; i++) {
RPCFunction function = new RPCFunction("whoami");
RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>());

AsyncResult<RPCMessage> res = rpcHandler.makeAsyncRequest(asyncRequest);
AsyncResult<RPCResponse> res = rpcHandler.makeAsyncRequest(asyncRequest);

results.add(res);
}

AsyncResult<List<RPCMessage>> allResults = AsyncResult.combine(results);
List<RPCMessage> rpcMessages = allResults.get();
AsyncResult<List<RPCResponse>> allResults = AsyncResult.combine(results);
List<RPCResponse> rpcMessages = allResults.get();

assertEquals(10, rpcMessages.size());

rpcMessages.forEach(msg -> {
assertFalse(msg.lastMessageOrError());

});

}


Expand Down Expand Up @@ -157,7 +150,7 @@ public void postMessageTest(@VertxInstance Vertx vertx) throws Exception {
RPCHandler rpcHandler = makeRPCHandler(vertx);


List<AsyncResult<RPCMessage>> results = new ArrayList<>();
List<AsyncResult<RPCResponse>> results = new ArrayList<>();

for (int i = 0; i < 20; i++) {
// Note: in a real use case, this would more likely be a Java class with these fields
Expand All @@ -167,13 +160,13 @@ public void postMessageTest(@VertxInstance Vertx vertx) throws Exception {

RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params));

AsyncResult<RPCMessage> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
AsyncResult<RPCResponse> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);

results.add(rpcMessageAsyncResult);

}

List<RPCMessage> rpcMessages = AsyncResult.combine(results).get();
List<RPCResponse> rpcMessages = AsyncResult.combine(results).get();

rpcMessages.forEach(msg -> System.out.println(msg.asString()));
}
Expand Down Expand Up @@ -220,7 +213,7 @@ public void streamTest(@VertxInstance Vertx vertx) throws Exception {

handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
@Override
public void onMessage(RPCMessage message) {
public void onMessage(RPCResponse message) {
System.out.print(message.asString());
}

Expand All @@ -232,6 +225,7 @@ public void onStreamEnd() {
@Override
public void onStreamError(Exception ex) {

streamEnded.completeExceptionally(ex);
}
});

Expand Down

0 comments on commit d2c036b

Please sign in to comment.