Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

Expand All @@ -35,9 +34,10 @@
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;

Expand Down Expand Up @@ -69,9 +69,13 @@ public class QueryResultHandler {
private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
Maps.newConcurrentMap();

public RpcOutcomeListener<QueryId> getWrappedListener(RemoteConnection connection,
UserResultsListener resultsListener) {
return new SubmissionListener(connection, resultsListener);
public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
return new SubmissionListener(resultsListener);
}

public RpcConnectionHandler<ServerConnection> getWrappedConnectionHandler(
final RpcConnectionHandler<ServerConnection> handler) {
return new ChannelClosedHandler(handler);
}

/**
Expand All @@ -84,17 +88,17 @@ public void resultArrived( ByteBuf pBody ) throws RpcException {
final QueryId queryId = queryResult.getQueryId();
final QueryState queryState = queryResult.getQueryState();

logger.debug( "resultArrived: queryState: {}, queryId = {}", queryState, queryId );
if (logger.isDebugEnabled()) {
logger.debug("resultArrived: queryState: {}, queryId = {}", queryState,
QueryIdHelper.getQueryId(queryId));
}

assert queryResult.hasQueryState() : "received query result without QueryState";

final boolean isFailureResult = QueryState.FAILED == queryState;
// CANCELED queries are handled the same way as COMPLETED
final boolean isTerminalResult;
switch ( queryState ) {
case STARTING:
isTerminalResult = false;
break;
case FAILED:
case CANCELED:
case COMPLETED:
Expand Down Expand Up @@ -154,7 +158,9 @@ public void batchArrived( ConnectionThrottle throttle,

final QueryId queryId = queryData.getQueryId();

logger.debug( "batchArrived: queryId = {}", queryId );
if (logger.isDebugEnabled()) {
logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId));
}
logger.trace( "batchArrived: batch = {}", batch );

final UserResultsListener resultsListener = newUserResultsListener(queryId);
Expand Down Expand Up @@ -189,20 +195,10 @@ private UserResultsListener newUserResultsListener(QueryId queryId) {
if ( null == resultsListener ) {
resultsListener = bl;
}
// TODO: Is there a more direct way to detect a Query ID in whatever state this string comparison detects?
if ( queryId.toString().isEmpty() ) {
failAll();
}
}
return resultsListener;
}

private void failAll() {
for (UserResultsListener l : queryIdToResultsListenersMap.values()) {
l.submissionFailed(UserException.systemError(new RpcException("Received result without QueryId")).build(logger));
}
}

private static class BufferingResultsListener implements UserResultsListener {

private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
Expand Down Expand Up @@ -272,55 +268,40 @@ public void submissionFailed(UserException ex) {
@Override
public void queryIdArrived(QueryId queryId) {
}

}


private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {

private final UserResultsListener resultsListener;
private final RemoteConnection connection;
private final ChannelFuture closeFuture;
private final ChannelClosedListener closeListener;
private final AtomicBoolean isTerminal = new AtomicBoolean(false);

public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) {
super();
public SubmissionListener(UserResultsListener resultsListener) {
this.resultsListener = resultsListener;
this.connection = connection;
this.closeFuture = connection.getChannel().closeFuture();
this.closeListener = new ChannelClosedListener();
closeFuture.addListener(closeListener);
}

private class ChannelClosedListener implements GenericFutureListener<Future<Void>> {

@Override
public void operationComplete(Future<Void> future) throws Exception {
resultsListener.submissionFailed(UserException.connectionError()
.message("Connection %s closed unexpectedly.", connection.getName())
.build(logger));
}

}

@Override
public void failed(RpcException ex) {
if (!isTerminal.compareAndSet(false, true)) {
logger.warn("Received multiple responses to run query request.");
return;
}

closeFuture.removeListener(closeListener);
resultsListener.submissionFailed(UserException.systemError(ex).build(logger));

// Although query submission failed, results might have arrived for this query.
// However, the results could not be transferred to this resultListener because
// there is no query id mapped to this resultListener. Look out for the warning
// message from ChannelClosedHandler in the client logs.
resultsListener.submissionFailed(UserException.systemError(ex)
.addContext("Query submission to Drillbit failed.")
.build(logger));
}

@Override
public void success(QueryId queryId, ByteBuf buf) {
if (!isTerminal.compareAndSet(false, true)) {
logger.warn("Received multiple responses to run query request.");
return;
}

closeFuture.removeListener(closeListener);
resultsListener.queryIdArrived(queryId);
if (logger.isDebugEnabled()) {
logger.debug("Received QueryId {} successfully. Adding results listener {}.",
Expand Down Expand Up @@ -354,17 +335,57 @@ public void success(QueryId queryId, ByteBuf buf) {

@Override
public void interrupted(final InterruptedException ex) {
logger.warn("Interrupted while waiting for query results from Drillbit", ex);

if (!isTerminal.compareAndSet(false, true)) {
logger.warn("Received multiple responses to run query request.");
return;
}

closeFuture.removeListener(closeListener);

// Throw an interrupted UserException?
resultsListener.submissionFailed(UserException.systemError(ex).build(logger));
resultsListener.submissionFailed(UserException.systemError(ex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't use a "system error" when we want to display a proper error message. System errors are meant for unexpected errors that don't have a "nice" error message yet.
This may require adding a new "client" user exception type, to make it easier to differentiate errors that happen on the client side.
This is may be too much of a change for this pull request, so it's fine if you just create a JIRA to address this change later, as I think there are other places in the client code that could also throw a client error instead of a system error.

.addContext("The client had been asked to wait as the Drillbit is potentially being over-utilized." +
" But the client was interrupted while waiting.")
.build(logger));
}
}

/**
* When a {@link ServerConnection connection} to a server is successfully created, this handler adds a
* listener to that connection that listens to connection closure. If the connection is closed, all active
* {@link UserResultsListener result listeners} are failed.
*/
private class ChannelClosedHandler implements RpcConnectionHandler<ServerConnection> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submissions are asynchronous; no thread is waiting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I did not consider back pressure. The warning message is misleading. I'll update the warning and user exception message.


private final RpcConnectionHandler<ServerConnection> parentHandler;

public ChannelClosedHandler(final RpcConnectionHandler<ServerConnection> parentHandler) {
this.parentHandler = parentHandler;
}

@Override
public void connectionSucceeded(final ServerConnection connection) {
connection.getChannel().closeFuture().addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
for (final UserResultsListener listener : queryIdToResultsListenersMap.values()) {
listener.submissionFailed(UserException.connectionError()
.message("Connection %s closed unexpectedly. Drillbit down?",
connection.getName())
.build(logger));
if (listener instanceof BufferingResultsListener) {
// the appropriate listener will be failed by SubmissionListener#failed
logger.warn("Buffering listener failed before results were transferred to the actual listener.");
}
}
}
});
parentHandler.connectionSucceeded(connection);
}

@Override
public void connectionFailed(FailureType type, Throwable t) {
parentHandler.connectionFailed(type, t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocat
}

public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}

public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint,
UserProperties props, UserBitShared.UserCredentials credentials) {
UserProperties props, UserBitShared.UserCredentials credentials) {
UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
Expand All @@ -83,7 +83,8 @@ public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndp
hsBuilder.setProperties(props);
}

this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
this.connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler),
hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
}

@Override
Expand Down