Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Fix thread deadlock issue in RPC handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
Happy0 committed Apr 14, 2019
1 parent b67ae4f commit d762d20
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 100 deletions.
Expand Up @@ -35,7 +35,7 @@ public interface Multiplexer {
*
* @return an async result which will be completed with the result or an error if the request fails.
*/
AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request);
AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException;

/**
* Creates a request which opens a stream (e.g. a 'source' in the protocol docs.)
Expand Down
Expand Up @@ -26,6 +26,8 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -36,15 +38,7 @@
import org.logl.LoggerProvider;

/**
* Handles RPC requests and responses from an active connection to a scuttlebutt node
*
* Note: the public methods on this class are synchronized so that a request is rejected if the connection has been
* closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without
* new incoming requests being added to the maps by threads.
*
* In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of
* concurrency.
*
* Handles RPC requests and responses from an active connection to a scuttlebutt node.
*/
public class RPCHandler implements Multiplexer, ClientHandler {

Expand All @@ -58,6 +52,12 @@ public class RPCHandler implements Multiplexer, ClientHandler {

private boolean closed;

/**
* We run each each update on this executor to update the request state synchronously, and to handle the underlying
* connection closing by failing the in progress requests and not accepting future requests
*/
private final ExecutorService executor = Executors.newSingleThreadExecutor();

/**
* Makes RPC requests over a connection
*
Expand All @@ -80,86 +80,113 @@ public RPCHandler(
}

@Override
public synchronized AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) {
public AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException {

CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete();
Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);

if (closed) {
result.completeExceptionally(new ConnectionClosedException());
}

try {
RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
int requestNumber = message.requestNumber();
awaitingAsyncResponse.put(requestNumber, result);
Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
messageSender.accept(bytes);
CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete();

} catch (JsonProcessingException e) {
result.completeExceptionally(e);
}
Runnable synchronizedAddRequest = () -> {
if (closed) {
result.completeExceptionally(new ConnectionClosedException());
} else {
RPCMessage message = new RPCMessage(bodyBytes);
int requestNumber = message.requestNumber();
awaitingAsyncResponse.put(requestNumber, result);
Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
sendBytes(bytes);
}
};

executor.submit(synchronizedAddRequest);
return result;
}

@Override
public synchronized void openStream(
RPCStreamRequest request,
Function<Runnable, ScuttlebuttStreamHandler> responseSink) throws JsonProcessingException,
ConnectionClosedException {
public void openStream(RPCStreamRequest request, Function<Runnable, ScuttlebuttStreamHandler> responseSink)
throws JsonProcessingException {

if (closed) {
throw new ConnectionClosedException();
}
Bytes bytes = request.toEncodedRpcMessage(objectMapper);

Runnable synchronizedRequest = () -> {

try {
RPCFlag[] rpcFlags = request.getRPCFlags();
RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
RPCMessage message = new RPCMessage(bytes);
int requestNumber = message.requestNumber();

Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
messageSender.accept(bytes);
Bytes bytes1 = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);

Runnable closeStreamHandler = new Runnable() {
@Override
public void run() {

try {
Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber);
messageSender.accept(bytes);
} catch (JsonProcessingException e) {
logger.warn("Unexpectedly could not encode stream end message to JSON.");
}
Runnable closeStreamHandler = () -> {

try {
Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
sendBytes(streamEnd);
} catch (JsonProcessingException e) {
logger.warn("Unexpectedly could not encode stream end message to JSON.");
}

};

ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);

streams.put(requestNumber, scuttlebuttStreamHandler);
} catch (JsonProcessingException ex) {
throw ex;
}
if (closed) {
scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
} else {
streams.put(requestNumber, scuttlebuttStreamHandler);
sendBytes(bytes1);
}


};

executor.submit(synchronizedRequest);
}

@Override
public synchronized void close() {
connectionCloser.run();
public void close() {
executor.submit(connectionCloser);
}

@Override
public synchronized void receivedMessage(Bytes message) {
public void receivedMessage(Bytes message) {

RPCMessage rpcMessage = new RPCMessage(message);
Runnable synchronizedHandleMessage = () -> {
RPCMessage rpcMessage = new RPCMessage(message);

// A negative request number indicates that this is a response, rather than a request that this node
// should service
if (rpcMessage.requestNumber() < 0) {
handleResponse(rpcMessage);
} else {
handleRequest(rpcMessage);
}
// A negative request number indicates that this is a response, rather than a request that this node
// should service
if (rpcMessage.requestNumber() < 0) {
handleResponse(rpcMessage);
} else {
handleRequest(rpcMessage);
}
};

executor.submit(synchronizedHandleMessage);
}

@Override
public void streamClosed() {

Runnable synchronizedCloseStream = () -> {
closed = true;

streams.forEach((key, streamHandler) -> {
streamHandler.onStreamError(new ConnectionClosedException());
});

streams.clear();

awaitingAsyncResponse.forEach((key, value) -> {
if (!value.isDone()) {
value.completeExceptionally(new ConnectionClosedException());
}
});

awaitingAsyncResponse.clear();
};

executor.submit(synchronizedCloseStream);
}

private void handleRequest(RPCMessage rpcMessage) {
Expand Down Expand Up @@ -228,23 +255,8 @@ private void handleResponse(RPCMessage response) {

}

@Override
public void streamClosed() {
this.closed = true;

streams.forEach((key, streamHandler) -> {
streamHandler.onStreamError(new ConnectionClosedException());
});

streams.clear();

awaitingAsyncResponse.forEach((key, value) -> {
if (!value.isDone()) {
value.completeExceptionally(new ConnectionClosedException());
}

});


private void sendBytes(Bytes bytes) {
messageSender.accept(bytes);
}

}
Expand Up @@ -41,7 +41,6 @@
import net.consensys.cava.scuttlebutt.rpc.RPCFunction;
import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;

import java.io.BufferedWriter;
import java.io.File;
Expand Down Expand Up @@ -219,26 +218,22 @@ public void streamTest(@VertxInstance Vertx vertx) throws Exception {

RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params));

try {
handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
@Override
public void onMessage(RPCMessage message) {
System.out.print(message.asString());
}

@Override
public void onStreamEnd() {
streamEnded.complete(null);
}

@Override
public void onStreamError(Exception ex) {

}
});
} catch (ConnectionClosedException e) {
throw e;
}
handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
@Override
public void onMessage(RPCMessage message) {
System.out.print(message.asString());
}

@Override
public void onStreamEnd() {
streamEnded.complete(null);
}

@Override
public void onStreamError(Exception ex) {

}
});

// Wait until the stream is complete
streamEnded.get();
Expand Down

0 comments on commit d762d20

Please sign in to comment.