Skip to content
Permalink
Browse files
DRILL-6498: Support for EMIT outcome in ExternalSortBatch
* DRILL-6498: Support for EMIT outcome in ExternalSortBatch
* Updated TestTopNEmitOutcome to use RowSetComparison for comparing expected and actual output batches produced

closes #1323
  • Loading branch information
sohami committed Jun 28, 2018
1 parent c346859 commit 3b34f0dfddade01220cee5cd299a62f012aea70a
Showing 9 changed files with 1,143 additions and 180 deletions.
@@ -29,6 +29,7 @@
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,6 +39,12 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;

import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;

/**
* External sort batch: a sort batch which can spill to disk in
* order to operate within a defined memory footprint.
@@ -186,8 +193,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private enum SortState { START, LOAD, DELIVER, DONE }
private SortState sortState = SortState.START;

private SortConfig sortConfig;

private SortImpl sortImpl;

private IterOutcome lastKnownOutcome;

private boolean firstBatchOfSchema;

private VectorContainer outputWrapperContainer;

private SelectionVector4 outputSV4;

// WARNING: The enum here is used within this class. But, the members of
// this enum MUST match those in the (unmanaged) ExternalSortBatch since
// that is the enum used in the UI to display metrics for the query profile.
@@ -212,19 +229,17 @@ public int metricId() {
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
super(popConfig, context, true);
this.incoming = incoming;

SortConfig sortConfig = new SortConfig(context.getConfig(), context.getOptions());
SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
outputWrapperContainer = new VectorContainer(context.getAllocator());
outputSV4 = new SelectionVector4(context.getAllocator(), 0);
sortConfig = new SortConfig(context.getConfig(), context.getOptions());
oContext.setInjector(injector);
PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
sortImpl = new SortImpl(oContext, sortConfig, spilledRuns, container);
sortImpl = createNewSortImpl();

// The upstream operator checks on record count before we have
// results. Create an empty result set temporarily to handle
// these calls.

resultsIterator = new SortImpl.EmptyResults(container);
resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
}

@Override
@@ -234,7 +249,10 @@ public int getRecordCount() {

@Override
public SelectionVector4 getSelectionVector4() {
return resultsIterator.getSv4();
// Return outputSV4 instead of resultsIterator sv4. For resultsIterator which has null SV4 outputSV4 will be empty.
// But Sort with EMIT outcome will ideally fail in those cases while preparing output container as it's not
// supported currently, like for spilling scenarios
return outputSV4;
}

@Override
@@ -293,10 +311,12 @@ public void buildSchema() {
public IterOutcome innerNext() {
switch (sortState) {
case DONE:
return IterOutcome.NONE;
return NONE;
case START:
case LOAD:
return load();
case LOAD:
resetSortState();
return (sortState == SortState.DONE) ? NONE : load();
case DELIVER:
return nextOutputBatch();
default:
@@ -305,31 +325,17 @@ public IterOutcome innerNext() {
}

private IterOutcome nextOutputBatch() {
// Call next on outputSV4 for it's state to progress in parallel to resultsIterator state
outputSV4.next();

// But if results iterator next returns true that means it has more results to pass
if (resultsIterator.next()) {
container.setRecordCount(getRecordCount());
injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_WHILE_MERGING);
return IterOutcome.OK;
} else {
logger.trace("Deliver phase complete: Returned {} batches, {} records",
resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
sortState = SortState.DONE;

// Close the iterator here to release any remaining resources such
// as spill files. This is important when a query has a join: the
// first branch sort may complete before the second branch starts;
// it may be quite a while after returning the last batch before the
// fragment executor calls this operator's close method.
//
// Note however, that the StreamingAgg operator REQUIRES that the sort
// retain the batches behind an SV4 when doing an in-memory sort because
// the StreamingAgg retains a reference to that data that it will use
// after receiving a NONE result code. See DRILL-5656.

if (! this.retainInMemoryBatchesOnNone) {
resultsIterator.close();
resultsIterator = null;
}
return IterOutcome.NONE;
}
// getFinalOutcome will take care of returning correct IterOutcome when there is no data to pass and for
// EMIT/NONE scenarios
return getFinalOutcome();
}

/**
@@ -343,44 +349,45 @@ private IterOutcome nextOutputBatch() {
private IterOutcome load() {
logger.trace("Start of load phase");

// Clear the temporary container created by
// buildSchema().

container.clear();
// Don't clear the temporary container created by buildSchema() after each load since across EMIT outcome we have
// to maintain the ValueVector references for downstream operators

// Loop over all input batches

IterOutcome result = OK;
for (;;) {
IterOutcome result = loadBatch();

// None means all batches have been read.
result = loadBatch();

if (result == IterOutcome.NONE) {
// NONE/EMIT means all batches have been read at this record boundary
if (result == NONE || result == EMIT) {
break; }

// Any outcome other than OK means something went wrong.
// if result is STOP that means something went wrong.

if (result != IterOutcome.OK) {
if (result == STOP) {
return result; }
}

// Anything to actually sort?

resultsIterator = sortImpl.startMerge();
if (! resultsIterator.next()) {
sortState = SortState.DONE;
return IterOutcome.NONE;
// If there is no records to sort and we got NONE then just return NONE
if (result == NONE) {
sortState = SortState.DONE;
return NONE;
}
}

// sort may have prematurely exited due to shouldContinue() returning false.

if (!context.getExecutorState().shouldContinue()) {
sortState = SortState.DONE;
return IterOutcome.STOP;
return STOP;
}

sortState = SortState.DELIVER;
return IterOutcome.OK_NEW_SCHEMA;
// If we are here that means there is some data to be returned downstream. We have to prepare output container
prepareOutputContainer(resultsIterator);
return getFinalOutcome();
}

/**
@@ -395,22 +402,23 @@ private IterOutcome loadBatch() {
// If this is the very first batch, then AbstractRecordBatch
// already loaded it for us in buildSchema().

IterOutcome upstream;
if (sortState == SortState.START) {
sortState = SortState.LOAD;
upstream = IterOutcome.OK_NEW_SCHEMA;
lastKnownOutcome = OK_NEW_SCHEMA;
} else {
upstream = next(incoming);
lastKnownOutcome = next(incoming);
}
switch (upstream) {
switch (lastKnownOutcome) {
case NONE:
case STOP:
return upstream;
return lastKnownOutcome;
case OK_NEW_SCHEMA:
firstBatchOfSchema = true;
setupSchema();
// Fall through

case OK:
case EMIT:

// Add the batch to the in-memory generation, spilling if
// needed.
@@ -431,9 +439,9 @@ private IterOutcome loadBatch() {
}
break;
default:
throw new IllegalStateException("Unexpected iter outcome: " + upstream);
throw new IllegalStateException("Unexpected iter outcome: " + lastKnownOutcome);
}
return IterOutcome.OK;
return lastKnownOutcome;
}

/**
@@ -503,11 +511,10 @@ public void close() {
resultsIterator = null;
}
} catch (RuntimeException e) {
ex = (ex == null) ? e : ex;
ex = e;
}

// Then close the "guts" of the sort operation.

try {
if (sortImpl != null) {
sortImpl.close();
@@ -522,6 +529,8 @@ public void close() {
// (when closing the operator context) after the super call.

try {
outputWrapperContainer.clear();
outputSV4.clear();
super.close();
} catch (RuntimeException e) {
ex = (ex == null) ? e : ex;
@@ -569,10 +578,122 @@ public static void releaseBatches(RecordBatch incoming) {
}
if (incoming instanceof ExternalSortBatch) {
ExternalSortBatch esb = (ExternalSortBatch) incoming;
if (esb.resultsIterator != null) {
esb.resultsIterator.close();
esb.resultsIterator = null;
esb.releaseResources();
}
}

private void releaseResources() {
// This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
// then release the resources
if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
(sortState == SortState.LOAD)) {

// Close the iterator here to release any remaining resources such
// as spill files. This is important when a query has a join: the
// first branch sort may complete before the second branch starts;
// it may be quite a while after returning the last batch before the
// fragment executor calls this operator's close method.
//
// Note however, that the StreamingAgg operator REQUIRES that the sort
// retain the batches behind an SV4 when doing an in-memory sort because
// the StreamingAgg retains a reference to that data that it will use
// after receiving a NONE result code. See DRILL-5656.
//zeroResources();
if (resultsIterator != null) {
resultsIterator.close();
}
// We only zero vectors for actual output container
outputWrapperContainer.clear();
outputSV4.clear();
container.zeroVectors();
}

// Close sortImpl for this boundary
if (sortImpl != null) {
sortImpl.close();
}
}

/**
* Method to reset sort state after every EMIT outcome is seen to process next batch of incoming records which
* belongs to different record boundary.
*/
private void resetSortState() {
sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
releaseResources();

if (lastKnownOutcome == EMIT) {
sortImpl = createNewSortImpl();
// Set the schema again since with reset we create new instance of SortImpl
sortImpl.setSchema(schema);
resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
}
}

