From 99b35b89789fd4d1521b6dd2557c9f6709d7e0ff Mon Sep 17 00:00:00 2001 From: "zhengyunhong.zyh" <337361684@qq.com> Date: Mon, 12 Dec 2022 12:06:38 +0800 Subject: [PATCH 1/5] [FLINK-30365][table-planner] New dynamic partition pruning strategy to support more dpp patterns --- ...namicPartitionPruningConverterProgram.java | 147 ++++ .../batch/DynamicPartitionPruningRule.java | 739 ------------------ .../utils/DynamicPartitionPruningUtils.java | 395 +++++++--- .../plan/metadata/FlinkRelMdRowCount.scala | 18 +- .../optimize/program/FlinkBatchProgram.scala | 6 + .../program/FlinkChainedProgram.scala | 4 + .../plan/rules/FlinkBatchRuleSets.scala | 21 +- ...PartitionPruningConverterProgramTest.java} | 103 ++- ...cPartitionPruningConverterProgramTest.xml} | 268 ++++++- 9 files changed, 779 insertions(+), 922 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java delete mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/{rules/physical/batch/DynamicPartitionPruningRuleTest.java => optimize/program/DynamicPartitionPruningConverterProgramTest.java} (86%) rename flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/{rules/physical/batch/DynamicPartitionPruningRuleTest.xml => optimize/program/DynamicPartitionPruningConverterProgramTest.xml} (79%) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java new file mode 100644 index 0000000000000..4ed9b1c8f5b43 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle; +import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Planner program that tries to do partition prune in the execution phase, which can translate a + * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan} + * whose source is a partition source. The {@link + * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true. + * + *

Suppose we have the original physical plan: + * + *

{@code
+ * LogicalProject(...)
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
+ *  * +- Exchange(distribution=[broadcast])
+ *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }
+ * + *

This physical plan will be rewritten to: + * + *

{@code
+ * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
+ * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
+ * :  +- DynamicFilteringDataCollector(fields=[dim_key])
+ * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
+ * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * +- Exchange(distribution=[broadcast])
+ *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
+ *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
+ * }
+ */ +public class FlinkDynamicPartitionPruningConverterProgram + implements FlinkOptimizeProgram { + + @Override + public RelNode optimize(RelNode root, BatchOptimizeContext context) { + DefaultRelShuttle shuttle = + new DefaultRelShuttle() { + @Override + public RelNode visit(RelNode rel) { + if (!(rel instanceof Join)) { + List newInputs = new ArrayList<>(); + for (RelNode input : rel.getInputs()) { + RelNode newInput = input.accept(this); + newInputs.add(newInput); + } + return rel.copy(rel.getTraitSet(), newInputs); + } + Join join = (Join) rel; + if (!ShortcutUtils.unwrapContext(join) + .getTableConfig() + .get( + OptimizerConfigOptions + .TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { + return join; + } + + JoinInfo joinInfo = join.analyzeCondition(); + RelNode leftSide = join.getLeft(); + RelNode rightSide = join.getRight(); + Join newJoin = join; + boolean changed = false; + if (DynamicPartitionPruningUtils.isDimSide(leftSide)) { + if (join.getJoinType() != JoinRelType.RIGHT) { + Tuple2 relTuple = + DynamicPartitionPruningUtils + .canConvertAndConvertDppFactSide( + rightSide, + joinInfo.rightKeys, + leftSide, + joinInfo.leftKeys, + join); + changed = relTuple.f0; + newJoin = + join.copy( + join.getTraitSet(), + Arrays.asList(leftSide, relTuple.f1.accept(this))); + } + } + if (DynamicPartitionPruningUtils.isDimSide(rightSide)) { + if (join.getJoinType() != JoinRelType.LEFT) { + Tuple2 relTuple = + DynamicPartitionPruningUtils + .canConvertAndConvertDppFactSide( + leftSide, + joinInfo.leftKeys, + rightSide, + joinInfo.rightKeys, + join); + changed = relTuple.f0; + newJoin = + join.copy( + join.getTraitSet(), + Arrays.asList(relTuple.f1.accept(this), rightSide)); + } + } + + if (changed) { + return newJoin; + } else { + return newJoin.copy( + newJoin.getTraitSet(), + Arrays.asList( + newJoin.getLeft().accept(this), + newJoin.getRight().accept(this))); + } + } + }; + return shuttle.visit(root); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java deleted file mode 100644 index b0767d27f0510..0000000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java +++ /dev/null @@ -1,739 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.rules.physical.batch; - -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringDataCollector; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalJoinBase; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan; -import org.apache.flink.table.planner.plan.schema.TableSourceTable; -import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils; - -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Exchange; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.JoinInfo; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexProgram; -import org.apache.calcite.tools.RuleSet; -import org.apache.calcite.tools.RuleSets; - -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Planner rule that tries to do partition prune in the execution phase, which can translate a - * {@link BatchPhysicalTableSourceScan} to a {@link BatchPhysicalDynamicFilteringTableSourceScan} - * whose source is a partition source. The {@link - * OptimizerConfigOptions#TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED} need to be true. - * - *

Suppose we have the original physical plan: - * - *

{@code
- * LogicalProject(...)
- * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
- *  * :- TableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key],) # Is a partition table.
- *  * +- Exchange(distribution=[broadcast])
- *  *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
- *  *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * }
- * - *

This physical plan will be rewritten to: - * - *

{@code
- * HashJoin(joinType=[InnerJoin], where=[=(fact_partition_key, dim_key)], select=[xxx])
- * :- DynamicFilteringTableSourceScan(table=[[fact]], fields=[xxx, fact_partition_key]) # Is a partition table.
- * :  +- DynamicFilteringDataCollector(fields=[dim_key])
- * :     +- Calc(select=[xxx], where=[<(xxx, xxx)])
- * :        +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * +- Exchange(distribution=[broadcast])
- *    +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition.
- *       +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key])
- * }
- */ -public abstract class DynamicPartitionPruningRule extends RelRule { - - // To support more patterns of dynamic partition pruning, in this rule base, we provide eight - // different matching rules according to the different situation of the fact side (partition - // table side) and the different order of left/right join. - public static final RuleSet DYNAMIC_PARTITION_PRUNING_RULES = - RuleSets.ofList( - DynamicPartitionPruningFactInRightRule.Config.DEFAULT.toRule(), - DynamicPartitionPruningFactInLeftRule.Config.DEFAULT.toRule(), - DynamicPartitionPruningFactInRightWithExchangeRule.Config.DEFAULT.toRule(), - DynamicPartitionPruningFactInLeftWithExchangeRule.Config.DEFAULT.toRule(), - DynamicPartitionPruningFactInRightWithCalcRule.Config.DEFAULT.toRule(), - DynamicPartitionPruningFactInLeftWithCalcRule.Config.DEFAULT.toRule(), - DynamicPartitionPruningFactInRightWithExchangeAndCalcRule.Config.DEFAULT - .toRule(), - DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule.Config.DEFAULT - .toRule()); - - protected DynamicPartitionPruningRule(RelRule.Config config) { - super(config); - } - - private static List getAcceptedFieldIndices( - List factJoinKeys, - @Nullable BatchPhysicalCalc factCalc, - BatchPhysicalTableSourceScan factScan, - DynamicTableSource tableSource) { - List candidateFields; - if (factCalc == null) { - candidateFields = - factJoinKeys.stream() - .map(i -> factScan.getRowType().getFieldNames().get(i)) - .collect(Collectors.toList()); - } else { - // Changing the fact key index in fact table calc output to fact key index in fact - // table, and filtering these fields that computing in calc node. - RexProgram program = factCalc.getProgram(); - List joinKeysIndexInFactTable = new ArrayList<>(); - for (int joinKeyIdx : factJoinKeys) { - RexNode node = program.expandLocalRef(program.getProjectList().get(joinKeyIdx)); - if (node instanceof RexInputRef) { - joinKeysIndexInFactTable.add(((RexInputRef) node).getIndex()); - } - } - - if (joinKeysIndexInFactTable.isEmpty()) { - return Collections.emptyList(); - } - - candidateFields = - joinKeysIndexInFactTable.stream() - .map(i -> factScan.getRowType().getFieldNames().get(i)) - .collect(Collectors.toList()); - } - - List acceptedFilterFields = - DynamicPartitionPruningUtils.getSuitableDynamicFilteringFieldsInFactSide( - tableSource, candidateFields); - // Apply suitable accepted filter fields to source. - ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields); - - if (factCalc == null) { - return acceptedFilterFields.stream() - .map(f -> factScan.getRowType().getFieldNames().indexOf(f)) - .collect(Collectors.toList()); - } else { - return getAcceptedFieldsIndicesInCalc( - acceptedFilterFields, factJoinKeys, factCalc, factScan); - } - } - - private static List getAcceptedFieldsIndicesInCalc( - List acceptedFields, - List factJoinKeys, - BatchPhysicalCalc factCalc, - BatchPhysicalTableSourceScan factScan) { - List acceptedFieldsIndicesInFactScan = - acceptedFields.stream() - .map(f -> factScan.getRowType().getFieldNames().indexOf(f)) - .collect(Collectors.toList()); - RexProgram program = factCalc.getProgram(); - List acceptedFieldsIndicesInCalc = new ArrayList<>(); - for (int joinKeyIdx : factJoinKeys) { - RexNode node = program.expandLocalRef(program.getProjectList().get(joinKeyIdx)); - if (node instanceof RexInputRef - && acceptedFieldsIndicesInFactScan.contains(((RexInputRef) node).getIndex())) { - acceptedFieldsIndicesInCalc.add(joinKeyIdx); - } - } - - return acceptedFieldsIndicesInCalc; - } - - protected BatchPhysicalDynamicFilteringTableSourceScan createDynamicFilteringTableSourceScan( - BatchPhysicalTableSourceScan factScan, - BatchPhysicalRel dimSide, - BatchPhysicalJoinBase join, - @Nullable BatchPhysicalCalc factCalc, - boolean factInLeft) { - JoinInfo joinInfo = join.analyzeCondition(); - TableSourceTable tableSourceTable = factScan.getTable().unwrap(TableSourceTable.class); - DynamicTableSource tableSource = tableSourceTable.tableSource(); - - List factJoinKeys = factInLeft ? joinInfo.leftKeys : joinInfo.rightKeys; - List dimJoinKeys = factInLeft ? joinInfo.rightKeys : joinInfo.leftKeys; - - List acceptedFieldIndices = - getAcceptedFieldIndices(factJoinKeys, factCalc, factScan, tableSource); - - List dynamicFilteringFieldIndices = new ArrayList<>(); - for (int i = 0; i < joinInfo.leftKeys.size(); ++i) { - if (acceptedFieldIndices.contains(factJoinKeys.get(i))) { - dynamicFilteringFieldIndices.add(dimJoinKeys.get(i)); - } - } - final BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector = - createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices); - return new BatchPhysicalDynamicFilteringTableSourceScan( - factScan.getCluster(), - factScan.getTraitSet(), - factScan.getHints(), - factScan.tableSourceTable(), - dynamicFilteringDataCollector); - } - - private BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector( - RelNode dimSide, List dynamicFilteringFieldIndices) { - final RelDataType outputType = - ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory()) - .projectStructType( - dimSide.getRowType(), - dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray()); - return new BatchPhysicalDynamicFilteringDataCollector( - dimSide.getCluster(), - dimSide.getTraitSet(), - ignoreExchange(dimSide), - outputType, - dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray()); - } - - private RelNode ignoreExchange(RelNode dimSide) { - if (dimSide instanceof Exchange) { - return dimSide.getInput(0); - } else { - return dimSide; - } - } - - /** Simple dynamic filtering pattern with fact side in join right. */ - protected static class DynamicPartitionPruningFactInRightRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInRightRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalRel - .class) - .anyInputs(), - r -> - r.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs())) - .as(DynamicPartitionPruningFactInRightRule.Config.class); - - @Override - default DynamicPartitionPruningFactInRightRule toRule() { - return new DynamicPartitionPruningFactInRightRule(this); - } - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalRel dimSide = call.rel(1); - final BatchPhysicalTableSourceScan factScan = call.rel(2); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newFactScan)); - call.transformTo(newJoin); - } - } - - /** Simple dynamic filtering pattern with fact side in join left. */ - protected static class DynamicPartitionPruningFactInLeftRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInLeftRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - @Override - default DynamicPartitionPruningFactInLeftRule toRule() { - return new DynamicPartitionPruningFactInLeftRule(this); - } - - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs(), - r -> - r.operand( - BatchPhysicalRel - .class) - .anyInputs())) - .as(DynamicPartitionPruningFactInLeftRule.Config.class); - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalTableSourceScan factScan = call.rel(1); - final BatchPhysicalRel dimSide = call.rel(2); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newFactScan, dimSide)); - call.transformTo(newJoin); - } - } - - /** Dynamic filtering pattern with exchange node in fact side while fact side in join right. */ - protected static class DynamicPartitionPruningFactInRightWithExchangeRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInRightWithExchangeRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - @Override - default DynamicPartitionPruningFactInRightWithExchangeRule toRule() { - return new DynamicPartitionPruningFactInRightWithExchangeRule(this); - } - - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalRel - .class) - .anyInputs(), - r -> - r.operand( - BatchPhysicalExchange - .class) - .oneInput( - e -> - e.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs()))) - .as(DynamicPartitionPruningFactInRightWithExchangeRule.Config.class); - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalRel dimSide = call.rel(1); - final BatchPhysicalExchange exchange = call.rel(2); - final BatchPhysicalTableSourceScan factScan = call.rel(3); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, false); - final BatchPhysicalExchange newExchange = - (BatchPhysicalExchange) - exchange.copy( - exchange.getTraitSet(), Collections.singletonList(newFactScan)); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newExchange)); - call.transformTo(newJoin); - } - } - - /** Dynamic filtering pattern with exchange node in fact side while fact side in join left. */ - protected static class DynamicPartitionPruningFactInLeftWithExchangeRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInLeftWithExchangeRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - @Override - default DynamicPartitionPruningFactInLeftWithExchangeRule toRule() { - return new DynamicPartitionPruningFactInLeftWithExchangeRule(this); - } - - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalExchange - .class) - .oneInput( - e -> - e.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs()), - r -> - r.operand( - BatchPhysicalRel - .class) - .anyInputs())) - .as(DynamicPartitionPruningFactInLeftWithExchangeRule.Config.class); - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalExchange exchange = call.rel(1); - final BatchPhysicalTableSourceScan factScan = call.rel(2); - final BatchPhysicalRel dimSide = call.rel(3); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, null, true); - final BatchPhysicalExchange newExchange = - (BatchPhysicalExchange) - exchange.copy( - exchange.getTraitSet(), Collections.singletonList(newFactScan)); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newExchange, dimSide)); - call.transformTo(newJoin); - } - } - - /** Dynamic filtering pattern with calc node in fact side while fact side in join right. */ - protected static class DynamicPartitionPruningFactInRightWithCalcRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInRightWithCalcRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - @Override - default DynamicPartitionPruningFactInRightWithCalcRule toRule() { - return new DynamicPartitionPruningFactInRightWithCalcRule(this); - } - - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalRel - .class) - .anyInputs(), - r -> - r.operand( - BatchPhysicalCalc - .class) - .oneInput( - f -> - f.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs()))) - .as(DynamicPartitionPruningFactInRightWithCalcRule.Config.class); - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalRel dimSide = call.rel(1); - final BatchPhysicalCalc factCalc = call.rel(2); - final BatchPhysicalTableSourceScan factScan = call.rel(3); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, false); - final BatchPhysicalCalc newCalc = - (BatchPhysicalCalc) - factCalc.copy( - factCalc.getTraitSet(), newFactScan, factCalc.getProgram()); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newCalc)); - call.transformTo(newJoin); - } - } - - /** Dynamic filtering pattern with calc node in fact side while fact side in join left. */ - protected static class DynamicPartitionPruningFactInLeftWithCalcRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInLeftWithCalcRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - @Override - default DynamicPartitionPruningFactInLeftWithCalcRule toRule() { - return new DynamicPartitionPruningFactInLeftWithCalcRule(this); - } - - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalCalc - .class) - .oneInput( - f -> - f.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs()), - r -> - r.operand( - BatchPhysicalRel - .class) - .anyInputs())) - .as(DynamicPartitionPruningFactInLeftWithCalcRule.Config.class); - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalCalc factCalc = call.rel(1); - final BatchPhysicalTableSourceScan factScan = call.rel(2); - final BatchPhysicalRel dimSide = call.rel(3); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true); - final BatchPhysicalCalc newCalc = - (BatchPhysicalCalc) - factCalc.copy( - factCalc.getTraitSet(), newFactScan, factCalc.getProgram()); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newCalc, dimSide)); - call.transformTo(newJoin); - } - } - - /** - * Dynamic filtering pattern with exchange node and calc node in fact side while fact side in - * join right. - */ - protected static class DynamicPartitionPruningFactInRightWithExchangeAndCalcRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInRightWithExchangeAndCalcRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - @Override - default DynamicPartitionPruningFactInRightWithExchangeAndCalcRule toRule() { - return new DynamicPartitionPruningFactInRightWithExchangeAndCalcRule(this); - } - - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalRel - .class) - .anyInputs(), - r -> - r.operand( - BatchPhysicalExchange - .class) - .oneInput( - e -> - e.operand( - BatchPhysicalCalc - .class) - .oneInput( - f -> - f.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs())))) - .as( - DynamicPartitionPruningFactInRightWithExchangeAndCalcRule.Config - .class); - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, false); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalRel dimSide = call.rel(1); - final BatchPhysicalExchange exchange = call.rel(2); - final BatchPhysicalCalc factCalc = call.rel(3); - final BatchPhysicalTableSourceScan factScan = call.rel(4); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, false); - final BatchPhysicalCalc newCalc = - (BatchPhysicalCalc) - factCalc.copy( - factCalc.getTraitSet(), newFactScan, factCalc.getProgram()); - final BatchPhysicalExchange newExchange = - (BatchPhysicalExchange) - exchange.copy( - exchange.getTraitSet(), Collections.singletonList(newCalc)); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(dimSide, newExchange)); - call.transformTo(newJoin); - } - } - - /** - * Dynamic filtering pattern with exchange node and calc node in fact side while fact side in - * join left. - */ - protected static class DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule - extends DynamicPartitionPruningRule { - - public DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule(RelRule.Config config) { - super(config); - } - - /** Config. */ - public interface Config extends RelRule.Config { - @Override - default DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule toRule() { - return new DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule(this); - } - - Config DEFAULT = - EMPTY.withOperandSupplier( - b0 -> - b0.operand(BatchPhysicalJoinBase.class) - .inputs( - l -> - l.operand( - BatchPhysicalExchange - .class) - .oneInput( - e -> - e.operand( - BatchPhysicalCalc - .class) - .oneInput( - f -> - f.operand( - BatchPhysicalTableSourceScan - .class) - .noInputs())), - r -> - r.operand( - BatchPhysicalRel - .class) - .anyInputs())) - .as( - DynamicPartitionPruningFactInLeftWithExchangeAndCalcRule.Config - .class); - } - - @Override - public boolean matches(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - return DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join, true); - } - - @Override - public void onMatch(RelOptRuleCall call) { - final BatchPhysicalJoinBase join = call.rel(0); - final BatchPhysicalExchange exchange = call.rel(1); - final BatchPhysicalCalc factCalc = call.rel(2); - final BatchPhysicalTableSourceScan factScan = call.rel(3); - final BatchPhysicalRel dimSide = call.rel(4); - - final BatchPhysicalDynamicFilteringTableSourceScan newFactScan = - createDynamicFilteringTableSourceScan(factScan, dimSide, join, factCalc, true); - final BatchPhysicalCalc newCalc = - (BatchPhysicalCalc) - factCalc.copy( - factCalc.getTraitSet(), newFactScan, factCalc.getProgram()); - final BatchPhysicalExchange newExchange = - (BatchPhysicalExchange) - exchange.copy( - exchange.getTraitSet(), Collections.singletonList(newCalc)); - final Join newJoin = join.copy(join.getTraitSet(), Arrays.asList(newExchange, dimSide)); - call.transformTo(newJoin); - } - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java index 9f2649cdf8bf3..ab3b11b565802 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java @@ -19,18 +19,22 @@ package org.apache.flink.table.planner.utils; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.connectors.TransformationScanProvider; import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringDataCollector; import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; @@ -45,6 +49,8 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; @@ -52,6 +58,8 @@ import org.apache.calcite.util.ImmutableIntList; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -60,111 +68,79 @@ public class DynamicPartitionPruningUtils { /** - * For the input join node, judge whether the join left side and join right side meet the - * requirements of dynamic partition pruning. Fact side in left or right join is not clear. + * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we + * need not consider the join keys in dim side, which already deal by dynamic partition pruning + * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if + * changed, this RelNode is not dim side. */ - public static boolean supportDynamicPartitionPruning(Join join) { - return supportDynamicPartitionPruning(join, true) - || supportDynamicPartitionPruning(join, false); + public static boolean isDimSide(RelNode rel) { + DppDimSideFactors dimSideFactors = new DppDimSideFactors(); + visitDimSide(rel, dimSideFactors); + return dimSideFactors.isDimSide(); } - /** - * For the input join node, judge whether the join left side and join right side meet the - * requirements of dynamic partition pruning. Fact side in left or right is clear. If meets the - * requirements, return true. - */ - public static boolean supportDynamicPartitionPruning(Join join, boolean factInLeft) { - if (!ShortcutUtils.unwrapContext(join) - .getTableConfig() - .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { - return false; - } - // Now dynamic partition pruning supports left/right join, inner and semi join. but now semi - // join can not join reorder. - if (join.getJoinType() == JoinRelType.LEFT) { - if (factInLeft) { - return false; - } - } else if (join.getJoinType() == JoinRelType.RIGHT) { - if (!factInLeft) { - return false; - } - } else if (join.getJoinType() != JoinRelType.INNER - && join.getJoinType() != JoinRelType.SEMI) { - return false; + public static List getSuitableDynamicFilteringFieldsInFactSide( + DynamicTableSource tableSource, List candidateFields) { + List acceptedFilterFields = + ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields(); + if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) { + return new ArrayList<>(); } - JoinInfo joinInfo = join.analyzeCondition(); - if (joinInfo.leftKeys.isEmpty()) { - return false; + List suitableFields = new ArrayList<>(); + // If candidateField not in acceptedFilterFields means dpp rule will not be matched, + // because we can not prune any partitions according to non-accepted filter fields + // provided by partition table source. + for (String candidateField : candidateFields) { + if (acceptedFilterFields.contains(candidateField)) { + suitableFields.add(candidateField); + } } - RelNode left = join.getLeft(); - RelNode right = join.getRight(); - - // TODO Now fact side and dim side don't support many complex patterns, like join inside - // fact/dim side, agg inside fact/dim side etc. which will support next. - return factInLeft - ? isDynamicPartitionPruningPattern(left, right, joinInfo.leftKeys) - : isDynamicPartitionPruningPattern(right, left, joinInfo.rightKeys); - } - private static boolean isDynamicPartitionPruningPattern( - RelNode factSide, RelNode dimSide, ImmutableIntList factSideJoinKey) { - return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey); + return suitableFields; } - /** make a dpp fact side factor to recurrence in fact side. */ - private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) { + public static Tuple2 canConvertAndConvertDppFactSide( + RelNode rel, + ImmutableIntList joinKeys, + RelNode dimSide, + ImmutableIntList dimSideJoinKey, + Join join) { DppFactSideFactors factSideFactors = new DppFactSideFactors(); - visitFactSide(rel, factSideFactors, joinKeys); - return factSideFactors.isFactSide(); + RelNode newRel = + convertDppFactSide(rel, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join); + return Tuple2.of(factSideFactors.isChanged, newRel); } - /** - * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we - * need not consider the join keys in dim side, which already deal by dynamic partition pruning - * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if - * changed, this RelNode is not dim side. - */ - private static boolean isDimSide(RelNode rel) { - DppDimSideFactors dimSideFactors = new DppDimSideFactors(); - visitDimSide(rel, dimSideFactors); - return dimSideFactors.isDimSide(); - } - - /** - * Visit fact side to judge whether fact side has partition table, partition table source meets - * the condition of dpp table source and dynamic filtering keys changed in fact side. - */ - private static void visitFactSide( - RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList joinKeys) { + public static RelNode convertDppFactSide( + RelNode rel, + ImmutableIntList joinKeys, + RelNode dimSide, + ImmutableIntList dimSideJoinKey, + DppFactSideFactors factSideFactors, + Join join) { if (rel instanceof TableScan) { TableScan scan = (TableScan) rel; if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) { // rule applied - factSideFactors.isSuitableFactScanSource = false; - return; + return rel; } TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); if (tableSourceTable == null) { - factSideFactors.isSuitableFactScanSource = false; - return; + return rel; } CatalogTable catalogTable = tableSourceTable.contextResolvedTable().getResolvedTable(); List partitionKeys = catalogTable.getPartitionKeys(); if (partitionKeys.isEmpty()) { - factSideFactors.isSuitableFactScanSource = false; - return; + return rel; } DynamicTableSource tableSource = tableSourceTable.tableSource(); if (!(tableSource instanceof SupportsDynamicFiltering) || !(tableSource instanceof ScanTableSource)) { - factSideFactors.isSuitableFactScanSource = false; - return; + return rel; } if (!isNewSource((ScanTableSource) tableSource)) { - factSideFactors.isSuitableFactScanSource = false; - return; + return rel; } List candidateFields = @@ -172,26 +148,73 @@ private static void visitFactSide( .map(i -> scan.getRowType().getFieldNames().get(i)) .collect(Collectors.toList()); if (candidateFields.isEmpty()) { - factSideFactors.isSuitableFactScanSource = false; - return; + return rel; } - factSideFactors.isSuitableFactScanSource = - !getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields) - .isEmpty(); - } else if (rel instanceof HepRelVertex) { - visitFactSide(((HepRelVertex) rel).getCurrentRel(), factSideFactors, joinKeys); + List acceptedFilterFields = + getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields); + + if (acceptedFilterFields.size() == 0) { + return rel; + } + + // Apply suitable accepted filter fields to source. + ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields); + + List acceptedFieldIndices = + acceptedFilterFields.stream() + .map(f -> scan.getRowType().getFieldNames().indexOf(f)) + .collect(Collectors.toList()); + List dynamicFilteringFieldIndices = new ArrayList<>(); + for (int i = 0; i < joinKeys.size(); ++i) { + if (acceptedFieldIndices.contains(joinKeys.get(i))) { + dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i)); + } + } + + BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector = + createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices); + + factSideFactors.isChanged = true; + + if (!isSuitableJoin(join)) { + return rel; + } + + return new BatchPhysicalDynamicFilteringTableSourceScan( + scan.getCluster(), + scan.getTraitSet(), + scan.getHints(), + tableSourceTable, + dynamicFilteringDataCollector); } else if (rel instanceof Exchange || rel instanceof Filter) { - visitFactSide(rel.getInput(0), factSideFactors, joinKeys); + return rel.copy( + rel.getTraitSet(), + Collections.singletonList( + convertDppFactSide( + rel.getInput(0), + joinKeys, + dimSide, + dimSideJoinKey, + factSideFactors, + join))); } else if (rel instanceof Project) { List projects = ((Project) rel).getProjects(); ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); if (inputJoinKeys.isEmpty()) { - factSideFactors.isSuitableJoinKey = false; - return; + return rel; } - visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys); + return rel.copy( + rel.getTraitSet(), + Collections.singletonList( + convertDppFactSide( + rel.getInput(0), + inputJoinKeys, + dimSide, + dimSideJoinKey, + factSideFactors, + join))); } else if (rel instanceof Calc) { Calc calc = (Calc) rel; RexProgram program = calc.getProgram(); @@ -201,33 +224,128 @@ private static void visitFactSide( .collect(Collectors.toList()); ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); if (inputJoinKeys.isEmpty()) { - factSideFactors.isSuitableJoinKey = false; - return; + return rel; } - visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys); + return rel.copy( + rel.getTraitSet(), + Collections.singletonList( + convertDppFactSide( + rel.getInput(0), + inputJoinKeys, + dimSide, + dimSideJoinKey, + factSideFactors, + join))); + } else if (rel instanceof Join) { + Join currentJoin = (Join) rel; + return currentJoin.copy( + currentJoin.getTraitSet(), + Arrays.asList( + convertDppFactSide( + currentJoin.getLeft(), + getInputIndices(currentJoin, joinKeys, true), + dimSide, + dimSideJoinKey, + factSideFactors, + join), + convertDppFactSide( + currentJoin.getRight(), + getInputIndices(currentJoin, joinKeys, false), + dimSide, + dimSideJoinKey, + factSideFactors, + join))); + } else if (rel instanceof Union) { + Union union = (Union) rel; + List newInputs = new ArrayList<>(); + for (RelNode input : union.getInputs()) { + newInputs.add( + convertDppFactSide( + input, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join)); + } + return union.copy(union.getTraitSet(), newInputs, union.all); + } else if (rel instanceof BatchPhysicalGroupAggregateBase) { + BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel; + List newInputs = new ArrayList<>(); + for (RelNode input : agg.getInputs()) { + newInputs.add( + convertDppFactSide( + input, + getInputIndices(agg, input, joinKeys), + dimSide, + dimSideJoinKey, + factSideFactors, + join)); + } + + return agg.copy(agg.getTraitSet(), newInputs); } + + return rel; } - public static List getSuitableDynamicFilteringFieldsInFactSide( - DynamicTableSource tableSource, List candidateFields) { - List acceptedFilterFields = - ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields(); - if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) { - return new ArrayList<>(); + private static boolean isSuitableJoin(Join join) { + // Now dynamic partition pruning supports left/right join, inner and semi + // join. but now semi + // join can not join reorder. + if (join.getJoinType() != JoinRelType.INNER + && join.getJoinType() != JoinRelType.SEMI + && join.getJoinType() != JoinRelType.LEFT + && join.getJoinType() != JoinRelType.RIGHT) { + return false; } - List suitableFields = new ArrayList<>(); - // If candidateField not in acceptedFilterFields means dpp rule will not be matched, - // because we can not prune any partitions according to non-accepted filter fields - // provided by partition table source. - for (String candidateField : candidateFields) { - if (acceptedFilterFields.contains(candidateField)) { - suitableFields.add(candidateField); + JoinInfo joinInfo = join.analyzeCondition(); + return !joinInfo.leftKeys.isEmpty(); + } + + private static ImmutableIntList getInputIndices( + BatchPhysicalGroupAggregateBase agg, RelNode aggInput, ImmutableIntList joinKeys) { + int[] indexMap = new int[aggInput.getRowType().getFieldCount()]; + int[] grouping = agg.grouping(); + if (grouping.length == 0) { + return joinKeys; + } + int beginIndex = grouping[0] - 1; + for (int i = 0; i < indexMap.length; i++) { + indexMap[i] = i; + } + + System.arraycopy(grouping, 0, indexMap, 0, grouping.length); + if (beginIndex >= 0) { + for (int i = 0; i <= beginIndex; i++) { + indexMap[grouping.length + i] = i; } } + List indices = new ArrayList<>(); + for (int k : joinKeys) { + indices.add(indexMap[k]); + } + return ImmutableIntList.copyOf(indices); + } - return suitableFields; + private static BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector( + RelNode dimSide, List dynamicFilteringFieldIndices) { + final RelDataType outputType = + ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory()) + .projectStructType( + dimSide.getRowType(), + dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray()); + return new BatchPhysicalDynamicFilteringDataCollector( + dimSide.getCluster(), + dimSide.getTraitSet(), + ignoreExchange(dimSide), + outputType, + dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray()); + } + + private static RelNode ignoreExchange(RelNode dimSide) { + if (dimSide instanceof Exchange) { + return dimSide.getInput(0); + } else { + return dimSide; + } } /** @@ -258,7 +376,14 @@ private static void visitDimSide(RelNode rel, DppDimSideFactors dimSideFactors) } } CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable(); - dimSideFactors.hasNonPartitionedScan = !catalogTable.isPartitioned(); + if (catalogTable.isPartitioned()) { + dimSideFactors.hasPartitionedScan = true; + return; + } + + if (!dimSideFactors.setTables(table.contextResolvedTable())) { + return; + } } else if (rel instanceof HepRelVertex) { visitDimSide(((HepRelVertex) rel).getCurrentRel(), dimSideFactors); } else if (rel instanceof Exchange || rel instanceof Project) { @@ -275,6 +400,19 @@ && isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) { dimSideFactors.hasFilter = true; } visitDimSide(rel.getInput(0), dimSideFactors); + } else if (rel instanceof Join) { + Join join = (Join) rel; + visitDimSide(join.getLeft(), dimSideFactors); + visitDimSide(join.getRight(), dimSideFactors); + } else if (rel instanceof BatchPhysicalGroupAggregateBase) { + for (RelNode input : rel.getInputs()) { + visitDimSide(input, dimSideFactors); + } + } else if (rel instanceof Union) { + Union union = (Union) rel; + for (RelNode input : union.getInputs()) { + visitDimSide(input, dimSideFactors); + } } } @@ -354,24 +492,51 @@ private static ImmutableIntList getInputIndices( return ImmutableIntList.copyOf(indices); } - /** This class is used to remember dim side messages while recurring in dim side. */ + private static ImmutableIntList getInputIndices( + Join join, ImmutableIntList joinKeys, boolean isLeft) { + List indices = new ArrayList<>(); + RelNode left = join.getLeft(); + int leftSize = left.getRowType().getFieldCount(); + for (int k : joinKeys) { + if (isLeft) { + if (k < leftSize) { + indices.add(k); + } + } else { + if (k >= leftSize) { + indices.add(k - leftSize); + } + } + } + return ImmutableIntList.copyOf(indices); + } + private static class DppDimSideFactors { private boolean hasFilter; - private boolean hasNonPartitionedScan; + private boolean hasPartitionedScan; + private final List tables = new ArrayList<>(); + + public boolean setTables(ContextResolvedTable catalogTable) { + if (tables.size() == 0) { + tables.add(catalogTable); + } else { + for (ContextResolvedTable thisTable : tables) { + if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { + return false; + } + } + } + return true; + } public boolean isDimSide() { - return hasFilter && hasNonPartitionedScan; + return hasFilter && !hasPartitionedScan && tables.size() == 1; } } /** This class is used to remember fact side messages while recurring in fact side. */ private static class DppFactSideFactors { - private boolean isSuitableFactScanSource; // If join key is not changed in fact side, this value is always true. - private boolean isSuitableJoinKey = true; - - public boolean isFactSide() { - return isSuitableFactScanSource && isSuitableJoinKey; - } + private boolean isChanged; } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala index b20868b183597..f5dec96af6801 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala @@ -26,7 +26,6 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch._ import org.apache.flink.table.planner.plan.stats.ValueInterval import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, SortUtil} import org.apache.flink.table.planner.plan.utils.AggregateUtil.{hasTimeIntervalType, toLong} -import org.apache.flink.table.planner.utils.DynamicPartitionPruningUtils import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.calcite.adapter.enumerable.EnumerableLimit @@ -336,22 +335,11 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun fmq.getSelectivity(joinWithOnlyEquiPred, nonEquiPred) } - // Currently, join-reorder is before dynamic partition pruning rewrite. This factor - // is adding to adjust join cost for these join node which meets dynamic partition - // pruning pattern. Try best to reorder the fact table and fact table together to - // make DPP succeed. - val dynamicPartitionPruningFactor = - if (DynamicPartitionPruningUtils.supportDynamicPartitionPruning(join)) { - 0.0001 - } else { - 1 - } - if (leftNdv != null && rightNdv != null) { // selectivity of equi part is 1 / Max(leftNdv, rightNdv) val selectivityOfEquiPred = Math.min(1d, 1d / Math.max(leftNdv, rightNdv)) return leftRowCount * rightRowCount * selectivityOfEquiPred * - selectivityOfNonEquiPred * dynamicPartitionPruningFactor + selectivityOfNonEquiPred } val leftKeysAreUnique = fmq.areColumnsUnique(leftChild, leftKeySet) @@ -369,14 +357,14 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun } else { leftRowCount * selectivityOfNonEquiPred } - return outputRowCount * dynamicPartitionPruningFactor + return outputRowCount } // if joinCondition has no ndv stats and no uniqueKeys stats, // rowCount = (leftRowCount + rightRowCount) * join condition selectivity val crossJoin = copyJoinWithNewCondition(join, rexBuilder.makeLiteral(true)) val selectivity = fmq.getSelectivity(crossJoin, condition) - (leftRowCount + rightRowCount) * selectivity * dynamicPartitionPruningFactor + (leftRowCount + rightRowCount) * selectivity } private def copyJoinWithNewCondition(join: Join, newCondition: RexNode): Join = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala index f64197c383661..06125b3a7bdb8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala @@ -40,6 +40,7 @@ object FlinkBatchProgram { val TIME_INDICATOR = "time_indicator" val PHYSICAL = "physical" val PHYSICAL_REWRITE = "physical_rewrite" + val DYNAMIC_PARTITION_PRUNING_CONVERTER = "dynamic_partition_pruning_converter" def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = { val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]() @@ -288,6 +289,11 @@ object FlinkBatchProgram { .build() ) + // convert dynamic partition pruning scan source + chainedProgram.addLast( + DYNAMIC_PARTITION_PRUNING_CONVERTER, + new FlinkDynamicPartitionPruningConverterProgram) + chainedProgram } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala index 005c6bfe7dcba..8b871a63d7bd3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala @@ -59,6 +59,10 @@ class FlinkChainedProgram[OC <: FlinkOptimizeContext] val result = program.optimize(input, context) val end = System.currentTimeMillis() + println( + s"optimize $name cost ${end - start} ms.\n" + + s"optimize result: \n${FlinkRelOptUtil.toString(result)}") + if (LOG.isDebugEnabled) { LOG.debug( s"optimize $name cost ${end - start} ms.\n" + diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 39fa34cbaf689..6551d936c586e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -433,18 +433,13 @@ object FlinkBatchRuleSets { /** RuleSet to optimize plans after batch exec execution. */ val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList( - (RuleSets - .ofList( - EnforceLocalHashAggRule.INSTANCE, - EnforceLocalSortAggRule.INSTANCE, - PushLocalHashAggIntoScanRule.INSTANCE, - PushLocalHashAggWithCalcIntoScanRule.INSTANCE, - PushLocalSortAggIntoScanRule.INSTANCE, - PushLocalSortAggWithSortIntoScanRule.INSTANCE, - PushLocalSortAggWithCalcIntoScanRule.INSTANCE, - PushLocalSortAggWithSortAndCalcIntoScanRule.INSTANCE - ) - .asScala ++ - DynamicPartitionPruningRule.DYNAMIC_PARTITION_PRUNING_RULES.asScala).asJava + EnforceLocalHashAggRule.INSTANCE, + EnforceLocalSortAggRule.INSTANCE, + PushLocalHashAggIntoScanRule.INSTANCE, + PushLocalHashAggWithCalcIntoScanRule.INSTANCE, + PushLocalSortAggIntoScanRule.INSTANCE, + PushLocalSortAggWithSortIntoScanRule.INSTANCE, + PushLocalSortAggWithCalcIntoScanRule.INSTANCE, + PushLocalSortAggWithSortAndCalcIntoScanRule.INSTANCE ) } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.java similarity index 86% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.java index 0a7f0c7bc9d4e..6252a0891d9e2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.plan.rules.physical.batch; +package org.apache.flink.table.planner.plan.optimize.program; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.OptimizerConfigOptions; @@ -40,10 +40,10 @@ import java.util.Map; /** - * Test for rules that extend {@link DynamicPartitionPruningRule} to create {@link + * Tests for rules that extend {@link FlinkDynamicPartitionPruningConverterProgram} to create {@link * org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan}. */ -public class DynamicPartitionPruningRuleTest extends TableTestBase { +public class DynamicPartitionPruningConverterProgramTest extends TableTestBase { private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); private final TestValuesCatalog catalog = new TestValuesCatalog("testCatalog", "test_database", true); @@ -381,7 +381,7 @@ public void testMultiJoin() { @Test public void testComplexDimSideWithJoinInDimSide() { - // Dim side contains join will not succeed in this version, it will improve later. + // Dpp will success util.tableEnv() .executeSql( "CREATE TABLE sales (\n" @@ -434,7 +434,100 @@ public void testComplexDimSideWithAggInDimSide() { util.verifyRelPlan(query); } - // --------------------------dpp factor test --------------------------------------------- + @Test + public void testDppWithoutJoinReorder() { + // Dpp will success + String ddl = + "CREATE TABLE test_database.item (\n" + + " id BIGINT,\n" + + " amount BIGINT,\n" + + " price BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + TableConfig tableConfig = util.tableEnv().getConfig(); + // Join reorder don't open. + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false); + + String query = + "Select * from fact_part, item, dim" + + " where fact_part.fact_date_sk = dim.dim_date_sk" + + " and fact_part.id = item.id" + + " and dim.id = item.id " + + " and dim.price < 500 and dim.price > 300"; + util.verifyRelPlan(query); + } + + @Test + public void testDppWithSubQuery() { + // Dpp will success + String ddl = + "CREATE TABLE test_database.item (\n" + + " id BIGINT,\n" + + " amount BIGINT,\n" + + " price BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + TableConfig tableConfig = util.tableEnv().getConfig(); + // Join reorder don't open. + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, false); + + String query = + "Select * from fact_part, item, dim" + + " where fact_part.id = item.id" + + " and dim.price in (select price from dim where amount = (select amount from dim where amount = 2000))" + + " and fact_part.fact_date_sk = dim.dim_date_sk"; + util.verifyRelPlan(query); + } + + @Test + public void testDppWithUnionInFactSide() { + // Dpp will success + String ddl = + "CREATE TABLE test_database.item (\n" + + " id BIGINT,\n" + + " amount BIGINT,\n" + + " price BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + + String query = + "Select * from (select id, fact_date_sk, amount + 1 as amount1 from fact_part where price = 1 union all " + + "select id, fact_date_sk, amount + 1 from fact_part where price = 2) fact_part2, item, dim" + + " where fact_part2.fact_date_sk = dim.dim_date_sk" + + " and fact_part2.id = item.id" + + " and dim.price < 500 and dim.price > 300"; + util.verifyRelPlan(query); + } + + @Test + public void testDppWithAggInFactSide() { + // Dpp will success + String ddl = + "CREATE TABLE test_database.item (\n" + + " id BIGINT,\n" + + " amount BIGINT,\n" + + " price BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + + String query = + "Select * from (Select fact_date_sk, item.amount, sum(fact_part.price) from fact_part " + + "join item on fact_part.id = item.id group by fact_date_sk, item.amount) t1 " + + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 "; + util.verifyRelPlan(query); + } @Test public void testDPPFactorToReorderTableWithoutStats() { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.xml similarity index 79% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.xml index b9d9dfc726b90..60100e3634233 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.xml @@ -91,7 +91,13 @@ LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], d (price, 300)]) +: : +- TableSourceScan(table=[[testCatalog, test_database, sales, filter=[]]], fields=[id, amount, price]) +: +- Exchange(distribution=[hash[id]]) +: +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price]) +- Exchange(distribution=[hash[dim_date_sk]]) +- HashJoin(joinType=[InnerJoin], where=[=(id0, id1)], select=[id, male, amount, price, dim_date_sk, id0, amount0, price0, id1, amount1, price1], build=[left]) :- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, male, amount, price, dim_date_sk, id0, amount0, price0], build=[right]) @@ -396,6 +413,169 @@ HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, pri +- Exchange(distribution=[hash[id]]) +- Calc(select=[id, male, amount, price, dim_date_sk], where=[AND(SEARCH(price, Sarg[(300..500)]), IS NOT NULL(id))]) +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +]]> + + + + + 200 and dim.price < 500]]> + + + ($8, 200), <($3, 500))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) +]]> + + + (price, 200)]) + +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk]) + +- DynamicFilteringDataCollector(fields=[dim_date_sk]) + +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) + +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +]]> + + + + + 300 ]]> + + + ($6, 300))]) + +- LogicalJoin(condition=[=($0, $7)], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) + : +- LogicalProject(fact_date_sk=[$4], amount=[$6], price=[$3]) + : +- LogicalJoin(condition=[=($0, $5)], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + + + + + + + + 300]]> + + + ($11, 300))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + + + + + + + + + + + + + + @@ -560,17 +740,17 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], selec ]]> - + - 200 and dim.price < 500]]> + ($8, 200), <($3, 500))]) ++- LogicalFilter(condition=[AND(=($9, $4), <($3, 500))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalTableScan(table=[[testCatalog, test_database, dim]]) - +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, legacy_source]]) ]]> @@ -580,11 +760,53 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +- Exchange(distribution=[hash[fact_date_sk]]) - +- Calc(select=[id, name, amount, price, fact_date_sk], where=[>(price, 200)]) - +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk]) - +- DynamicFilteringDataCollector(fields=[dim_date_sk]) - +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) - +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) + +- TableSourceScan(table=[[testCatalog, test_database, legacy_source]], fields=[id, name, amount, price, fact_date_sk]) +]]> + + + + + 300]]> + + + ($9, 300))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalUnion(all=[true]) + : : :- LogicalProject(id=[$0], fact_date_sk=[$4], amount1=[+($2, 1)]) + : : : +- LogicalFilter(condition=[=($3, 1)]) + : : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalProject(id=[$0], fact_date_sk=[$4], EXPR$2=[+($2, 1)]) + : : +- LogicalFilter(condition=[=($3, 2)]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + + + @@ -640,30 +862,6 @@ NestedLoopJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =( +- DynamicFilteringDataCollector(fields=[dim_date_sk]) +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) -]]> - - - - - - - - - - - From 59480d7372997f253512a7f70309fb977d63bf3e Mon Sep 17 00:00:00 2001 From: "zhengyunhong.zyh" <337361684@qq.com> Date: Mon, 26 Dec 2022 20:44:42 +0800 Subject: [PATCH 2/5] Some minor changes --- ... FlinkDynamicPartitionPruningProgram.java} | 29 ++- .../utils/DynamicPartitionPruningUtils.java | 95 ++++----- .../optimize/program/FlinkBatchProgram.scala | 6 +- .../program/FlinkChainedProgram.scala | 4 - ...> DynamicPartitionPruningProgramTest.java} | 30 ++- ...=> DynamicPartitionPruningProgramTest.xml} | 187 +++++++++--------- 6 files changed, 154 insertions(+), 197 deletions(-) rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/{FlinkDynamicPartitionPruningConverterProgram.java => FlinkDynamicPartitionPruningProgram.java} (93%) rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/{DynamicPartitionPruningConverterProgramTest.java => DynamicPartitionPruningProgramTest.java} (97%) rename flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/{DynamicPartitionPruningConverterProgramTest.xml => DynamicPartitionPruningProgramTest.xml} (97%) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java similarity index 93% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java index 4ed9b1c8f5b43..52590d32731ad 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningConverterProgram.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java @@ -65,7 +65,7 @@ * +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key]) * } */ -public class FlinkDynamicPartitionPruningConverterProgram +public class FlinkDynamicPartitionPruningProgram implements FlinkOptimizeProgram { @Override @@ -74,7 +74,15 @@ public RelNode optimize(RelNode root, BatchOptimizeContext context) { new DefaultRelShuttle() { @Override public RelNode visit(RelNode rel) { - if (!(rel instanceof Join)) { + if (!ShortcutUtils.unwrapContext(rel) + .getTableConfig() + .get( + OptimizerConfigOptions + .TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { + return rel; + } + if (!(rel instanceof Join) + || !DynamicPartitionPruningUtils.isSuitableJoin((Join) rel)) { List newInputs = new ArrayList<>(); for (RelNode input : rel.getInputs()) { RelNode newInput = input.accept(this); @@ -83,14 +91,6 @@ public RelNode visit(RelNode rel) { return rel.copy(rel.getTraitSet(), newInputs); } Join join = (Join) rel; - if (!ShortcutUtils.unwrapContext(join) - .getTableConfig() - .get( - OptimizerConfigOptions - .TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { - return join; - } - JoinInfo joinInfo = join.analyzeCondition(); RelNode leftSide = join.getLeft(); RelNode rightSide = join.getRight(); @@ -104,16 +104,14 @@ public RelNode visit(RelNode rel) { rightSide, joinInfo.rightKeys, leftSide, - joinInfo.leftKeys, - join); + joinInfo.leftKeys); changed = relTuple.f0; newJoin = join.copy( join.getTraitSet(), Arrays.asList(leftSide, relTuple.f1.accept(this))); } - } - if (DynamicPartitionPruningUtils.isDimSide(rightSide)) { + } else if (DynamicPartitionPruningUtils.isDimSide(rightSide)) { if (join.getJoinType() != JoinRelType.LEFT) { Tuple2 relTuple = DynamicPartitionPruningUtils @@ -121,8 +119,7 @@ public RelNode visit(RelNode rel) { leftSide, joinInfo.leftKeys, rightSide, - joinInfo.rightKeys, - join); + joinInfo.rightKeys); changed = relTuple.f0; newJoin = join.copy( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java index ab3b11b565802..ef8506a7c6fdd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java @@ -79,7 +79,18 @@ public static boolean isDimSide(RelNode rel) { return dimSideFactors.isDimSide(); } - public static List getSuitableDynamicFilteringFieldsInFactSide( + public static Tuple2 canConvertAndConvertDppFactSide( + RelNode rel, + ImmutableIntList joinKeys, + RelNode dimSide, + ImmutableIntList dimSideJoinKey) { + DppFactSideFactors factSideFactors = new DppFactSideFactors(); + RelNode newRel = + convertDppFactSide(rel, joinKeys, dimSide, dimSideJoinKey, factSideFactors); + return Tuple2.of(factSideFactors.isChanged, newRel); + } + + private static List getSuitableDynamicFilteringFieldsInFactSide( DynamicTableSource tableSource, List candidateFields) { List acceptedFilterFields = ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields(); @@ -100,25 +111,12 @@ public static List getSuitableDynamicFilteringFieldsInFactSide( return suitableFields; } - public static Tuple2 canConvertAndConvertDppFactSide( + private static RelNode convertDppFactSide( RelNode rel, ImmutableIntList joinKeys, RelNode dimSide, ImmutableIntList dimSideJoinKey, - Join join) { - DppFactSideFactors factSideFactors = new DppFactSideFactors(); - RelNode newRel = - convertDppFactSide(rel, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join); - return Tuple2.of(factSideFactors.isChanged, newRel); - } - - public static RelNode convertDppFactSide( - RelNode rel, - ImmutableIntList joinKeys, - RelNode dimSide, - ImmutableIntList dimSideJoinKey, - DppFactSideFactors factSideFactors, - Join join) { + DppFactSideFactors factSideFactors) { if (rel instanceof TableScan) { TableScan scan = (TableScan) rel; if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) { @@ -176,11 +174,6 @@ public static RelNode convertDppFactSide( createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices); factSideFactors.isChanged = true; - - if (!isSuitableJoin(join)) { - return rel; - } - return new BatchPhysicalDynamicFilteringTableSourceScan( scan.getCluster(), scan.getTraitSet(), @@ -196,8 +189,7 @@ public static RelNode convertDppFactSide( joinKeys, dimSide, dimSideJoinKey, - factSideFactors, - join))); + factSideFactors))); } else if (rel instanceof Project) { List projects = ((Project) rel).getProjects(); ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); @@ -213,8 +205,7 @@ public static RelNode convertDppFactSide( inputJoinKeys, dimSide, dimSideJoinKey, - factSideFactors, - join))); + factSideFactors))); } else if (rel instanceof Calc) { Calc calc = (Calc) rel; RexProgram program = calc.getProgram(); @@ -235,8 +226,7 @@ public static RelNode convertDppFactSide( inputJoinKeys, dimSide, dimSideJoinKey, - factSideFactors, - join))); + factSideFactors))); } else if (rel instanceof Join) { Join currentJoin = (Join) rel; return currentJoin.copy( @@ -247,48 +237,41 @@ public static RelNode convertDppFactSide( getInputIndices(currentJoin, joinKeys, true), dimSide, dimSideJoinKey, - factSideFactors, - join), + factSideFactors), convertDppFactSide( currentJoin.getRight(), getInputIndices(currentJoin, joinKeys, false), dimSide, dimSideJoinKey, - factSideFactors, - join))); + factSideFactors))); } else if (rel instanceof Union) { Union union = (Union) rel; List newInputs = new ArrayList<>(); for (RelNode input : union.getInputs()) { newInputs.add( convertDppFactSide( - input, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join)); + input, joinKeys, dimSide, dimSideJoinKey, factSideFactors)); } return union.copy(union.getTraitSet(), newInputs, union.all); } else if (rel instanceof BatchPhysicalGroupAggregateBase) { BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel; - List newInputs = new ArrayList<>(); - for (RelNode input : agg.getInputs()) { - newInputs.add( - convertDppFactSide( - input, - getInputIndices(agg, input, joinKeys), - dimSide, - dimSideJoinKey, - factSideFactors, - join)); - } - - return agg.copy(agg.getTraitSet(), newInputs); + RelNode input = agg.getInput(); + RelNode convertedRel = + convertDppFactSide( + input, + getInputIndices(agg, input, joinKeys), + dimSide, + dimSideJoinKey, + factSideFactors); + return agg.copy(agg.getTraitSet(), Collections.singletonList(convertedRel)); } return rel; } - private static boolean isSuitableJoin(Join join) { + public static boolean isSuitableJoin(Join join) { // Now dynamic partition pruning supports left/right join, inner and semi - // join. but now semi - // join can not join reorder. + // join. but now semi join can not join reorder. if (join.getJoinType() != JoinRelType.INNER && join.getJoinType() != JoinRelType.SEMI && join.getJoinType() != JoinRelType.LEFT @@ -381,9 +364,8 @@ private static void visitDimSide(RelNode rel, DppDimSideFactors dimSideFactors) return; } - if (!dimSideFactors.setTables(table.contextResolvedTable())) { - return; - } + // To ensure there is only one source on the dim side. + dimSideFactors.setTables(table.contextResolvedTable()); } else if (rel instanceof HepRelVertex) { visitDimSide(((HepRelVertex) rel).getCurrentRel(), dimSideFactors); } else if (rel instanceof Exchange || rel instanceof Project) { @@ -405,9 +387,7 @@ && isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) { visitDimSide(join.getLeft(), dimSideFactors); visitDimSide(join.getRight(), dimSideFactors); } else if (rel instanceof BatchPhysicalGroupAggregateBase) { - for (RelNode input : rel.getInputs()) { - visitDimSide(input, dimSideFactors); - } + visitDimSide(((BatchPhysicalGroupAggregateBase) rel).getInput(), dimSideFactors); } else if (rel instanceof Union) { Union union = (Union) rel; for (RelNode input : union.getInputs()) { @@ -516,17 +496,16 @@ private static class DppDimSideFactors { private boolean hasPartitionedScan; private final List tables = new ArrayList<>(); - public boolean setTables(ContextResolvedTable catalogTable) { + public void setTables(ContextResolvedTable catalogTable) { if (tables.size() == 0) { tables.add(catalogTable); } else { - for (ContextResolvedTable thisTable : tables) { + for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { - return false; + tables.add(catalogTable); } } } - return true; } public boolean isDimSide() { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala index 06125b3a7bdb8..a1784867149ff 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala @@ -40,7 +40,7 @@ object FlinkBatchProgram { val TIME_INDICATOR = "time_indicator" val PHYSICAL = "physical" val PHYSICAL_REWRITE = "physical_rewrite" - val DYNAMIC_PARTITION_PRUNING_CONVERTER = "dynamic_partition_pruning_converter" + val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning" def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = { val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]() @@ -290,9 +290,7 @@ object FlinkBatchProgram { ) // convert dynamic partition pruning scan source - chainedProgram.addLast( - DYNAMIC_PARTITION_PRUNING_CONVERTER, - new FlinkDynamicPartitionPruningConverterProgram) + chainedProgram.addLast(DYNAMIC_PARTITION_PRUNING, new FlinkDynamicPartitionPruningProgram) chainedProgram } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala index 8b871a63d7bd3..005c6bfe7dcba 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgram.scala @@ -59,10 +59,6 @@ class FlinkChainedProgram[OC <: FlinkOptimizeContext] val result = program.optimize(input, context) val end = System.currentTimeMillis() - println( - s"optimize $name cost ${end - start} ms.\n" + - s"optimize result: \n${FlinkRelOptUtil.toString(result)}") - if (LOG.isDebugEnabled) { LOG.debug( s"optimize $name cost ${end - start} ms.\n" + diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java similarity index 97% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java index 6252a0891d9e2..01f5c6ef0574f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java @@ -40,10 +40,10 @@ import java.util.Map; /** - * Tests for rules that extend {@link FlinkDynamicPartitionPruningConverterProgram} to create {@link + * Tests for rules that extend {@link FlinkDynamicPartitionPruningProgram} to create {@link * org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringTableSourceScan}. */ -public class DynamicPartitionPruningConverterProgramTest extends TableTestBase { +public class DynamicPartitionPruningProgramTest extends TableTestBase { private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); private final TestValuesCatalog catalog = new TestValuesCatalog("testCatalog", "test_database", true); @@ -381,7 +381,7 @@ public void testMultiJoin() { @Test public void testComplexDimSideWithJoinInDimSide() { - // Dpp will success + // TODO, Dpp will not success with complex dim side. util.tableEnv() .executeSql( "CREATE TABLE sales (\n" @@ -487,7 +487,7 @@ public void testDppWithSubQuery() { @Test public void testDppWithUnionInFactSide() { - // Dpp will success + // Dpp will success. String ddl = "CREATE TABLE test_database.item (\n" + " id BIGINT,\n" @@ -530,10 +530,8 @@ public void testDppWithAggInFactSide() { } @Test - public void testDPPFactorToReorderTableWithoutStats() { - // While there are several joins, and fact table not adjacent to dim table directly. dynamic - // partition pruning factor will try best to reorder join relations to make fact table - // adjacent to dim table. + public void testDPPWithJoinReorderTableWithoutStats() { + // Dpp will success. String ddl = "CREATE TABLE test_database.item (\n" + " id BIGINT,\n" @@ -558,7 +556,7 @@ public void testDPPFactorToReorderTableWithoutStats() { } @Test - public void testDPPFactorToReorderTableWithStats() throws TableNotExistException { + public void testDPPWithJoinReorderTableWithStats() throws TableNotExistException { String ddl = "CREATE TABLE test_database.item (\n" + " id BIGINT,\n" @@ -611,8 +609,8 @@ public void testDPPFactorToReorderTableWithStats() throws TableNotExistException } @Test - public void testDPPFactorWithFactSideJoinKeyChanged() { - // If partition keys changed in fact side. DPP factor will not work. + public void testDPPWithFactSideJoinKeyChanged() { + // If partition keys changed in fact side. DPP factor will not success. String ddl = "CREATE TABLE test_database.item (\n" + " id BIGINT,\n" @@ -636,8 +634,8 @@ public void testDPPFactorWithFactSideJoinKeyChanged() { } @Test - public void testDPPFactorWithDimSideJoinKeyChanged() { - // Although partition keys changed in dim side. DPP factor will work. + public void testDPPWithDimSideJoinKeyChanged() { + // Although partition keys changed in dim side. DPP will success. String ddl = "CREATE TABLE test_database.item (\n" + " id BIGINT,\n" @@ -661,9 +659,9 @@ public void testDPPFactorWithDimSideJoinKeyChanged() { } @Test - public void testDPPFactorWithJoinKeysNotIncludePartitionKeys() { - // If join keys of partition table join with dim table not include partition keys, dpp - // factor will not be adjusted and dpp will not succeed. + public void testDPPWithJoinKeysNotIncludePartitionKeys() { + // If join keys of partition table join with dim table not include partition keys, dpp will + // not success. String ddl = "CREATE TABLE test_database.item (\n" + " id BIGINT,\n" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml similarity index 97% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.xml rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml index 60100e3634233..4c6757a7ca4ee 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningConverterProgramTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml @@ -128,18 +128,7 @@ LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], i (price, 300)]) -: : +- TableSourceScan(table=[[testCatalog, test_database, sales, filter=[]]], fields=[id, amount, price]) -: +- Exchange(distribution=[hash[id]]) -: +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price]) +: +- TableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk]) +- Exchange(distribution=[hash[dim_date_sk]]) +- HashJoin(joinType=[InnerJoin], where=[=(id0, id1)], select=[id, male, amount, price, dim_date_sk, id0, amount0, price0, id1, amount1, price1], build=[left]) :- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, male, amount, price, dim_date_sk, id0, amount0, price0], build=[right]) @@ -254,72 +243,74 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, ]]> - + - 300]]> + 200 and dim.price < 500]]> ($11, 300))]) +LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], id0=[$5], name=[$6], amount0=[$7], price0=[$8], fact_date_sk=[$9]) ++- LogicalFilter(condition=[AND(=($9, $4), >($8, 200), <($3, 500))]) +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalJoin(condition=[true], joinType=[inner]) - : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) - : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) - +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + :- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) ]]> (price, 200)]) + +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk]) + +- DynamicFilteringDataCollector(fields=[dim_date_sk]) + +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) + +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) ]]> - + - 300]]> + 300 ]]> ($11, 300))]) - +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalJoin(condition=[true], joinType=[inner]) - : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) - : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) +LogicalProject(fact_date_sk=[$0], amount=[$1], EXPR$2=[$2], id=[$3], male=[$4], amount0=[$5], price=[$6], dim_date_sk=[$7]) ++- LogicalFilter(condition=[AND(<($6, 500), >($6, 300))]) + +- LogicalJoin(condition=[=($0, $7)], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) + : +- LogicalProject(fact_date_sk=[$4], amount=[$6], price=[$3]) + : +- LogicalJoin(condition=[=($0, $5)], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) ]]> - + 300]]> @@ -354,7 +345,7 @@ Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, ]]> - + 300]]> @@ -387,7 +378,7 @@ Calc(select=[fact_date_sk, id0 AS id, id AS id0, amount, price, id00 AS id1, mal ]]> - + 300]]> @@ -416,70 +407,68 @@ HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, amount, pri ]]> - + - 200 and dim.price < 500]]> + 300]]> ($8, 200), <($3, 500))]) +LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12]) ++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))]) +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalTableScan(table=[[testCatalog, test_database, dim]]) - +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) ]]> (price, 200)]) - +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk]) - +- DynamicFilteringDataCollector(fields=[dim_date_sk]) - +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) - +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +Calc(select=[id0 AS id, name, amount0 AS amount, price0 AS price, fact_date_sk, id AS id0, amount AS amount0, price AS price0, id00 AS id1, male, amount00 AS amount1, price00 AS price1, dim_date_sk]) ++- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, amount, price, id0, name, amount0, price0, fact_date_sk, id00, male, amount00, price00, dim_date_sk], build=[right]) + :- Exchange(distribution=[hash[id]]) + : +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price]) + +- HashJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =(id0, id))], select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], build=[right]) + :- Exchange(distribution=[hash[id]]) + : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk]) + : +- DynamicFilteringDataCollector(fields=[dim_date_sk]) + : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])]) + : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) + +- Exchange(distribution=[hash[id]]) + +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])]) + +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) ]]> - + - 300 ]]> + 300]]> ($6, 300))]) - +- LogicalJoin(condition=[=($0, $7)], joinType=[inner]) - :- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) - : +- LogicalProject(fact_date_sk=[$4], amount=[$6], price=[$3]) - : +- LogicalJoin(condition=[=($0, $5)], joinType=[inner]) - : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) - : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) +LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], price1=[$11], dim_date_sk=[$12]) ++- LogicalFilter(condition=[AND(=($4, $12), =($0, $5), =($8, $5), <($11, 500), >($11, 300))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) ]]> From 1b52b01f726206fe54d186ded75acc176ccda599 Mon Sep 17 00:00:00 2001 From: "zhengyunhong.zyh" <337361684@qq.com> Date: Thu, 29 Dec 2022 13:59:35 +0800 Subject: [PATCH 3/5] Some minor changes2 --- .../program/FlinkDynamicPartitionPruningProgram.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java index 52590d32731ad..25311764442ef 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java @@ -64,6 +64,13 @@ * +- Calc(select=[xxx], where=[<(xxx, xxx)]) # Need have an arbitrary filter condition. * +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key]) * } + * + *

We use a {@link FlinkOptimizeProgram} instead of a {@link org.apache.calcite.plan.RelRule} to + * realize dynamic partition pruning because the {@link org.apache.calcite.plan.hep.HepPlanner} in + * Flink doesn't support matching a simple join, and replacing one node on one side of the join + * node. After that, rebuilding this join node. This is a defect of the existing optimizer, and it's + * matching pattern need to be simpler. Only then can we use {@link org.apache.calcite.plan.RelRule} + * to achieve dpp. */ public class FlinkDynamicPartitionPruningProgram implements FlinkOptimizeProgram { From 1bda3812ca4cad3d6e650aaa4c79608c6bdd9d0c Mon Sep 17 00:00:00 2001 From: "zhengyunhong.zyh" <337361684@qq.com> Date: Wed, 4 Jan 2023 11:54:05 +0800 Subject: [PATCH 4/5] dpp will not success if agg push down enabled --- .../utils/DynamicPartitionPruningUtils.java | 59 +++-- .../DynamicPartitionPruningProgramTest.java | 73 +++++- .../DynamicPartitionPruningProgramTest.xml | 236 +++++++++++++----- 3 files changed, 272 insertions(+), 96 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java index ef8506a7c6fdd..3bbeec455d2df 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java @@ -21,15 +21,18 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.connectors.TransformationScanProvider; +import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalDynamicFilteringDataCollector; @@ -137,6 +140,20 @@ private static RelNode convertDppFactSide( || !(tableSource instanceof ScanTableSource)) { return rel; } + + // Dpp cannot success if source support aggregate push down, source aggregate push + // down enabled is true and aggregate push down success. + if (tableSource instanceof SupportsAggregatePushDown + && ShortcutUtils.unwrapContext(rel) + .getTableConfig() + .get( + OptimizerConfigOptions + .TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED) + && Arrays.stream(tableSourceTable.abilitySpecs()) + .anyMatch(spec -> spec instanceof AggregatePushDownSpec)) { + return rel; + } + if (!isNewSource((ScanTableSource) tableSource)) { return rel; } @@ -256,14 +273,29 @@ private static RelNode convertDppFactSide( } else if (rel instanceof BatchPhysicalGroupAggregateBase) { BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel; RelNode input = agg.getInput(); + int[] grouping = agg.grouping(); + + // If one of joinKey in joinKeys are aggregate function field, dpp will not success. + for (int k : joinKeys) { + if (k >= grouping.length) { + return rel; + } + } + RelNode convertedRel = convertDppFactSide( input, - getInputIndices(agg, input, joinKeys), + ImmutableIntList.copyOf( + joinKeys.stream() + .map(joinKey -> agg.grouping()[joinKey]) + .collect(Collectors.toList())), dimSide, dimSideJoinKey, factSideFactors); return agg.copy(agg.getTraitSet(), Collections.singletonList(convertedRel)); + } else { + // TODO In the future, we need to support more operators to enrich matchable dpp + // pattern. } return rel; @@ -283,31 +315,6 @@ public static boolean isSuitableJoin(Join join) { return !joinInfo.leftKeys.isEmpty(); } - private static ImmutableIntList getInputIndices( - BatchPhysicalGroupAggregateBase agg, RelNode aggInput, ImmutableIntList joinKeys) { - int[] indexMap = new int[aggInput.getRowType().getFieldCount()]; - int[] grouping = agg.grouping(); - if (grouping.length == 0) { - return joinKeys; - } - int beginIndex = grouping[0] - 1; - for (int i = 0; i < indexMap.length; i++) { - indexMap[i] = i; - } - - System.arraycopy(grouping, 0, indexMap, 0, grouping.length); - if (beginIndex >= 0) { - for (int i = 0; i <= beginIndex; i++) { - indexMap[grouping.length + i] = i; - } - } - List indices = new ArrayList<>(); - for (int k : joinKeys) { - indices.add(indexMap[k]); - } - return ImmutableIntList.copyOf(indices); - } - private static BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector( RelNode dimSide, List dynamicFilteringFieldIndices) { final RelDataType outputType = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java index 01f5c6ef0574f..cef12b156aaf5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.java @@ -509,7 +509,7 @@ public void testDppWithUnionInFactSide() { } @Test - public void testDppWithAggInFactSide() { + public void testDppWithAggInFactSideAndJoinKeyInGrouping() { // Dpp will success String ddl = "CREATE TABLE test_database.item (\n" @@ -529,6 +529,77 @@ public void testDppWithAggInFactSide() { util.verifyRelPlan(query); } + @Test + public void testDppWithAggInFactSideAndJoinKeyInGroupFunction() { + // Dpp will not success because join key in group function. + String ddl = + "CREATE TABLE test_database.item (\n" + + " id BIGINT,\n" + + " amount BIGINT,\n" + + " price BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + + String query = + "Select * from (Select fact_part.id, item.amount, fact_part.name, sum(fact_part.price), sum(item.price), sum(fact_date_sk) as fact_date_sk1 " + + "from fact_part join item on fact_part.id = item.id " + + "group by fact_part.id, fact_part.name, item.amount) t1 " + + "join dim on t1.fact_date_sk1 = dim.dim_date_sk where dim.price < 500 and dim.price > 300 "; + util.verifyRelPlan(query); + } + + @Test + public void testDppWithAggInFactSideWithAggPushDownEnable() { + // Dpp will not success while fact side source support agg push down and source agg push + // down enabled is true. + String ddl = + "CREATE TABLE test_database.item (\n" + + " id BIGINT,\n" + + " amount BIGINT,\n" + + " price BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + + String query = + "Select * from (Select id, amount, fact_date_sk, count(name), sum(price) " + + "from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 " + + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 "; + util.verifyRelPlan(query); + } + + @Test + public void testDppWithAggInFactSideWithAggPushDownDisable() { + // Dpp will success while fact side source support agg push down but source agg push down + // enabled is false. + TableConfig tableConfig = util.tableEnv().getConfig(); + // Disable source agg push down. + tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, false); + + String ddl = + "CREATE TABLE test_database.item (\n" + + " id BIGINT,\n" + + " amount BIGINT,\n" + + " price BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl); + + String query = + "Select * from (Select id, amount, fact_date_sk, count(name), sum(price) " + + "from fact_part where fact_date_sk > 100 group by id, amount, fact_date_sk) t1 " + + "join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 "; + util.verifyRelPlan(query); + } + @Test public void testDPPWithJoinReorderTableWithoutStats() { // Dpp will success. diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml index 4c6757a7ca4ee..5ff1bafdb97df 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/DynamicPartitionPruningProgramTest.xml @@ -243,35 +243,43 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, ]]> - + - 200 and dim.price < 500]]> + 300 ]]> ($8, 200), <($3, 500))]) - +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalTableScan(table=[[testCatalog, test_database, dim]]) - +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) +LogicalProject(id=[$0], amount=[$1], name=[$2], EXPR$3=[$3], EXPR$4=[$4], fact_date_sk1=[$5], id0=[$6], male=[$7], amount0=[$8], price=[$9], dim_date_sk=[$10]) ++- LogicalFilter(condition=[AND(<($9, 500), >($9, 300))]) + +- LogicalJoin(condition=[=($5, $10)], joinType=[inner]) + :- LogicalProject(id=[$0], amount=[$2], name=[$1], EXPR$3=[$3], EXPR$4=[$4], fact_date_sk1=[$5]) + : +- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)], EXPR$4=[SUM($4)], fact_date_sk1=[SUM($5)]) + : +- LogicalProject(id=[$0], name=[$1], amount=[$6], price=[$3], price0=[$7], fact_date_sk=[$4]) + : +- LogicalJoin(condition=[=($0, $5)], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) ]]> (price, 200)]) - +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk]) - +- DynamicFilteringDataCollector(fields=[dim_date_sk]) - +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) - +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk1, dim_date_sk)], select=[id, amount, name, EXPR$3, EXPR$4, fact_date_sk1, id0, male, amount0, price, dim_date_sk], build=[left]) +:- Exchange(distribution=[hash[fact_date_sk1]]) +: +- Calc(select=[id, amount, name, EXPR$3, EXPR$4, fact_date_sk1]) +: +- HashAggregate(isMerge=[false], groupBy=[id, name, amount], select=[id, name, amount, SUM(price) AS EXPR$3, SUM(price0) AS EXPR$4, SUM(fact_date_sk) AS fact_date_sk1]) +: +- Calc(select=[id, name, amount, price, price0, fact_date_sk]) +: +- HashJoin(joinType=[InnerJoin], where=[=(id, id0)], select=[id, name, price, fact_date_sk, id0, amount, price0], build=[right]) +: :- Exchange(distribution=[hash[id]]) +: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, project=[id, name, price, fact_date_sk], metadata=[]]], fields=[id, name, price, fact_date_sk]) +: +- Exchange(distribution=[hash[id]]) +: +- TableSourceScan(table=[[testCatalog, test_database, item]], fields=[id, amount, price]) ++- Exchange(distribution=[hash[dim_date_sk]]) + +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])]) + +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) ]]> - + 300 ]]> @@ -304,6 +312,68 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[fac : : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) : +- Exchange(distribution=[hash[id]]) : +- TableSourceScan(table=[[testCatalog, test_database, item, project=[id, amount], metadata=[]]], fields=[id, amount]) ++- Exchange(distribution=[hash[dim_date_sk]]) + +- Calc(select=[id, male, amount, price, dim_date_sk], where=[SEARCH(price, Sarg[(300..500)])]) + +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +]]> + + + + + 100 group by id, amount, fact_date_sk) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]> + + + ($8, 300))]) + +- LogicalJoin(condition=[=($2, $9)], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)], EXPR$4=[SUM($4)]) + : +- LogicalProject(id=[$0], amount=[$2], fact_date_sk=[$4], name=[$1], price=[$3]) + : +- LogicalFilter(condition=[>($4, 100)]) + : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + + + + + + + + 100 group by id, amount, fact_date_sk) t1 join dim on t1.fact_date_sk = dim.dim_date_sk where dim.price < 500 and dim.price > 300 ]]> + + + ($8, 300))]) + +- LogicalJoin(condition=[=($2, $9)], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)], EXPR$4=[SUM($4)]) + : +- LogicalProject(id=[$0], amount=[$2], fact_date_sk=[$4], name=[$1], price=[$3]) + : +- LogicalFilter(condition=[>($4, 100)]) + : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + + + + + + + + 300]]> + + + ($9, 300))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalUnion(all=[true]) + : : :- LogicalProject(id=[$0], fact_date_sk=[$4], amount1=[+($2, 1)]) + : : : +- LogicalFilter(condition=[=($3, 1)]) + : : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalProject(id=[$0], fact_date_sk=[$4], EXPR$2=[+($2, 1)]) + : : +- LogicalFilter(condition=[=($3, 2)]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + + + @@ -729,17 +845,17 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], selec ]]> - + - + 200 and dim.price < 500]]> ($8, 200), <($3, 500))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalTableScan(table=[[testCatalog, test_database, dim]]) - +- LogicalTableScan(table=[[testCatalog, test_database, legacy_source]]) + +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) ]]> @@ -749,53 +865,11 @@ HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], select=[id, : +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) : +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +- Exchange(distribution=[hash[fact_date_sk]]) - +- TableSourceScan(table=[[testCatalog, test_database, legacy_source]], fields=[id, name, amount, price, fact_date_sk]) -]]> - - - - - 300]]> - - - ($9, 300))]) - +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalJoin(condition=[true], joinType=[inner]) - : :- LogicalUnion(all=[true]) - : : :- LogicalProject(id=[$0], fact_date_sk=[$4], amount1=[+($2, 1)]) - : : : +- LogicalFilter(condition=[=($3, 1)]) - : : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) - : : +- LogicalProject(id=[$0], fact_date_sk=[$4], EXPR$2=[+($2, 1)]) - : : +- LogicalFilter(condition=[=($3, 2)]) - : : +- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) - : +- LogicalTableScan(table=[[testCatalog, test_database, item]]) - +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) -]]> - - - (price, 200)]) + +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk]) + +- DynamicFilteringDataCollector(fields=[dim_date_sk]) + +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) + +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) ]]> @@ -851,6 +925,30 @@ NestedLoopJoin(joinType=[InnerJoin], where=[AND(=(fact_date_sk, dim_date_sk), =( +- DynamicFilteringDataCollector(fields=[dim_date_sk]) +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 500)]) +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk]) +]]> + + + + + + + + + + + From 96569781df60b2f8175d2c0a76d726d46a9a3bd9 Mon Sep 17 00:00:00 2001 From: "zhengyunhong.zyh" <337361684@qq.com> Date: Fri, 6 Jan 2023 19:16:44 +0800 Subject: [PATCH 5/5] Reformat the dynamic partition pruning utils --- .../FlinkDynamicPartitionPruningProgram.java | 27 +- .../utils/DynamicPartitionPruningUtils.java | 794 +++++++++--------- 2 files changed, 414 insertions(+), 407 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java index 25311764442ef..22aebd7fcab20 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkDynamicPartitionPruningProgram.java @@ -65,29 +65,26 @@ * +- TableSourceScan(table=[[dim, filter=[]]], fields=[xxx, dim_key]) * } * - *

We use a {@link FlinkOptimizeProgram} instead of a {@link org.apache.calcite.plan.RelRule} to - * realize dynamic partition pruning because the {@link org.apache.calcite.plan.hep.HepPlanner} in - * Flink doesn't support matching a simple join, and replacing one node on one side of the join - * node. After that, rebuilding this join node. This is a defect of the existing optimizer, and it's - * matching pattern need to be simpler. Only then can we use {@link org.apache.calcite.plan.RelRule} - * to achieve dpp. + *

Note: We use a {@link FlinkOptimizeProgram} instead of a {@link + * org.apache.calcite.plan.RelRule} here because the {@link org.apache.calcite.plan.hep.HepPlanner} + * doesn't support matching a partially determined pattern or dynamically replacing the inputs of + * matched nodes. Once we improve the {@link org.apache.calcite.plan.hep.HepPlanner}, then class can + * be converted to {@link org.apache.calcite.plan.RelRule}. */ public class FlinkDynamicPartitionPruningProgram implements FlinkOptimizeProgram { @Override public RelNode optimize(RelNode root, BatchOptimizeContext context) { + if (!ShortcutUtils.unwrapContext(root) + .getTableConfig() + .get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { + return root; + } DefaultRelShuttle shuttle = new DefaultRelShuttle() { @Override public RelNode visit(RelNode rel) { - if (!ShortcutUtils.unwrapContext(rel) - .getTableConfig() - .get( - OptimizerConfigOptions - .TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) { - return rel; - } if (!(rel instanceof Join) || !DynamicPartitionPruningUtils.isSuitableJoin((Join) rel)) { List newInputs = new ArrayList<>(); @@ -103,7 +100,7 @@ public RelNode visit(RelNode rel) { RelNode rightSide = join.getRight(); Join newJoin = join; boolean changed = false; - if (DynamicPartitionPruningUtils.isDimSide(leftSide)) { + if (DynamicPartitionPruningUtils.isDppDimSide(leftSide)) { if (join.getJoinType() != JoinRelType.RIGHT) { Tuple2 relTuple = DynamicPartitionPruningUtils @@ -118,7 +115,7 @@ public RelNode visit(RelNode rel) { join.getTraitSet(), Arrays.asList(leftSide, relTuple.f1.accept(this))); } - } else if (DynamicPartitionPruningUtils.isDimSide(rightSide)) { + } else if (DynamicPartitionPruningUtils.isDppDimSide(rightSide)) { if (join.getJoinType() != JoinRelType.LEFT) { Tuple2 relTuple = DynamicPartitionPruningUtils diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java index 3bbeec455d2df..90f7b40bc0b42 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java @@ -21,14 +21,12 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceProvider; -import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown; import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.connectors.TransformationScanProvider; @@ -71,458 +69,470 @@ public class DynamicPartitionPruningUtils { /** - * Judge whether input RelNode meets the conditions of dimSide. If joinKeys is null means we + * Judge whether the input RelNode meets the conditions of dimSide. If joinKeys is null means we * need not consider the join keys in dim side, which already deal by dynamic partition pruning * rule. If joinKeys not null means we need to judge whether joinKeys changed in dim side, if * changed, this RelNode is not dim side. */ - public static boolean isDimSide(RelNode rel) { - DppDimSideFactors dimSideFactors = new DppDimSideFactors(); - visitDimSide(rel, dimSideFactors); - return dimSideFactors.isDimSide(); + public static boolean isDppDimSide(RelNode rel) { + DppDimSideChecker dimSideChecker = new DppDimSideChecker(rel); + return dimSideChecker.isDppDimSide(); } + /** + * Judge whether the input RelNode can be converted to the dpp fact side. If the input RelNode + * can be converted, this method will return the converted fact side whose partitioned table + * source will be converted to {@link BatchPhysicalDynamicFilteringTableSourceScan}, If not, + * this method will return the origin RelNode. + */ public static Tuple2 canConvertAndConvertDppFactSide( RelNode rel, ImmutableIntList joinKeys, RelNode dimSide, ImmutableIntList dimSideJoinKey) { - DppFactSideFactors factSideFactors = new DppFactSideFactors(); - RelNode newRel = - convertDppFactSide(rel, joinKeys, dimSide, dimSideJoinKey, factSideFactors); - return Tuple2.of(factSideFactors.isChanged, newRel); + DppFactSideChecker dppFactSideChecker = + new DppFactSideChecker(rel, joinKeys, dimSide, dimSideJoinKey); + return dppFactSideChecker.canConvertAndConvertDppFactSide(); } - private static List getSuitableDynamicFilteringFieldsInFactSide( - DynamicTableSource tableSource, List candidateFields) { - List acceptedFilterFields = - ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields(); - if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) { - return new ArrayList<>(); + /** Judge whether the join node is suitable one for dpp pattern. */ + public static boolean isSuitableJoin(Join join) { + // Now dynamic partition pruning supports left/right join, inner and semi + // join. but now semi join can not join reorder. + if (join.getJoinType() != JoinRelType.INNER + && join.getJoinType() != JoinRelType.SEMI + && join.getJoinType() != JoinRelType.LEFT + && join.getJoinType() != JoinRelType.RIGHT) { + return false; } - List suitableFields = new ArrayList<>(); - // If candidateField not in acceptedFilterFields means dpp rule will not be matched, - // because we can not prune any partitions according to non-accepted filter fields - // provided by partition table source. - for (String candidateField : candidateFields) { - if (acceptedFilterFields.contains(candidateField)) { - suitableFields.add(candidateField); - } + JoinInfo joinInfo = join.analyzeCondition(); + return !joinInfo.leftKeys.isEmpty(); + } + + /** This class is used to check whether the relNode is dpp dim side. */ + private static class DppDimSideChecker { + private final RelNode relNode; + private boolean hasFilter; + private boolean hasPartitionedScan; + private final List tables = new ArrayList<>(); + + public DppDimSideChecker(RelNode relNode) { + this.relNode = relNode; } - return suitableFields; - } + public boolean isDppDimSide() { + visitDimSide(this.relNode); + return hasFilter && !hasPartitionedScan && tables.size() == 1; + } - private static RelNode convertDppFactSide( - RelNode rel, - ImmutableIntList joinKeys, - RelNode dimSide, - ImmutableIntList dimSideJoinKey, - DppFactSideFactors factSideFactors) { - if (rel instanceof TableScan) { - TableScan scan = (TableScan) rel; - if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) { - // rule applied - return rel; - } - TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); - if (tableSourceTable == null) { - return rel; - } - CatalogTable catalogTable = tableSourceTable.contextResolvedTable().getResolvedTable(); - List partitionKeys = catalogTable.getPartitionKeys(); - if (partitionKeys.isEmpty()) { - return rel; - } - DynamicTableSource tableSource = tableSourceTable.tableSource(); - if (!(tableSource instanceof SupportsDynamicFiltering) - || !(tableSource instanceof ScanTableSource)) { - return rel; - } + /** + * Visit dim side to judge whether dim side has filter condition and whether dim side's + * source table scan is non partitioned scan. + */ + private void visitDimSide(RelNode rel) { + // TODO Let visitDimSide more efficient and more accurate. Like a filter on dim table or + // a filter for the partition field on fact table. + if (rel instanceof TableScan) { + TableScan scan = (TableScan) rel; + TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class); + if (table == null) { + return; + } + if (!hasFilter + && table.abilitySpecs() != null + && table.abilitySpecs().length != 0) { + for (SourceAbilitySpec spec : table.abilitySpecs()) { + if (spec instanceof FilterPushDownSpec) { + List predicates = ((FilterPushDownSpec) spec).getPredicates(); + for (RexNode predicate : predicates) { + if (isSuitableFilter(predicate)) { + hasFilter = true; + } + } + } + } + } + CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable(); + if (catalogTable.isPartitioned()) { + hasPartitionedScan = true; + return; + } - // Dpp cannot success if source support aggregate push down, source aggregate push - // down enabled is true and aggregate push down success. - if (tableSource instanceof SupportsAggregatePushDown - && ShortcutUtils.unwrapContext(rel) - .getTableConfig() - .get( - OptimizerConfigOptions - .TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED) - && Arrays.stream(tableSourceTable.abilitySpecs()) - .anyMatch(spec -> spec instanceof AggregatePushDownSpec)) { - return rel; + // To ensure there is only one source on the dim side. + setTables(table.contextResolvedTable()); + } else if (rel instanceof HepRelVertex) { + visitDimSide(((HepRelVertex) rel).getCurrentRel()); + } else if (rel instanceof Exchange || rel instanceof Project) { + visitDimSide(rel.getInput(0)); + } else if (rel instanceof Calc) { + RexProgram origProgram = ((Calc) rel).getProgram(); + if (origProgram.getCondition() != null + && isSuitableFilter( + origProgram.expandLocalRef(origProgram.getCondition()))) { + hasFilter = true; + } + visitDimSide(rel.getInput(0)); + } else if (rel instanceof Filter) { + if (isSuitableFilter(((Filter) rel).getCondition())) { + hasFilter = true; + } + visitDimSide(rel.getInput(0)); + } else if (rel instanceof Join) { + Join join = (Join) rel; + visitDimSide(join.getLeft()); + visitDimSide(join.getRight()); + } else if (rel instanceof BatchPhysicalGroupAggregateBase) { + visitDimSide(((BatchPhysicalGroupAggregateBase) rel).getInput()); + } else if (rel instanceof Union) { + Union union = (Union) rel; + for (RelNode input : union.getInputs()) { + visitDimSide(input); + } } + } - if (!isNewSource((ScanTableSource) tableSource)) { - return rel; + /** + * Not all filter condition suitable for using to filter partitions by dynamic partition + * pruning rules. For example, NOT NULL can only filter one default partition which have a + * small impact on filtering data. + */ + private static boolean isSuitableFilter(RexNode filterCondition) { + switch (filterCondition.getKind()) { + case AND: + List conjunctions = RelOptUtil.conjunctions(filterCondition); + return isSuitableFilter(conjunctions.get(0)) + || isSuitableFilter(conjunctions.get(1)); + case OR: + List disjunctions = RelOptUtil.disjunctions(filterCondition); + return isSuitableFilter(disjunctions.get(0)) + && isSuitableFilter(disjunctions.get(1)); + case NOT: + return isSuitableFilter(((RexCall) filterCondition).operands.get(0)); + case EQUALS: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + case NOT_EQUALS: + case IN: + case LIKE: + case CONTAINS: + case SEARCH: + case IS_FALSE: + case IS_NOT_FALSE: + case IS_NOT_TRUE: + case IS_TRUE: + // TODO adding more suitable filters which can filter enough partitions after + // using this filter in dynamic partition pruning. + return true; + default: + return false; } + } - List candidateFields = - joinKeys.stream() - .map(i -> scan.getRowType().getFieldNames().get(i)) - .collect(Collectors.toList()); - if (candidateFields.isEmpty()) { - return rel; + private void setTables(ContextResolvedTable catalogTable) { + if (tables.size() == 0) { + tables.add(catalogTable); + } else { + for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { + if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { + tables.add(catalogTable); + } + } } + } + } - List acceptedFilterFields = - getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields); + /** This class is used to check whether the relNode can be as a fact side in dpp. */ + private static class DppFactSideChecker { + private final RelNode relNode; + private final ImmutableIntList joinKeys; + private final RelNode dimSide; + private final ImmutableIntList dimSideJoinKey; - if (acceptedFilterFields.size() == 0) { - return rel; - } + // If join key is not changed in fact side, this value is always true. + private boolean isChanged; - // Apply suitable accepted filter fields to source. - ((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields); - - List acceptedFieldIndices = - acceptedFilterFields.stream() - .map(f -> scan.getRowType().getFieldNames().indexOf(f)) - .collect(Collectors.toList()); - List dynamicFilteringFieldIndices = new ArrayList<>(); - for (int i = 0; i < joinKeys.size(); ++i) { - if (acceptedFieldIndices.contains(joinKeys.get(i))) { - dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i)); - } - } + public DppFactSideChecker( + RelNode relNode, + ImmutableIntList joinKeys, + RelNode dimSide, + ImmutableIntList dimSideJoinKey) { + this.relNode = relNode; + this.joinKeys = joinKeys; + this.dimSide = dimSide; + this.dimSideJoinKey = dimSideJoinKey; + } - BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector = - createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices); - - factSideFactors.isChanged = true; - return new BatchPhysicalDynamicFilteringTableSourceScan( - scan.getCluster(), - scan.getTraitSet(), - scan.getHints(), - tableSourceTable, - dynamicFilteringDataCollector); - } else if (rel instanceof Exchange || rel instanceof Filter) { - return rel.copy( - rel.getTraitSet(), - Collections.singletonList( - convertDppFactSide( - rel.getInput(0), - joinKeys, - dimSide, - dimSideJoinKey, - factSideFactors))); - } else if (rel instanceof Project) { - List projects = ((Project) rel).getProjects(); - ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); - if (inputJoinKeys.isEmpty()) { - return rel; - } + public Tuple2 canConvertAndConvertDppFactSide() { + return Tuple2.of( + isChanged, convertDppFactSide(relNode, joinKeys, dimSide, dimSideJoinKey)); + } - return rel.copy( - rel.getTraitSet(), - Collections.singletonList( - convertDppFactSide( - rel.getInput(0), - inputJoinKeys, - dimSide, - dimSideJoinKey, - factSideFactors))); - } else if (rel instanceof Calc) { - Calc calc = (Calc) rel; - RexProgram program = calc.getProgram(); - List projects = - program.getProjectList().stream() - .map(program::expandLocalRef) - .collect(Collectors.toList()); - ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); - if (inputJoinKeys.isEmpty()) { - return rel; - } + private RelNode convertDppFactSide( + RelNode rel, + ImmutableIntList joinKeys, + RelNode dimSide, + ImmutableIntList dimSideJoinKey) { + if (rel instanceof TableScan) { + TableScan scan = (TableScan) rel; + if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) { + // rule applied + return rel; + } + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + if (tableSourceTable == null) { + return rel; + } + CatalogTable catalogTable = + tableSourceTable.contextResolvedTable().getResolvedTable(); + List partitionKeys = catalogTable.getPartitionKeys(); + if (partitionKeys.isEmpty()) { + return rel; + } + DynamicTableSource tableSource = tableSourceTable.tableSource(); + if (!(tableSource instanceof SupportsDynamicFiltering) + || !(tableSource instanceof ScanTableSource)) { + return rel; + } - return rel.copy( - rel.getTraitSet(), - Collections.singletonList( - convertDppFactSide( - rel.getInput(0), - inputJoinKeys, - dimSide, - dimSideJoinKey, - factSideFactors))); - } else if (rel instanceof Join) { - Join currentJoin = (Join) rel; - return currentJoin.copy( - currentJoin.getTraitSet(), - Arrays.asList( - convertDppFactSide( - currentJoin.getLeft(), - getInputIndices(currentJoin, joinKeys, true), - dimSide, - dimSideJoinKey, - factSideFactors), - convertDppFactSide( - currentJoin.getRight(), - getInputIndices(currentJoin, joinKeys, false), - dimSide, - dimSideJoinKey, - factSideFactors))); - } else if (rel instanceof Union) { - Union union = (Union) rel; - List newInputs = new ArrayList<>(); - for (RelNode input : union.getInputs()) { - newInputs.add( - convertDppFactSide( - input, joinKeys, dimSide, dimSideJoinKey, factSideFactors)); - } - return union.copy(union.getTraitSet(), newInputs, union.all); - } else if (rel instanceof BatchPhysicalGroupAggregateBase) { - BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel; - RelNode input = agg.getInput(); - int[] grouping = agg.grouping(); + // Dpp cannot success if source have aggregate push down spec. + if (Arrays.stream(tableSourceTable.abilitySpecs()) + .anyMatch(spec -> spec instanceof AggregatePushDownSpec)) { + return rel; + } - // If one of joinKey in joinKeys are aggregate function field, dpp will not success. - for (int k : joinKeys) { - if (k >= grouping.length) { + if (!isNewSource((ScanTableSource) tableSource)) { return rel; } - } - RelNode convertedRel = - convertDppFactSide( - input, - ImmutableIntList.copyOf( - joinKeys.stream() - .map(joinKey -> agg.grouping()[joinKey]) - .collect(Collectors.toList())), - dimSide, - dimSideJoinKey, - factSideFactors); - return agg.copy(agg.getTraitSet(), Collections.singletonList(convertedRel)); - } else { - // TODO In the future, we need to support more operators to enrich matchable dpp - // pattern. - } + List candidateFields = + joinKeys.stream() + .map(i -> scan.getRowType().getFieldNames().get(i)) + .collect(Collectors.toList()); + if (candidateFields.isEmpty()) { + return rel; + } - return rel; - } + List acceptedFilterFields = + getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields); - public static boolean isSuitableJoin(Join join) { - // Now dynamic partition pruning supports left/right join, inner and semi - // join. but now semi join can not join reorder. - if (join.getJoinType() != JoinRelType.INNER - && join.getJoinType() != JoinRelType.SEMI - && join.getJoinType() != JoinRelType.LEFT - && join.getJoinType() != JoinRelType.RIGHT) { - return false; - } + if (acceptedFilterFields.size() == 0) { + return rel; + } - JoinInfo joinInfo = join.analyzeCondition(); - return !joinInfo.leftKeys.isEmpty(); - } + // Apply suitable accepted filter fields to source. + ((SupportsDynamicFiltering) tableSource) + .applyDynamicFiltering(acceptedFilterFields); + + List acceptedFieldIndices = + acceptedFilterFields.stream() + .map(f -> scan.getRowType().getFieldNames().indexOf(f)) + .collect(Collectors.toList()); + List dynamicFilteringFieldIndices = new ArrayList<>(); + for (int i = 0; i < joinKeys.size(); ++i) { + if (acceptedFieldIndices.contains(joinKeys.get(i))) { + dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i)); + } + } - private static BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector( - RelNode dimSide, List dynamicFilteringFieldIndices) { - final RelDataType outputType = - ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory()) - .projectStructType( - dimSide.getRowType(), - dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray()); - return new BatchPhysicalDynamicFilteringDataCollector( - dimSide.getCluster(), - dimSide.getTraitSet(), - ignoreExchange(dimSide), - outputType, - dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray()); - } + BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector = + createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices); + + isChanged = true; + return new BatchPhysicalDynamicFilteringTableSourceScan( + scan.getCluster(), + scan.getTraitSet(), + scan.getHints(), + tableSourceTable, + dynamicFilteringDataCollector); + } else if (rel instanceof Exchange || rel instanceof Filter) { + return rel.copy( + rel.getTraitSet(), + Collections.singletonList( + convertDppFactSide( + rel.getInput(0), joinKeys, dimSide, dimSideJoinKey))); + } else if (rel instanceof Project) { + List projects = ((Project) rel).getProjects(); + ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); + if (inputJoinKeys.isEmpty()) { + return rel; + } - private static RelNode ignoreExchange(RelNode dimSide) { - if (dimSide instanceof Exchange) { - return dimSide.getInput(0); - } else { - return dimSide; - } - } + return rel.copy( + rel.getTraitSet(), + Collections.singletonList( + convertDppFactSide( + rel.getInput(0), inputJoinKeys, dimSide, dimSideJoinKey))); + } else if (rel instanceof Calc) { + Calc calc = (Calc) rel; + RexProgram program = calc.getProgram(); + List projects = + program.getProjectList().stream() + .map(program::expandLocalRef) + .collect(Collectors.toList()); + ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); + if (inputJoinKeys.isEmpty()) { + return rel; + } - /** - * Visit dim side to judge whether dim side has filter condition and whether dim side's source - * table scan is non partitioned scan. - */ - private static void visitDimSide(RelNode rel, DppDimSideFactors dimSideFactors) { - // TODO Let visitDimSide more efficient and more accurate. Like a filter on dim table or a - // filter for the partition field on fact table. - if (rel instanceof TableScan) { - TableScan scan = (TableScan) rel; - TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class); - if (table == null) { - return; - } - if (!dimSideFactors.hasFilter - && table.abilitySpecs() != null - && table.abilitySpecs().length != 0) { - for (SourceAbilitySpec spec : table.abilitySpecs()) { - if (spec instanceof FilterPushDownSpec) { - List predicates = ((FilterPushDownSpec) spec).getPredicates(); - for (RexNode predicate : predicates) { - if (isSuitableFilter(predicate)) { - dimSideFactors.hasFilter = true; - } - } + return rel.copy( + rel.getTraitSet(), + Collections.singletonList( + convertDppFactSide( + rel.getInput(0), inputJoinKeys, dimSide, dimSideJoinKey))); + } else if (rel instanceof Join) { + Join currentJoin = (Join) rel; + return currentJoin.copy( + currentJoin.getTraitSet(), + Arrays.asList( + convertDppFactSide( + currentJoin.getLeft(), + getInputIndices(currentJoin, joinKeys, true), + dimSide, + dimSideJoinKey), + convertDppFactSide( + currentJoin.getRight(), + getInputIndices(currentJoin, joinKeys, false), + dimSide, + dimSideJoinKey))); + } else if (rel instanceof Union) { + Union union = (Union) rel; + List newInputs = new ArrayList<>(); + for (RelNode input : union.getInputs()) { + newInputs.add(convertDppFactSide(input, joinKeys, dimSide, dimSideJoinKey)); + } + return union.copy(union.getTraitSet(), newInputs, union.all); + } else if (rel instanceof BatchPhysicalGroupAggregateBase) { + BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel; + RelNode input = agg.getInput(); + int[] grouping = agg.grouping(); + + // If one of joinKey in joinKeys are aggregate function field, dpp will not success. + for (int k : joinKeys) { + if (k >= grouping.length) { + return rel; } } - } - CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable(); - if (catalogTable.isPartitioned()) { - dimSideFactors.hasPartitionedScan = true; - return; - } - // To ensure there is only one source on the dim side. - dimSideFactors.setTables(table.contextResolvedTable()); - } else if (rel instanceof HepRelVertex) { - visitDimSide(((HepRelVertex) rel).getCurrentRel(), dimSideFactors); - } else if (rel instanceof Exchange || rel instanceof Project) { - visitDimSide(rel.getInput(0), dimSideFactors); - } else if (rel instanceof Calc) { - RexProgram origProgram = ((Calc) rel).getProgram(); - if (origProgram.getCondition() != null - && isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) { - dimSideFactors.hasFilter = true; + RelNode convertedRel = + convertDppFactSide( + input, + ImmutableIntList.copyOf( + joinKeys.stream() + .map(joinKey -> agg.grouping()[joinKey]) + .collect(Collectors.toList())), + dimSide, + dimSideJoinKey); + return agg.copy(agg.getTraitSet(), Collections.singletonList(convertedRel)); + } else { + // TODO In the future, we need to support more operators to enrich matchable dpp + // pattern. } - visitDimSide(rel.getInput(0), dimSideFactors); - } else if (rel instanceof Filter) { - if (isSuitableFilter(((Filter) rel).getCondition())) { - dimSideFactors.hasFilter = true; + + return rel; + } + + private static List getSuitableDynamicFilteringFieldsInFactSide( + DynamicTableSource tableSource, List candidateFields) { + List acceptedFilterFields = + ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields(); + if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) { + return new ArrayList<>(); } - visitDimSide(rel.getInput(0), dimSideFactors); - } else if (rel instanceof Join) { - Join join = (Join) rel; - visitDimSide(join.getLeft(), dimSideFactors); - visitDimSide(join.getRight(), dimSideFactors); - } else if (rel instanceof BatchPhysicalGroupAggregateBase) { - visitDimSide(((BatchPhysicalGroupAggregateBase) rel).getInput(), dimSideFactors); - } else if (rel instanceof Union) { - Union union = (Union) rel; - for (RelNode input : union.getInputs()) { - visitDimSide(input, dimSideFactors); + + List suitableFields = new ArrayList<>(); + // If candidateField not in acceptedFilterFields means dpp rule will not be matched, + // because we can not prune any partitions according to non-accepted filter fields + // provided by partition table source. + for (String candidateField : candidateFields) { + if (acceptedFilterFields.contains(candidateField)) { + suitableFields.add(candidateField); + } } + + return suitableFields; } - } - /** - * Not all filter condition suitable for using to filter partitions by dynamic partition pruning - * rules. For example, NOT NULL can only filter one default partition which have a small impact - * on filtering data. - */ - private static boolean isSuitableFilter(RexNode filterCondition) { - switch (filterCondition.getKind()) { - case AND: - List conjunctions = RelOptUtil.conjunctions(filterCondition); - return isSuitableFilter(conjunctions.get(0)) - || isSuitableFilter(conjunctions.get(1)); - case OR: - List disjunctions = RelOptUtil.disjunctions(filterCondition); - return isSuitableFilter(disjunctions.get(0)) - && isSuitableFilter(disjunctions.get(1)); - case NOT: - return isSuitableFilter(((RexCall) filterCondition).operands.get(0)); - case EQUALS: - case GREATER_THAN: - case GREATER_THAN_OR_EQUAL: - case LESS_THAN: - case LESS_THAN_OR_EQUAL: - case NOT_EQUALS: - case IN: - case LIKE: - case CONTAINS: - case SEARCH: - case IS_FALSE: - case IS_NOT_FALSE: - case IS_NOT_TRUE: - case IS_TRUE: - // TODO adding more suitable filters which can filter enough partitions after using - // this filter in dynamic partition pruning. - return true; - default: - return false; + private static BatchPhysicalDynamicFilteringDataCollector createDynamicFilteringConnector( + RelNode dimSide, List dynamicFilteringFieldIndices) { + final RelDataType outputType = + ((FlinkTypeFactory) dimSide.getCluster().getTypeFactory()) + .projectStructType( + dimSide.getRowType(), + dynamicFilteringFieldIndices.stream() + .mapToInt(i -> i) + .toArray()); + return new BatchPhysicalDynamicFilteringDataCollector( + dimSide.getCluster(), + dimSide.getTraitSet(), + ignoreExchange(dimSide), + outputType, + dynamicFilteringFieldIndices.stream().mapToInt(i -> i).toArray()); } - } - /** Returns true if the source is FLIP-27 source, else false. */ - private static boolean isNewSource(ScanTableSource scanTableSource) { - ScanTableSource.ScanRuntimeProvider provider = - scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); - if (provider instanceof SourceProvider) { - return true; - } else if (provider instanceof TransformationScanProvider) { - Transformation transformation = - ((TransformationScanProvider) provider) - .createTransformation(name -> Optional.empty()); - return transformation instanceof SourceTransformation; - } else if (provider instanceof DataStreamScanProvider) { - // Suppose DataStreamScanProvider of sources that support dynamic filtering will use new - // Source. It's not reliable and should be checked. - // TODO FLINK-28864 check if the source used by the DataStreamScanProvider is actually a - // new source. - // This situation will not generate wrong result because it's handled when translating - // BatchTableSourceScan. The only effect is the physical plan and the exec node plan - // have DPP nodes, but they do not work in runtime. - return true; + private static RelNode ignoreExchange(RelNode dimSide) { + if (dimSide instanceof Exchange) { + return dimSide.getInput(0); + } else { + return dimSide; + } } - // TODO supports more - return false; - } - private static ImmutableIntList getInputIndices( - List projects, ImmutableIntList joinKeys) { - List indices = new ArrayList<>(); - for (int k : joinKeys) { - RexNode rexNode = projects.get(k); - if (rexNode instanceof RexInputRef) { - indices.add(((RexInputRef) rexNode).getIndex()); + /** Returns true if the source is FLIP-27 source, else false. */ + private static boolean isNewSource(ScanTableSource scanTableSource) { + ScanTableSource.ScanRuntimeProvider provider = + scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + if (provider instanceof SourceProvider) { + return true; + } else if (provider instanceof TransformationScanProvider) { + Transformation transformation = + ((TransformationScanProvider) provider) + .createTransformation(name -> Optional.empty()); + return transformation instanceof SourceTransformation; + } else if (provider instanceof DataStreamScanProvider) { + // Suppose DataStreamScanProvider of sources that support dynamic filtering will use + // new Source. It's not reliable and should be checked. + // TODO FLINK-28864 check if the source used by the DataStreamScanProvider is + // actually a new source. This situation will not generate wrong result because it's + // handled when translating BatchTableSourceScan. The only effect is the physical + // plan and the exec node plan have DPP nodes, but they do not work in runtime. + return true; } + // TODO supports more + return false; } - return ImmutableIntList.copyOf(indices); - } - private static ImmutableIntList getInputIndices( - Join join, ImmutableIntList joinKeys, boolean isLeft) { - List indices = new ArrayList<>(); - RelNode left = join.getLeft(); - int leftSize = left.getRowType().getFieldCount(); - for (int k : joinKeys) { - if (isLeft) { - if (k < leftSize) { - indices.add(k); - } - } else { - if (k >= leftSize) { - indices.add(k - leftSize); + private static ImmutableIntList getInputIndices( + List projects, ImmutableIntList joinKeys) { + List indices = new ArrayList<>(); + for (int k : joinKeys) { + RexNode rexNode = projects.get(k); + if (rexNode instanceof RexInputRef) { + indices.add(((RexInputRef) rexNode).getIndex()); } } + return ImmutableIntList.copyOf(indices); } - return ImmutableIntList.copyOf(indices); - } - private static class DppDimSideFactors { - private boolean hasFilter; - private boolean hasPartitionedScan; - private final List tables = new ArrayList<>(); - - public void setTables(ContextResolvedTable catalogTable) { - if (tables.size() == 0) { - tables.add(catalogTable); - } else { - for (ContextResolvedTable thisTable : new ArrayList<>(tables)) { - if (!thisTable.getIdentifier().equals(catalogTable.getIdentifier())) { - tables.add(catalogTable); + private static ImmutableIntList getInputIndices( + Join join, ImmutableIntList joinKeys, boolean isLeft) { + List indices = new ArrayList<>(); + RelNode left = join.getLeft(); + int leftSize = left.getRowType().getFieldCount(); + for (int k : joinKeys) { + if (isLeft) { + if (k < leftSize) { + indices.add(k); + } + } else { + if (k >= leftSize) { + indices.add(k - leftSize); } } } + return ImmutableIntList.copyOf(indices); } - - public boolean isDimSide() { - return hasFilter && !hasPartitionedScan && tables.size() == 1; - } - } - - /** This class is used to remember fact side messages while recurring in fact side. */ - private static class DppFactSideFactors { - // If join key is not changed in fact side, this value is always true. - private boolean isChanged; } }