Skip to content

Commit

Permalink
DRILL-2755: Use and handle InterruptedException during query processing.
Browse files Browse the repository at this point in the history
- Interrupt FragmentExecutor thread as part of FragmentExecutor.cancel()
- Handle InterruptedException in ExternalSortBatch.newSV2(). If the fragment status says
  should not continue, then throw the InterruptedException to caller which returns IterOutcome.STOP
- Add comments reg not handling of InterruptedException in SendingAccountor.waitForSendComplete()
- Handle InterruptedException in OrderedPartitionRecordBatch.getPartitionVectors()
  If interrupted in Thread.sleep calls and fragment status says should not run, then
  return IterOutcome.STOP downstream.
- Interrupt partitioner threads if PartitionerRecordBatch is interrupted while waiting for
  partitioner threads to complete.
- Preserve interrupt status if not handled
- Handle null RecordBatches returned by RawBatchBuffer.getNext() in MergingRecordBatch.buildSchema()
- Change timeout in Foreman to be proportional to the number of intermediate fragments sent instead
  of hard coded limit of 90s.
- Change TimedRunnable to enforce a timeout of 15s per runnable.
  Total timeout is (5s * numOfRunnableTasks) / parallelism.
- Add unit tests
  * Testing cancelling a query interrupts the query fragments which are currently blocked
  * Testing interrupting the partitioner sender which in turn interrupts its helper threads
  * Testing TimedRunanble enforeces timeout for the whole task list.
  • Loading branch information
vkorukanti committed May 10, 2015
1 parent f8e5e61 commit 3a294ab
Show file tree
Hide file tree
Showing 29 changed files with 546 additions and 98 deletions.
Expand Up @@ -34,16 +34,6 @@ public ExtendedLatch(final int count) {
super(count); super(count);
} }


/**
* Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
* state variable or similar.
*
* @return Whether awaitUninterruptibly() should continue ignoring interruptions.
*/
protected boolean ignoreInterruptions() {
return true;
}

/** /**
* Await without interruption for a given time. * Await without interruption for a given time.
* @param waitMillis * @param waitMillis
Expand All @@ -68,21 +58,16 @@ public boolean awaitUninterruptibly(long waitMillis) {
} }


/** /**
* Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the * Await without interruption. In the case of interruption, log a warning and continue to wait.
* output of ignoreInterruptions();
*/ */
public void awaitUninterruptibly() { public void awaitUninterruptibly() {
while (true) { while (true) {
try { try {
await(); await();
return; return;
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
if (ignoreInterruptions()) { // if we're still not ready, the while loop will cause us to wait again
// if we're still not ready, the while loop will cause us to wait again logger.warn("Interrupted while waiting for event latch.", e);
logger.warn("Interrupted while waiting for event latch.", e);
} else {
return;
}
} }
} }
} }
Expand Down
Expand Up @@ -42,13 +42,26 @@ void decrement() {


public synchronized void waitForSendComplete() { public synchronized void waitForSendComplete() {
int waitForBatches = batchesSent.get(); int waitForBatches = batchesSent.get();
boolean isInterrupted = false;
while(waitForBatches != 0) { while(waitForBatches != 0) {
try { try {
wait.acquire(waitForBatches); wait.acquire(waitForBatches);
waitForBatches = batchesSent.addAndGet(-1 * waitForBatches); waitForBatches = batchesSent.addAndGet(-1 * waitForBatches);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// We should always wait for send complete. If we don't, we'll leak memory or have a memory miss when we try
// to send. This should be safe because: network connections should get disconnected and fail a send if a
// node goes down, otherwise, the receiving side connection should always consume from the rpc layer
// (blocking is cooperative and will get triggered before this)
logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e); logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e);

isInterrupted = true;
} }
} }

if (isInterrupted) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
}
} }
} }
Expand Up @@ -70,6 +70,7 @@
import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.FixedWidthVector;
Expand All @@ -88,6 +89,7 @@
*/ */
public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch { public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(MergingRecordBatch.class);


private static final int OUTGOING_BATCH_SIZE = 32 * 1024; private static final int OUTGOING_BATCH_SIZE = 32 * 1024;


Expand Down Expand Up @@ -141,13 +143,20 @@ private RawFragmentBatch getNext(final int providerIndex) throws IOException{
stats.startWait(); stats.startWait();
final RawFragmentBatchProvider provider = fragProviders[providerIndex]; final RawFragmentBatchProvider provider = fragProviders[providerIndex];
try { try {
injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
final RawFragmentBatch b = provider.getNext(); final RawFragmentBatch b = provider.getNext();
if (b != null) { if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount()); stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount(); inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
} }
return b; return b;
} catch(final InterruptedException e) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();

return null;
} finally { } finally {
stats.stopWait(); stats.stopWait();
} }
Expand Down Expand Up @@ -359,6 +368,7 @@ public int compare(final Node node1, final Node node2) {
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) { while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(node.batchId); nextBatch = getNext(node.batchId);
} }

assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId] assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
if (nextBatch == null && !context.shouldContinue()) { if (nextBatch == null && !context.shouldContinue()) {
Expand Down Expand Up @@ -461,6 +471,15 @@ public void buildSchema() {
return; return;
} }
final RawFragmentBatch batch = getNext(i); final RawFragmentBatch batch = getNext(i);
if (batch == null) {
if (!context.shouldContinue()) {
state = BatchState.STOP;
} else {
state = BatchState.DONE;
}

break;
}
if (batch.getHeader().getDef().getFieldCount() == 0) { if (batch.getHeader().getDef().getFieldCount() == 0) {
i++; i++;
continue; continue;
Expand Down
Expand Up @@ -245,6 +245,24 @@ private boolean saveSamples() throws SchemaChangeException, ClassTransformationE


} }


/**
* Wait until the at least the given timeout is expired or interrupted and the fragment status is not runnable.
* @param timeout Timeout in milliseconds.
* @return True if the given timeout is expired. False when interrupted and the fragment status is not runnable.
*/
private boolean waitUntilTimeOut(final long timeout) {
while(true) {
try {
Thread.sleep(timeout);
return true;
} catch (final InterruptedException e) {
if (!context.shouldContinue()) {
return false;
}
}
}
}

/** /**
* This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At * This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At
* that point, the records in the batches are sorted and sampled, and the sampled records are stored in the * that point, the records in the batches are sorted and sampled, and the sampled records are stored in the
Expand All @@ -255,10 +273,7 @@ private boolean saveSamples() throws SchemaChangeException, ClassTransformationE
* @return True is successful. False if failed. * @return True is successful. False if failed.
*/ */
private boolean getPartitionVectors() { private boolean getPartitionVectors() {


try { try {

if (!saveSamples()) { if (!saveSamples()) {
return false; return false;
} }
Expand All @@ -279,14 +294,18 @@ private boolean getPartitionVectors() {
// TODO: this should be polling. // TODO: this should be polling.


if (val < fragmentsBeforeProceed) { if (val < fragmentsBeforeProceed) {
Thread.sleep(10); if (!waitUntilTimeOut(10)) {
return false;
}
} }
for (int i = 0; i < 100 && finalTable == null; i++) { for (int i = 0; i < 100 && finalTable == null; i++) {
finalTable = tableMap.get(finalTableKey); finalTable = tableMap.get(finalTableKey);
if (finalTable != null) { if (finalTable != null) {
break; break;
} }
Thread.sleep(10); if (!waitUntilTimeOut(10)) {
return false;
}
} }
if (finalTable == null) { if (finalTable == null) {
buildTable(); buildTable();
Expand All @@ -302,7 +321,7 @@ private boolean getPartitionVectors() {
partitionVectors.add(w.getValueVector()); partitionVectors.add(w.getValueVector());
} }


} catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) { } catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
kill(false); kill(false);
context.fail(ex); context.fail(ex);
return false; return false;
Expand Down
Expand Up @@ -21,13 +21,16 @@
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.drill.exec.testing.CountDownLatchInjection;
import org.apache.drill.exec.testing.ExecutionControlsInjector;


/** /**
* Decorator class to hide multiple Partitioner existence from the caller * Decorator class to hide multiple Partitioner existence from the caller
Expand All @@ -38,19 +41,22 @@
* totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner) * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
*/ */
public class PartitionerDecorator { public class PartitionerDecorator {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
private static final ExecutionControlsInjector injector =
ExecutionControlsInjector.getInjector(PartitionerDecorator.class);


private List<Partitioner> partitioners; private List<Partitioner> partitioners;
private final OperatorStats stats; private final OperatorStats stats;
private final String tName; private final String tName;
private final String childThreadPrefix; private final String childThreadPrefix;
private final ExecutorService executor; private final ExecutorService executor;
private final FragmentContext context;




public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) { public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
this.partitioners = partitioners; this.partitioners = partitioners;
this.stats = stats; this.stats = stats;
this.context = context;
this.executor = context.getDrillbitContext().getExecutor(); this.executor = context.getDrillbitContext().getExecutor();
this.tName = Thread.currentThread().getName(); this.tName = Thread.currentThread().getName();
this.childThreadPrefix = "Partitioner-" + tName + "-"; this.childThreadPrefix = "Partitioner-" + tName + "-";
Expand Down Expand Up @@ -145,17 +151,42 @@ protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOExce
stats.startWait(); stats.startWait();
final CountDownLatch latch = new CountDownLatch(partitioners.size()); final CountDownLatch latch = new CountDownLatch(partitioners.size());
final List<CustomRunnable> runnables = Lists.newArrayList(); final List<CustomRunnable> runnables = Lists.newArrayList();
final List<Future> taskFutures = Lists.newArrayList();
CountDownLatchInjection testCountDownLatch = null;
try { try {
int i = 0; // To simulate interruption of main fragment thread and interrupting the partition threads, create a
for (final Partitioner part : partitioners ) { // CountDownInject patch. Partitioner threads await on the latch and main fragment thread counts down or
runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, part)); // interrupts waiting threads. This makes sures that we are actually interrupting the blocked partitioner threads.
executor.submit(runnables.get(i++)); testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch");
testCountDownLatch.initialize(1);
for (final Partitioner part : partitioners) {
final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, latch, iface, part, testCountDownLatch);
runnables.add(runnable);
taskFutures.add(executor.submit(runnable));
} }
try {
latch.await(); while (true) {
} catch (InterruptedException e) { try {
throw new RuntimeException(e); // Wait for main fragment interruption.
injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);

// If there is no pause inserted at site "wait-for-fragment-interrupt", release the latch.
injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch").countDown();

latch.await();
break;
} catch (final InterruptedException e) {
// If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
if (!context.shouldContinue()) {
for(Future f : taskFutures) {
f.cancel(true);
}

break;
}
}
} }

IOException excep = null; IOException excep = null;
for (final CustomRunnable runnable : runnables ) { for (final CustomRunnable runnable : runnables ) {
IOException myException = runnable.getException(); IOException myException = runnable.getException();
Expand All @@ -180,8 +211,12 @@ protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOExce
// scale down main stats wait time based on calculated processing time // scale down main stats wait time based on calculated processing time
// since we did not wait for whole duration of above execution // since we did not wait for whole duration of above execution
stats.adjustWaitNanos(-maxProcessTime); stats.adjustWaitNanos(-maxProcessTime);
}


// Done with the latch, close it.
if (testCountDownLatch != null) {
testCountDownLatch.close();
}
}
} }


/** /**
Expand All @@ -190,7 +225,7 @@ protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOExce
* protected is for testing purposes * protected is for testing purposes
*/ */
protected interface GeneralExecuteIface { protected interface GeneralExecuteIface {
public void execute(Partitioner partitioner) throws IOException; void execute(Partitioner partitioner) throws IOException;
} }


/** /**
Expand Down Expand Up @@ -242,17 +277,28 @@ private static class CustomRunnable implements Runnable {
private final CountDownLatch latch; private final CountDownLatch latch;
private final GeneralExecuteIface iface; private final GeneralExecuteIface iface;
private final Partitioner part; private final Partitioner part;
private CountDownLatchInjection testCountDownLatch;

private volatile IOException exp; private volatile IOException exp;


public CustomRunnable(String parentThreadName, CountDownLatch latch, GeneralExecuteIface iface, Partitioner part) { public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
final Partitioner part, CountDownLatchInjection testCountDownLatch) {
this.parentThreadName = parentThreadName; this.parentThreadName = parentThreadName;
this.latch = latch; this.latch = latch;
this.iface = iface; this.iface = iface;
this.part = part; this.part = part;
this.testCountDownLatch = testCountDownLatch;
} }


@Override @Override
public void run() { public void run() {
// Test only - Pause until interrupted by fragment thread
try {
testCountDownLatch.await();
} catch (final InterruptedException e) {
logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
}

final Thread currThread = Thread.currentThread(); final Thread currThread = Thread.currentThread();
final String currThreadName = currThread.getName(); final String currThreadName = currThread.getName();
final OperatorStats localStats = part.getStats(); final OperatorStats localStats = part.getStats();
Expand Down
Expand Up @@ -285,7 +285,8 @@ public void flush(boolean schemaChanged) throws IOException {
// to terminate we need to send at least one batch with "isLastBatch" set to true, so that receiver knows // to terminate we need to send at least one batch with "isLastBatch" set to true, so that receiver knows
// sender has acknowledged the terminate request. After sending the last batch, all further batches are // sender has acknowledged the terminate request. After sending the last batch, all further batches are
// dropped. // dropped.
final boolean isLastBatch = isLast || terminated; // 3. Partitioner thread is interrupted due to cancellation of fragment.
final boolean isLastBatch = isLast || terminated || Thread.currentThread().isInterrupted();


// if the batch is not the last batch and the current recordCount is zero, then no need to send any RecordBatches // if the batch is not the last batch and the current recordCount is zero, then no need to send any RecordBatches
if (!isLastBatch && recordCount == 0) { if (!isLastBatch && recordCount == 0) {
Expand Down
Expand Up @@ -49,9 +49,12 @@
import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.testing.ExecutionControlsInjector;


public class UnorderedReceiverBatch implements CloseableRecordBatch { public class UnorderedReceiverBatch implements CloseableRecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
private final static ExecutionControlsInjector injector =
ExecutionControlsInjector.getInjector(UnorderedReceiverBatch.class);


private final RecordBatchLoader batchLoader; private final RecordBatchLoader batchLoader;
private final RawFragmentBatchProvider fragProvider; private final RawFragmentBatchProvider fragProvider;
Expand Down Expand Up @@ -133,18 +136,31 @@ public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int...
return batchLoader.getValueAccessorById(clazz, ids); return batchLoader.getValueAccessorById(clazz, ids);
} }


private RawFragmentBatch getNextBatch() throws IOException {
try {
injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
return fragProvider.getNext();
} catch(final InterruptedException e) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();

return null;
}
}

@Override @Override
public IterOutcome next() { public IterOutcome next() {
stats.startProcessing(); stats.startProcessing();
try{ try{
RawFragmentBatch batch; RawFragmentBatch batch;
try { try {
stats.startWait(); stats.startWait();
batch = fragProvider.getNext(); batch = getNextBatch();


// skip over empty batches. we do this since these are basically control messages. // skip over empty batches. we do this since these are basically control messages.
while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) { while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
batch = fragProvider.getNext(); batch = getNextBatch();
} }
} finally { } finally {
stats.stopWait(); stats.stopWait();
Expand Down

0 comments on commit 3a294ab

Please sign in to comment.