diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java index 60942ff7..59c2a80f 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java @@ -15,11 +15,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import net.consensys.cava.bytes.Bytes; +import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException; import java.io.IOException; +import java.util.Optional; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; /** * Decoded RPC message, making elements of the message available directly. @@ -104,16 +105,40 @@ public Optional getErrorBody(ObjectMapper objectMapper) { if (!isErrorMessage()) { // If the body of the response is 'true' or the error flag isn't set, it's a successful end condition - return Optional.absent(); + return Optional.empty(); } else { try { return Optional.of(asJSON(objectMapper, RPCErrorBody.class)); } catch (IOException e) { - return Optional.absent(); + return Optional.empty(); } } } + /** + * + * @param objectMapper the objectmatter to deserialize the error with. + * + * @return an exception if this represents an error RPC response, otherwise nothing + */ + public Optional getException(ObjectMapper objectMapper) { + if (isErrorMessage()) { + Optional exception = + getErrorBody(objectMapper).map(errorBody -> new RPCRequestFailedException(errorBody.getMessage())); + + if (!exception.isPresent()) { + // If we failed to deserialize into the RPCErrorBody type there may be a bug in the server implementation + // which prevented it returning the correct type, so we just print whatever it returned + return Optional.of(new RPCRequestFailedException(this.asString())); + } else { + return exception; + } + + } else { + return Optional.empty(); + } + } + /** * Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message. * diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java new file mode 100644 index 00000000..7049c3c4 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import net.consensys.cava.bytes.Bytes; +import net.consensys.cava.scuttlebutt.rpc.RPCFlag.BodyType; + +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A successful RPC response. + */ +public class RPCResponse { + + private final Bytes body; + private final BodyType bodyType; + + /** + * A successful RPC response. + * + * @param body the body of the response in bytes + * @param bodyType the type of the response (e.g. JSON, UTF-8 or binary.) + */ + public RPCResponse(Bytes body, BodyType bodyType) { + + this.body = body; + this.bodyType = bodyType; + } + + public Bytes getBody() { + return body; + } + + public BodyType getBodyType() { + return bodyType; + } + + /** + * Provides the body of the message as a UTF-8 string. + * + * @return the body of the message as a UTF-8 string + */ + public String asString() { + return new String(getBody().toArrayUnsafe(), UTF_8); + } + + /** + * Provides the body of the message, marshalled as a JSON object. + * + * @param objectMapper the object mapper to deserialize with + * @param clazz the JSON object class + * @param the matching JSON object class + * @return a new instance of the JSON object class + * @throws IOException if an error occurs during marshalling + */ + public T asJSON(ObjectMapper objectMapper, Class clazz) throws IOException { + return objectMapper.readerFor(clazz).readValue(getBody().toArrayUnsafe()); + } + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java index 1047bd9e..b49ba733 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java @@ -14,7 +14,7 @@ import net.consensys.cava.concurrent.AsyncResult; import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; @@ -35,7 +35,7 @@ public interface Multiplexer { * * @return an async result which will be completed with the result or an error if the request fails. */ - AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException; + AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException; /** * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.) diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java index b6eecfbc..d4bfc512 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java @@ -18,14 +18,16 @@ import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler; import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; import net.consensys.cava.scuttlebutt.rpc.RPCCodec; -import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody; import net.consensys.cava.scuttlebutt.rpc.RPCFlag; import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; +import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -33,7 +35,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; import org.logl.Logger; import org.logl.LoggerProvider; @@ -47,7 +48,7 @@ public class RPCHandler implements Multiplexer, ClientHandler { private final Runnable connectionCloser; private final ObjectMapper objectMapper; - private Map> awaitingAsyncResponse = new HashMap<>(); + private Map> awaitingAsyncResponse = new HashMap<>(); private Map streams = new HashMap<>(); private boolean closed; @@ -80,11 +81,11 @@ public RPCHandler( } @Override - public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException { + public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException { Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper); - CompletableAsyncResult result = AsyncResult.incomplete(); + CompletableAsyncResult result = AsyncResult.incomplete(); Runnable synchronizedAddRequest = () -> { if (closed) { @@ -92,6 +93,7 @@ public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws } else { RPCMessage message = new RPCMessage(bodyBytes); int requestNumber = message.requestNumber(); + awaitingAsyncResponse.put(requestNumber, result); Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags()); sendBytes(bytes); @@ -206,6 +208,8 @@ private void handleResponse(RPCMessage response) { boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags); + Optional exception = response.getException(objectMapper); + if (isStream) { ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber); @@ -214,20 +218,11 @@ private void handleResponse(RPCMessage response) { if (response.isSuccessfulLastMessage()) { streams.remove(requestNumber); scuttlebuttStreamHandler.onStreamEnd(); - } else if (response.isErrorMessage()) { - - Optional errorBody = response.getErrorBody(objectMapper); - - if (errorBody.isPresent()) { - scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage())); - } else { - // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message - // if we fail to marshall it for whatever reason - scuttlebuttStreamHandler.onStreamError(new Exception(response.asString())); - } - + } else if (exception.isPresent()) { + scuttlebuttStreamHandler.onStreamError(exception.get()); } else { - scuttlebuttStreamHandler.onMessage(response); + RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType()); + scuttlebuttStreamHandler.onMessage(successfulResponse); } } else { logger.warn( @@ -239,11 +234,18 @@ private void handleResponse(RPCMessage response) { } else { - CompletableAsyncResult rpcMessageFuture = awaitingAsyncResponse.get(requestNumber); + CompletableAsyncResult rpcMessageFuture = awaitingAsyncResponse.remove(requestNumber); if (rpcMessageFuture != null) { - rpcMessageFuture.complete(response); - awaitingAsyncResponse.remove(requestNumber); + + if (exception.isPresent()) { + rpcMessageFuture.completeExceptionally(exception.get()); + } else { + RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType()); + + rpcMessageFuture.complete(successfulResponse); + } + } else { logger.warn( "Couldn't find async handler for RPC response with request number " diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java index d108663a..b75435d5 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java @@ -12,7 +12,7 @@ */ package net.consensys.cava.scuttlebutt.rpc.mux; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; /** * Handles incoming items from a result stream @@ -24,7 +24,7 @@ public interface ScuttlebuttStreamHandler { * * @param message */ - void onMessage(RPCMessage message); + void onMessage(RPCResponse message); /** * Invoked when the stream has been closed. diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java new file mode 100644 index 00000000..ad444bd8 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java @@ -0,0 +1,20 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc.mux.exceptions; + +public class RPCRequestFailedException extends Exception { + + public RPCRequestFailedException(String errorMessage) { + super(errorMessage); + } +} diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java index 79ed8c72..c3b12c7d 100644 --- a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java @@ -26,7 +26,6 @@ */ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import net.consensys.cava.bytes.Bytes; import net.consensys.cava.bytes.Bytes32; @@ -39,7 +38,7 @@ import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient; import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; import net.consensys.cava.scuttlebutt.rpc.RPCFunction; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; import java.io.BufferedWriter; @@ -77,27 +76,21 @@ public void testWithPatchwork(@VertxInstance Vertx vertx) throws Exception { RPCHandler rpcHandler = makeRPCHandler(vertx); - List> results = new ArrayList<>(); + List> results = new ArrayList<>(); for (int i = 0; i < 10; i++) { RPCFunction function = new RPCFunction("whoami"); RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>()); - AsyncResult res = rpcHandler.makeAsyncRequest(asyncRequest); + AsyncResult res = rpcHandler.makeAsyncRequest(asyncRequest); results.add(res); } - AsyncResult> allResults = AsyncResult.combine(results); - List rpcMessages = allResults.get(); + AsyncResult> allResults = AsyncResult.combine(results); + List rpcMessages = allResults.get(); assertEquals(10, rpcMessages.size()); - - rpcMessages.forEach(msg -> { - assertFalse(msg.lastMessageOrError()); - - }); - } @@ -157,7 +150,7 @@ public void postMessageTest(@VertxInstance Vertx vertx) throws Exception { RPCHandler rpcHandler = makeRPCHandler(vertx); - List> results = new ArrayList<>(); + List> results = new ArrayList<>(); for (int i = 0; i < 20; i++) { // Note: in a real use case, this would more likely be a Java class with these fields @@ -167,13 +160,13 @@ public void postMessageTest(@VertxInstance Vertx vertx) throws Exception { RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params)); - AsyncResult rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest); + AsyncResult rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest); results.add(rpcMessageAsyncResult); } - List rpcMessages = AsyncResult.combine(results).get(); + List rpcMessages = AsyncResult.combine(results).get(); rpcMessages.forEach(msg -> System.out.println(msg.asString())); } @@ -220,7 +213,7 @@ public void streamTest(@VertxInstance Vertx vertx) throws Exception { handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() { @Override - public void onMessage(RPCMessage message) { + public void onMessage(RPCResponse message) { System.out.print(message.asString()); } @@ -232,6 +225,7 @@ public void onStreamEnd() { @Override public void onStreamError(Exception ex) { + streamEnded.completeExceptionally(ex); } });