Skip to content

Commit

Permalink
DRILL-5851: Empty table during a join operation with a non empty tabl…
Browse files Browse the repository at this point in the history
…e produces cast exception.

close #1059
  • Loading branch information
Hanumath Maduri authored and Aman Sinha committed Jan 23, 2018
1 parent a2a56af commit 7d1e81a
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 33 deletions.
Expand Up @@ -181,7 +181,9 @@ protected void buildSchema() throws SchemaChangeException {
hyperContainer = new ExpandableHyperContainer(vectors);
hjHelper.addNewBatch(0);
buildBatchIndex++;
setupHashTable();
if (isFurtherProcessingRequired(rightUpstream) && this.right.getRecordCount() > 0) {
setupHashTable();
}
hashJoinProbe = setupHashJoinProbe();
// Build the container schema and set the counts
for (final VectorWrapper<?> w : container) {
Expand Down Expand Up @@ -212,7 +214,7 @@ public IterOutcome innerNext() {
}

// Store the number of records projected
if (!hashTable.isEmpty() || joinType != JoinRelType.INNER) {
if ((hashTable != null && !hashTable.isEmpty()) || joinType != JoinRelType.INNER) {

// Allocate the memory for the vectors in the output container
allocateVectors();
Expand Down Expand Up @@ -305,11 +307,15 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio
//Setup the underlying hash table

// skip first batch if count is zero, as it may be an empty schema batch
if (right.getRecordCount() == 0) {
if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() == 0) {
for (final VectorWrapper<?> w : right) {
w.clear();
}
rightUpstream = next(right);
if (isFurtherProcessingRequired(rightUpstream) &&
right.getRecordCount() > 0 && hashTable == null) {
setupHashTable();
}
}

boolean moreData = true;
Expand Down Expand Up @@ -535,4 +541,13 @@ public void close() {
}
super.close();
}

/**
* This method checks to see if join processing should be continued further.
* @param upStream up stream operator status.
* @@return true if up stream status is OK or OK_NEW_SCHEMA otherwise false.
*/
private boolean isFurtherProcessingRequired(IterOutcome upStream) {
return upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA;
}
}
Expand Up @@ -136,7 +136,9 @@ public void executeProbePhase() throws SchemaChangeException {
case OK_NEW_SCHEMA:
if (probeBatch.getSchema().equals(probeSchema)) {
doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, outgoingJoinBatch);
hashTable.updateBatches();
if (hashTable != null) {
hashTable.updateBatches();
}
} else {
throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
probeSchema,
Expand All @@ -155,7 +157,7 @@ public void executeProbePhase() throws SchemaChangeException {

// Check if we need to drain the next row in the probe side
if (getNextRecord) {
if (hashTable != null) {
if (hashTable != null && !hashTable.isEmpty()) {
probeIndex = hashTable.containsKey(recordsProcessed, true);
}

Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
Expand All @@ -54,6 +53,7 @@
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
Expand Down Expand Up @@ -500,5 +500,4 @@ private LogicalExpression materializeExpression(LogicalExpression expression, It
}
return materializedExpr;
}

}
Expand Up @@ -78,6 +78,7 @@ protected void killIncoming(boolean sendUpstream) {

protected void buildSchema() throws SchemaChangeException {
if (! prefetchFirstBatchFromBothSides()) {
state = BatchState.DONE;
return;
}

Expand Down
Expand Up @@ -137,6 +137,7 @@ private void validateReadState(String operation) {
switch (batchState) {
case OK:
case OK_NEW_SCHEMA:
case NONE:
return;
default:
throw new IllegalStateException(
Expand Down
Expand Up @@ -65,11 +65,20 @@ protected boolean prefetchFirstBatchFromBothSides() {
return false;
}

if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE) {
if (checkForEarlyFinish()) {
state = BatchState.DONE;
return false;
}

return true;
}

/*
* Checks for the operator specific early terminal condition.
* @return true if the further processing can stop.
* false if the further processing is needed.
*/
protected boolean checkForEarlyFinish() {
return (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE);
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.categories.OperatorTest;
import org.apache.drill.PlanTestBase;
import org.junit.experimental.categories.Category;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;


@Category(OperatorTest.class)
public class JoinTestBase extends PlanTestBase {

private static final String testEmptyJoin = "select count(*) as cnt from cp.`employee.json` emp %s join dfs.`dept.json` " +
"as dept on dept.manager = emp.`last_name`";

/**
* This method runs a join query with one of the table generated as an
* empty json file.
* @param testDir in which the empty json file is generated.
* @param joinType to be executed.
* @param joinPattern to look for the pattern in the successful run.
* @param result number of the output rows.
*/
public void testJoinWithEmptyFile(File testDir, String joinType,
String joinPattern, long result) throws Exception {
buildFile("dept.json", new String[0], testDir);
String query = String.format(testEmptyJoin, joinType);
testPlanMatchingPatterns(query, new String[]{joinPattern}, new String[]{});
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(result)
.build().run();
}

private void buildFile(String fileName, String[] data, File testDir) throws IOException {
try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
for (String line : data) {
out.println(line);
}
}
}
}
Expand Up @@ -19,20 +19,22 @@
package org.apache.drill.exec.physical.impl.join;


import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.UnlikelyTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;


@Category(OperatorTest.class)
public class TestHashJoinAdvanced extends BaseTestQuery {
public class TestHashJoinAdvanced extends JoinTestBase {

private static final String HJ_PATTERN = "HashJoin";


// Have to disable merge join, if this testcase is to test "HASH-JOIN".
@BeforeClass
Expand Down Expand Up @@ -160,4 +162,19 @@ public void testJoinWithMapAndDotField() throws Exception {
.baselineValues("1", "2", "1", null, "a")
.go();
}

@Test
public void testHashLeftJoinWithEmptyTable() throws Exception {
testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "left outer", HJ_PATTERN, 1155L);
}

@Test
public void testHashInnerJoinWithEmptyTable() throws Exception {
testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", HJ_PATTERN, 0L);
}

@Test
public void testHashRightJoinWithEmptyTable() throws Exception {
testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", HJ_PATTERN, 0L);
}
}
Expand Up @@ -17,16 +17,15 @@
*/
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.test.TestTools;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;

Expand All @@ -38,13 +37,16 @@
import java.util.Random;

@Category(OperatorTest.class)
public class TestMergeJoinAdvanced extends BaseTestQuery {
public class TestMergeJoinAdvanced extends JoinTestBase {
private static final String LEFT = "merge-join-left.json";
private static final String RIGHT = "merge-join-right.json";
private static final String MJ_PATTERN = "MergeJoin";


private static File leftFile;
private static File rightFile;


@Rule
public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual.

Expand Down Expand Up @@ -253,4 +255,19 @@ public void testDrill4196() throws Exception {
.baselineValues(6000*800L)
.go();
}

@Test
public void testMergeLeftJoinWithEmptyTable() throws Exception {
testJoinWithEmptyFile(dirTestWatcher.getRootDir(),"left outer", MJ_PATTERN, 1155L);
}

@Test
public void testMergeInnerJoinWithEmptyTable() throws Exception {
testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "inner", MJ_PATTERN, 0L);
}

@Test
public void testMergeRightJoinWithEmptyTable() throws Exception {
testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", MJ_PATTERN, 0L);
}
}

0 comments on commit 7d1e81a

Please sign in to comment.