Skip to content
Permalink
Browse files
DRILL-6576: Unnest reports incoming record counts incorrectly
This closes #1362
  • Loading branch information
parthchandra committed Jul 3, 2018
1 parent 0ae7035 commit 62aadda83c68cb3ec06af38336df969958620aa0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
@@ -103,7 +103,7 @@ public final int unnestRecords(final int recordCount) {
innerValueIndex += count;
return count;

}
}

@Override
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
@@ -199,6 +199,7 @@ public IterOutcome innerNext() {
schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
// current field metadata for check in subsequent iterations
setupNewSchema();
stats.batchReceived(0, incoming.getRecordCount(), true);
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
@@ -207,32 +208,28 @@ public IterOutcome innerNext() {
} finally {
stats.stopSetup();
}
// since we never called next on an upstream operator, incoming stats are
// not updated. update input stats explicitly.
stats.batchReceived(0, incoming.getRecordCount(), true);
return IterOutcome.OK_NEW_SCHEMA;
} else {
assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
container.zeroVectors();

// Check if schema has changed
if (lateral.getRecordIndex() == 0 && schemaChanged()) {
hasRemainder = true; // next call to next will handle the actual data.
try {
setupNewSchema();
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
stats.batchReceived(0, incoming.getRecordCount(), true);
return OK_NEW_SCHEMA;
}
if (lateral.getRecordIndex() == 0) {
boolean isNewSchema = schemaChanged();
stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
if (isNewSchema) {
hasRemainder = true; // next call to next will handle the actual data.
try {
setupNewSchema();
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
return OK_NEW_SCHEMA;
}
// else
unnest.resetGroupIndex();
}
stats.batchReceived(0, incoming.getRecordCount(), false);
return doWork();
}

@@ -243,7 +240,8 @@ public VectorContainer getOutgoingContainer() {
return this.container;
}

@SuppressWarnings("resource") private void setUnnestVector() {
@SuppressWarnings("resource")
private void setUnnestVector() {
final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
final RepeatedValueVector vector;
@@ -347,7 +345,8 @@ protected IterOutcome doWork() {
return tp;
}

@Override protected boolean setupNewSchema() throws SchemaChangeException {
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
Preconditions.checkNotNull(lateral);
container.clear();
recordCount = 0;

0 comments on commit 62aadda

Please sign in to comment.