Skip to content

Commit

Permalink
DRILL-3765: Move partitioning pruning to HepPlanner to avoid the perf…
Browse files Browse the repository at this point in the history
…ormance overhead for redundant rule execution.

Add fall back option in planner.

close #255
  • Loading branch information
jinfengni committed Nov 17, 2015
1 parent ea8e17d commit e0c9b84
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 68 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.sql.HivePartitionDescriptor;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveScan;
Expand All @@ -52,7 +53,11 @@ public boolean matches(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(2);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for Hive based partition pruning
return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown();
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown();
}
}

@Override
Expand Down Expand Up @@ -82,7 +87,11 @@ public boolean matches(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(1);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for Hive based partition pruning
return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown();
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
return groupScan instanceof HiveScan && groupScan.supportsPartitionFilterPushdown();
}
}

@Override
Expand Down
Expand Up @@ -74,7 +74,7 @@ public RelOptCost computeSelfCost(RelOptPlanner planner) {
* - Such filter on top of join might be pushed into JOIN, when LOPT planner is called.
* - Return non-infinite cost will give LOPT planner a chance to try to push the filters.
*/
if (PrelUtil.getPlannerSettings(planner).isHepJoinOptEnabled()) {
if (PrelUtil.getPlannerSettings(planner).isHepOptEnabled()) {
return computeCartesianJoinCost(planner);
} else {
return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
Expand Down
Expand Up @@ -31,12 +31,15 @@
import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.JoinToMultiJoinRule;
import org.apache.calcite.rel.rules.LoptOptimizeJoinRule;
import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
import org.apache.calcite.rel.rules.ReduceExpressionsRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.calcite.tools.RuleSet;
import org.apache.commons.digester.Rules;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule;
import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
Expand Down Expand Up @@ -67,6 +70,12 @@
public class DrillRuleSets {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class);

public static final RelOptRule DRILL_JOIN_TO_MULTIJOIN_RULE = new JoinToMultiJoinRule(DrillJoinRel.class);
public static final RelOptRule DRILL_LOPT_OPTIMIZE_JOIN_RULE = new LoptOptimizeJoinRule(
DrillRelFactories.DRILL_LOGICAL_JOIN_FACTORY,
DrillRelFactories.DRILL_LOGICAL_PROJECT_FACTORY,
DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY);

/**
* Get a list of logical rules that can be turned on or off by session/system options.
*
Expand All @@ -90,7 +99,6 @@ public static RuleSet getDrillUserConfigurableLogicalRules(OptimizerRulesContext
if (ps.isConstantFoldingEnabled()) {
// TODO - DRILL-2218
userConfigurableRules.add(ReduceExpressionsRule.PROJECT_INSTANCE);

userConfigurableRules.add(DrillReduceExpressionsRule.FILTER_INSTANCE_DRILL);
userConfigurableRules.add(DrillReduceExpressionsRule.CALC_INSTANCE_DRILL);
}
Expand Down Expand Up @@ -183,16 +191,27 @@ public static RuleSet getDrillBasicRules(OptimizerRulesContext optimizerRulesCon
.addAll(staticRuleSet)
.add(
DrillMergeProjectRule.getInstance(true, RelFactories.DEFAULT_PROJECT_FACTORY,
optimizerRulesContext.getFunctionRegistry()),
optimizerRulesContext.getFunctionRegistry())
)
.build();

return new DrillRuleSet(basicRules);
}

/**
* Get an immutable list of partition pruning rules that will be used in logical planning.
*/
public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
.add(
PruneScanRule.getFilterOnProject(optimizerRulesContext),
PruneScanRule.getFilterOnScan(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext)
)
)
.build();

return new DrillRuleSet(basicRules);
return new DrillRuleSet(pruneRules);
}

// Ruleset for join permutation, used only in VolcanoPlanner.
Expand Down
Expand Up @@ -54,19 +54,31 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
private GroupScan groupScan;
private List<SchemaPath> columns;
private PlannerSettings settings;
private final boolean partitionFilterPushdown;

/** Creates a DrillScan. */
public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
final RelOptTable table) {
final RelOptTable table) {
this(cluster, traits, table, false);
}
/** Creates a DrillScan. */
public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
final RelOptTable table, boolean partitionFilterPushdown) {
// By default, scan does not support project pushdown.
// Decision whether push projects into scan will be made solely in DrillPushProjIntoScanRule.
this(cluster, traits, table, table.getRowType(), GroupScan.ALL_COLUMNS);
this(cluster, traits, table, table.getRowType(), GroupScan.ALL_COLUMNS, partitionFilterPushdown);
this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
}

