Skip to content

Commit

Permalink
DRILL-6694: NPE in UnnestRecordBatch when query uses a column name no…
Browse files Browse the repository at this point in the history
…t present in data

closes #1434
  • Loading branch information
sohami committed Aug 16, 2018
1 parent b8376cc commit 0ed5683
Showing 1 changed file with 39 additions and 38 deletions.
Expand Up @@ -62,6 +62,8 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
private int remainderIndex = 0;
private int recordCount;
private MaterializedField unnestFieldMetadata;
// Reference of TypedFieldId for Unnest column. It's always set in schemaChanged method and later used by others
private TypedFieldId unnestTypedFieldId;
private final UnnestMemoryManager memoryManager;

public enum Metric implements MetricDef {
Expand Down Expand Up @@ -95,12 +97,8 @@ public void update() {
// Get sizing information for the batch.
setRecordBatchSizer(new RecordBatchSizer(incoming));

final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);

// Get column size of unnest column.

RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(field.getName());
RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(unnestFieldMetadata.getName());

final int rowIdColumnSize = TypeHelper.getSize(rowIdVector.getField().getType());

Expand Down Expand Up @@ -213,22 +211,15 @@ public IterOutcome innerNext() {
container.zeroVectors();
// Check if schema has changed
if (lateral.getRecordIndex() == 0) {
boolean hasNewSchema = schemaChanged();
stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
if (hasNewSchema) {
try {
try {
boolean hasNewSchema = schemaChanged();
stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
if (hasNewSchema) {
setupNewSchema();
hasRemainder = true;
memoryManager.update();
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
return OK_NEW_SCHEMA;
} else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
try {
return OK_NEW_SCHEMA;
} else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
// This means even though there is no schema change for unnest field the reference of unnest field
// ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
// same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
Expand All @@ -237,19 +228,18 @@ public IterOutcome innerNext() {
// pair. It should do for each new left incoming batch.
resetUnnestTransferPair();
container.zeroVectors();
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
} // else
unnest.resetGroupIndex();
memoryManager.update();
} // else
unnest.resetGroupIndex();
memoryManager.update();
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
}
return doWork();
}

}

@Override
Expand All @@ -259,11 +249,10 @@ public VectorContainer getOutgoingContainer() {

@SuppressWarnings("resource")
private void setUnnestVector() {
final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
final MaterializedField field = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
final RepeatedValueVector vector;
final ValueVector inVV =
incoming.getValueAccessorById(field.getValueClass(), typedFieldId.getFieldIds()).getValueVector();
incoming.getValueAccessorById(field.getValueClass(), unnestTypedFieldId.getFieldIds()).getValueVector();

if (!(inVV instanceof RepeatedValueVector)) {
if (incoming.getRecordCount() != 0) {
Expand Down Expand Up @@ -333,10 +322,11 @@ protected IterOutcome doWork() {
* the end of one of the other vectors while we are copying the data of the other vectors alongside each new unnested
* value coming out of the repeated field.)
*/
@SuppressWarnings("resource") private TransferPair getUnnestFieldTransferPair(FieldReference reference) {
final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
final Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
final ValueVector unnestField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector();
@SuppressWarnings("resource")
private TransferPair getUnnestFieldTransferPair(FieldReference reference) {
final int[] typeFieldIds = unnestTypedFieldId.getFieldIds();
final Class<?> vectorClass = incoming.getSchema().getColumn(typeFieldIds[0]).getValueClass();
final ValueVector unnestField = incoming.getValueAccessorById(vectorClass, typeFieldIds).getValueVector();

TransferPair tp = null;
if (unnestField instanceof RepeatedMapVector) {
Expand Down Expand Up @@ -398,9 +388,9 @@ protected boolean setupNewSchema() throws SchemaChangeException {
*
* @return true if the schema has changed, false otherwise
*/
private boolean schemaChanged() {
final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField thisField = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]);
private boolean schemaChanged() throws SchemaChangeException {
unnestTypedFieldId = checkAndGetUnnestFieldId();
final MaterializedField thisField = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
final MaterializedField prevField = unnestFieldMetadata;
Preconditions.checkNotNull(thisField);

Expand Down Expand Up @@ -440,6 +430,17 @@ private void updateStats() {

}

private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException {
final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
if (fieldId == null) {
throw new SchemaChangeException(String.format("Unnest column %s not found inside the incoming record batch. " +
"This may happen if a wrong Unnest column name is used in the query. Please rerun query after fixing that.",
popConfig.getColumn()));
}

return fieldId;
}

@Override
public void close() {
updateStats();
Expand Down

0 comments on commit 0ed5683

Please sign in to comment.