From 7d1e81a5ac0d8ca0b6519dd2c90bf3804c3221b1 Mon Sep 17 00:00:00 2001 From: Hanumath Maduri Date: Mon, 9 Oct 2017 13:08:13 -0700 Subject: [PATCH] DRILL-5851: Empty table during a join operation with a non empty table produces cast exception. close apache/drill#1059 --- .../physical/impl/join/HashJoinBatch.java | 21 +++++- .../impl/join/HashJoinProbeTemplate.java | 6 +- .../physical/impl/join/MergeJoinBatch.java | 3 +- .../impl/union/UnionAllRecordBatch.java | 1 + .../IteratorValidatorBatchIterator.java | 1 + .../record/AbstractBinaryRecordBatch.java | 11 +++- .../exec/physical/impl/join/JoinTestBase.java | 63 ++++++++++++++++++ .../impl/join/TestHashJoinAdvanced.java | 23 ++++++- .../impl/join/TestMergeJoinAdvanced.java | 25 ++++++-- .../impl/join/TestNestedLoopJoin.java | 64 +++++++++++++------ 10 files changed, 185 insertions(+), 33 deletions(-) create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 7e2859e5e1a..7b679c0d6f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -181,7 +181,9 @@ protected void buildSchema() throws SchemaChangeException { hyperContainer = new ExpandableHyperContainer(vectors); hjHelper.addNewBatch(0); buildBatchIndex++; - setupHashTable(); + if (isFurtherProcessingRequired(rightUpstream) && this.right.getRecordCount() > 0) { + setupHashTable(); + } hashJoinProbe = setupHashJoinProbe(); // Build the container schema and set the counts for (final VectorWrapper w : container) { @@ -212,7 +214,7 @@ public IterOutcome innerNext() { } // Store the number of records projected - if (!hashTable.isEmpty() || joinType != JoinRelType.INNER) { + if ((hashTable != null && !hashTable.isEmpty()) || joinType != JoinRelType.INNER) { // Allocate the memory for the vectors in the output container allocateVectors(); @@ -305,11 +307,15 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio //Setup the underlying hash table // skip first batch if count is zero, as it may be an empty schema batch - if (right.getRecordCount() == 0) { + if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() == 0) { for (final VectorWrapper w : right) { w.clear(); } rightUpstream = next(right); + if (isFurtherProcessingRequired(rightUpstream) && + right.getRecordCount() > 0 && hashTable == null) { + setupHashTable(); + } } boolean moreData = true; @@ -535,4 +541,13 @@ public void close() { } super.close(); } + + /** + * This method checks to see if join processing should be continued further. + * @param upStream up stream operator status. + * @@return true if up stream status is OK or OK_NEW_SCHEMA otherwise false. + */ + private boolean isFurtherProcessingRequired(IterOutcome upStream) { + return upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 3cdce2fd24e..5c6371a3e20 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -136,7 +136,9 @@ public void executeProbePhase() throws SchemaChangeException { case OK_NEW_SCHEMA: if (probeBatch.getSchema().equals(probeSchema)) { doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, outgoingJoinBatch); - hashTable.updateBatches(); + if (hashTable != null) { + hashTable.updateBatches(); + } } else { throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.", probeSchema, @@ -155,7 +157,7 @@ public void executeProbePhase() throws SchemaChangeException { // Check if we need to drain the next row in the probe side if (getNextRecord) { - if (hashTable != null) { + if (hashTable != null && !hashTable.isEmpty()) { probeIndex = hashTable.containsKey(recordsProcessed, true); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 6d804c5e6db..8ad3f84db60 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -45,7 +45,6 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.common.Comparator; -import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -54,6 +53,7 @@ import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -500,5 +500,4 @@ private LogicalExpression materializeExpression(LogicalExpression expression, It } return materializedExpr; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 761e2724c52..1d1ecb00c53 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -78,6 +78,7 @@ protected void killIncoming(boolean sendUpstream) { protected void buildSchema() throws SchemaChangeException { if (! prefetchFirstBatchFromBothSides()) { + state = BatchState.DONE; return; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index a8ee0dee3da..ac6a462e24a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -137,6 +137,7 @@ private void validateReadState(String operation) { switch (batchState) { case OK: case OK_NEW_SCHEMA: + case NONE: return; default: throw new IllegalStateException( diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java index 1137922ae15..1ce5fdeb553 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java @@ -65,11 +65,20 @@ protected boolean prefetchFirstBatchFromBothSides() { return false; } - if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE) { + if (checkForEarlyFinish()) { state = BatchState.DONE; return false; } return true; } + + /* + * Checks for the operator specific early terminal condition. + * @return true if the further processing can stop. + * false if the further processing is needed. + */ + protected boolean checkForEarlyFinish() { + return (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java new file mode 100644 index 00000000000..6d55a3ba0a6 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/JoinTestBase.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.categories.OperatorTest; +import org.apache.drill.PlanTestBase; +import org.junit.experimental.categories.Category; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + + +@Category(OperatorTest.class) +public class JoinTestBase extends PlanTestBase { + + private static final String testEmptyJoin = "select count(*) as cnt from cp.`employee.json` emp %s join dfs.`dept.json` " + + "as dept on dept.manager = emp.`last_name`"; + + /** + * This method runs a join query with one of the table generated as an + * empty json file. + * @param testDir in which the empty json file is generated. + * @param joinType to be executed. + * @param joinPattern to look for the pattern in the successful run. + * @param result number of the output rows. + */ + public void testJoinWithEmptyFile(File testDir, String joinType, + String joinPattern, long result) throws Exception { + buildFile("dept.json", new String[0], testDir); + String query = String.format(testEmptyJoin, joinType); + testPlanMatchingPatterns(query, new String[]{joinPattern}, new String[]{}); + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(result) + .build().run(); + } + + private void buildFile(String fileName, String[] data, File testDir) throws IOException { + try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) { + for (String line : data) { + out.println(line); + } + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java index 49aefe64149..81104768e6f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java @@ -19,20 +19,22 @@ package org.apache.drill.exec.physical.impl.join; -import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.OperatorTest; import org.apache.drill.categories.UnlikelyTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; - import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; + @Category(OperatorTest.class) -public class TestHashJoinAdvanced extends BaseTestQuery { +public class TestHashJoinAdvanced extends JoinTestBase { + + private static final String HJ_PATTERN = "HashJoin"; + // Have to disable merge join, if this testcase is to test "HASH-JOIN". @BeforeClass @@ -160,4 +162,19 @@ public void testJoinWithMapAndDotField() throws Exception { .baselineValues("1", "2", "1", null, "a") .go(); } + + @Test + public void testHashLeftJoinWithEmptyTable() throws Exception { + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "left outer", HJ_PATTERN, 1155L); + } + + @Test + public void testHashInnerJoinWithEmptyTable() throws Exception { + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", HJ_PATTERN, 0L); + } + + @Test + public void testHashRightJoinWithEmptyTable() throws Exception { + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", HJ_PATTERN, 0L); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java index 310b3312922..488e60a8ec3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java @@ -17,16 +17,15 @@ */ package org.apache.drill.exec.physical.impl.join; -import org.apache.drill.test.BaseTestQuery; -import org.apache.drill.categories.OperatorTest; import org.apache.drill.test.TestTools; +import org.apache.drill.categories.OperatorTest; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.Ignore; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; @@ -38,13 +37,16 @@ import java.util.Random; @Category(OperatorTest.class) -public class TestMergeJoinAdvanced extends BaseTestQuery { +public class TestMergeJoinAdvanced extends JoinTestBase { private static final String LEFT = "merge-join-left.json"; private static final String RIGHT = "merge-join-right.json"; + private static final String MJ_PATTERN = "MergeJoin"; + private static File leftFile; private static File rightFile; + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual. @@ -253,4 +255,19 @@ public void testDrill4196() throws Exception { .baselineValues(6000*800L) .go(); } + + @Test + public void testMergeLeftJoinWithEmptyTable() throws Exception { + testJoinWithEmptyFile(dirTestWatcher.getRootDir(),"left outer", MJ_PATTERN, 1155L); + } + + @Test + public void testMergeInnerJoinWithEmptyTable() throws Exception { + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", MJ_PATTERN, 0L); + } + + @Test + public void testMergeRightJoinWithEmptyTable() throws Exception { + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", MJ_PATTERN, 0L); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java index a67a4848a96..092a1a7ea10 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java @@ -19,21 +19,19 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.drill.categories.OperatorTest; -import org.apache.drill.PlanTestBase; import org.apache.drill.common.exceptions.UserRemoteException; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; - import java.nio.file.Paths; - +import org.apache.drill.exec.planner.physical.PlannerSettings; import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertThat; @Category(OperatorTest.class) -public class TestNestedLoopJoin extends PlanTestBase { +public class TestNestedLoopJoin extends JoinTestBase { - private static String nlpattern = "NestedLoopJoin"; + private static final String NLJ_PATTERN = "NestedLoopJoin"; private static final String DISABLE_HJ = "alter session set `planner.enable_hashjoin` = false"; private static final String ENABLE_HJ = "alter session set `planner.enable_hashjoin` = true"; @@ -86,30 +84,30 @@ public static void setupTestFiles() { @Test public void testNlJoinExists_1_planning() throws Exception { - testPlanMatchingPatterns(testNlJoinExists_1, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(testNlJoinExists_1, new String[]{NLJ_PATTERN}, new String[]{}); } @Test public void testNlJoinNotIn_1_planning() throws Exception { - testPlanMatchingPatterns(testNlJoinNotIn_1, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(testNlJoinNotIn_1, new String[]{NLJ_PATTERN}, new String[]{}); } @Test public void testNlJoinInequality_1() throws Exception { - testPlanMatchingPatterns(testNlJoinInequality_1, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(testNlJoinInequality_1, new String[]{NLJ_PATTERN}, new String[]{}); } @Test public void testNlJoinInequality_2() throws Exception { test(DISABLE_NLJ_SCALAR); - testPlanMatchingPatterns(testNlJoinInequality_2, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(testNlJoinInequality_2, new String[]{NLJ_PATTERN}, new String[]{}); test(ENABLE_NLJ_SCALAR); } @Test public void testNlJoinInequality_3() throws Exception { test(DISABLE_NLJ_SCALAR); - testPlanMatchingPatterns(testNlJoinInequality_3, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(testNlJoinInequality_3, new String[]{NLJ_PATTERN}, new String[]{}); test(ENABLE_NLJ_SCALAR); } @@ -118,7 +116,7 @@ public void testNlJoinAggrs_1_planning() throws Exception { String query = "select total1, total2 from " + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` where l_suppkey between 100 and 200), " + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` where l_suppkey between 200 and 300) "; - testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN}, new String[]{}); } @Test // equality join and scalar right input, hj and mj disabled @@ -128,7 +126,7 @@ public void testNlJoinEqualityScalar_1_planning() throws Exception { + " where n_nationkey < 10)"; test(DISABLE_HJ); test(DISABLE_MJ); - testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN}, new String[]{}); test(ENABLE_HJ); test(ENABLE_MJ); } @@ -141,7 +139,7 @@ public void testNlJoinEqualityScalar_2_planning() throws Exception { test("alter session set `planner.slice_target` = 1"); test(DISABLE_HJ); test(DISABLE_MJ); - testPlanMatchingPatterns(query, new String[]{nlpattern, "BroadcastExchange"}, new String[]{}); + testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, "BroadcastExchange"}, new String[]{}); test(ENABLE_HJ); test(ENABLE_MJ); test("alter session set `planner.slice_target` = 100000"); @@ -154,7 +152,7 @@ public void testNlJoinEqualityNonScalar_1_planning() throws Exception { test(DISABLE_HJ); test(DISABLE_MJ); test(DISABLE_NLJ_SCALAR); - testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN}, new String[]{}); test(ENABLE_HJ); test(ENABLE_MJ); test(ENABLE_NLJ_SCALAR); @@ -169,7 +167,7 @@ public void testNlJoinEqualityNonScalar_2_planning() throws Exception { test(DISABLE_HJ); test(DISABLE_MJ); test(DISABLE_NLJ_SCALAR); - testPlanMatchingPatterns(query, new String[]{nlpattern, "BroadcastExchange"}, new String[]{}); + testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN, "BroadcastExchange"}, new String[]{}); test(ENABLE_HJ); test(ENABLE_MJ); test(ENABLE_NLJ_SCALAR); @@ -274,7 +272,7 @@ public void testNlJoinInnerBetween() throws Exception { try { test(DISABLE_NLJ_SCALAR); String query = String.format(testNlJoinBetween, "INNER"); - testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN}, new String[]{}); testBuilder() .sqlQuery(query) .ordered() @@ -292,7 +290,7 @@ public void testNlJoinLeftBetween() throws Exception { try { test(DISABLE_NLJ_SCALAR); String query = String.format(testNlJoinBetween, "LEFT"); - testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(query, new String[]{NLJ_PATTERN}, new String[]{}); testBuilder() .sqlQuery(query) .ordered() @@ -327,10 +325,40 @@ public void testNlJoinWithLargeRightInputSuccess() throws Exception { try { test(DISABLE_NLJ_SCALAR); test(DISABLE_JOIN_OPTIMIZATION); - testPlanMatchingPatterns(testNlJoinWithLargeRightInput, new String[]{nlpattern}, new String[]{}); + testPlanMatchingPatterns(testNlJoinWithLargeRightInput, new String[]{NLJ_PATTERN}, new String[]{}); } finally { test(RESET_HJ); test(RESET_JOIN_OPTIMIZATION); } } + + @Test + public void testNestedLeftJoinWithEmptyTable() throws Exception { + try { + alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false); + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "left outer", NLJ_PATTERN, 1155L); + } finally { + resetSessionOption(PlannerSettings.HASHJOIN.getOptionName()); + } + } + + @Test + public void testNestedInnerJoinWithEmptyTable() throws Exception { + try { + alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false); + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", NLJ_PATTERN, 0L); + } finally { + resetSessionOption(PlannerSettings.HASHJOIN.getOptionName()); + } + } + + @Test + public void testNestRightJoinWithEmptyTable() throws Exception { + try { + alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false); + testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", NLJ_PATTERN, 0L); + } finally { + resetSessionOption(PlannerSettings.HASHJOIN.getOptionName()); + } + } }