From 082c836157b6adb4e9c9a4a46751c9e5f54a370a Mon Sep 17 00:00:00 2001 From: Aman Sinha Date: Tue, 17 May 2016 14:35:06 -0700 Subject: [PATCH] DRILL-4679: When convert() functions are present, ensure that ProjectRecordBatch produces a schema even for empty result set. Add unit tests Modify doAlloc() to accept record count parameter (addresses review comment) --- .../expr/fn/DrillComplexWriterFuncHolder.java | 4 ++ .../impl/project/ProjectRecordBatch.java | 40 +++++++++-- .../physical/impl/TestConvertFunctions.java | 69 +++++++++++++++++++ 3 files changed, 109 insertions(+), 4 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java index 747a08b26ac..a0bf134b987 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java @@ -44,6 +44,10 @@ public void setReference(FieldReference ref) { this.ref = ref; } + public FieldReference getReference() { + return ref; + } + @Override protected HoldingContainer generateEvalBody(ClassGenerator g, HoldingContainer[] inputVariables, String body, JVar[] workspaceJVars) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 5ba7b5ab4bc..4ad5b8b2c2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -63,6 +63,7 @@ import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntHashSet; @@ -76,12 +77,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { private Projector projector; private List allocationVectors; private List complexWriters; + private List complexExprList; private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; private static final String EMPTY_STRING = ""; private boolean first = true; + private boolean wasNone = false; // whether a NONE iter outcome was already seen private class ClassifierResult { public boolean isStar = false; @@ -121,6 +124,9 @@ protected void killIncoming(final boolean sendUpstream) { @Override public IterOutcome innerNext() { + if (wasNone) { + return IterOutcome.NONE; + } recordCount = 0; if (hasRemainder) { handleRemainder(); @@ -136,6 +142,10 @@ public VectorContainer getOutgoingContainer() { @Override protected IterOutcome doWork() { + if (wasNone) { + return IterOutcome.NONE; + } + int incomingRecordCount = incoming.getRecordCount(); if (first && incomingRecordCount == 0) { @@ -146,6 +156,23 @@ protected IterOutcome doWork() { if (next == IterOutcome.OUT_OF_MEMORY) { outOfMemory = true; return next; + } else if (next == IterOutcome.NONE) { + // since this is first batch and we already got a NONE, need to set up the schema + if (!doAlloc(0)) { + outOfMemory = true; + return IterOutcome.OUT_OF_MEMORY; + } + setValueCount(0); + + // Only need to add the schema for the complex exprs because others should already have + // been setup during setupNewSchema + for (DrillComplexWriterFuncHolder f : complexExprList) { + container.addOrGet(f.getReference().getRootSegment().getPath(), + Types.required(MinorType.MAP), MapVector.class); + } + container.buildSchema(SelectionVectorMode.NONE); + wasNone = true; + return IterOutcome.OK_NEW_SCHEMA; } else if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA) { return next; } @@ -164,7 +191,7 @@ protected IterOutcome doWork() { container.zeroVectors(); - if (!doAlloc()) { + if (!doAlloc(incomingRecordCount)) { outOfMemory = true; return IterOutcome.OUT_OF_MEMORY; } @@ -193,7 +220,7 @@ protected IterOutcome doWork() { private void handleRemainder() { final int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - if (!doAlloc()) { + if (!doAlloc(remainingRecordCount)) { outOfMemory = true; return; } @@ -222,10 +249,10 @@ public void addComplexWriter(final ComplexWriter writer) { complexWriters.add(writer); } - private boolean doAlloc() { + private boolean doAlloc(int recordCount) { //Allocate vv in the allocationVectors. for (final ValueVector v : this.allocationVectors) { - AllocationHelper.allocateNew(v, incoming.getRecordCount()); + AllocationHelper.allocateNew(v, recordCount); } //Allocate vv for complexWriters. @@ -417,6 +444,11 @@ protected boolean setupNewSchema() throws SchemaChangeException { // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); cg.addExpr(expr, false); + if (complexExprList == null) { + complexExprList = Lists.newArrayList(); + } + // save the expr for later for getting schema when input is empty + complexExprList.add((DrillComplexWriterFuncHolder)((DrillFuncHolderExpr)expr).getHolder()); } else { // need to do evaluation. final ValueVector vector = container.addOrGet(outputField, callBack); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java index aab087df4da..8bf65d71880 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java @@ -148,6 +148,75 @@ public void test_JSON_convertTo_empty_list_drill_1416() throws Exception { .go(); } + @Test // DRILL-4679 + public void testConvertFromJson_drill4679() throws Exception { + Object mapVal1 = mapOf("y", "kevin", "z", "paul"); + Object mapVal2 = mapOf("y", "bill", "z", "peter"); + + // right side of union-all produces 0 rows due to FALSE filter, column t.x is a map + String query1 = String.format("select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2010 " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t" + + " where 1 = 0"); + + testBuilder() + .sqlQuery(query1) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", mapVal1, "xyz") + .go(); + + // left side of union-all produces 0 rows due to FALSE filter, column t.x is a map + String query2 = String.format("select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where 1 = 0 " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2010"); + + testBuilder() + .sqlQuery(query2) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", mapVal1, "xyz") + .go(); + + // sanity test where neither side produces 0 rows + String query3 = String.format("select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2010 " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2001"); + + testBuilder() + .sqlQuery(query3) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", mapVal1, "xyz") + .baselineValues("abc", mapVal2, "xyz") + .go(); + + // convert_from() on a list, column t.rl is a repeated list + Object listVal1 = listOf(listOf(2l, 1l), listOf(4l, 6l)); + Object listVal2 = listOf(); // empty + + String query4 = String.format("select 'abc' as col1, convert_from(convert_to(t.rl, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.rl, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t" + + " where 1 = 0"); + + testBuilder() + .sqlQuery(query4) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", listVal1, "xyz") + .baselineValues("abc", listVal2, "xyz") + .baselineValues("abc", listVal1, "xyz") + .baselineValues("abc", listVal1, "xyz") + .go(); + + } + @Test public void testConvertToComplexJSON() throws Exception {