Skip to content

Commit

Permalink
DRILL-4833: Insert exchanges on the inputs of union-all such that the…
Browse files Browse the repository at this point in the history
… parent and children can be independently parallelized.

Add planner option to enable/disable distribution for union-all.

close #566
  • Loading branch information
Aman Sinha committed Aug 24, 2016
1 parent d5e74b6 commit 0ccc81a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 10 deletions.
Expand Up @@ -80,6 +80,8 @@ public class PlannerSettings implements Context{
public static final OptionValidator HEP_PARTITION_PRUNING = new BooleanValidator("planner.enable_hep_partition_pruning", true);
public static final OptionValidator PLANNER_MEMORY_LIMIT = new RangeLongValidator("planner.memory_limit",
INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES, DEFAULT_MAX_OFF_HEAP_ALLOCATION_IN_BYTES);
public static final String UNIONALL_DISTRIBUTE_KEY = "planner.enable_unionall_distribute";
public static final BooleanValidator UNIONALL_DISTRIBUTE = new BooleanValidator(UNIONALL_DISTRIBUTE_KEY, false);

public static final OptionValidator IDENTIFIER_MAX_LENGTH =
new RangeLongValidator("planner.identifier_max_length", 128 /* A minimum length is needed because option names are identifiers themselves */,
Expand Down Expand Up @@ -241,6 +243,10 @@ public long getInSubqueryThreshold() {
return options.getOption(IN_SUBQUERY_THRESHOLD);
}

public boolean isUnionAllDistributeEnabled() {
return options.getOption(UNIONALL_DISTRIBUTE);
}

@Override
public <T> T unwrap(Class<T> clazz) {
if(clazz == PlannerSettings.class){
Expand Down
Expand Up @@ -25,9 +25,11 @@
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.trace.CalciteTrace;
import org.apache.drill.exec.planner.logical.DrillUnionRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -53,19 +55,50 @@ public void onMatch(RelOptRuleCall call) {
final DrillUnionRel union = (DrillUnionRel) call.rel(0);
final List<RelNode> inputs = union.getInputs();
List<RelNode> convertedInputList = Lists.newArrayList();
RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
boolean allHashDistributed = true;

try {
for (int i = 0; i < inputs.size(); i++) {
RelNode convertedInput = convert(inputs.get(i), PrelUtil.fixTraits(call, traits));
convertedInputList.add(convertedInput);
for (int i = 0; i < inputs.size(); i++) {
RelNode child = inputs.get(i);
List<DistributionField> childDistFields = Lists.newArrayList();
RelNode convertedChild;

for (RelDataTypeField f : child.getRowType().getFieldList()) {
childDistFields.add(new DistributionField(f.getIndex()));
}

if (settings.isUnionAllDistributeEnabled()) {
/*
* Strictly speaking, union-all does not need re-distribution of data; but in Drill's execution
* model, the data distribution and parallelism operators are the same. Here, we insert a
* hash distribution operator to allow parallelism to be determined independently for the parent
* and children. (See DRILL-4833).
* Note that a round robin distribution would have sufficed but we don't have one.
*/
DrillDistributionTrait hashChild = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(childDistFields));
RelTraitSet traitsChild = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashChild);
convertedChild = convert(child, PrelUtil.fixTraits(call, traitsChild));
} else {
RelTraitSet traitsChild = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
convertedChild = convert(child, PrelUtil.fixTraits(call, traitsChild));
allHashDistributed = false;
}
convertedInputList.add(convertedChild);
}

try {

// output distribution trait is set to ANY since union-all inputs may be distributed in different ways
// and unlike a join there are no join keys that allow determining how the output would be distributed.
// Note that a downstream operator may impose a required distribution which would be satisfied by
// inserting an Exchange after the Union-All.
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.ANY);
RelTraitSet traits;
if (allHashDistributed) {
// since all children of union-all are hash distributed, propagate the traits of the left child
traits = convertedInputList.get(0).getTraitSet();
} else {
// output distribution trait is set to ANY since union-all inputs may be distributed in different ways
// and unlike a join there are no join keys that allow determining how the output would be distributed.
// Note that a downstream operator may impose a required distribution which would be satisfied by
// inserting an Exchange after the Union-All.
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.ANY);
}

Preconditions.checkArgument(convertedInputList.size() >= 2, "Union list must be at least two items.");
RelNode left = convertedInputList.get(0);
Expand Down
Expand Up @@ -87,6 +87,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
PlannerSettings.FILTER_MAX_SELECTIVITY_ESTIMATE_FACTOR,
PlannerSettings.TYPE_INFERENCE,
PlannerSettings.IN_SUBQUERY_THRESHOLD,
PlannerSettings.UNIONALL_DISTRIBUTE,
ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
Expand Down
69 changes: 69 additions & 0 deletions exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
Expand Up @@ -35,6 +35,8 @@ public class TestUnionAll extends BaseTestQuery{

private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
private static final String enableDistribute = "alter session set `planner.enable_unionall_distribute` = true";
private static final String defaultDistribute = "alter session reset `planner.enable_unionall_distribute`";

@Test // Simple Union-All over two scans
public void testUnionAll1() throws Exception {
Expand Down Expand Up @@ -1111,4 +1113,71 @@ public void testDrill4147_3() throws Exception {
}
}

@Test // DRILL-4833 // limit 1 is on RHS of union-all
public void testDrill4833_1() throws Exception {
final String l = FileUtils.getResourceAsFile("/multilevel/parquet/1994").toURI().toString();
final String r = FileUtils.getResourceAsFile("/multilevel/parquet/1995").toURI().toString();

final String query = String.format("SELECT o_custkey FROM \n" +
" ((select o1.o_custkey from dfs_test.`%s` o1 inner join dfs_test.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
" Union All (SELECT o_custkey FROM dfs_test.`%s` limit 1))", l, r, l);

// Validate the plan
final String[] expectedPlan = {"(?s)UnionExchange.*UnionAll.*HashJoin.*"};
final String[] excludedPlan = {};

try {
test(sliceTargetSmall);
test(enableDistribute);

PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);

testBuilder()
.optionSettingQueriesForTestQuery(sliceTargetSmall)
.optionSettingQueriesForBaseline(sliceTargetDefault)
.unOrdered()
.sqlQuery(query)
.sqlBaselineQuery(query)
.build()
.run();
} finally {
test(sliceTargetDefault);
test(defaultDistribute);
}
}

@Test // DRILL-4833 // limit 1 is on LHS of union-all
public void testDrill4833_2() throws Exception {
final String l = FileUtils.getResourceAsFile("/multilevel/parquet/1994").toURI().toString();
final String r = FileUtils.getResourceAsFile("/multilevel/parquet/1995").toURI().toString();

final String query = String.format("SELECT o_custkey FROM \n" +
" ((SELECT o_custkey FROM dfs_test.`%s` limit 1) \n" +
" union all \n" +
" (select o1.o_custkey from dfs_test.`%s` o1 inner join dfs_test.`%s` o2 on o1.o_orderkey = o2.o_custkey))", l, r, l);

// Validate the plan
final String[] expectedPlan = {"(?s)UnionExchange.*UnionAll.*HashJoin.*"};
final String[] excludedPlan = {};

try {
test(sliceTargetSmall);
test(enableDistribute);

PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);

testBuilder()
.optionSettingQueriesForTestQuery(sliceTargetSmall)
.optionSettingQueriesForBaseline(sliceTargetDefault)
.unOrdered()
.sqlQuery(query)
.sqlBaselineQuery(query)
.build()
.run();
} finally {
test(sliceTargetDefault);
test(defaultDistribute);
}
}

}

0 comments on commit 0ccc81a

Please sign in to comment.