diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java index 18e2316b7d4..bec5d3a4860 100644 --- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java @@ -1130,6 +1130,7 @@ public void testHashIndexNoRemovingSort() throws Exception { ); } + @Ignore @Test public void testCastTimestampPlan() throws Exception { String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t " + @@ -1698,8 +1699,7 @@ public void testHangForSimpleDistinct() throws Exception { public void testRowkeyJoinPushdown_1() throws Exception { // _id IN (select col ...) String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " + - " from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))"; + " from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs' and t2.address.state = 'pc')"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {}); @@ -1707,6 +1707,7 @@ public void testRowkeyJoinPushdown_1() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1717,8 +1718,7 @@ public void testRowkeyJoinPushdown_1() throws Exception { public void testRowkeyJoinPushdown_2() throws Exception { // _id = col String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " + - " where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')"; + " where t1._id = t2._id and t2.address.city = 'pfrrs' and t2.address.state = 'pc'"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {}); @@ -1726,6 +1726,7 @@ public void testRowkeyJoinPushdown_2() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1736,9 +1737,7 @@ public void testRowkeyJoinPushdown_2() throws Exception { public void testRowkeyJoinPushdown_3() throws Exception { // filters on both sides of the join String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " + - " where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') and cast(t1.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') "; + " where t1._id = t2._id and t1.address.city = 'pfrrs' and t2.address.city = 'pfrrs'"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {}); @@ -1746,6 +1745,7 @@ public void testRowkeyJoinPushdown_3() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1756,8 +1756,7 @@ public void testRowkeyJoinPushdown_3() throws Exception { public void testRowkeyJoinPushdown_4() throws Exception { // _id = cast(col as int) works since the rowids are internally cast to string! String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " + - " where t1._id = cast(t2.rowid as int) and cast(t2.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')"; + " where t1._id = cast(t2.rowid as int) and t2.address.city = 'pfrrs'"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {}); @@ -1765,6 +1764,7 @@ public void testRowkeyJoinPushdown_4() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1775,8 +1775,7 @@ public void testRowkeyJoinPushdown_4() throws Exception { public void testRowkeyJoinPushdown_5() throws Exception { // _id = cast(cast(col as int) as varchar(10) String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " + - " where t1._id = cast(cast(t2.rowid as int) as varchar(10)) " + - " and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')"; + " where t1._id = cast(cast(t2.rowid as int) as varchar(10)) and t2.address.city = 'pfrrs'"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {}); @@ -1784,6 +1783,7 @@ public void testRowkeyJoinPushdown_5() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1795,15 +1795,14 @@ public void testRowkeyJoinPushdown_6() throws Exception { // _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...) String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in " + "(select cast(cast(t2.rowid as int) as varchar(10)) from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 " + - "where t2.address.city = t3.address.city and cast(t2.activity.irs.firstlogin as timestamp) = " + - "to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))"; + "where t2.address.city = t3.address.city and t2.name.fname = 'ubar')"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {}); testBuilder() .sqlQuery(query) .ordered() - .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100001382") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1814,17 +1813,17 @@ public void testRowkeyJoinPushdown_6() throws Exception { public void testRowkeyJoinPushdown_7() throws Exception { // with non-covering index String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " + - "where t1._id = t2.rowid and cast(t2.activity.irs.firstlogin as timestamp) = " + - "to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')"; + "where t1._id = t2.rowid and t2.address.city = 'pfrrs'"; try { test(incrRowKeyJoinConvSelThreshold + ";" + incrnonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, - new String[] {"RowKeyJoin", "RestrictedJsonTableGroupScan", "RowKeyJoin", "indexName=hash_i_cast_timestamp_firstlogin"}, + new String[] {"RowKeyJoin", "RestrictedJsonTableGroupScan", "RowKeyJoin", "Scan.*condition=\\(address.city = \"pfrrs\"\\)"}, new String[] {}); testBuilder() .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1855,8 +1854,7 @@ public void testRowkeyJoinPushdown_8() throws Exception { public void testRowkeyJoinPushdown_9() throws Exception { // Negative test - rowkey join should not be present String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " + - "(select t2._id from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))"; + "(select t2._id from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs')"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"}); @@ -1864,6 +1862,7 @@ public void testRowkeyJoinPushdown_9() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1874,8 +1873,7 @@ public void testRowkeyJoinPushdown_9() throws Exception { public void testRowkeyJoinPushdown_10() throws Exception { // Negative test - rowkey join should not be present String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " + - " where cast(t1._id as varchar(10)) = cast(t2._id as varchar(10)) and cast(t2.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')"; + " where cast(t1._id as varchar(10)) = cast(t2._id as varchar(10)) and t2.address.city = 'pfrrs'"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"}); @@ -1883,6 +1881,7 @@ public void testRowkeyJoinPushdown_10() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); @@ -1894,7 +1893,7 @@ public void testRowkeyJoinPushdown_11() throws Exception { // Negative test - rowkey join should not be present String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " + "(select t2._id from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " + - "and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))"; + "and t2.address.city = 'pfrrs')"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"}); @@ -1902,20 +1901,21 @@ public void testRowkeyJoinPushdown_11() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";"); } } + @Ignore @Test public void testRowkeyJoinPushdown_12() throws Exception { // JOIN _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...) - rowkey join appears in intermediate join order String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t4 " + "where t1.address.city = t4.address.city and t1._id in (select cast(cast(t2.rowid as int) as varchar(10)) " + "from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " + - "and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')) " + - "and t4.address.state = 'pc'"; + "and t2.address.state = 'pc') and t4.address.state = 'pc'"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, @@ -1932,12 +1932,12 @@ public void testRowkeyJoinPushdown_12() throws Exception { } } + @Ignore @Test public void testRowkeyJoinPushdown_13() throws Exception { // Check option planner.rowkeyjoin_conversion_using_hashjoin works as expected! String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " + - " from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " + - " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))"; + " from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs')"; try { test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";"); PlanTestBase.testPlanMatchingPatterns(query, new String[]{"RowKeyJoin"}, new String[]{}); @@ -1945,6 +1945,7 @@ public void testRowkeyJoinPushdown_13() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";" + forceRowKeyJoinConversionUsingHashJoin + ";"); @@ -1953,6 +1954,7 @@ public void testRowkeyJoinPushdown_13() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("ssn").baselineValues("100007423") + .baselineColumns("ssn").baselineValues("100008861") .go(); } finally { test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";" + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java index 30067dab9c6..6be723479fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java @@ -45,4 +45,7 @@ public interface DrillJoin extends DrillRelNode { /* Right RelNode of the Join Relation */ RelNode getRight(); + + /* Does semi-join? */ + boolean isSemiJoin(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java index 2559d28d7fd..724d5cb7143 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java @@ -189,4 +189,9 @@ public static DrillJoinRel convert(Join join, ConversionContext context) throws inputs.left, inputs.right, rexCondition, join.getJoinType()); return joinRel; } + + @Override + public boolean isSemiJoin() { + return false; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java index 7c0a9b7c0cc..b06c58f18ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java @@ -87,7 +87,7 @@ public void onMatch(RelOptRuleCall call) { } public static DrillPushRowKeyJoinToScanRule JOIN = new DrillPushRowKeyJoinToScanRule( - RelOptHelper.any(DrillJoinRel.class), "DrillPushRowKeyJoinToScanRule_Join", new MatchRelJ()); + RelOptHelper.any(DrillJoin.class), "DrillPushRowKeyJoinToScanRule_Join", new MatchRelJ()); public static class MatchRelJ implements MatchFunction { /* @@ -150,7 +150,7 @@ private void findRelSequenceInternal(Class[] classes, int idx, RelNode rel, List * plan nodes. It tries to identify some RelNode sequences e.g. Filter-Project-Scan and generates * the context based on the identified sequence. */ - private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoinRel joinRel, + private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoin joinRel, RelNode joinChildRel, RowKey rowKeyLoc, int rowKeyPos, boolean swapInputs) { List matchingRels; // Sequence of rels (PFPS, FPS, PS, FS, S) matched for this rule @@ -199,7 +199,7 @@ private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoinRel @Override public boolean match(RelOptRuleCall call) { - DrillJoinRel joinRel = call.rel(0); + DrillJoin joinRel = call.rel(0); //Perform validity checks logger.debug("DrillPushRowKeyJoinToScanRule begin()"); return canPushRowKeyJoinToScan(joinRel, call.getPlanner()).left; @@ -207,7 +207,7 @@ public boolean match(RelOptRuleCall call) { @Override public RowKeyJoinCallContext onMatch(RelOptRuleCall call) { - DrillJoinRel joinRel = call.rel(0); + DrillJoin joinRel = call.rel(0); /* * Find which side of the join (left/right) has the primary-key column. Then find which sequence of rels * is present on that side of the join. We will need this sequence to correctly transform the left @@ -234,7 +234,7 @@ public RowKeyJoinCallContext onMatch(RelOptRuleCall call) { /* Assumption : Only the non-rowkey side needs to be checked. The row-key side does not have * any blocking operators for the transformation to work */ - private static boolean canSwapJoinInputs(DrillJoinRel joinRel, RowKey rowKeyLocation) { + private static boolean canSwapJoinInputs(DrillJoin joinRel, RowKey rowKeyLocation) { // We cannot swap the join inputs if the join is a semi-join. We determine it indirectly, by // checking for the presence of a aggregating Aggregate Rel (computes aggregates e.g. sum). if (rowKeyLocation == RowKey.LEFT @@ -281,7 +281,7 @@ private static boolean canSwapJoinInputsInternal(RelNode rel) { * whether the rowkey is present on the left/right side of the join and its 0-based index in the projection of that * side. */ - private static Pair> canPushRowKeyJoinToScan(DrillJoinRel joinRel, RelOptPlanner planner) { + private static Pair> canPushRowKeyJoinToScan(DrillJoin joinRel, RelOptPlanner planner) { RowKey rowKeyLoc = RowKey.NONE; logger.debug("canPushRowKeyJoinToScan(): Check: Rel={}", joinRel); @@ -506,7 +506,7 @@ protected void doOnMatch(RowKeyJoinCallContext rkjCallContext) { } } - private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInputs, DrillJoinRel joinRel, + private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInputs, DrillJoin joinRel, DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel, DrillScanRel scanRel) { // Swap the inputs, when necessary (i.e. when the primary-key col is on the right-side of the join) logger.debug("Transforming: Swapping of join inputs is required!"); @@ -537,7 +537,7 @@ private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInpu logger.debug("Transforming: LeftKeys={}, LeftRowType={}, RightKeys={}, RightRowType={}", leftJoinKeys, leftRel.getRowType(), rightJoinKeys, right.getRowType()); RowKeyJoinRel rowKeyJoin = new RowKeyJoinRel(joinRel.getCluster(), joinRel.getTraitSet(), leftRel, right, - joinCondition, joinRel.getJoinType()); + joinCondition, joinRel.getJoinType(), joinRel instanceof DrillSemiJoinRel); logger.info("Transforming: SUCCESS: Register runtime filter pushdown plan (rowkeyjoin)"); call.transformTo(rowKeyJoin); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java index 09e4be9de25..527b7446442 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java @@ -98,4 +98,9 @@ public LogicalOperator implement(DrillImplementor implementor) { return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType); } + + @Override + public boolean isSemiJoin() { + return true; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java index b82e77cc669..abaf2abd33d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java @@ -34,7 +34,7 @@ public enum RowKey {NONE, LEFT, RIGHT, BOTH}; private int rowKeyPos; // swapping of row-key join inputs necessary private boolean swapInputs; - private DrillJoinRel joinRel; + private DrillJoin joinRel; // rels on the rowkey side of the join to be transformed private DrillProjectRel upperProjectRel; private DrillFilterRel filterRel; @@ -42,7 +42,7 @@ public enum RowKey {NONE, LEFT, RIGHT, BOTH}; private DrillScanRel scanRel; public RowKeyJoinCallContext (RelOptRuleCall call, RowKey rowKeyLoc, int rowKeyPos, boolean swapInputs, - DrillJoinRel joinRel, DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel, + DrillJoin joinRel, DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel, DrillScanRel scanRel) { this.call = call; this.rowKeyLoc = rowKeyLoc; @@ -71,7 +71,7 @@ public boolean mustSwapInputs() { return swapInputs; } - public DrillJoinRel getJoinRel() { + public DrillJoin getJoinRel() { return joinRel; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java index 2f73526b59c..10520291290 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java @@ -18,26 +18,42 @@ package org.apache.drill.exec.planner.logical; +import java.util.List; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexChecker; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.calcite.util.Litmus; import org.apache.calcite.util.Pair; import org.apache.drill.common.logical.data.Join; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.exec.planner.torel.ConversionContext; - -import java.util.List; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; public class RowKeyJoinRel extends DrillJoinRel implements DrillRel { + /* Whether this join represents a semi-join. This is done to skip creating another logical join + * RowKeySemiJoinRel + */ + boolean isSemiJoin; + public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType) { super(cluster, traits, left, right, condition, joinType); } + public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, + JoinRelType joinType, boolean isSemiJoin) { + super(cluster, traits, left, right, condition, joinType); + this.isSemiJoin = isSemiJoin; + } + public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType, int joinControl) { super(cluster, traits, left, right, condition, joinType, joinControl); @@ -51,7 +67,7 @@ public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, Re @Override public RowKeyJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { - return new RowKeyJoinRel(getCluster(), traitSet, left, right, condition, joinType); + return new RowKeyJoinRel(getCluster(), traitSet, left, right, condition, joinType, isSemiJoin()); } @Override @@ -59,6 +75,25 @@ public LogicalOperator implement(DrillImplementor implementor) { return super.implement(implementor); } + /** + * Returns whether this RowKeyJoin represents a {@link org.apache.calcite.rel.core.SemiJoin} + * @return true if join represents a {@link org.apache.calcite.rel.core.SemiJoin}, false otherwise. + */ + public boolean isSemiJoin() { + return isSemiJoin; + } + + @Override + public RelDataType deriveRowType() { + return SqlValidatorUtil.deriveJoinRowType( + left.getRowType(), + isSemiJoin() ? null : right.getRowType(), + JoinRelType.INNER, + getCluster().getTypeFactory(), + null, + ImmutableList.of()); + } + public static RowKeyJoinRel convert(Join join, ConversionContext context) throws InvalidRelException { Pair inputs = getJoinInputs(join, context); RexNode rexCondition = getJoinCondition(join, context); @@ -66,4 +101,40 @@ public static RowKeyJoinRel convert(Join join, ConversionContext context) throws inputs.left, inputs.right, rexCondition, join.getJoinType()); return joinRel; } + + /** The parent method relies the class being an instance of {@link org.apache.calcite.rel.core.SemiJoin} + * in deciding row-type validity. We override this method to account for the RowKeyJoinRel logical rel + * representing both regular and semi-joins */ + @Override public boolean isValid(Litmus litmus, Context context) { + if (getRowType().getFieldCount() + != getSystemFieldList().size() + + left.getRowType().getFieldCount() + + ((this.isSemiJoin()) ? 0 : right.getRowType().getFieldCount())) { + return litmus.fail("field count mismatch"); + } + if (condition != null) { + if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) { + return litmus.fail("condition must be boolean: {}", + condition.getType()); + } + // The input to the condition is a row type consisting of system + // fields, left fields, and right fields. Very similar to the + // output row type, except that fields have not yet been made due + // due to outer joins. + RexChecker checker = + new RexChecker( + getCluster().getTypeFactory().builder() + .addAll(getSystemFieldList()) + .addAll(getLeft().getRowType().getFieldList()) + .addAll(getRight().getRowType().getFieldList()) + .build(), + context, litmus); + condition.accept(checker); + if (checker.getFailureCount() > 0) { + return litmus.fail(checker.getFailureCount() + + " failures in condition " + condition); + } + } + return litmus.succeed(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java index 0d7f5caa483..9ba7ba4ddd1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java @@ -73,11 +73,11 @@ public void onMatch(RelOptRuleCall call) { if(isDist){ createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, - left, right, null /* left collation */, null /* right collation */, hashSingleKey, isSemi); + left, right, null /* left collation */, null /* right collation */, hashSingleKey); }else{ if (checkBroadcastConditions(call.getPlanner(), join, left, right)) { createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.HASH_JOIN, - left, right, null /* left collation */, null /* right collation */, isSemi); + left, right, null /* left collation */, null /* right collation */); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java index a589fccf4eb..56787987471 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java @@ -118,13 +118,14 @@ protected void createRangePartitionRightPlan(RelOptRuleCall call, RowKeyJoinRel if (implementAsRowKeyJoin) { newJoin = new RowKeyJoinPrel(join.getCluster(), traitsLeft, convertedLeft, convertedRight, join.getCondition(), - join.getJoinType()); + join.getJoinType(), join.isSemiJoin()); } else { newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, convertedLeft, convertedRight, join.getCondition(), join.getJoinType(), false /* no swap */, null /* no runtime filter */, - true /* useful for join-restricted scans */, JoinControl.DEFAULT); + true /* useful for join-restricted scans */, + JoinControl.DEFAULT, join.isSemiJoin()); } } if (newJoin != null) { @@ -136,7 +137,7 @@ protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join, PhysicalJoinType physicalJoinType, RelNode left, RelNode right, RelCollation collationLeft, RelCollation collationRight, - boolean hashSingleKey, boolean semiJoin)throws InvalidRelException { + boolean hashSingleKey)throws InvalidRelException { /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan: * 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side. @@ -153,7 +154,7 @@ protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join, DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); - createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin); + createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition); assert (join.getLeftKeys().size() == join.getRightKeys().size()); @@ -167,7 +168,7 @@ protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join, hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1)))); hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1)))); - createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin); + createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition); } } } @@ -179,7 +180,7 @@ private void createDistBothPlan(RelOptRuleCall call, DrillJoin join, PhysicalJoinType physicalJoinType, RelNode left, RelNode right, RelCollation collationLeft, RelCollation collationRight, - DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition, boolean isSemiJoin) throws InvalidRelException { + DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition) throws InvalidRelException { RelTraitSet traitsLeft = null; RelTraitSet traitsRight = null; @@ -202,12 +203,12 @@ private void createDistBothPlan(RelOptRuleCall call, DrillJoin join, final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call); newJoin = new HashJoinPrel(join.getCluster(), traitSet, convertedLeft, convertedRight, join.getCondition(), - join.getJoinType(), isSemiJoin); + join.getJoinType(), join.isSemiJoin()); } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, convertedLeft, convertedRight, join.getCondition(), - join.getJoinType()); + join.getJoinType(), join.isSemiJoin()); } call.transformTo(newJoin); } @@ -219,7 +220,7 @@ protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoin jo final RexNode joinCondition, final PhysicalJoinType physicalJoinType, final RelNode left, final RelNode right, - final RelCollation collationLeft, final RelCollation collationRight, boolean semiJoin) throws InvalidRelException { + final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException { DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED); RelTraitSet traitsRight = null; @@ -265,7 +266,7 @@ public RelNode convertChild(final DrillJoin join, final RelNode rel) throws Inv RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist); RelNode newLeft = convert(left, newTraitsLeft); return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition, - join.getJoinType(), semiJoin); + join.getJoinType(), join.isSemiJoin()); } @@ -288,11 +289,11 @@ public RelNode convertChild(final DrillJoin join, final RelNode rel) throws Inv } else { if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, - convertedRight, joinCondition, join.getJoinType())); + convertedRight, joinCondition, join.getJoinType(), join.isSemiJoin())); } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call); call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, convertedLeft, - convertedRight, joinCondition, join.getJoinType(), semiJoin)); + convertedRight, joinCondition, join.getJoinType(), join.isSemiJoin())); } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) { call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition, join.getJoinType())); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java index b293c47ccd7..195abbb5d71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java @@ -50,6 +50,11 @@ public MergeJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, Re joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys, filterNulls); } + public MergeJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, + JoinRelType joinType, boolean semijoin) throws InvalidRelException { + super(cluster, traits, left, right, condition, joinType, semijoin); + joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys, filterNulls); + } @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java index 0bd25685dd1..f06b66d2bc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java @@ -70,11 +70,11 @@ public void onMatch(RelOptRuleCall call) { RelCollation collationRight = getCollation(join.getRightKeys()); if(isDist){ - createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey, false); + createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey); }else{ if (checkBroadcastConditions(call.getPlanner(), join, left, right)) { createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.MERGE_JOIN, - left, right, collationLeft, collationRight, false); + left, right, collationLeft, collationRight); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java index b184eab88e1..283b36d76a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java @@ -46,6 +46,11 @@ public NestedLoopJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode lef super(cluster, traits, left, right, condition, joinType); } + public NestedLoopJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, + JoinRelType joinType, boolean semijoin) throws InvalidRelException { + super(cluster, traits, left, right, condition, joinType, semijoin); + } + @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java index e7fc032af33..6caea820d2f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java @@ -94,7 +94,7 @@ public void onMatch(RelOptRuleCall call) { if (checkBroadcastConditions(call.getPlanner(), join, left, right)) { createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.NESTEDLOOP_JOIN, - left, right, null /* left collation */, null /* right collation */, false); + left, right, null /* left collation */, null /* right collation */); } } catch (InvalidRelException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java index 7e8f77e5791..54c22307464 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java @@ -47,6 +47,12 @@ public RowKeyJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, R Preconditions.checkArgument(joinType == JoinRelType.INNER); } + public RowKeyJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + RexNode condition, JoinRelType joinType, boolean isSemiJoin) throws InvalidRelException { + super(cluster, traits, left, right, condition, joinType, isSemiJoin); + Preconditions.checkArgument(joinType == JoinRelType.INNER); + } + @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator); @@ -67,7 +73,8 @@ public double estimateRowCount(RelMetadataQuery mq) { public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { try { - RowKeyJoinPrel rkj = new RowKeyJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType); + RowKeyJoinPrel rkj = new RowKeyJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, + joinType, isSemiJoin()); rkj.setEstimatedRowCount(this.estimatedRowCount); return rkj; } catch (InvalidRelException e) {