/**
* Based on first batch for this schema or not it either clears off the output container or just zero down the vectors
* Then calls {@link SortResults#updateOutputContainer(VectorContainer, SelectionVector4, IterOutcome, BatchSchema)}
* to populate the output container of sort with results data. It is done this way for the support of EMIT outcome
* where SORT will return results multiple time in same minor fragment so there needs a way to preserve the
* ValueVector references across output batches.
* However it currently only supports SortResults of type EmptyResults and MergeSortWrapper. We don't expect
* spilling to happen in EMIT outcome scenario hence it's not supported now.
* @param sortResults - Final sorted result which contains the container with data
*/
private void prepareOutputContainer(SortResults sortResults) {
if (firstBatchOfSchema) {
container.clear();
} else {
container.zeroVectors();
}
sortResults.updateOutputContainer(container, outputSV4, lastKnownOutcome, schema);
}

/**
* Provides the final IterOutcome which Sort should return downstream with current output batch. It considers
* following cases:
* 1) If it is the first output batch of current known schema then return OK_NEW_SCHEMA to downstream and reset the
* flag firstBatchOfSchema.
* 2) If the current output row count is zero, then return outcome of EMIT or NONE based on the received outcome
* from upstream and also reset the SortState.
* 3) If EMIT is received from upstream and all output rows can fit in current output batch then send it downstream
* with EMIT outcome and set SortState to LOAD for next EMIT boundary. Otherwise if all output rows cannot fit in
* current output batch then send current batch with OK outcome and set SortState to DELIVER.
* 4) In other cases send current output batch with OK outcome and set SortState to DELIVER. This is for cases when
* all the incoming batches are received with OK outcome and EMIT is not seen.
*
* @return - IterOutcome - outcome to send downstream
*/
private IterOutcome getFinalOutcome() {
IterOutcome outcomeToReturn;

// If this is the first output batch for current known schema then return OK_NEW_SCHEMA to downstream
if (firstBatchOfSchema) {
outcomeToReturn = OK_NEW_SCHEMA;
firstBatchOfSchema = false;
sortState = SortState.DELIVER;
} else if (getRecordCount() == 0) { // There is no record to send downstream
outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
resetSortState();
} else if (lastKnownOutcome == EMIT) {
final boolean hasMoreRecords = outputSV4.hasNext();
sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;
outcomeToReturn = hasMoreRecords ? OK : EMIT;
} else {
outcomeToReturn = OK;
sortState = SortState.DELIVER;
}
return outcomeToReturn;
}

/**
* Method to create new instances of SortImpl
* @return SortImpl
*/
private SortImpl createNewSortImpl() {
SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
}
}

0 comments on commit 3b34f0d

Please sign in to comment.