diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index ca73ac8d9b5..265a801dd4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; -import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -35,9 +34,10 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; +import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection; import org.apache.drill.exec.rpc.ConnectionThrottle; -import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.rpc.RpcBus; +import org.apache.drill.exec.rpc.RpcConnectionHandler; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; @@ -69,9 +69,13 @@ public class QueryResultHandler { private final ConcurrentMap queryIdToResultsListenersMap = Maps.newConcurrentMap(); - public RpcOutcomeListener getWrappedListener(RemoteConnection connection, - UserResultsListener resultsListener) { - return new SubmissionListener(connection, resultsListener); + public RpcOutcomeListener getWrappedListener(UserResultsListener resultsListener) { + return new SubmissionListener(resultsListener); + } + + public RpcConnectionHandler getWrappedConnectionHandler( + final RpcConnectionHandler handler) { + return new ChannelClosedHandler(handler); } /** @@ -84,7 +88,10 @@ public void resultArrived( ByteBuf pBody ) throws RpcException { final QueryId queryId = queryResult.getQueryId(); final QueryState queryState = queryResult.getQueryState(); - logger.debug( "resultArrived: queryState: {}, queryId = {}", queryState, queryId ); + if (logger.isDebugEnabled()) { + logger.debug("resultArrived: queryState: {}, queryId = {}", queryState, + QueryIdHelper.getQueryId(queryId)); + } assert queryResult.hasQueryState() : "received query result without QueryState"; @@ -92,9 +99,6 @@ public void resultArrived( ByteBuf pBody ) throws RpcException { // CANCELED queries are handled the same way as COMPLETED final boolean isTerminalResult; switch ( queryState ) { - case STARTING: - isTerminalResult = false; - break; case FAILED: case CANCELED: case COMPLETED: @@ -154,7 +158,9 @@ public void batchArrived( ConnectionThrottle throttle, final QueryId queryId = queryData.getQueryId(); - logger.debug( "batchArrived: queryId = {}", queryId ); + if (logger.isDebugEnabled()) { + logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId)); + } logger.trace( "batchArrived: batch = {}", batch ); final UserResultsListener resultsListener = newUserResultsListener(queryId); @@ -189,20 +195,10 @@ private UserResultsListener newUserResultsListener(QueryId queryId) { if ( null == resultsListener ) { resultsListener = bl; } - // TODO: Is there a more direct way to detect a Query ID in whatever state this string comparison detects? - if ( queryId.toString().isEmpty() ) { - failAll(); - } } return resultsListener; } - private void failAll() { - for (UserResultsListener l : queryIdToResultsListenersMap.values()) { - l.submissionFailed(UserException.systemError(new RpcException("Received result without QueryId")).build(logger)); - } - } - private static class BufferingResultsListener implements UserResultsListener { private ConcurrentLinkedQueue results = Queues.newConcurrentLinkedQueue(); @@ -272,55 +268,40 @@ public void submissionFailed(UserException ex) { @Override public void queryIdArrived(QueryId queryId) { } - } - private class SubmissionListener extends BaseRpcOutcomeListener { + private final UserResultsListener resultsListener; - private final RemoteConnection connection; - private final ChannelFuture closeFuture; - private final ChannelClosedListener closeListener; private final AtomicBoolean isTerminal = new AtomicBoolean(false); - public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) { - super(); + public SubmissionListener(UserResultsListener resultsListener) { this.resultsListener = resultsListener; - this.connection = connection; - this.closeFuture = connection.getChannel().closeFuture(); - this.closeListener = new ChannelClosedListener(); - closeFuture.addListener(closeListener); - } - - private class ChannelClosedListener implements GenericFutureListener> { - - @Override - public void operationComplete(Future future) throws Exception { - resultsListener.submissionFailed(UserException.connectionError() - .message("Connection %s closed unexpectedly.", connection.getName()) - .build(logger)); - } - } @Override public void failed(RpcException ex) { if (!isTerminal.compareAndSet(false, true)) { + logger.warn("Received multiple responses to run query request."); return; } - closeFuture.removeListener(closeListener); - resultsListener.submissionFailed(UserException.systemError(ex).build(logger)); - + // Although query submission failed, results might have arrived for this query. + // However, the results could not be transferred to this resultListener because + // there is no query id mapped to this resultListener. Look out for the warning + // message from ChannelClosedHandler in the client logs. + resultsListener.submissionFailed(UserException.systemError(ex) + .addContext("Query submission to Drillbit failed.") + .build(logger)); } @Override public void success(QueryId queryId, ByteBuf buf) { if (!isTerminal.compareAndSet(false, true)) { + logger.warn("Received multiple responses to run query request."); return; } - closeFuture.removeListener(closeListener); resultsListener.queryIdArrived(queryId); if (logger.isDebugEnabled()) { logger.debug("Received QueryId {} successfully. Adding results listener {}.", @@ -354,17 +335,57 @@ public void success(QueryId queryId, ByteBuf buf) { @Override public void interrupted(final InterruptedException ex) { - logger.warn("Interrupted while waiting for query results from Drillbit", ex); - if (!isTerminal.compareAndSet(false, true)) { + logger.warn("Received multiple responses to run query request."); return; } - closeFuture.removeListener(closeListener); - // Throw an interrupted UserException? - resultsListener.submissionFailed(UserException.systemError(ex).build(logger)); + resultsListener.submissionFailed(UserException.systemError(ex) + .addContext("The client had been asked to wait as the Drillbit is potentially being over-utilized." + + " But the client was interrupted while waiting.") + .build(logger)); } } + /** + * When a {@link ServerConnection connection} to a server is successfully created, this handler adds a + * listener to that connection that listens to connection closure. If the connection is closed, all active + * {@link UserResultsListener result listeners} are failed. + */ + private class ChannelClosedHandler implements RpcConnectionHandler { + + private final RpcConnectionHandler parentHandler; + + public ChannelClosedHandler(final RpcConnectionHandler parentHandler) { + this.parentHandler = parentHandler; + } + + @Override + public void connectionSucceeded(final ServerConnection connection) { + connection.getChannel().closeFuture().addListener( + new GenericFutureListener>() { + @Override + public void operationComplete(Future future) + throws Exception { + for (final UserResultsListener listener : queryIdToResultsListenersMap.values()) { + listener.submissionFailed(UserException.connectionError() + .message("Connection %s closed unexpectedly. Drillbit down?", + connection.getName()) + .build(logger)); + if (listener instanceof BufferingResultsListener) { + // the appropriate listener will be failed by SubmissionListener#failed + logger.warn("Buffering listener failed before results were transferred to the actual listener."); + } + } + } + }); + parentHandler.connectionSucceeded(connection); + } + + @Override + public void connectionFailed(FailureType type, Throwable t) { + parentHandler.connectionFailed(type, t); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 824e6eb63f9..5ff6a6dc50f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -67,11 +67,11 @@ public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocat } public void submitQuery(UserResultsListener resultsListener, RunQuery query) { - send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class); + send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class); } public void connect(RpcConnectionHandler handler, DrillbitEndpoint endpoint, - UserProperties props, UserBitShared.UserCredentials credentials) { + UserProperties props, UserBitShared.UserCredentials credentials) { UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() .setRpcVersion(UserRpcConfig.RPC_VERSION) .setSupportListening(true) @@ -83,7 +83,8 @@ public void connect(RpcConnectionHandler handler, DrillbitEndp hsBuilder.setProperties(props); } - this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort()); + this.connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler), + hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort()); } @Override