/** Creates a DrillScan. */
public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
final RelOptTable table, final RelDataType rowType, final List<SchemaPath> columns) {
final RelOptTable table, final RelDataType rowType, final List<SchemaPath> columns) {
this(cluster, traits, table, rowType, columns, false);
}

/** Creates a DrillScan. */
public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
final RelOptTable table, final RelDataType rowType, final List<SchemaPath> columns, boolean partitionFilterPushdown) {
super(DRILL_LOGICAL, cluster, traits, table);
this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
this.rowType = rowType;
Expand All @@ -77,6 +89,7 @@ public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
} else { // planner asks to scan some columns
this.columns = ColumnList.some(columns);
}
this.partitionFilterPushdown = partitionFilterPushdown;
try {
this.groupScan = drillTable.getGroupScan().clone(this.columns);
} catch (final IOException e) {
Expand All @@ -86,12 +99,19 @@ public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,

/** Creates a DrillScanRel for a particular GroupScan */
public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
final RelOptTable table, final GroupScan groupScan, final RelDataType rowType, final List<SchemaPath> columns) {
final RelOptTable table, final GroupScan groupScan, final RelDataType rowType, final List<SchemaPath> columns) {
this(cluster, traits, table, groupScan, rowType, columns, false);
}

/** Creates a DrillScanRel for a particular GroupScan */
public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
final RelOptTable table, final GroupScan groupScan, final RelDataType rowType, final List<SchemaPath> columns, boolean partitionFilterPushdown) {
super(DRILL_LOGICAL, cluster, traits, table);
this.rowType = rowType;
this.columns = columns;
this.groupScan = groupScan;
this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
this.partitionFilterPushdown = partitionFilterPushdown;
}

//
Expand Down Expand Up @@ -180,4 +200,8 @@ public GroupScan getGroupScan() {
return groupScan;
}

public boolean partitionFilterPushdown() {
return this.partitionFilterPushdown;
}

}
Expand Up @@ -29,6 +29,7 @@
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.store.parquet.ParquetGroupScan;

public class ParquetPruneScanRule {
Expand All @@ -49,7 +50,11 @@ public boolean matches(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(2);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
}
}

@Override
Expand Down Expand Up @@ -77,7 +82,11 @@ public boolean matches(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(1);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
}
}

@Override
Expand Down
Expand Up @@ -94,7 +94,11 @@ public boolean matches(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(2);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for dfs based partition pruning
return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
}
}

@Override
Expand Down Expand Up @@ -122,7 +126,11 @@ public boolean matches(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(1);
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for dfs based partition pruning
return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
}
}

@Override
Expand Down Expand Up @@ -319,7 +327,8 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro
scanRel.getTable(),
descriptor.createNewGroupScan(newFiles),
scanRel.getRowType(),
scanRel.getColumns());
scanRel.getColumns(),
true /*filter pushdown*/);

RelNode inputRel = newScanRel;

