From 22ac65cb387d90f43fe42c48021366f743af9c59 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Fri, 12 Apr 2019 20:23:54 +0100 Subject: [PATCH 1/3] Fix thread deadlock issue in RPC handler. --- .../cava/scuttlebutt/rpc/mux/Multiplexer.java | 2 +- .../cava/scuttlebutt/rpc/mux/RPCHandler.java | 168 ++++++++++-------- .../rpc/mux/PatchworkIntegrationTest.java | 37 ++-- 3 files changed, 107 insertions(+), 100 deletions(-) diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java index 722dcbfa..1047bd9e 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java @@ -35,7 +35,7 @@ public interface Multiplexer { * * @return an async result which will be completed with the result or an error if the request fails. */ - AsyncResult makeAsyncRequest(RPCAsyncRequest request); + AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException; /** * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.) diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java index 0ac45833..b6eecfbc 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Function; @@ -36,15 +38,7 @@ import org.logl.LoggerProvider; /** - * Handles RPC requests and responses from an active connection to a scuttlebutt node - * - * Note: the public methods on this class are synchronized so that a request is rejected if the connection has been - * closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without - * new incoming requests being added to the maps by threads. - * - * In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of - * concurrency. - * + * Handles RPC requests and responses from an active connection to a scuttlebutt node. */ public class RPCHandler implements Multiplexer, ClientHandler { @@ -58,6 +52,12 @@ public class RPCHandler implements Multiplexer, ClientHandler { private boolean closed; + /** + * We run each each update on this executor to update the request state synchronously, and to handle the underlying + * connection closing by failing the in progress requests and not accepting future requests + */ + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + /** * Makes RPC requests over a connection * @@ -80,86 +80,113 @@ public RPCHandler( } @Override - public synchronized AsyncResult makeAsyncRequest(RPCAsyncRequest request) { + public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException { - CompletableAsyncResult result = AsyncResult.incomplete(); + Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper); - if (closed) { - result.completeExceptionally(new ConnectionClosedException()); - } - - try { - RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper)); - int requestNumber = message.requestNumber(); - awaitingAsyncResponse.put(requestNumber, result); - Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags()); - messageSender.accept(bytes); + CompletableAsyncResult result = AsyncResult.incomplete(); - } catch (JsonProcessingException e) { - result.completeExceptionally(e); - } + Runnable synchronizedAddRequest = () -> { + if (closed) { + result.completeExceptionally(new ConnectionClosedException()); + } else { + RPCMessage message = new RPCMessage(bodyBytes); + int requestNumber = message.requestNumber(); + awaitingAsyncResponse.put(requestNumber, result); + Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags()); + sendBytes(bytes); + } + }; + executor.submit(synchronizedAddRequest); return result; } @Override - public synchronized void openStream( - RPCStreamRequest request, - Function responseSink) throws JsonProcessingException, - ConnectionClosedException { + public void openStream(RPCStreamRequest request, Function responseSink) + throws JsonProcessingException { - if (closed) { - throw new ConnectionClosedException(); - } + Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper); + + Runnable synchronizedRequest = () -> { - try { RPCFlag[] rpcFlags = request.getRPCFlags(); - RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper)); + RPCMessage message = new RPCMessage(bodyBytes); int requestNumber = message.requestNumber(); - Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags); - messageSender.accept(bytes); + Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags); - Runnable closeStreamHandler = new Runnable() { - @Override - public void run() { - - try { - Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber); - messageSender.accept(bytes); - } catch (JsonProcessingException e) { - logger.warn("Unexpectedly could not encode stream end message to JSON."); - } + Runnable closeStreamHandler = () -> { + try { + Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber); + sendBytes(streamEnd); + } catch (JsonProcessingException e) { + logger.warn("Unexpectedly could not encode stream end message to JSON."); } + }; ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler); - streams.put(requestNumber, scuttlebuttStreamHandler); - } catch (JsonProcessingException ex) { - throw ex; - } + if (closed) { + scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException()); + } else { + streams.put(requestNumber, scuttlebuttStreamHandler); + sendBytes(requestBytes); + } + + + }; + + executor.submit(synchronizedRequest); } @Override - public synchronized void close() { - connectionCloser.run(); + public void close() { + executor.submit(connectionCloser); } @Override - public synchronized void receivedMessage(Bytes message) { + public void receivedMessage(Bytes message) { - RPCMessage rpcMessage = new RPCMessage(message); + Runnable synchronizedHandleMessage = () -> { + RPCMessage rpcMessage = new RPCMessage(message); - // A negative request number indicates that this is a response, rather than a request that this node - // should service - if (rpcMessage.requestNumber() < 0) { - handleResponse(rpcMessage); - } else { - handleRequest(rpcMessage); - } + // A negative request number indicates that this is a response, rather than a request that this node + // should service + if (rpcMessage.requestNumber() < 0) { + handleResponse(rpcMessage); + } else { + handleRequest(rpcMessage); + } + }; + executor.submit(synchronizedHandleMessage); + } + + @Override + public void streamClosed() { + + Runnable synchronizedCloseStream = () -> { + closed = true; + + streams.forEach((key, streamHandler) -> { + streamHandler.onStreamError(new ConnectionClosedException()); + }); + + streams.clear(); + + awaitingAsyncResponse.forEach((key, value) -> { + if (!value.isDone()) { + value.completeExceptionally(new ConnectionClosedException()); + } + }); + + awaitingAsyncResponse.clear(); + }; + + executor.submit(synchronizedCloseStream); } private void handleRequest(RPCMessage rpcMessage) { @@ -228,23 +255,8 @@ private void handleResponse(RPCMessage response) { } - @Override - public void streamClosed() { - this.closed = true; - - streams.forEach((key, streamHandler) -> { - streamHandler.onStreamError(new ConnectionClosedException()); - }); - - streams.clear(); - - awaitingAsyncResponse.forEach((key, value) -> { - if (!value.isDone()) { - value.completeExceptionally(new ConnectionClosedException()); - } - - }); - - + private void sendBytes(Bytes bytes) { + messageSender.accept(bytes); } + } diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java index 293ac92b..79ed8c72 100644 --- a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java @@ -41,7 +41,6 @@ import net.consensys.cava.scuttlebutt.rpc.RPCFunction; import net.consensys.cava.scuttlebutt.rpc.RPCMessage; import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; -import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; import java.io.BufferedWriter; import java.io.File; @@ -219,26 +218,22 @@ public void streamTest(@VertxInstance Vertx vertx) throws Exception { RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params)); - try { - handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() { - @Override - public void onMessage(RPCMessage message) { - System.out.print(message.asString()); - } - - @Override - public void onStreamEnd() { - streamEnded.complete(null); - } - - @Override - public void onStreamError(Exception ex) { - - } - }); - } catch (ConnectionClosedException e) { - throw e; - } + handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() { + @Override + public void onMessage(RPCMessage message) { + System.out.print(message.asString()); + } + + @Override + public void onStreamEnd() { + streamEnded.complete(null); + } + + @Override + public void onStreamError(Exception ex) { + + } + }); // Wait until the stream is complete streamEnded.get(); From d2c036b0a5ad26621264fc40ac4cad351f722cbf Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Sun, 14 Apr 2019 16:17:13 +0100 Subject: [PATCH 2/3] Add higher level 'RPCResponse' class to represent successful RPC response bodies as implementers may think they have to check and handle errors in RPCMessage, rather than handling the future completing exceptionally. --- .../cava/scuttlebutt/rpc/RPCMessage.java | 31 +++++++- .../cava/scuttlebutt/rpc/RPCResponse.java | 74 +++++++++++++++++++ .../cava/scuttlebutt/rpc/mux/Multiplexer.java | 4 +- .../cava/scuttlebutt/rpc/mux/RPCHandler.java | 44 +++++------ .../rpc/mux/ScuttlebuttStreamHandler.java | 4 +- .../exceptions/RPCRequestFailedException.java | 20 +++++ .../rpc/mux/PatchworkIntegrationTest.java | 26 +++---- 7 files changed, 159 insertions(+), 44 deletions(-) create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java create mode 100644 scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java index 60942ff7..59c2a80f 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java @@ -15,11 +15,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import net.consensys.cava.bytes.Bytes; +import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException; import java.io.IOException; +import java.util.Optional; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; /** * Decoded RPC message, making elements of the message available directly. @@ -104,16 +105,40 @@ public Optional getErrorBody(ObjectMapper objectMapper) { if (!isErrorMessage()) { // If the body of the response is 'true' or the error flag isn't set, it's a successful end condition - return Optional.absent(); + return Optional.empty(); } else { try { return Optional.of(asJSON(objectMapper, RPCErrorBody.class)); } catch (IOException e) { - return Optional.absent(); + return Optional.empty(); } } } + /** + * + * @param objectMapper the objectmatter to deserialize the error with. + * + * @return an exception if this represents an error RPC response, otherwise nothing + */ + public Optional getException(ObjectMapper objectMapper) { + if (isErrorMessage()) { + Optional exception = + getErrorBody(objectMapper).map(errorBody -> new RPCRequestFailedException(errorBody.getMessage())); + + if (!exception.isPresent()) { + // If we failed to deserialize into the RPCErrorBody type there may be a bug in the server implementation + // which prevented it returning the correct type, so we just print whatever it returned + return Optional.of(new RPCRequestFailedException(this.asString())); + } else { + return exception; + } + + } else { + return Optional.empty(); + } + } + /** * Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message. * diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java new file mode 100644 index 00000000..7049c3c4 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import net.consensys.cava.bytes.Bytes; +import net.consensys.cava.scuttlebutt.rpc.RPCFlag.BodyType; + +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A successful RPC response. + */ +public class RPCResponse { + + private final Bytes body; + private final BodyType bodyType; + + /** + * A successful RPC response. + * + * @param body the body of the response in bytes + * @param bodyType the type of the response (e.g. JSON, UTF-8 or binary.) + */ + public RPCResponse(Bytes body, BodyType bodyType) { + + this.body = body; + this.bodyType = bodyType; + } + + public Bytes getBody() { + return body; + } + + public BodyType getBodyType() { + return bodyType; + } + + /** + * Provides the body of the message as a UTF-8 string. + * + * @return the body of the message as a UTF-8 string + */ + public String asString() { + return new String(getBody().toArrayUnsafe(), UTF_8); + } + + /** + * Provides the body of the message, marshalled as a JSON object. + * + * @param objectMapper the object mapper to deserialize with + * @param clazz the JSON object class + * @param the matching JSON object class + * @return a new instance of the JSON object class + * @throws IOException if an error occurs during marshalling + */ + public T asJSON(ObjectMapper objectMapper, Class clazz) throws IOException { + return objectMapper.readerFor(clazz).readValue(getBody().toArrayUnsafe()); + } + +} diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java index 1047bd9e..b49ba733 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/Multiplexer.java @@ -14,7 +14,7 @@ import net.consensys.cava.concurrent.AsyncResult; import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; @@ -35,7 +35,7 @@ public interface Multiplexer { * * @return an async result which will be completed with the result or an error if the request fails. */ - AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException; + AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException; /** * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.) diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java index b6eecfbc..d4bfc512 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java @@ -18,14 +18,16 @@ import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler; import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; import net.consensys.cava.scuttlebutt.rpc.RPCCodec; -import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody; import net.consensys.cava.scuttlebutt.rpc.RPCFlag; import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException; +import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -33,7 +35,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; import org.logl.Logger; import org.logl.LoggerProvider; @@ -47,7 +48,7 @@ public class RPCHandler implements Multiplexer, ClientHandler { private final Runnable connectionCloser; private final ObjectMapper objectMapper; - private Map> awaitingAsyncResponse = new HashMap<>(); + private Map> awaitingAsyncResponse = new HashMap<>(); private Map streams = new HashMap<>(); private boolean closed; @@ -80,11 +81,11 @@ public RPCHandler( } @Override - public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException { + public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException { Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper); - CompletableAsyncResult result = AsyncResult.incomplete(); + CompletableAsyncResult result = AsyncResult.incomplete(); Runnable synchronizedAddRequest = () -> { if (closed) { @@ -92,6 +93,7 @@ public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws } else { RPCMessage message = new RPCMessage(bodyBytes); int requestNumber = message.requestNumber(); + awaitingAsyncResponse.put(requestNumber, result); Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags()); sendBytes(bytes); @@ -206,6 +208,8 @@ private void handleResponse(RPCMessage response) { boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags); + Optional exception = response.getException(objectMapper); + if (isStream) { ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber); @@ -214,20 +218,11 @@ private void handleResponse(RPCMessage response) { if (response.isSuccessfulLastMessage()) { streams.remove(requestNumber); scuttlebuttStreamHandler.onStreamEnd(); - } else if (response.isErrorMessage()) { - - Optional errorBody = response.getErrorBody(objectMapper); - - if (errorBody.isPresent()) { - scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage())); - } else { - // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message - // if we fail to marshall it for whatever reason - scuttlebuttStreamHandler.onStreamError(new Exception(response.asString())); - } - + } else if (exception.isPresent()) { + scuttlebuttStreamHandler.onStreamError(exception.get()); } else { - scuttlebuttStreamHandler.onMessage(response); + RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType()); + scuttlebuttStreamHandler.onMessage(successfulResponse); } } else { logger.warn( @@ -239,11 +234,18 @@ private void handleResponse(RPCMessage response) { } else { - CompletableAsyncResult rpcMessageFuture = awaitingAsyncResponse.get(requestNumber); + CompletableAsyncResult rpcMessageFuture = awaitingAsyncResponse.remove(requestNumber); if (rpcMessageFuture != null) { - rpcMessageFuture.complete(response); - awaitingAsyncResponse.remove(requestNumber); + + if (exception.isPresent()) { + rpcMessageFuture.completeExceptionally(exception.get()); + } else { + RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType()); + + rpcMessageFuture.complete(successfulResponse); + } + } else { logger.warn( "Couldn't find async handler for RPC response with request number " diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java index d108663a..b75435d5 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java @@ -12,7 +12,7 @@ */ package net.consensys.cava.scuttlebutt.rpc.mux; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; /** * Handles incoming items from a result stream @@ -24,7 +24,7 @@ public interface ScuttlebuttStreamHandler { * * @param message */ - void onMessage(RPCMessage message); + void onMessage(RPCResponse message); /** * Invoked when the stream has been closed. diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java new file mode 100644 index 00000000..ad444bd8 --- /dev/null +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java @@ -0,0 +1,20 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.scuttlebutt.rpc.mux.exceptions; + +public class RPCRequestFailedException extends Exception { + + public RPCRequestFailedException(String errorMessage) { + super(errorMessage); + } +} diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java index 79ed8c72..c3b12c7d 100644 --- a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java @@ -26,7 +26,6 @@ */ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import net.consensys.cava.bytes.Bytes; import net.consensys.cava.bytes.Bytes32; @@ -39,7 +38,7 @@ import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient; import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest; import net.consensys.cava.scuttlebutt.rpc.RPCFunction; -import net.consensys.cava.scuttlebutt.rpc.RPCMessage; +import net.consensys.cava.scuttlebutt.rpc.RPCResponse; import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest; import java.io.BufferedWriter; @@ -77,27 +76,21 @@ public void testWithPatchwork(@VertxInstance Vertx vertx) throws Exception { RPCHandler rpcHandler = makeRPCHandler(vertx); - List> results = new ArrayList<>(); + List> results = new ArrayList<>(); for (int i = 0; i < 10; i++) { RPCFunction function = new RPCFunction("whoami"); RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>()); - AsyncResult res = rpcHandler.makeAsyncRequest(asyncRequest); + AsyncResult res = rpcHandler.makeAsyncRequest(asyncRequest); results.add(res); } - AsyncResult> allResults = AsyncResult.combine(results); - List rpcMessages = allResults.get(); + AsyncResult> allResults = AsyncResult.combine(results); + List rpcMessages = allResults.get(); assertEquals(10, rpcMessages.size()); - - rpcMessages.forEach(msg -> { - assertFalse(msg.lastMessageOrError()); - - }); - } @@ -157,7 +150,7 @@ public void postMessageTest(@VertxInstance Vertx vertx) throws Exception { RPCHandler rpcHandler = makeRPCHandler(vertx); - List> results = new ArrayList<>(); + List> results = new ArrayList<>(); for (int i = 0; i < 20; i++) { // Note: in a real use case, this would more likely be a Java class with these fields @@ -167,13 +160,13 @@ public void postMessageTest(@VertxInstance Vertx vertx) throws Exception { RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params)); - AsyncResult rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest); + AsyncResult rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest); results.add(rpcMessageAsyncResult); } - List rpcMessages = AsyncResult.combine(results).get(); + List rpcMessages = AsyncResult.combine(results).get(); rpcMessages.forEach(msg -> System.out.println(msg.asString())); } @@ -220,7 +213,7 @@ public void streamTest(@VertxInstance Vertx vertx) throws Exception { handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() { @Override - public void onMessage(RPCMessage message) { + public void onMessage(RPCResponse message) { System.out.print(message.asString()); } @@ -232,6 +225,7 @@ public void onStreamEnd() { @Override public void onStreamError(Exception ex) { + streamEnded.completeExceptionally(ex); } }); From 2fede73ffcd5605859d6c07a56267787f95e097c Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Sun, 14 Apr 2019 22:41:02 +0100 Subject: [PATCH 3/3] Enqueue rpc request handlers on vertx event loop rather than having a dedicator executor. Fix some styling. --- scuttlebutt-rpc/build.gradle | 1 + .../cava/scuttlebutt/rpc/RPCMessage.java | 2 +- .../cava/scuttlebutt/rpc/RPCResponse.java | 14 +++++-- .../cava/scuttlebutt/rpc/mux/RPCHandler.java | 39 +++++++++++-------- .../exceptions/RPCRequestFailedException.java | 2 +- .../rpc/mux/PatchworkIntegrationTest.java | 2 +- 6 files changed, 36 insertions(+), 24 deletions(-) diff --git a/scuttlebutt-rpc/build.gradle b/scuttlebutt-rpc/build.gradle index cf71a725..9b343ed2 100644 --- a/scuttlebutt-rpc/build.gradle +++ b/scuttlebutt-rpc/build.gradle @@ -4,6 +4,7 @@ dependencies { compile project(':bytes') compile project(':concurrent') compile project(':crypto') + compileOnly 'io.vertx:vertx-core' compile project(':scuttlebutt') compile project(':scuttlebutt-handshake') compile 'org.logl:logl-api' diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java index 59c2a80f..0597e99a 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCMessage.java @@ -118,7 +118,7 @@ public Optional getErrorBody(ObjectMapper objectMapper) { /** * * @param objectMapper the objectmatter to deserialize the error with. - * + * * @return an exception if this represents an error RPC response, otherwise nothing */ public Optional getException(ObjectMapper objectMapper) { diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java index 7049c3c4..41ebde09 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/RPCResponse.java @@ -41,11 +41,17 @@ public RPCResponse(Bytes body, BodyType bodyType) { this.bodyType = bodyType; } - public Bytes getBody() { + /** + * @return the RPC response body + */ + public Bytes body() { return body; } - public BodyType getBodyType() { + /** + * @return The type of the data contained in the body. + */ + public BodyType bodyType() { return bodyType; } @@ -55,7 +61,7 @@ public BodyType getBodyType() { * @return the body of the message as a UTF-8 string */ public String asString() { - return new String(getBody().toArrayUnsafe(), UTF_8); + return new String(body().toArrayUnsafe(), UTF_8); } /** @@ -68,7 +74,7 @@ public String asString() { * @throws IOException if an error occurs during marshalling */ public T asJSON(ObjectMapper objectMapper, Class clazz) throws IOException { - return objectMapper.readerFor(clazz).readValue(getBody().toArrayUnsafe()); + return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe()); } } diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java index d4bfc512..7ee70d8a 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/RPCHandler.java @@ -28,13 +28,13 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Function; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; import org.logl.Logger; import org.logl.LoggerProvider; @@ -48,30 +48,33 @@ public class RPCHandler implements Multiplexer, ClientHandler { private final Runnable connectionCloser; private final ObjectMapper objectMapper; + /** + * We run each each update on the vertx event loop to update the request state synchronously, and to handle the + * underlying connection closing by failing the in progress requests and not accepting future requests + */ + private final Vertx vertx; + private Map> awaitingAsyncResponse = new HashMap<>(); private Map streams = new HashMap<>(); private boolean closed; - /** - * We run each each update on this executor to update the request state synchronously, and to handle the underlying - * connection closing by failing the in progress requests and not accepting future requests - */ - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - /** * Makes RPC requests over a connection * + * @param vertx The vertx instance to queue requests with * @param messageSender sends the request to the node * @param terminationFn closes the connection * @param objectMapper the objectMapper to serialize and deserialize message request and response bodies * @param logger */ public RPCHandler( + Vertx vertx, Consumer messageSender, Runnable terminationFn, ObjectMapper objectMapper, LoggerProvider logger) { + this.vertx = vertx; this.messageSender = messageSender; this.connectionCloser = terminationFn; this.closed = false; @@ -87,7 +90,7 @@ public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws CompletableAsyncResult result = AsyncResult.incomplete(); - Runnable synchronizedAddRequest = () -> { + Handler synchronizedAddRequest = (event) -> { if (closed) { result.completeExceptionally(new ConnectionClosedException()); } else { @@ -100,7 +103,7 @@ public AsyncResult makeAsyncRequest(RPCAsyncRequest request) throws } }; - executor.submit(synchronizedAddRequest); + vertx.runOnContext(synchronizedAddRequest); return result; } @@ -110,7 +113,7 @@ public void openStream(RPCStreamRequest request, Function { + Handler synchronizedRequest = (event) -> { RPCFlag[] rpcFlags = request.getRPCFlags(); RPCMessage message = new RPCMessage(bodyBytes); @@ -141,18 +144,20 @@ public void openStream(RPCStreamRequest request, Function { + connectionCloser.run(); + }); } @Override public void receivedMessage(Bytes message) { - Runnable synchronizedHandleMessage = () -> { + Handler synchronizedHandleMessage = (event) -> { RPCMessage rpcMessage = new RPCMessage(message); // A negative request number indicates that this is a response, rather than a request that this node @@ -164,13 +169,13 @@ public void receivedMessage(Bytes message) { } }; - executor.submit(synchronizedHandleMessage); + vertx.runOnContext(synchronizedHandleMessage); } @Override public void streamClosed() { - Runnable synchronizedCloseStream = () -> { + Handler synchronizedCloseStream = (event) -> { closed = true; streams.forEach((key, streamHandler) -> { @@ -188,7 +193,7 @@ public void streamClosed() { awaitingAsyncResponse.clear(); }; - executor.submit(synchronizedCloseStream); + vertx.runOnContext(synchronizedCloseStream); } private void handleRequest(RPCMessage rpcMessage) { diff --git a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java index ad444bd8..8b98eac3 100644 --- a/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java +++ b/scuttlebutt-rpc/src/main/java/net/consensys/cava/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java @@ -12,7 +12,7 @@ */ package net.consensys.cava.scuttlebutt.rpc.mux.exceptions; -public class RPCRequestFailedException extends Exception { +public final class RPCRequestFailedException extends RuntimeException { public RPCRequestFailedException(String errorMessage) { super(errorMessage); diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java index c3b12c7d..516bc919 100644 --- a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java @@ -188,7 +188,7 @@ private RPCHandler makeRPCHandler(Vertx vertx) throws Exception { AsyncResult onConnect = secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> { - return new RPCHandler(sender, terminationFn, new ObjectMapper(), loggerProvider); + return new RPCHandler(vertx, sender, terminationFn, new ObjectMapper(), loggerProvider); }); return onConnect.get();