Skip to content

Commit

Permalink
DRILL-6327: Update unary operators to handle IterOutcome.EMIT
Browse files Browse the repository at this point in the history
            Note: Handles for Non-Blocking Unary operators (like Filter/Project/etc) with EMIT Iter.Outcome

closes #1240
  • Loading branch information
Sorabh Hamirwasia authored and vdiravka committed Apr 29, 2018
1 parent f563f38 commit 2f275d1
Show file tree
Hide file tree
Showing 17 changed files with 1,157 additions and 101 deletions.
Expand Up @@ -81,7 +81,7 @@ protected IterOutcome doWork() {
throw new UnsupportedOperationException(e); throw new UnsupportedOperationException(e);
} }


return IterOutcome.OK; return getFinalOutcome(false);
} }


@Override @Override
Expand Down Expand Up @@ -168,6 +168,9 @@ protected Filterer generateSV2Filterer() throws SchemaChangeException {
final ErrorCollector collector = new ErrorCollectorImpl(); final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList(); final List<TransferPair> transfers = Lists.newArrayList();
final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions()); final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions());
// Uncomment below lines to enable saving generated code file for debugging
// cg.getCodeGenerator().plainJavaCapable(true);
// cg.getCodeGenerator().saveCodeForDebugging(true);


final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector,
context.getFunctionRegistry(), false, unionTypeEnabled); context.getFunctionRegistry(), false, unionTypeEnabled);
Expand Down
Expand Up @@ -61,6 +61,7 @@ private void doTransfers(){
@Override @Override
public void filterBatch(int recordCount) throws SchemaChangeException{ public void filterBatch(int recordCount) throws SchemaChangeException{
if (recordCount == 0) { if (recordCount == 0) {
outgoingSelectionVector.setRecordCount(0);
return; return;
} }
if (! outgoingSelectionVector.allocateNewSafe(recordCount)) { if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
Expand Down
Expand Up @@ -191,7 +191,8 @@ protected void killIncoming(boolean sendUpstream) {
public IterOutcome innerNext() { public IterOutcome innerNext() {
if (hasRemainder) { if (hasRemainder) {
handleRemainder(); handleRemainder();
return IterOutcome.OK; // Check if we are supposed to return EMIT outcome and have consumed entire batch
return getFinalOutcome(hasRemainder);
} }
return super.innerNext(); return super.innerNext();
} }
Expand Down Expand Up @@ -261,7 +262,10 @@ protected IterOutcome doWork() {
} }


flattenMemoryManager.updateOutgoingStats(outputRecords); flattenMemoryManager.updateOutgoingStats(outputRecords);
return IterOutcome.OK;
// Get the final outcome based on hasRemainder since that will determine if all the incoming records were
// consumed in current output batch or not
return getFinalOutcome(hasRemainder);
} }


private void handleRemainder() { private void handleRemainder() {
Expand Down
Expand Up @@ -718,7 +718,7 @@ private void setupNewSchema() throws SchemaChangeException {
/** /**
* Simple method to allocate space for all the vectors in the container. * Simple method to allocate space for all the vectors in the container.
*/ */
private void allocateVectors() {; private void allocateVectors() {
for (VectorWrapper w : container) { for (VectorWrapper w : container) {
RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
colSize.allocateVector(w.getValueVector(), maxOutputRowCount); colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
Expand Down
Expand Up @@ -19,6 +19,7 @@


import java.util.List; import java.util.List;


import com.google.common.base.Preconditions;
import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContext;
Expand All @@ -32,39 +33,84 @@


import com.google.common.collect.Lists; import com.google.common.collect.Lists;


import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;

public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);


private SelectionVector2 outgoingSv; private SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv; private SelectionVector2 incomingSv;
private int recordsToSkip;
private int recordsLeft; // Start offset of the records
private final boolean noEndLimit; private int recordStartOffset;
private boolean skipBatch; private int numberOfRecords;
private boolean first = true; private boolean first = true;
private final List<TransferPair> transfers = Lists.newArrayList(); private final List<TransferPair> transfers = Lists.newArrayList();


public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming)
throws OutOfMemoryException { throws OutOfMemoryException {
super(popConfig, context, incoming); super(popConfig, context, incoming);
outgoingSv = new SelectionVector2(oContext.getAllocator()); outgoingSv = new SelectionVector2(oContext.getAllocator());
recordsToSkip = popConfig.getFirst(); refreshLimitState();
noEndLimit = popConfig.getLast() == null; }
if(!noEndLimit) {
recordsLeft = popConfig.getLast() - recordsToSkip; @Override
} public IterOutcome innerNext() {
skipBatch = false; if (!first && !needMoreRecords(numberOfRecords)) {
outgoingSv.setRecordCount(0);
incoming.kill(true);

IterOutcome upStream = next(incoming);
if (upStream == IterOutcome.OUT_OF_MEMORY) {
return upStream;
}

while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
// Clear the memory for the incoming batch
for (VectorWrapper<?> wrapper : incoming) {
wrapper.getValueVector().clear();
}
upStream = next(incoming);
if (upStream == IterOutcome.OUT_OF_MEMORY) {
return upStream;
}
}
// If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
if (upStream == EMIT) {
refreshLimitState();
return upStream;
}
// other leaf operator behave as before.
return NONE;
}
return super.innerNext();
}

@Override
public SelectionVector2 getSelectionVector2() {
return outgoingSv;
}

@Override
public int getRecordCount() {
return outgoingSv.getCount();
}

@Override
public void close() {
outgoingSv.clear();
super.close();
} }


@Override @Override
protected boolean setupNewSchema() throws SchemaChangeException { protected boolean setupNewSchema() throws SchemaChangeException {
container.zeroVectors(); container.zeroVectors();
transfers.clear(); transfers.clear();



for(final VectorWrapper<?> v : incoming) { for(final VectorWrapper<?> v : incoming) {
final TransferPair pair = v.getValueVector().makeTransferPair( final TransferPair pair = v.getValueVector().makeTransferPair(
container.addOrGet(v.getField(), callBack)); container.addOrGet(v.getField(), callBack));
transfers.add(pair); transfers.add(pair);
} }


Expand All @@ -88,94 +134,115 @@ protected boolean setupNewSchema() throws SchemaChangeException {
return false; return false;
} }


/**
* Gets the outcome to return from super implementation and then in case of EMIT outcome it refreshes the state of
* operator. Refresh is done to again apply limit on all the future incoming batches which will be part of next
* record boundary.
* @param hasRemainder
* @return - IterOutcome to send downstream
*/
@Override @Override
public IterOutcome innerNext() { protected IterOutcome getFinalOutcome(boolean hasRemainder) {
if(!first && !noEndLimit && recordsLeft <= 0) { final IterOutcome outcomeToReturn = super.getFinalOutcome(hasRemainder);
incoming.kill(true);

IterOutcome upStream = next(incoming);
if (upStream == IterOutcome.OUT_OF_MEMORY) {
return upStream;
}


while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { // EMIT outcome means leaf operator is UNNEST, hence refresh the state no matter limit is reached or not.
// Clear the memory for the incoming batch if (outcomeToReturn == EMIT) {
for (VectorWrapper<?> wrapper : incoming) { refreshLimitState();
wrapper.getValueVector().clear();
}
upStream = next(incoming);
if (upStream == IterOutcome.OUT_OF_MEMORY) {
return upStream;
}
}

return IterOutcome.NONE;
} }

return outcomeToReturn;
return super.innerNext();
}

@Override
public SelectionVector2 getSelectionVector2() {
return outgoingSv;
} }


@Override @Override
protected IterOutcome doWork() { protected IterOutcome doWork() {
if (first) { if (first) {
first = false; first = false;
} }
skipBatch = false; final int inputRecordCount = incoming.getRecordCount();
final int recordCount = incoming.getRecordCount(); if (inputRecordCount == 0) {
if (recordCount == 0) { setOutgoingRecordCount(0);
skipBatch = true; return getFinalOutcome(false);
return IterOutcome.OK;
} }

for(final TransferPair tp : transfers) { for(final TransferPair tp : transfers) {
tp.transfer(); tp.transfer();
} }
if (recordCount <= recordsToSkip) { // Check if current input record count is less than start offset. If yes then adjust the start offset since we
recordsToSkip -= recordCount; // have to ignore all these records and return empty batch.
skipBatch = true; if (inputRecordCount <= recordStartOffset) {
recordStartOffset -= inputRecordCount;
setOutgoingRecordCount(0);
} else { } else {
outgoingSv.allocateNew(recordCount); // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
limit(recordCount); // batch to output record batch and later an SV2Remover copies the needed records.
outgoingSv.allocateNew(inputRecordCount);
limit(inputRecordCount);
} }

return getFinalOutcome(false);
return IterOutcome.OK;
} }


private void limit(int recordCount) { /**
final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); * limit call when incoming batch has number of records more than the start offset such that it can produce some
recordsToSkip -= offset; * output records. After first call of this method recordStartOffset should be 0 since we have already skipped the
int fetch; * required number of records as part of first incoming record batch.

* @param inputRecordCount - number of records in incoming batch
if(noEndLimit) { */
fetch = recordCount; private void limit(int inputRecordCount) {
int endRecordIndex;

if (numberOfRecords == Integer.MIN_VALUE) {
endRecordIndex = inputRecordCount;
} else { } else {
fetch = Math.min(recordCount, offset + recordsLeft); endRecordIndex = Math.min(inputRecordCount, recordStartOffset + numberOfRecords);
recordsLeft -= Math.max(0, fetch - offset); numberOfRecords -= Math.max(0, endRecordIndex - recordStartOffset);
} }


int svIndex = 0; int svIndex = 0;
for(int i = offset; i < fetch; svIndex++, i++) { for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
if (incomingSv != null) { if (incomingSv != null) {
outgoingSv.setIndex(svIndex, incomingSv.getIndex(i)); outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
} else { } else {
outgoingSv.setIndex(svIndex, (char) i); outgoingSv.setIndex(svIndex, (char) i);
} }
} }
outgoingSv.setRecordCount(svIndex); outgoingSv.setRecordCount(svIndex);
// Update the start offset
recordStartOffset = 0;
} }


@Override private void setOutgoingRecordCount(int outputCount) {
public int getRecordCount() { outgoingSv.setRecordCount(outputCount);
return skipBatch ? 0 : outgoingSv.getCount();
} }


@Override /**
public void close() { * Method which returns if more output records are needed from LIMIT operator. When numberOfRecords is set to
outgoingSv.clear(); * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so get all the records past start offset.
super.close(); * @return - true - more output records is expected.
* false - limit bound is reached and no more record is expected
*/
private boolean needMoreRecords(int recordsToRead) {
boolean readMore = true;

Preconditions.checkState(recordsToRead == Integer.MIN_VALUE || recordsToRead >= 0,
String.format("Invalid value of numberOfRecords %d inside LimitRecordBatch", recordsToRead));

// Above check makes sure that either numberOfRecords has no bound or if it has bounds then either we have read
// all the records or still left to read some.
// Below check just verifies if there is bound on numberOfRecords and we have read all of it.
if (recordsToRead == 0) {
readMore = false;
}
return readMore;
}

/**
* Reset the states for recordStartOffset and numberOfRecords based on the popConfig passed to the operator.
* This method is called for the outcome EMIT no matter if limit is reached or not.
*/
private void refreshLimitState() {
// Make sure startOffset is non-negative
recordStartOffset = Math.max(0, popConfig.getFirst());
numberOfRecords = (popConfig.getLast() == null) ?
Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
first = true;
} }
} }

0 comments on commit 2f275d1

Please sign in to comment.