Skip to content

Commit

Permalink
DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold c…
Browse files Browse the repository at this point in the history
…onfigurable.

Cleaned, enhanced DrillResultSet:
- Enhanced ResultsListener logging:
  - Added instance ID; added batch numbers.
  - Added logging at close (pairing with logging at construction).
  - Fixed 2-integer query ID to UUID form.
- Renamed qrb -> qdb; q -> qdb (per recent QueryDataBatch change).
- Added "final" on ResultsListener's logger.

Reduced Avatica-vs.-Drill casting:
- DrillStatementImpl's (Drill)Connection(Impl).
- DrillResultSetImpl's (Drill)Statement(Impl).

Converted a comment in ExecConstants.
  • Loading branch information
dsbos authored and parthchandra committed Jun 2, 2015
1 parent 71199ed commit 0c69631
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 32 deletions.
Expand Up @@ -60,7 +60,8 @@ public interface ExecConstants {
public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories"; public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem"; public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl"; public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl";
public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches) /** incoming buffer size (number of batches) */
public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size";
public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete"; public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
Expand Down
Expand Up @@ -82,9 +82,14 @@ public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection connectio
} }


@Override @Override
public DrillResultSetImpl newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) { public DrillResultSetImpl newResultSet( AvaticaStatement statement,
final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList()); AvaticaPrepareResult prepareResult,
return new DrillResultSetImpl(statement, (DrillPrepareResult) prepareResult, metaData, timeZone); TimeZone timeZone ) {
final ResultSetMetaData metaData =
newResultSetMetaData(statement, prepareResult.getColumnList());
return new DrillResultSetImpl( (DrillStatementImpl) statement,
(DrillPrepareResult) prepareResult,
metaData, timeZone);
} }


@Override @Override
Expand Down
Expand Up @@ -58,6 +58,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);


private final DrillStatementImpl statement;

// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public SchemaChangeListener changeListener; public SchemaChangeListener changeListener;
// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
Expand All @@ -71,17 +73,21 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
public final DrillCursor cursor; public final DrillCursor cursor;
public boolean hasPendingCancelationNotification; public boolean hasPendingCancelationNotification;


public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult, public DrillResultSetImpl(DrillStatementImpl statement, AvaticaPrepareResult prepareResult,
ResultSetMetaData resultSetMetaData, TimeZone timeZone) { ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
super(statement, prepareResult, resultSetMetaData, timeZone); super(statement, prepareResult, resultSetMetaData, timeZone);
this.statement = statement;
DrillConnection c = (DrillConnection) statement.getConnection(); DrillConnection c = (DrillConnection) statement.getConnection();
DrillClient client = c.getClient(); DrillClient client = c.getClient();
// DrillClient client, DrillStatement statement) {
currentBatch = new RecordBatchLoader(client.getAllocator()); currentBatch = new RecordBatchLoader(client.getAllocator());
this.client = client; this.client = client;
cursor = new DrillCursor(this); cursor = new DrillCursor(this);
} }


public DrillStatementImpl getStatement() {
return statement;
}

/** /**
* Throws AlreadyClosedSqlException or QueryCanceledSqlException if this * Throws AlreadyClosedSqlException or QueryCanceledSqlException if this
* ResultSet is closed. * ResultSet is closed.
Expand Down Expand Up @@ -171,21 +177,31 @@ protected DrillResultSetImpl execute() throws SQLException{
} }


public String getQueryId() { public String getQueryId() {
if (resultsListener.queryId != null) { if (resultsListener.getQueryId() != null) {
return QueryIdHelper.getQueryId(resultsListener.queryId); return QueryIdHelper.getQueryId(resultsListener.getQueryId());
} else { } else {
return null; return null;
} }
} }


// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public static class ResultsListener implements UserResultsListener { public static class ResultsListener implements UserResultsListener {
private static Logger logger = getLogger( ResultsListener.class ); private static final Logger logger = getLogger( ResultsListener.class );


private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100; private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
private static volatile int nextInstanceId = 1;


/** (Just for logging.) */
private final int instanceId;

/** (Just for logging.) */
private volatile QueryId queryId; private volatile QueryId queryId;


/** (Just for logging.) */
private int lastReceivedBatchNumber;
/** (Just for logging.) */
private int lastDequeuedBatchNumber;

private volatile UserException executionFailureException; private volatile UserException executionFailureException;


