Skip to content
Permalink
Browse files
DRILL-6592: Unnest record batch size is called too frequently
closes #1376
  • Loading branch information
parthchandra authored and sohami committed Jul 13, 2018
1 parent c396ae7 commit cad9aad12ff18a9315b8cce971e27c1b32c48079
Showing 1 changed file with 14 additions and 6 deletions.
@@ -49,6 +49,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);

private Unnest unnest;
private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
// sent. The next iteration, we need to make sure the record batch sizer
// is updated before we process the actual data.
private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
// to keep processing it. Kill may be called by a limit in a subquery that
// requires us to stop processing thecurrent row, but not stop processing
@@ -180,6 +183,12 @@ public IterOutcome innerNext() {
return nextState;
}

if (hasNewSchema) {
memoryManager.update();
hasNewSchema = false;
return doWork();
}

if (hasRemainder) {
return doWork();
}
@@ -194,7 +203,7 @@ public IterOutcome innerNext() {
state = BatchState.NOT_FIRST;
try {
stats.startSetup();
hasRemainder = true; // next call to next will handle the actual data.
hasNewSchema = true; // next call to next will handle the actual data.
logger.debug("First batch received");
schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
// current field metadata for check in subsequent iterations
@@ -213,10 +222,9 @@ public IterOutcome innerNext() {
container.zeroVectors();
// Check if schema has changed
if (lateral.getRecordIndex() == 0) {
boolean isNewSchema = schemaChanged();
stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
if (isNewSchema) {
hasRemainder = true; // next call to next will handle the actual data.
hasNewSchema = schemaChanged();
stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
if (hasNewSchema) {
try {
setupNewSchema();
} catch (SchemaChangeException ex) {
@@ -229,6 +237,7 @@ public IterOutcome innerNext() {
}
// else
unnest.resetGroupIndex();
memoryManager.update();
}
return doWork();
}
@@ -265,7 +274,6 @@ private void setUnnestVector() {

protected IterOutcome doWork() {
Preconditions.checkNotNull(lateral);
memoryManager.update();
unnest.setOutputCount(memoryManager.getOutputRowCount());
final int incomingRecordCount = incoming.getRecordCount();
final int currentRecord = lateral.getRecordIndex();

0 comments on commit cad9aad

Please sign in to comment.