Skip to content

Commit

Permalink
DRILL-6089: Removed ordering trait from HashJoin in planner and verif…
Browse files Browse the repository at this point in the history
…ied the planner does not assume HashJoin preserves ordering.

closes #1117
  • Loading branch information
ilooner authored and vdiravka committed Feb 16, 2018
1 parent 20185c9 commit 24a7acd
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 18 deletions.
Expand Up @@ -116,7 +116,6 @@ protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
}
}


// Create join plan with both left and right children hash distributed. If the physical join type
// is MergeJoin, a collation must be provided for both left and right child and the plan will contain
// sort converter if necessary to provide the collation.
Expand All @@ -126,8 +125,6 @@ private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
RelCollation collationLeft, RelCollation collationRight,
DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition) throws InvalidRelException {

//DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
//DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
RelTraitSet traitsLeft = null;
RelTraitSet traitsRight = null;

Expand All @@ -146,7 +143,8 @@ private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
DrillJoinRelBase newJoin = null;

if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
newJoin = new HashJoinPrel(join.getCluster(), traitsLeft,
final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call);
newJoin = new HashJoinPrel(join.getCluster(), traitSet,
convertedLeft, convertedRight, join.getCondition(),
join.getJoinType());

Expand Down Expand Up @@ -236,7 +234,8 @@ public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws
call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
convertedRight, joinCondition, join.getJoinType()));
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, convertedLeft,
convertedRight, joinCondition, join.getJoinType()));
} else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
Expand All @@ -245,5 +244,4 @@ public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws
}

}

}
Expand Up @@ -25,6 +25,7 @@
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
Expand Down Expand Up @@ -327,6 +328,20 @@ public static RelTraitSet fixTraits(RelOptPlanner cluster, RelTraitSet set) {
}
}

// DRILL-6089 make sure no collations are added to HashJoin
public static RelTraitSet removeCollation(RelTraitSet traitSet, RelOptRuleCall call)
{
RelTraitSet newTraitSet = call.getPlanner().emptyTraitSet();

for (RelTrait trait: traitSet) {
if (!trait.getTraitDef().getTraitClass().equals(RelCollation.class)) {
newTraitSet = newTraitSet.plus(trait);
}
}

return newTraitSet;
}

public static class InputRefRemap {
private int oldIndex;
private int newIndex;
Expand Down
37 changes: 28 additions & 9 deletions exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
Expand Up @@ -80,28 +80,47 @@ public static void testPhysicalPlan(String sql, String... expectedSubstrs)
* planning process throws an exception
*/
public static void testPlanMatchingPatterns(String query, String[] expectedPatterns, String[] excludedPatterns)
throws Exception {
throws Exception {
testPlanMatchingPatterns(query, stringsToPatterns(expectedPatterns), stringsToPatterns(excludedPatterns));
}

public static void testPlanMatchingPatterns(String query, Pattern[] expectedPatterns, Pattern[] excludedPatterns)
throws Exception {
final String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT);

// Check and make sure all expected patterns are in the plan
if (expectedPatterns != null) {
for (final String s : expectedPatterns) {
final Pattern p = Pattern.compile(s);
final Matcher m = p.matcher(plan);
assertTrue(EXPECTED_NOT_FOUND + s +"\n" + plan, m.find());
for (final Pattern expectedPattern: expectedPatterns) {
final Matcher m = expectedPattern.matcher(plan);
assertTrue(EXPECTED_NOT_FOUND + expectedPattern.pattern() +"\n" + plan, m.find());
}
}

// Check and make sure all excluded patterns are not in the plan
if (excludedPatterns != null) {
for (final String s : excludedPatterns) {
final Pattern p = Pattern.compile(s);
final Matcher m = p.matcher(plan);
assertFalse(UNEXPECTED_FOUND + s +"\n" + plan, m.find());
for (final Pattern excludedPattern: excludedPatterns) {
final Matcher m = excludedPattern.matcher(plan);
assertFalse(UNEXPECTED_FOUND + excludedPattern.pattern() +"\n" + plan, m.find());
}
}
}

private static Pattern[] stringsToPatterns(String[] strings)
{
if (strings == null) {
return null;
}

final Pattern[] patterns = new Pattern[strings.length];

for (int index = 0; index < strings.length; index++) {
final String string = strings[index];
patterns[index] = Pattern.compile(string);
}

return patterns;
}

/**
* Runs an explain plan including attributes query and check for expected regex patterns
* (in optiq text format), also ensure excluded patterns are not found. Either list can
Expand Down
Expand Up @@ -18,7 +18,6 @@

package org.apache.drill.exec.physical.impl.join;


import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.exec.ExecConstants;
Expand All @@ -31,8 +30,7 @@
import java.io.File;
import java.io.FileWriter;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;

import java.util.regex.Pattern;

@Category(OperatorTest.class)
public class TestHashJoinAdvanced extends JoinTestBase {
Expand All @@ -41,6 +39,8 @@ public class TestHashJoinAdvanced extends JoinTestBase {
@BeforeClass
public static void disableMergeJoin() throws Exception {
dirTestWatcher.copyResourceToRoot(Paths.get("join", "empty_part"));
dirTestWatcher.copyFileToRoot(Paths.get("sample-data", "region.parquet"));
dirTestWatcher.copyFileToRoot(Paths.get("sample-data", "nation.parquet"));
test(DISABLE_MJ);
}

Expand Down Expand Up @@ -197,4 +197,14 @@ public void emptyPartTest() throws Exception {
BaseTestQuery.resetSessionOption(ExecConstants.SLICE_TARGET);
}
}

@Test // DRILL-6089
public void testJoinOrdering() throws Exception {
final String query = "select * from dfs.`sample-data/nation.parquet` nation left outer join " +
"(select * from dfs.`sample-data/region.parquet`) " +
"as region on region.r_regionkey = nation.n_nationkey order by nation.n_name desc";

final Pattern sortHashJoinPattern = Pattern.compile(".*Sort.*HashJoin", Pattern.DOTALL);
testPlanMatchingPatterns(query, new Pattern[]{sortHashJoinPattern}, null);
}
}

0 comments on commit 24a7acd

Please sign in to comment.