Skip to content

Commit

Permalink
DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable.
Browse files Browse the repository at this point in the history
Added configuration/option "drill.jdbc.batch_queue_throttling_threshold".
Applied "drill.jdbc.batch_queue_throttling_threshold" to DrillResultSetImpl.
  • Loading branch information
dsbos authored and parthchandra committed Jun 2, 2015
1 parent 0c69631 commit acf5566
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
Expand Up @@ -87,6 +87,9 @@ public interface ExecConstants {
public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
/** Size of JDBC batch queue (in batches) above which throttling begins. */
public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
"drill.jdbc.batch_queue_throttling_threshold";

/**
* Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
Expand Down
4 changes: 4 additions & 0 deletions exec/java-exec/src/main/resources/drill-module.conf
Expand Up @@ -163,3 +163,7 @@ drill.exec: {
return_error_for_failure_in_cancelled_fragments: false
}
}

drill.jdbc: {
batch_queue_throttling_threshold: 100
}
Expand Up @@ -31,6 +31,7 @@
import net.hydromatic.avatica.AvaticaStatement;

import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
Expand Down Expand Up @@ -63,7 +64,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public SchemaChangeListener changeListener;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public final ResultsListener resultsListener = new ResultsListener();
public final ResultsListener resultsListener;
private final DrillClient client;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
// TODO: Resolve: Since is barely manipulated here in DrillResultSetImpl,
Expand All @@ -77,6 +78,10 @@ public DrillResultSetImpl(DrillStatementImpl statement, AvaticaPrepareResult pre
ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
super(statement, prepareResult, resultSetMetaData, timeZone);
this.statement = statement;
final int batchQueueThrottlingThreshold =
this.getStatement().getConnection().getClient().getConfig().getInt(
ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
resultsListener = new ResultsListener( batchQueueThrottlingThreshold );
DrillConnection c = (DrillConnection) statement.getConnection();
DrillClient client = c.getClient();
currentBatch = new RecordBatchLoader(client.getAllocator());
Expand Down Expand Up @@ -188,12 +193,13 @@ public String getQueryId() {
public static class ResultsListener implements UserResultsListener {
private static final Logger logger = getLogger( ResultsListener.class );

private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
private static volatile int nextInstanceId = 1;

/** (Just for logging.) */
private final int instanceId;

private final int batchQueueThrottlingThreshold;

/** (Just for logging.) */
private volatile QueryId queryId;

Expand Down Expand Up @@ -225,8 +231,14 @@ public static class ResultsListener implements UserResultsListener {
Queues.newLinkedBlockingDeque();


ResultsListener() {
/**
* ...
* @param batchQueueThrottlingThreshold
* queue size threshold for throttling server
*/
ResultsListener( int batchQueueThrottlingThreshold ) {
instanceId = nextInstanceId++;
this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
logger.debug( "[#{}] Query listener created.", instanceId );
}

Expand All @@ -245,7 +257,7 @@ private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
}

/**
* Stops throttling if currently active.
* Stops throttling if currently throttling.
* @return true if actually stopped (was throttling)
*/
private boolean stopThrottlingIfSo() {
Expand Down Expand Up @@ -300,7 +312,9 @@ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {

// We're active; let's add to the queue.
batchQueue.add(result);
if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {

// Throttle server if queue size has exceed threshold.
if (batchQueue.size() > batchQueueThrottlingThreshold ) {
if ( startThrottlingIfNot( throttle ) ) {
logger.debug( "[#{}] Throttling started at queue size {}.",
instanceId, batchQueue.size() );
Expand Down

0 comments on commit acf5566

Please sign in to comment.