Skip to content

Commit

Permalink
DRILL-6878: Use DrillPushRowKeyJoinToScan rule on DrillJoin pattern t…
Browse files Browse the repository at this point in the history
…o account for DrillSemiJoin

closes #1568
  • Loading branch information
Gautam Parai authored and vdiravka committed Dec 12, 2018
1 parent e67ac2d commit b954c57
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 58 deletions.
Expand Up @@ -1130,6 +1130,7 @@ public void testHashIndexNoRemovingSort() throws Exception {
);
}

@Ignore
@Test
public void testCastTimestampPlan() throws Exception {
String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t " +
Expand Down Expand Up @@ -1698,15 +1699,15 @@ public void testHangForSimpleDistinct() throws Exception {
public void testRowkeyJoinPushdown_1() throws Exception {
// _id IN (select col ...)
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " +
" from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
" from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs' and t2.address.state = 'pc')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1717,15 +1718,15 @@ public void testRowkeyJoinPushdown_1() throws Exception {
public void testRowkeyJoinPushdown_2() throws Exception {
// _id = col
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
" where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
" where t1._id = t2._id and t2.address.city = 'pfrrs' and t2.address.state = 'pc'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1736,16 +1737,15 @@ public void testRowkeyJoinPushdown_2() throws Exception {
public void testRowkeyJoinPushdown_3() throws Exception {
// filters on both sides of the join
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
" where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') and cast(t1.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') ";
" where t1._id = t2._id and t1.address.city = 'pfrrs' and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1756,15 +1756,15 @@ public void testRowkeyJoinPushdown_3() throws Exception {
public void testRowkeyJoinPushdown_4() throws Exception {
// _id = cast(col as int) works since the rowids are internally cast to string!
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
" where t1._id = cast(t2.rowid as int) and cast(t2.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
" where t1._id = cast(t2.rowid as int) and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1775,15 +1775,15 @@ public void testRowkeyJoinPushdown_4() throws Exception {
public void testRowkeyJoinPushdown_5() throws Exception {
// _id = cast(cast(col as int) as varchar(10)
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
" where t1._id = cast(cast(t2.rowid as int) as varchar(10)) " +
" and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
" where t1._id = cast(cast(t2.rowid as int) as varchar(10)) and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1795,15 +1795,14 @@ public void testRowkeyJoinPushdown_6() throws Exception {
// _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...)
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in " +
"(select cast(cast(t2.rowid as int) as varchar(10)) from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 " +
"where t2.address.city = t3.address.city and cast(t2.activity.irs.firstlogin as timestamp) = " +
"to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
"where t2.address.city = t3.address.city and t2.name.fname = 'ubar')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100001382")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1814,17 +1813,17 @@ public void testRowkeyJoinPushdown_6() throws Exception {
public void testRowkeyJoinPushdown_7() throws Exception {
// with non-covering index
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
"where t1._id = t2.rowid and cast(t2.activity.irs.firstlogin as timestamp) = " +
"to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
"where t1._id = t2.rowid and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + incrnonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query,
new String[] {"RowKeyJoin", "RestrictedJsonTableGroupScan", "RowKeyJoin", "indexName=hash_i_cast_timestamp_firstlogin"},
new String[] {"RowKeyJoin", "RestrictedJsonTableGroupScan", "RowKeyJoin", "Scan.*condition=\\(address.city = \"pfrrs\"\\)"},
new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand Down Expand Up @@ -1855,15 +1854,15 @@ public void testRowkeyJoinPushdown_8() throws Exception {
public void testRowkeyJoinPushdown_9() throws Exception {
// Negative test - rowkey join should not be present
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " +
"(select t2._id from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
"(select t2._id from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1874,15 +1873,15 @@ public void testRowkeyJoinPushdown_9() throws Exception {
public void testRowkeyJoinPushdown_10() throws Exception {
// Negative test - rowkey join should not be present
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
" where cast(t1._id as varchar(10)) = cast(t2._id as varchar(10)) and cast(t2.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
" where cast(t1._id as varchar(10)) = cast(t2._id as varchar(10)) and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
Expand All @@ -1894,28 +1893,29 @@ public void testRowkeyJoinPushdown_11() throws Exception {
// Negative test - rowkey join should not be present
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " +
"(select t2._id from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " +
"and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
"and t2.address.city = 'pfrrs')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
}
}

@Ignore
@Test
public void testRowkeyJoinPushdown_12() throws Exception {
// JOIN _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...) - rowkey join appears in intermediate join order
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t4 " +
"where t1.address.city = t4.address.city and t1._id in (select cast(cast(t2.rowid as int) as varchar(10)) " +
"from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " +
"and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')) " +
"and t4.address.state = 'pc'";
"and t2.address.state = 'pc') and t4.address.state = 'pc'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query,
Expand All @@ -1932,19 +1932,20 @@ public void testRowkeyJoinPushdown_12() throws Exception {
}
}

@Ignore
@Test
public void testRowkeyJoinPushdown_13() throws Exception {
// Check option planner.rowkeyjoin_conversion_using_hashjoin works as expected!
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " +
" from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
" to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
" from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[]{"RowKeyJoin"}, new String[]{});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";" +
forceRowKeyJoinConversionUsingHashJoin + ";");
Expand All @@ -1953,6 +1954,7 @@ public void testRowkeyJoinPushdown_13() throws Exception {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
.baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";" +
Expand Down
Expand Up @@ -45,4 +45,7 @@ public interface DrillJoin extends DrillRelNode {

/* Right RelNode of the Join Relation */
RelNode getRight();

/* Does semi-join? */
boolean isSemiJoin();
}
Expand Up @@ -189,4 +189,9 @@ public static DrillJoinRel convert(Join join, ConversionContext context) throws
inputs.left, inputs.right, rexCondition, join.getJoinType());
return joinRel;
}

@Override
public boolean isSemiJoin() {
return false;
}
}
Expand Up @@ -87,7 +87,7 @@ public void onMatch(RelOptRuleCall call) {
}

public static DrillPushRowKeyJoinToScanRule JOIN = new DrillPushRowKeyJoinToScanRule(
RelOptHelper.any(DrillJoinRel.class), "DrillPushRowKeyJoinToScanRule_Join", new MatchRelJ());
RelOptHelper.any(DrillJoin.class), "DrillPushRowKeyJoinToScanRule_Join", new MatchRelJ());

public static class MatchRelJ implements MatchFunction<RowKeyJoinCallContext> {
/*
Expand Down Expand Up @@ -150,7 +150,7 @@ private void findRelSequenceInternal(Class[] classes, int idx, RelNode rel, List
* plan nodes. It tries to identify some RelNode sequences e.g. Filter-Project-Scan and generates
* the context based on the identified sequence.
*/
private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoinRel joinRel,
private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoin joinRel,
RelNode joinChildRel, RowKey rowKeyLoc, int rowKeyPos, boolean swapInputs) {
List<RelNode> matchingRels;
// Sequence of rels (PFPS, FPS, PS, FS, S) matched for this rule
Expand Down Expand Up @@ -199,15 +199,15 @@ private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoinRel

@Override
public boolean match(RelOptRuleCall call) {
DrillJoinRel joinRel = call.rel(0);
DrillJoin joinRel = call.rel(0);
//Perform validity checks
logger.debug("DrillPushRowKeyJoinToScanRule begin()");
return canPushRowKeyJoinToScan(joinRel, call.getPlanner()).left;
}

@Override
public RowKeyJoinCallContext onMatch(RelOptRuleCall call) {
DrillJoinRel joinRel = call.rel(0);
DrillJoin joinRel = call.rel(0);
/*
* Find which side of the join (left/right) has the primary-key column. Then find which sequence of rels
* is present on that side of the join. We will need this sequence to correctly transform the left
Expand All @@ -234,7 +234,7 @@ public RowKeyJoinCallContext onMatch(RelOptRuleCall call) {
/* Assumption : Only the non-rowkey side needs to be checked. The row-key side does not have
* any blocking operators for the transformation to work
*/
private static boolean canSwapJoinInputs(DrillJoinRel joinRel, RowKey rowKeyLocation) {
private static boolean canSwapJoinInputs(DrillJoin joinRel, RowKey rowKeyLocation) {
// We cannot swap the join inputs if the join is a semi-join. We determine it indirectly, by
// checking for the presence of a aggregating Aggregate Rel (computes aggregates e.g. sum).
if (rowKeyLocation == RowKey.LEFT
Expand Down Expand Up @@ -281,7 +281,7 @@ private static boolean canSwapJoinInputsInternal(RelNode rel) {
* whether the rowkey is present on the left/right side of the join and its 0-based index in the projection of that
* side.
*/
private static Pair<Boolean, Pair<RowKey, Integer>> canPushRowKeyJoinToScan(DrillJoinRel joinRel, RelOptPlanner planner) {
private static Pair<Boolean, Pair<RowKey, Integer>> canPushRowKeyJoinToScan(DrillJoin joinRel, RelOptPlanner planner) {
RowKey rowKeyLoc = RowKey.NONE;
logger.debug("canPushRowKeyJoinToScan(): Check: Rel={}", joinRel);

Expand Down Expand Up @@ -506,7 +506,7 @@ protected void doOnMatch(RowKeyJoinCallContext rkjCallContext) {
}
}

private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInputs, DrillJoinRel joinRel,
private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInputs, DrillJoin joinRel,
DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel, DrillScanRel scanRel) {
// Swap the inputs, when necessary (i.e. when the primary-key col is on the right-side of the join)
logger.debug("Transforming: Swapping of join inputs is required!");
Expand Down Expand Up @@ -537,7 +537,7 @@ private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInpu
logger.debug("Transforming: LeftKeys={}, LeftRowType={}, RightKeys={}, RightRowType={}",
leftJoinKeys, leftRel.getRowType(), rightJoinKeys, right.getRowType());
RowKeyJoinRel rowKeyJoin = new RowKeyJoinRel(joinRel.getCluster(), joinRel.getTraitSet(), leftRel, right,
joinCondition, joinRel.getJoinType());
joinCondition, joinRel.getJoinType(), joinRel instanceof DrillSemiJoinRel);
logger.info("Transforming: SUCCESS: Register runtime filter pushdown plan (rowkeyjoin)");
call.transformTo(rowKeyJoin);
}
Expand Down
Expand Up @@ -98,4 +98,9 @@ public LogicalOperator implement(DrillImplementor implementor) {

return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
}

@Override
public boolean isSemiJoin() {
return true;
}
}

0 comments on commit b954c57

Please sign in to comment.