Expand Down
Expand Up @@ -72,7 +72,8 @@ public class PlannerSettings implements Context{
public static final OptionValidator HASH_JOIN_SWAP_MARGIN_FACTOR = new RangeDoubleValidator("planner.join.hash_join_swap_margin_factor", 0, 100, 10d);
public static final String ENABLE_DECIMAL_DATA_TYPE_KEY = "planner.enable_decimal_data_type";
public static final OptionValidator ENABLE_DECIMAL_DATA_TYPE = new BooleanValidator(ENABLE_DECIMAL_DATA_TYPE_KEY, false);
public static final OptionValidator HEP_JOIN_OPT = new BooleanValidator("planner.enable_hep_join_opt", true);
public static final OptionValidator HEP_OPT = new BooleanValidator("planner.enable_hep_opt", true);
public static final OptionValidator HEP_PARTITION_PRUNING = new BooleanValidator("planner.enable_hep_partition_pruning", true);
public static final OptionValidator PLANNER_MEMORY_LIMIT = new RangeLongValidator("planner.memory_limit",
INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES, DEFAULT_MAX_OFF_HEAP_ALLOCATION_IN_BYTES);

Expand Down Expand Up @@ -172,7 +173,9 @@ public boolean isHashJoinSwapEnabled() {
return options.getOption(HASH_JOIN_SWAP.getOptionName()).bool_val;
}

public boolean isHepJoinOptEnabled() { return options.getOption(HEP_JOIN_OPT.getOptionName()).bool_val;}
public boolean isHepPartitionPruningEnabled() { return options.getOption(HEP_PARTITION_PRUNING.getOptionName()).bool_val;}

public boolean isHepOptEnabled() { return options.getOption(HEP_OPT.getOptionName()).bool_val;}

public double getHashJoinSwapMarginFactor() {
return options.getOption(HASH_JOIN_SWAP_MARGIN_FACTOR.getOptionName()).float_val / 100d;
Expand Down
Expand Up @@ -74,7 +74,8 @@ public class DrillSqlWorker {
private final HepPlanner hepPlanner;
public final static int LOGICAL_RULES = 0;
public final static int PHYSICAL_MEM_RULES = 1;
public final static int LOGICAL_CONVERT_RULES = 2;
public final static int LOGICAL_HEP_JOIN_RULES = 2;
public final static int LOGICAL_HEP_JOIN__PP_RULES = 3;

private final QueryContext context;

Expand Down Expand Up @@ -116,20 +117,32 @@ public DrillSqlWorker(QueryContext context) {

private RuleSet[] getRules(QueryContext context) {
StoragePluginRegistry storagePluginRegistry = context.getStorage();
RuleSet drillLogicalRules = DrillRuleSets.mergedRuleSets(

// Ruleset for the case where VolcanoPlanner is used for everything : join, filter/project pushdown, partition pruning.
RuleSet drillLogicalVolOnlyRules = DrillRuleSets.mergedRuleSets(
DrillRuleSets.getDrillBasicRules(context),
DrillRuleSets.getPruneScanRules(context),
DrillRuleSets.getJoinPermRules(context),
DrillRuleSets.getDrillUserConfigurableLogicalRules(context));

// Ruleset for the case where join planning is done in Hep-LOPT, filter/project pushdown and parttion pruning are done in VolcanoPlanner
RuleSet drillLogicalHepJoinRules = DrillRuleSets.mergedRuleSets(
DrillRuleSets.getDrillBasicRules(context),
DrillRuleSets.getPruneScanRules(context),
DrillRuleSets.getDrillUserConfigurableLogicalRules(context));

// Ruleset for the case where join planning and partition pruning is done in Hep, filter/project pushdown are done in VolcanoPlanner
RuleSet drillLogicalHepJoinPPRules = DrillRuleSets.mergedRuleSets(
DrillRuleSets.getDrillBasicRules(context),
DrillRuleSets.getDrillUserConfigurableLogicalRules(context));

// Ruleset for physical planning rules
RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(
DrillRuleSets.getPhysicalRules(context),
storagePluginRegistry.getStoragePluginRuleSet(context));

// Following is used in LOPT join OPT.
RuleSet logicalConvertRules = DrillRuleSets.mergedRuleSets(
DrillRuleSets.getDrillBasicRules(context),
DrillRuleSets.getDrillUserConfigurableLogicalRules(context));

RuleSet[] allRules = new RuleSet[] {drillLogicalRules, drillPhysicalMem, logicalConvertRules};
RuleSet[] allRules = new RuleSet[] {drillLogicalVolOnlyRules, drillPhysicalMem, drillLogicalHepJoinRules, drillLogicalHepJoinPPRules};

return allRules;
}
Expand Down

0 comments on commit e0c9b84

Please sign in to comment.