Skip to content
Permalink
Browse files
DRILL-6479: Support EMIT for the Hash Aggr
closes #1311
  • Loading branch information
Ben-Zvi committed Jun 26, 2018
1 parent b92f599 commit 7b0c9034753a8c5035fd1c0f1f84a37b376e6748
Showing 8 changed files with 634 additions and 42 deletions.
@@ -21,7 +21,6 @@
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -86,7 +85,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
"aggrValuesContainer" /* workspace container */, UPDATE_AGGR_INSIDE, UPDATE_AGGR_OUTSIDE, UPDATE_AGGR_INSIDE);


public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) {
super(popConfig, context);
this.incoming = incoming;
wasKilled = false;
@@ -149,14 +148,23 @@ public IterOutcome innerNext() {
// if aggregation is complete and not all records have been output yet
if (aggregator.buildComplete() ||
// or: 1st phase need to return (not fully grouped) partial output due to memory pressure
aggregator.earlyOutput()) {
aggregator.earlyOutput() ||
// or: while handling an EMIT - returning output for that section
aggregator.handlingEmit() ) {
// then output the next batch downstream
HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch();
// if Batch returned, or end of data - then return the appropriate iter outcome
if ( aggOut == HashAggregator.AggIterOutcome.AGG_NONE ) { return IterOutcome.NONE; }
if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; }
// if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming
incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed
// if Batch returned, or end of data, or Emit - then return the appropriate iter outcome
switch ( aggOut ) {
case AGG_NONE:
return IterOutcome.NONE;
case AGG_OK:
return IterOutcome.OK;
case AGG_EMIT:
return IterOutcome.EMIT;
default: // i.e. RESTART
// if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming
incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed
}
}

if (wasKilled) { // if kill() was called before, then finish up
@@ -227,6 +235,8 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException,
ClassGenerator<HashAggregator> cg = top.getRoot();
ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
top.plainJavaCapable(true);
// Uncomment the following line to allow debugging of the template code
// top.saveCodeForDebugging(true);
container.clear();

int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;
@@ -168,6 +168,7 @@ private static class SpilledPartition { public int spilledBatches; public String
private boolean allFlushed = false;
private boolean buildComplete = false;
private boolean handlingSpills = false; // True once starting to process spill files
private boolean handleEmit = false; // true after receiving an EMIT, till finish handling it

private OperatorStats stats = null;
private HashTableStats htStats = new HashTableStats();
@@ -604,32 +605,36 @@ public AggOutcome doWork() {
for (VectorWrapper<?> v : incoming) {
v.getValueVector().clear();
}
//
// Get the NEXT input batch, initially from the upstream, later (if there was a spill)
// from one of the spill files (The spill case is handled differently here to avoid
// collecting stats on the spilled records)
//
long memAllocBeforeNext = allocator.getAllocatedMemory();
if ( handlingSpills ) {
outcome = incoming.next(); // get it from the SpilledRecordBatch
} else {
// Get the next RecordBatch from the incoming (i.e. upstream operator)
outcome = outgoing.next(0, incoming);
}
long memAllocAfterNext = allocator.getAllocatedMemory();
long incomingBatchSize = memAllocAfterNext - memAllocBeforeNext;

// If incoming batch is bigger than our estimate - adjust the estimate to match
if ( estMaxBatchSize < incomingBatchSize) {
logger.debug("Found a bigger next {} batch: {} , prior estimate was: {}, mem allocated {}",handlingSpills ? "spill" : "incoming",
incomingBatchSize, estMaxBatchSize, memAllocAfterNext);
estMaxBatchSize = incomingBatchSize;
}

if (EXTRA_DEBUG_1) {
logger.debug("Received IterOutcome of {}", outcome);
if ( handleEmit ) {
outcome = IterOutcome.NONE; // finished behaving like OK, now behave like NONE
}
else {
//
// Get the NEXT input batch, initially from the upstream, later (if there was a spill)
// from one of the spill files (The spill case is handled differently here to avoid
// collecting stats on the spilled records)
//
long memAllocBeforeNext = allocator.getAllocatedMemory();
if (handlingSpills) {
outcome = incoming.next(); // get it from the SpilledRecordBatch
} else {
// Get the next RecordBatch from the incoming (i.e. upstream operator)
outcome = outgoing.next(0, incoming);
}
long memAllocAfterNext = allocator.getAllocatedMemory();
long incomingBatchSize = memAllocAfterNext - memAllocBeforeNext;

// If incoming batch is bigger than our estimate - adjust the estimate to match
if (estMaxBatchSize < incomingBatchSize) {
logger.debug("Found a bigger next {} batch: {} , prior estimate was: {}, mem allocated {}", handlingSpills ? "spill" : "incoming", incomingBatchSize, estMaxBatchSize, memAllocAfterNext);
estMaxBatchSize = incomingBatchSize;
}

if (EXTRA_DEBUG_1) {
logger.debug("Received IterOutcome of {}", outcome);
}
}
// Handle various results from getting the next batch
switch (outcome) {
case OUT_OF_MEMORY:
@@ -644,6 +649,10 @@ public AggOutcome doWork() {
// TODO: new schema case needs to be handled appropriately
return AggOutcome.UPDATE_AGGREGATOR;

case EMIT:
handleEmit = true;
// remember EMIT, but continue like handling OK

case OK:
currentBatchRecordCount = incoming.getRecordCount(); // size of next batch

@@ -657,21 +666,36 @@ public AggOutcome doWork() {
case NONE:
resetIndex(); // initialize index (in case spill files need to be processed)

buildComplete = true;
// Either flag buildComplete or handleEmit (or earlyOutput) would cause returning of
// the outgoing batch downstream (see innerNext() in HashAggBatch).
buildComplete = true ; // now should go and return outgoing

if ( handleEmit ) {
buildComplete = false; // This was not a real NONE - more incoming is expected
// don't aggregate this incoming batch again (in the loop above; when doWork() is called again)
currentBatchRecordCount = 0;
}
updateStats(htables);

// output the first batch; remaining batches will be output
// in response to each next() call by a downstream operator
AggIterOutcome aggOutcome = outputCurrentBatch();

if ( aggOutcome == AggIterOutcome.AGG_RESTART ) {
// Output of first batch returned a RESTART (all new partitions were spilled)
return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition
switch ( aggOutcome ) {
case AGG_RESTART:
// Output of first batch returned a RESTART (all new partitions were spilled)
return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition
case AGG_EMIT:
// Following an incoming EMIT, if the output was only a single batch
// outcome is set to IterOutcome.EMIT;
break;
case AGG_NONE: // no output
break;
default:
// Regular output (including after EMIT, when more output batches are planned)
outcome = IterOutcome.OK;
}

if ( aggOutcome != AggIterOutcome.AGG_NONE ) { outcome = IterOutcome.OK; }

return AggOutcome.RETURN_OUTCOME;

case STOP:
@@ -750,6 +774,7 @@ private void allocateOutgoing(int records) {
logger.trace("Output values allocated {} but the estimate was only {}. Adjusting ...",memAdded,estOutgoingAllocSize);
estOutgoingAllocSize = memAdded;
}
outContainer.setRecordCount(records);
// try to restore the reserve
restoreReservedMemory();
}
@@ -1011,6 +1036,23 @@ protected BatchHolder newBatchHolder() {
@Override
public AggIterOutcome outputCurrentBatch() {

// Handle the case of an EMIT with an empty batch
if ( handleEmit && ( batchHolders == null || batchHolders[0].size() == 0 ) ) {
lastBatchOutputCount = 0; // empty
allocateOutgoing(0);
for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(0);
}
outgoing.getContainer().setRecordCount(0);
// When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
this.outcome = IterOutcome.EMIT;
handleEmit = false; // finish handling EMIT
if ( outBatchIndex != null ) {
outBatchIndex[0] = 0; // reset, for the next EMIT
}
return AggIterOutcome.AGG_EMIT;
}

// when incoming was an empty batch, just finish up
if ( schema == null ) {
logger.trace("Incoming was empty; output is an empty batch.");
@@ -1164,6 +1206,13 @@ public AggIterOutcome outputCurrentBatch() {
outBatchIndex[earlyPartition] = 0; // reset, for next time
earlyOutput = false ; // done with early output
}
else if ( handleEmit ) {
// When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
this.outcome = IterOutcome.EMIT;
handleEmit = false; // finished handling EMIT
outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
return AggIterOutcome.AGG_EMIT;
}
else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ?

allFlushed = true; // next next() call will return NONE
@@ -1187,6 +1236,12 @@ public boolean allFlushed() {
public boolean buildComplete() {
return buildComplete;
}

@Override
public boolean handlingEmit() {
return handleEmit;
}

@Override
public boolean earlyOutput() { return earlyOutput; }

@@ -1464,7 +1519,8 @@ private void spillIfNeeded(int currentPartition, boolean forceSpill) {
* @param htables
*/
private void updateStats(HashTable[] htables) {
if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
if ( cycleNum > 0 || // These stats are only for before processing spilled files
handleEmit ) { return; } // and no stats collecting when handling an EMIT
long numSpilled = 0;
HashTableStats newStats = new HashTableStats();
// sum the stats from all the partitions
@@ -43,8 +43,8 @@ enum AggOutcome {
}

// For returning results from outputCurrentBatch
// OK - batch returned, NONE - end of data, RESTART - call again
enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
// OK - batch returned, NONE - end of data, RESTART - call again, EMIT - like OK but EMIT
enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART , AGG_EMIT }

void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
@@ -61,6 +61,8 @@ void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentConte

boolean buildComplete();

boolean handlingEmit();

AggIterOutcome outputCurrentBatch();

boolean earlyOutput();
@@ -259,7 +259,7 @@ enum IterOutcome {
*
* @return The internal vector container
*/
public VectorContainer getContainer();
VectorContainer getContainer();

/**
* Gets the value vector type and ID for the given schema path. The

0 comments on commit 7b0c903

Please sign in to comment.