// TODO: Revisit "completed". Determine and document exactly what it // TODO: Revisit "completed". Determine and document exactly what it
Expand All @@ -210,7 +226,8 @@ public static class ResultsListener implements UserResultsListener {




ResultsListener() { ResultsListener() {
logger.debug( "Query listener created." ); instanceId = nextInstanceId++;
logger.debug( "[#{}] Query listener created.", instanceId );
} }


/** /**
Expand Down Expand Up @@ -252,22 +269,25 @@ private boolean releaseIfFirst() {


@Override @Override
public void queryIdArrived(QueryId queryId) { public void queryIdArrived(QueryId queryId) {
logger.debug( "Received query ID: {}.", queryId ); logger.debug( "[#{}] Received query ID: {}.",
instanceId, QueryIdHelper.getQueryId( queryId ) );
this.queryId = queryId; this.queryId = queryId;
} }


@Override @Override
public void submissionFailed(UserException ex) { public void submissionFailed(UserException ex) {
logger.debug( "Received query failure:", ex ); logger.debug( "Received query failure:", instanceId, ex );
this.executionFailureException = ex; this.executionFailureException = ex;
completed = true; completed = true;
close(); close();
logger.info( "Query failed: ", ex ); logger.info( "[#{}] Query failed: ", instanceId, ex );
} }


@Override @Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
logger.debug( "Received query data batch: {}.", result ); lastReceivedBatchNumber++;
logger.debug( "[#{}] Received query data batch #{}: {}.",
instanceId, lastReceivedBatchNumber, result );


// If we're in a closed state, just release the message. // If we're in a closed state, just release the message.
if (closed) { if (closed) {
Expand All @@ -282,7 +302,8 @@ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
batchQueue.add(result); batchQueue.add(result);
if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) { if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
if ( startThrottlingIfNot( throttle ) ) { if ( startThrottlingIfNot( throttle ) ) {
logger.debug( "Throttling started at queue size {}.", batchQueue.size() ); logger.debug( "[#{}] Throttling started at queue size {}.",
instanceId, batchQueue.size() );
} }
} }


Expand All @@ -291,7 +312,7 @@ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {


@Override @Override
public void queryCompleted(QueryState state) { public void queryCompleted(QueryState state) {
logger.debug( "Received query completion: {}.", state ); logger.debug( "[#{}] Received query completion: {}.", instanceId, state );
releaseIfFirst(); releaseIfFirst();
completed = true; completed = true;
} }
Expand All @@ -313,41 +334,50 @@ public QueryDataBatch getNext() throws UserException,
InterruptedException { InterruptedException {
while (true) { while (true) {
if (executionFailureException != null) { if (executionFailureException != null) {
logger.debug( "Dequeued query failure exception: {}.", executionFailureException ); logger.debug( "[#{}] Dequeued query failure exception: {}.",
instanceId, executionFailureException );
throw executionFailureException; throw executionFailureException;
} }
if (completed && batchQueue.isEmpty()) { if (completed && batchQueue.isEmpty()) {
return null; return null;
} else { } else {
QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS); QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
if (q != null) { if (qdb != null) {
assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2; lastDequeuedBatchNumber++;
if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) { logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
instanceId, lastDequeuedBatchNumber, qdb );

// Unthrottle server if queue size has dropped enough below threshold:
if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
|| batchQueue.size() == 0 // (in case threshold < 2)
) {
if ( stopThrottlingIfSo() ) { if ( stopThrottlingIfSo() ) {
logger.debug( "Throttling stopped at queue size {}.", logger.debug( "[#{}] Throttling stopped at queue size {}.",
batchQueue.size() ); instanceId, batchQueue.size() );
} }
} }
logger.debug( "Dequeued query data batch: {}.", q ); return qdb;
return q;
} }
} }
} }
} }


void close() { void close() {
logger.debug( "[#{}] Query listener closing.", instanceId );
closed = true; closed = true;
if ( stopThrottlingIfSo() ) { if ( stopThrottlingIfSo() ) {
logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() ); logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).",
instanceId, batchQueue.size() );
} }
while (!batchQueue.isEmpty()) { while (!batchQueue.isEmpty()) {
QueryDataBatch qrb = batchQueue.poll(); QueryDataBatch qdb = batchQueue.poll();
if (qrb != null && qrb.getData() != null) { if (qdb != null && qdb.getData() != null) {
qrb.getData().release(); qdb.getData().release();
} }
} }
// close may be called before the first result is received and the main thread is blocked waiting // Close may be called before the first result is received and therefore
// for the result. In that case we want to unblock the main thread. // when the main thread is blocked waiting for the result. In that case
// we want to unblock the main thread.
latch.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere? latch.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere?
completed = true; completed = true;
} }
Expand Down
Expand Up @@ -33,9 +33,12 @@
public abstract class DrillStatementImpl extends AvaticaStatement public abstract class DrillStatementImpl extends AvaticaStatement
implements DrillStatement, DrillRemoteStatement { implements DrillStatement, DrillRemoteStatement {


private final DrillConnectionImpl connection;

// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).)
public DrillStatementImpl(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) { public DrillStatementImpl(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
super(connection, resultSetType, resultSetConcurrency, resultSetHoldability); super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
this.connection = connection;
connection.openStatementsRegistry.addStatement(this); connection.openStatementsRegistry.addStatement(this);
} }


Expand All @@ -52,7 +55,7 @@ private void checkNotClosed() throws AlreadyClosedSqlException {


@Override @Override
public DrillConnectionImpl getConnection() { public DrillConnectionImpl getConnection() {
return (DrillConnectionImpl) connection; return connection;
} }


// WORKAROUND: Work around AvaticaStatement's code that wraps _any_ exception, // WORKAROUND: Work around AvaticaStatement's code that wraps _any_ exception,
Expand Down

0 comments on commit 0c69631

Please sign in to comment.