Regardless if this interface is implemented or not, a final aggregation is always applied in a
* subsequent operation after the source.
- *
- *
Note: currently, the {@link SupportsAggregatePushDown} is not supported by planner.
*/
@PublicEvolving
public interface SupportsAggregatePushDown {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
index ce111796db771..897ab8aa985c6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/AggregateExpression.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.expressions;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
@@ -47,9 +46,6 @@
*
{@code ignoreNulls} indicates whether this aggregate function ignore null value.
*
- *
- * Note: currently, the {@link AggregateExpression} is only used in {@link
- * SupportsAggregatePushDown}.
*/
@PublicEvolving
public class AggregateExpression implements ResolvedExpression {
@@ -107,7 +103,6 @@ public List getArgs() {
return args;
}
- @Nullable
public Optional getFilterExpression() {
return Optional.ofNullable(filterExpression);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
new file mode 100644
index 0000000000000..f8fe88720914f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
+import org.apache.flink.table.planner.plan.utils.AggregateInfo;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the aggregation
+ * to/from JSON, but also can push the local aggregate into a {@link SupportsAggregatePushDown}.
+ */
+@JsonTypeName("AggregatePushDown")
+public class AggregatePushDownSpec extends SourceAbilitySpecBase {
+
+ public static final String FIELD_NAME_INPUT_TYPE = "inputType";
+
+ public static final String FIELD_NAME_GROUPING_SETS = "groupingSets";
+
+ public static final String FIELD_NAME_AGGREGATE_CALLS = "aggregateCalls";
+
+ @JsonProperty(FIELD_NAME_INPUT_TYPE)
+ private final RowType inputType;
+
+ @JsonProperty(FIELD_NAME_GROUPING_SETS)
+ private final List groupingSets;
+
+ @JsonProperty(FIELD_NAME_AGGREGATE_CALLS)
+ private final List aggregateCalls;
+
+ @JsonCreator
+ public AggregatePushDownSpec(
+ @JsonProperty(FIELD_NAME_INPUT_TYPE) RowType inputType,
+ @JsonProperty(FIELD_NAME_GROUPING_SETS) List groupingSets,
+ @JsonProperty(FIELD_NAME_AGGREGATE_CALLS) List aggregateCalls,
+ @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) {
+ super(producedType);
+
+ this.inputType = inputType;
+ this.groupingSets = new ArrayList<>(checkNotNull(groupingSets));
+ this.aggregateCalls = aggregateCalls;
+ }
+
+ @Override
+ public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
+ checkArgument(getProducedType().isPresent());
+ apply(
+ inputType,
+ groupingSets,
+ aggregateCalls,
+ getProducedType().get(),
+ tableSource,
+ context);
+ }
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ int[] grouping = groupingSets.get(0);
+ String groupingStr =
+ Arrays.stream(grouping)
+ .mapToObj(index -> inputType.getFieldNames().get(index))
+ .collect(Collectors.joining(","));
+
+ List aggregateExpressions =
+ buildAggregateExpressions(inputType, aggregateCalls);
+ String aggFunctionsStr =
+ aggregateExpressions.stream()
+ .map(AggregateExpression::asSummaryString)
+ .collect(Collectors.joining(","));
+
+ return "aggregates=[grouping=["
+ + groupingStr
+ + "], aggFunctions=["
+ + aggFunctionsStr
+ + "]]";
+ }
+
+ public static boolean apply(
+ RowType inputType,
+ List groupingSets,
+ List aggregateCalls,
+ RowType producedType,
+ DynamicTableSource tableSource,
+ SourceAbilityContext context) {
+ assert context.isBatchMode() && groupingSets.size() == 1;
+
+ List aggregateExpressions =
+ buildAggregateExpressions(inputType, aggregateCalls);
+
+ if (tableSource instanceof SupportsAggregatePushDown) {
+ DataType producedDataType = TypeConversions.fromLogicalToDataType(producedType);
+ return ((SupportsAggregatePushDown) tableSource)
+ .applyAggregates(groupingSets, aggregateExpressions, producedDataType);
+ } else {
+ throw new TableException(
+ String.format(
+ "%s does not support SupportsAggregatePushDown.",
+ tableSource.getClass().getName()));
+ }
+ }
+
+ private static List buildAggregateExpressions(
+ RowType inputType, List aggregateCalls) {
+ AggregateInfoList aggInfoList =
+ AggregateUtil.transformToBatchAggregateInfoList(
+ inputType, JavaScalaConversionUtil.toScala(aggregateCalls), null, null);
+ if (aggInfoList.aggInfos().length == 0) {
+ // no agg function need to be pushed down
+ return Collections.emptyList();
+ }
+
+ List aggExpressions = new ArrayList<>();
+ for (AggregateInfo aggInfo : aggInfoList.aggInfos()) {
+ List arguments = new ArrayList<>(1);
+ for (int argIndex : aggInfo.argIndexes()) {
+ DataType argType =
+ TypeConversions.fromLogicalToDataType(
+ inputType.getFields().get(argIndex).getType());
+ FieldReferenceExpression field =
+ new FieldReferenceExpression(
+ inputType.getFieldNames().get(argIndex), argType, 0, argIndex);
+ arguments.add(field);
+ }
+ if (aggInfo.function() instanceof AvgAggFunction) {
+ Tuple2 sum0AndCountFunction =
+ AggregateUtil.deriveSumAndCountFromAvg((AvgAggFunction) aggInfo.function());
+ AggregateExpression sum0Expression =
+ new AggregateExpression(
+ sum0AndCountFunction._1(),
+ arguments,
+ null,
+ aggInfo.externalResultType(),
+ aggInfo.agg().isDistinct(),
+ aggInfo.agg().isApproximate(),
+ aggInfo.agg().ignoreNulls());
+ aggExpressions.add(sum0Expression);
+ AggregateExpression countExpression =
+ new AggregateExpression(
+ sum0AndCountFunction._2(),
+ arguments,
+ null,
+ aggInfo.externalResultType(),
+ aggInfo.agg().isDistinct(),
+ aggInfo.agg().isApproximate(),
+ aggInfo.agg().ignoreNulls());
+ aggExpressions.add(countExpression);
+ } else {
+ AggregateExpression aggregateExpression =
+ new AggregateExpression(
+ aggInfo.function(),
+ arguments,
+ null,
+ aggInfo.externalResultType(),
+ aggInfo.agg().isDistinct(),
+ aggInfo.agg().isApproximate(),
+ aggInfo.agg().ignoreNulls());
+ aggExpressions.add(aggregateExpression);
+ }
+ }
+ return aggExpressions;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java
index 1fbb61a468f88..e3431c706518b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java
@@ -40,6 +40,7 @@
* project push down (SupportsProjectionPushDown)
* partition push down (SupportsPartitionPushDown)
* watermark push down (SupportsWatermarkPushDown)
+ * aggregate push down (SupportsAggregatePushDown)
* reading metadata (SupportsReadingMetadata)
*
*/
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
index 92326f0ac52c4..453ee4cedfcbb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
@@ -40,7 +40,8 @@
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
@JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
- @JsonSubTypes.Type(value = SourceWatermarkSpec.class)
+ @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
+ @JsonSubTypes.Type(value = AggregatePushDownSpec.class)
})
@Internal
public interface SourceAbilitySpec {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
new file mode 100644
index 0000000000000..fe9a8584b02aa
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoScanRuleBase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.abilities.source.AggregatePushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Planner rule that tries to push a local aggregator into an {@link BatchPhysicalTableSourceScan}
+ * whose table is a {@link TableSourceTable} with a source supporting {@link
+ * SupportsAggregatePushDown}.
+ *
+ * The aggregate push down does not support a number of more complex statements at present:
+ *
+ *
+ * - complex grouping operations such as ROLLUP, CUBE, or GROUPING SETS.
+ *
- expressions inside the aggregation function call: such as sum(a * b).
+ *
- aggregations with ordering.
+ *
- aggregations with filter.
+ *
+ */
+public abstract class PushLocalAggIntoScanRuleBase extends RelOptRule {
+
+ public PushLocalAggIntoScanRuleBase(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ protected boolean canPushDown(
+ RelOptRuleCall call,
+ BatchPhysicalGroupAggregateBase aggregate,
+ BatchPhysicalTableSourceScan tableSourceScan) {
+ TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
+ if (!tableConfig
+ .getConfiguration()
+ .getBoolean(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) {
+ return false;
+ }
+
+ if (aggregate.isFinal() || aggregate.getAggCallList().isEmpty()) {
+ return false;
+ }
+ List aggCallList =
+ JavaScalaConversionUtil.toJava(aggregate.getAggCallList());
+ for (AggregateCall aggCall : aggCallList) {
+ if (aggCall.isDistinct()
+ || aggCall.isApproximate()
+ || aggCall.getArgList().size() > 1
+ || aggCall.hasFilter()
+ || !aggCall.getCollation().getFieldCollations().isEmpty()) {
+ return false;
+ }
+ }
+ TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable();
+ // we can not push aggregates twice
+ return tableSourceTable != null
+ && tableSourceTable.tableSource() instanceof SupportsAggregatePushDown
+ && Arrays.stream(tableSourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof AggregatePushDownSpec);
+ }
+
+ protected void pushLocalAggregateIntoScan(
+ RelOptRuleCall call,
+ BatchPhysicalGroupAggregateBase localAgg,
+ BatchPhysicalTableSourceScan oldScan) {
+ pushLocalAggregateIntoScan(call, localAgg, oldScan, null);
+ }
+
+ protected void pushLocalAggregateIntoScan(
+ RelOptRuleCall call,
+ BatchPhysicalGroupAggregateBase localAgg,
+ BatchPhysicalTableSourceScan oldScan,
+ int[] calcRefFields) {
+ RowType inputType = FlinkTypeFactory.toLogicalRowType(oldScan.getRowType());
+ List groupingSets =
+ Collections.singletonList(
+ ArrayUtils.addAll(localAgg.grouping(), localAgg.auxGrouping()));
+ List aggCallList = JavaScalaConversionUtil.toJava(localAgg.getAggCallList());
+
+ // map arg index in aggregate to field index in scan through referred fields by calc.
+ if (calcRefFields != null) {
+ groupingSets = translateGroupingArgIndex(groupingSets, calcRefFields);
+ aggCallList = translateAggCallArgIndex(aggCallList, calcRefFields);
+ }
+
+ RowType producedType = FlinkTypeFactory.toLogicalRowType(localAgg.getRowType());
+
+ TableSourceTable oldTableSourceTable = oldScan.tableSourceTable();
+ DynamicTableSource newTableSource = oldScan.tableSource().copy();
+
+ boolean isPushDownSuccess =
+ AggregatePushDownSpec.apply(
+ inputType,
+ groupingSets,
+ aggCallList,
+ producedType,
+ newTableSource,
+ SourceAbilityContext.from(oldScan));
+
+ if (!isPushDownSuccess) {
+ // aggregate push down failed, just return without changing any nodes.
+ return;
+ }
+
+ // create new source table with new spec and statistic.
+ AggregatePushDownSpec aggregatePushDownSpec =
+ new AggregatePushDownSpec(inputType, groupingSets, aggCallList, producedType);
+
+ TableSourceTable newTableSourceTable =
+ oldTableSourceTable
+ .copy(
+ newTableSource,
+ localAgg.getRowType(),
+ new SourceAbilitySpec[] {aggregatePushDownSpec})
+ .copy(FlinkStatistic.UNKNOWN());
+
+ // transform to new nodes.
+ BatchPhysicalTableSourceScan newScan =
+ oldScan.copy(oldScan.getTraitSet(), newTableSourceTable);
+ BatchPhysicalExchange oldExchange = call.rel(0);
+ BatchPhysicalExchange newExchange =
+ oldExchange.copy(oldExchange.getTraitSet(), newScan, oldExchange.getDistribution());
+ call.transformTo(newExchange);
+ }
+
+ protected boolean isProjectionNotPushedDown(BatchPhysicalTableSourceScan tableSourceScan) {
+ TableSourceTable tableSourceTable = tableSourceScan.tableSourceTable();
+ return tableSourceTable != null
+ && Arrays.stream(tableSourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+ }
+
+ /**
+ * Currently, we only supports to push down aggregate above calc which has input ref only.
+ *
+ * @param calc BatchPhysicalCalc
+ * @return true if OK to be pushed down
+ */
+ protected boolean isInputRefOnly(BatchPhysicalCalc calc) {
+ RexProgram program = calc.getProgram();
+
+ // check if condition exists. All filters should have been pushed down.
+ if (program.getCondition() != null) {
+ return false;
+ }
+
+ return !program.getProjectList().isEmpty()
+ && program.getProjectList().stream()
+ .map(calc.getProgram()::expandLocalRef)
+ .allMatch(RexInputRef.class::isInstance);
+ }
+
+ protected int[] getRefFiledIndex(BatchPhysicalCalc calc) {
+ List projects =
+ calc.getProgram().getProjectList().stream()
+ .map(calc.getProgram()::expandLocalRef)
+ .collect(Collectors.toList());
+
+ return RexNodeExtractor.extractRefInputFields(projects);
+ }
+
+ protected List translateGroupingArgIndex(List groupingSets, int[] refFields) {
+ List newGroupingSets = new ArrayList<>();
+ groupingSets.forEach(
+ grouping -> {
+ int[] newGrouping = new int[grouping.length];
+ for (int i = 0; i < grouping.length; i++) {
+ int argIndex = grouping[i];
+ newGrouping[i] = refFields[argIndex];
+ }
+ newGroupingSets.add(newGrouping);
+ });
+
+ return newGroupingSets;
+ }
+
+ protected List translateAggCallArgIndex(
+ List aggCallList, int[] refFields) {
+ List newAggCallList = new ArrayList<>();
+ aggCallList.forEach(
+ aggCall -> {
+ List argList = new ArrayList<>();
+ for (int i = 0; i < aggCall.getArgList().size(); i++) {
+ int argIndex = aggCall.getArgList().get(i);
+ argList.add(refFields[argIndex]);
+ }
+ newAggCallList.add(aggCall.copy(argList, aggCall.filterArg, aggCall.collation));
+ });
+
+ return newAggCallList;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalHashAggIntoScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalHashAggIntoScanRule.java
new file mode 100644
index 0000000000000..0678162929ed7
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalHashAggIntoScanRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local hash aggregate which without sort into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * Suppose we have the original physical plan:
+ *
+ *
{@code
+ * BatchPhysicalHashAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalLocalHashAggregate (local)
+ * +- BatchPhysicalTableSourceScan
+ * }
+ *
+ * This physical plan will be rewritten to:
+ *
+ *
{@code
+ * BatchPhysicalHashAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalTableSourceScan (with local aggregate pushed down)
+ * }
+ */
+public class PushLocalHashAggIntoScanRule extends PushLocalAggIntoScanRuleBase {
+ public static final PushLocalHashAggIntoScanRule INSTANCE = new PushLocalHashAggIntoScanRule();
+
+ public PushLocalHashAggIntoScanRule() {
+ super(
+ operand(
+ BatchPhysicalExchange.class,
+ operand(
+ BatchPhysicalLocalHashAggregate.class,
+ operand(BatchPhysicalTableSourceScan.class, none()))),
+ "PushLocalHashAggIntoScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ BatchPhysicalLocalHashAggregate localAggregate = call.rel(1);
+ BatchPhysicalTableSourceScan tableSourceScan = call.rel(2);
+ return canPushDown(call, localAggregate, tableSourceScan);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ BatchPhysicalLocalHashAggregate localHashAgg = call.rel(1);
+ BatchPhysicalTableSourceScan oldScan = call.rel(2);
+ pushLocalAggregateIntoScan(call, localHashAgg, oldScan);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalHashAggWithCalcIntoScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalHashAggWithCalcIntoScanRule.java
new file mode 100755
index 0000000000000..87f47c5154181
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalHashAggWithCalcIntoScanRule.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local hash aggregate which with calc into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * Suppose we have the original physical plan:
+ *
+ *
{@code
+ * BatchPhysicalHashAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalLocalHashAggregate (local)
+ * +- BatchPhysicalCalc (filed projection only)
+ * +- BatchPhysicalTableSourceScan
+ * }
+ *
+ * This physical plan will be rewritten to:
+ *
+ *
{@code
+ * BatchPhysicalHashAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalTableSourceScan (with local aggregate pushed down)
+ * }
+ */
+public class PushLocalHashAggWithCalcIntoScanRule extends PushLocalAggIntoScanRuleBase {
+ public static final PushLocalHashAggWithCalcIntoScanRule INSTANCE =
+ new PushLocalHashAggWithCalcIntoScanRule();
+
+ public PushLocalHashAggWithCalcIntoScanRule() {
+ super(
+ operand(
+ BatchPhysicalExchange.class,
+ operand(
+ BatchPhysicalLocalHashAggregate.class,
+ operand(
+ BatchPhysicalCalc.class,
+ operand(BatchPhysicalTableSourceScan.class, none())))),
+ "PushLocalHashAggWithCalcIntoScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ BatchPhysicalLocalHashAggregate localHashAgg = call.rel(1);
+ BatchPhysicalCalc calc = call.rel(2);
+ BatchPhysicalTableSourceScan tableSourceScan = call.rel(3);
+
+ return isInputRefOnly(calc)
+ && isProjectionNotPushedDown(tableSourceScan)
+ && canPushDown(call, localHashAgg, tableSourceScan);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ BatchPhysicalLocalHashAggregate localHashAgg = call.rel(1);
+ BatchPhysicalCalc calc = call.rel(2);
+ BatchPhysicalTableSourceScan oldScan = call.rel(3);
+
+ int[] calcRefFields = getRefFiledIndex(calc);
+
+ pushLocalAggregateIntoScan(call, localHashAgg, oldScan, calcRefFields);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggIntoScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggIntoScanRule.java
new file mode 100755
index 0000000000000..ca101ca3cd8cb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggIntoScanRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local sort aggregate which without sort into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * Suppose we have the original physical plan:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalLocalSortAggregate (local)
+ * +- BatchPhysicalTableSourceScan
+ * }
+ *
+ * This physical plan will be rewritten to:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalTableSourceScan (with local aggregate pushed down)
+ * }
+ */
+public class PushLocalSortAggIntoScanRule extends PushLocalAggIntoScanRuleBase {
+ public static final PushLocalSortAggIntoScanRule INSTANCE = new PushLocalSortAggIntoScanRule();
+
+ public PushLocalSortAggIntoScanRule() {
+ super(
+ operand(
+ BatchPhysicalExchange.class,
+ operand(
+ BatchPhysicalLocalSortAggregate.class,
+ operand(BatchPhysicalTableSourceScan.class, none()))),
+ "PushLocalSortAggIntoScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ BatchPhysicalLocalSortAggregate localAggregate = call.rel(1);
+ BatchPhysicalTableSourceScan tableSourceScan = call.rel(2);
+ return canPushDown(call, localAggregate, tableSourceScan);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ BatchPhysicalLocalSortAggregate localHashAgg = call.rel(1);
+ BatchPhysicalTableSourceScan oldScan = call.rel(2);
+ pushLocalAggregateIntoScan(call, localHashAgg, oldScan);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithCalcIntoScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithCalcIntoScanRule.java
new file mode 100755
index 0000000000000..e56e3aa9dd866
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithCalcIntoScanRule.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local sort aggregate which without sort into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * Suppose we have the original physical plan:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalLocalSortAggregate (local)
+ * +- BatchPhysicalCalc (filed projection only)
+ * +- BatchPhysicalTableSourceScan
+ * }
+ *
+ * This physical plan will be rewritten to:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalTableSourceScan (with local aggregate pushed down)
+ * }
+ */
+public class PushLocalSortAggWithCalcIntoScanRule extends PushLocalAggIntoScanRuleBase {
+ public static final PushLocalSortAggWithCalcIntoScanRule INSTANCE =
+ new PushLocalSortAggWithCalcIntoScanRule();
+
+ public PushLocalSortAggWithCalcIntoScanRule() {
+ super(
+ operand(
+ BatchPhysicalExchange.class,
+ operand(
+ BatchPhysicalLocalSortAggregate.class,
+ operand(
+ BatchPhysicalCalc.class,
+ operand(BatchPhysicalTableSourceScan.class, none())))),
+ "PushLocalSortAggWithCalcIntoScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ BatchPhysicalLocalSortAggregate localAggregate = call.rel(1);
+ BatchPhysicalCalc calc = call.rel(2);
+ BatchPhysicalTableSourceScan tableSourceScan = call.rel(3);
+
+ return isInputRefOnly(calc)
+ && isProjectionNotPushedDown(tableSourceScan)
+ && canPushDown(call, localAggregate, tableSourceScan);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ BatchPhysicalLocalSortAggregate localHashAgg = call.rel(1);
+ BatchPhysicalCalc calc = call.rel(2);
+ BatchPhysicalTableSourceScan oldScan = call.rel(3);
+
+ int[] calcRefFields = getRefFiledIndex(calc);
+
+ pushLocalAggregateIntoScan(call, localHashAgg, oldScan, calcRefFields);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithSortAndCalcIntoScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithSortAndCalcIntoScanRule.java
new file mode 100755
index 0000000000000..d9c340a00edad
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithSortAndCalcIntoScanRule.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local sort aggregate which with sort and calc into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * Suppose we have the original physical plan:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalSort (exists if group keys are not empty)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalLocalSortAggregate (local)
+ * +- BatchPhysicalSort (exists if group keys are not empty)
+ * +- BatchPhysicalCalc (filed projection only)
+ * +- BatchPhysicalTableSourceScan
+ * }
+ *
+ * This physical plan will be rewritten to:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalSort (exists if group keys are not empty)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalTableSourceScan (with local aggregate pushed down)
+ * }
+ */
+public class PushLocalSortAggWithSortAndCalcIntoScanRule extends PushLocalAggIntoScanRuleBase {
+ public static final PushLocalSortAggWithSortAndCalcIntoScanRule INSTANCE =
+ new PushLocalSortAggWithSortAndCalcIntoScanRule();
+
+ public PushLocalSortAggWithSortAndCalcIntoScanRule() {
+ super(
+ operand(
+ BatchPhysicalExchange.class,
+ operand(
+ BatchPhysicalLocalSortAggregate.class,
+ operand(
+ BatchPhysicalSort.class,
+ operand(
+ BatchPhysicalCalc.class,
+ operand(
+ BatchPhysicalTableSourceScan.class,
+ none()))))),
+ "PushLocalSortAggWithSortAndCalcIntoScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ BatchPhysicalGroupAggregateBase localAggregate = call.rel(1);
+ BatchPhysicalCalc calc = call.rel(3);
+ BatchPhysicalTableSourceScan tableSourceScan = call.rel(4);
+
+ return isInputRefOnly(calc)
+ && isProjectionNotPushedDown(tableSourceScan)
+ && canPushDown(call, localAggregate, tableSourceScan);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ BatchPhysicalGroupAggregateBase localSortAgg = call.rel(1);
+ BatchPhysicalCalc calc = call.rel(3);
+ BatchPhysicalTableSourceScan oldScan = call.rel(4);
+
+ int[] calcRefFields = getRefFiledIndex(calc);
+
+ pushLocalAggregateIntoScan(call, localSortAgg, oldScan, calcRefFields);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithSortIntoScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithSortIntoScanRule.java
new file mode 100644
index 0000000000000..9d952b253afd7
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalSortAggWithSortIntoScanRule.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/**
+ * Planner rule that tries to push a local sort aggregate which with sort into a {@link
+ * BatchPhysicalTableSourceScan} whose table is a {@link TableSourceTable} with a source supporting
+ * {@link SupportsAggregatePushDown}. The {@link
+ * OptimizerConfigOptions#TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED} need to be true.
+ *
+ * Suppose we have the original physical plan:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalSort (exists if group keys are not empty)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalLocalSortAggregate (local)
+ * +- BatchPhysicalSort (exists if group keys are not empty)
+ * +- BatchPhysicalTableSourceScan
+ * }
+ *
+ * This physical plan will be rewritten to:
+ *
+ *
{@code
+ * BatchPhysicalSortAggregate (global)
+ * +- BatchPhysicalSort (exists if group keys are not empty)
+ * +- BatchPhysicalExchange (hash by group keys if group keys is not empty, else singleton)
+ * +- BatchPhysicalTableSourceScan (with local aggregate pushed down)
+ * }
+ */
+public class PushLocalSortAggWithSortIntoScanRule extends PushLocalAggIntoScanRuleBase {
+ public static final PushLocalSortAggWithSortIntoScanRule INSTANCE =
+ new PushLocalSortAggWithSortIntoScanRule();
+
+ public PushLocalSortAggWithSortIntoScanRule() {
+ super(
+ operand(
+ BatchPhysicalExchange.class,
+ operand(
+ BatchPhysicalLocalSortAggregate.class,
+ operand(
+ BatchPhysicalSort.class,
+ operand(BatchPhysicalTableSourceScan.class, none())))),
+ "PushLocalSortAggWithSortIntoScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ BatchPhysicalGroupAggregateBase localAggregate = call.rel(1);
+ BatchPhysicalTableSourceScan tableSourceScan = call.rel(3);
+ return canPushDown(call, localAggregate, tableSourceScan);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ BatchPhysicalGroupAggregateBase localSortAgg = call.rel(1);
+ BatchPhysicalTableSourceScan oldScan = call.rel(3);
+ pushLocalAggregateIntoScan(call, localSortAgg, oldScan);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala
index 3021002c83449..0a3563d397e76 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala
@@ -49,6 +49,12 @@ class BatchPhysicalTableSourceScan(
new BatchPhysicalTableSourceScan(cluster, traitSet, getHints, tableSourceTable)
}
+ def copy(
+ traitSet: RelTraitSet,
+ tableSourceTable: TableSourceTable): BatchPhysicalTableSourceScan = {
+ new BatchPhysicalTableSourceScan(cluster, traitSet, getHints, tableSourceTable)
+ }
+
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val rowCnt = mq.getRowCount(this)
if (rowCnt == null) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 83fa93b04b81b..28db5c8143eb3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -448,6 +448,12 @@ object FlinkBatchRuleSets {
*/
val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(
EnforceLocalHashAggRule.INSTANCE,
- EnforceLocalSortAggRule.INSTANCE
+ EnforceLocalSortAggRule.INSTANCE,
+ PushLocalHashAggIntoScanRule.INSTANCE,
+ PushLocalHashAggWithCalcIntoScanRule.INSTANCE,
+ PushLocalSortAggIntoScanRule.INSTANCE,
+ PushLocalSortAggWithSortIntoScanRule.INSTANCE,
+ PushLocalSortAggWithCalcIntoScanRule.INSTANCE,
+ PushLocalSortAggWithSortAndCalcIntoScanRule.INSTANCE
)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
index cc96c108e104b..2ef50b4114d4a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
@@ -131,4 +131,23 @@ class TableSourceTable(
flinkContext,
abilitySpecs ++ newAbilitySpecs)
}
+
+ /**
+ * Creates a copy of this table, changing the statistic
+ *
+ * @param newStatistic new table statistic
+ * @return New TableSourceTable instance with new statistic
+ */
+ def copy(newStatistic: FlinkStatistic): TableSourceTable = {
+ new TableSourceTable(
+ relOptSchema,
+ tableIdentifier,
+ rowType,
+ newStatistic,
+ tableSource,
+ isStreamingMode,
+ catalogTable,
+ flinkContext,
+ abilitySpecs)
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 7294d24d7f4d0..8c820930fbfaf 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -27,7 +27,9 @@ import org.apache.flink.table.planner.JLong
import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.expressions._
-import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
+import org.apache.flink.table.planner.functions.aggfunctions.{AvgAggFunction, CountAggFunction, DeclarativeAggregateFunction, Sum0AggFunction}
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.{ByteAvgAggFunction, DoubleAvgAggFunction, FloatAvgAggFunction, IntAvgAggFunction, LongAvgAggFunction, ShortAvgAggFunction}
+import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction.{ByteSum0AggFunction, DoubleSum0AggFunction, FloatSum0AggFunction, IntSum0AggFunction, LongSum0AggFunction, ShortSum0AggFunction}
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction
import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext
import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlFirstLastValueAggFunction, SqlListAggFunction}
@@ -50,7 +52,6 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
import org.apache.flink.table.types.logical.{LogicalTypeRoot, _}
import org.apache.flink.table.types.utils.DataTypeUtils
-
import org.apache.calcite.rel.`type`._
import org.apache.calcite.rel.core.Aggregate.AggCallBinding
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
@@ -62,7 +63,6 @@ import org.apache.calcite.tools.RelBuilder
import java.time.Duration
import java.util
-
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -278,6 +278,21 @@ object AggregateUtil extends Enumeration {
isBounded = false)
}
+ def deriveSumAndCountFromAvg(
+ avgAggFunction: AvgAggFunction): (Sum0AggFunction, CountAggFunction) = {
+ avgAggFunction match {
+ case _: ByteAvgAggFunction => (new ByteSum0AggFunction, new CountAggFunction)
+ case _: ShortAvgAggFunction => (new ShortSum0AggFunction, new CountAggFunction)
+ case _: IntAvgAggFunction => (new IntSum0AggFunction, new CountAggFunction)
+ case _: LongAvgAggFunction => (new LongSum0AggFunction, new CountAggFunction)
+ case _: FloatAvgAggFunction => (new FloatSum0AggFunction, new CountAggFunction)
+ case _: DoubleAvgAggFunction => (new DoubleSum0AggFunction, new CountAggFunction)
+ case _ =>
+ throw new TableException(s"Avg aggregate function does not support: ''$avgAggFunction''" +
+ s"Please re-check the function or data type.")
+ }
+ }
+
def transformToBatchAggregateFunctions(
inputRowType: RowType,
aggregateCalls: Seq[AggregateCall],
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index d71787621b1f8..404d05a3c3e19 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -54,6 +54,7 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
@@ -62,11 +63,14 @@
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.AggregateExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingOutputFormat;
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingSinkFunction;
@@ -74,10 +78,17 @@
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.KeyedUpsertingSinkFunction;
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.RetractingSinkFunction;
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.MaxAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.MinAggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
+import org.apache.flink.table.planner.functions.aggfunctions.SumAggFunction;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.utils.FilterUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -95,6 +106,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -286,6 +298,9 @@ private static RowKind parseRowKind(String rowKindShortString) {
private static final ConfigOption SINK_EXPECTED_MESSAGES_NUM =
ConfigOptions.key("sink-expected-messages-num").intType().defaultValue(-1);
+ private static final ConfigOption ENABLE_PROJECTION_PUSH_DOWN =
+ ConfigOptions.key("enable-projection-push-down").booleanType().defaultValue(true);
+
private static final ConfigOption NESTED_PROJECTION_SUPPORTED =
ConfigOptions.key("nested-projection-supported").booleanType().defaultValue(false);
@@ -361,6 +376,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boolean isAsync = helper.getOptions().get(ASYNC_ENABLED);
String lookupFunctionClass = helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
boolean disableLookup = helper.getOptions().get(DISABLE_LOOKUP);
+ boolean enableProjectionPushDown = helper.getOptions().get(ENABLE_PROJECTION_PUSH_DOWN);
boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
boolean enableWatermarkPushDown = helper.getOptions().get(ENABLE_WATERMARK_PUSH_DOWN);
boolean failingSource = helper.getOptions().get(FAILING_SOURCE);
@@ -398,6 +414,25 @@ public DynamicTableSource createDynamicTableSource(Context context) {
partition2Rows.put(Collections.emptyMap(), data);
}
+ if (!enableProjectionPushDown) {
+ return new TestValuesScanTableSourceWithoutProjectionPushDown(
+ producedDataType,
+ changelogMode,
+ isBounded,
+ runtimeSource,
+ failingSource,
+ partition2Rows,
+ nestedProjectionSupported,
+ null,
+ Collections.emptyList(),
+ filterableFieldsSet,
+ numElementToSkip,
+ Long.MAX_VALUE,
+ partitions,
+ readableMetadata,
+ null);
+ }
+
if (disableLookup) {
if (enableWatermarkPushDown) {
return new TestValuesScanTableSourceWithWatermarkPushDown(
@@ -541,6 +576,7 @@ public Set> optionalOptions() {
SINK_INSERT_ONLY,
RUNTIME_SINK,
SINK_EXPECTED_MESSAGES_NUM,
+ ENABLE_PROJECTION_PUSH_DOWN,
NESTED_PROJECTION_SUPPORTED,
FILTERABLE_FIELDS,
PARTITION_LIST,
@@ -679,14 +715,14 @@ private static Map convertToMetadataMap(
// Table sources
// --------------------------------------------------------------------------------------------
- /** Values {@link ScanTableSource} for testing. */
- private static class TestValuesScanTableSource
+ /** Values {@link ScanTableSource} for testing that disables projection push down. */
+ private static class TestValuesScanTableSourceWithoutProjectionPushDown
implements ScanTableSource,
- SupportsProjectionPushDown,
SupportsFilterPushDown,
SupportsLimitPushDown,
SupportsPartitionPushDown,
- SupportsReadingMetadata {
+ SupportsReadingMetadata,
+ SupportsAggregatePushDown {
protected DataType producedDataType;
protected final ChangelogMode changelogMode;
@@ -705,7 +741,10 @@ private static class TestValuesScanTableSource
protected final Map readableMetadata;
protected @Nullable int[] projectedMetadataFields;
- private TestValuesScanTableSource(
+ private @Nullable int[] groupingSet;
+ private List aggregateExpressions;
+
+ private TestValuesScanTableSourceWithoutProjectionPushDown(
DataType producedDataType,
ChangelogMode changelogMode,
boolean bounded,
@@ -736,6 +775,8 @@ private TestValuesScanTableSource(
this.allPartitions = allPartitions;
this.readableMetadata = readableMetadata;
this.projectedMetadataFields = projectedMetadataFields;
+ this.groupingSet = null;
+ this.aggregateExpressions = Collections.emptyList();
}
@Override
@@ -802,17 +843,6 @@ public boolean isBounded() {
}
}
- @Override
- public boolean supportsNestedProjection() {
- return nestedProjectionSupported;
- }
-
- @Override
- public void applyProjection(int[][] projectedFields) {
- this.producedDataType = DataTypeUtils.projectRow(producedDataType, projectedFields);
- this.projectedPhysicalFields = projectedFields;
- }
-
@Override
public Result applyFilters(List filters) {
List acceptedFilters = new ArrayList<>();
@@ -838,7 +868,7 @@ private Function> getValueGetter(Row row) {
@Override
public DynamicTableSource copy() {
- return new TestValuesScanTableSource(
+ return new TestValuesScanTableSourceWithoutProjectionPushDown(
producedDataType,
changelogMode,
bounded,
@@ -867,27 +897,129 @@ protected Collection convertToRowData(DataStructureConverter converter)
allPartitions.isEmpty()
? Collections.singletonList(Collections.emptyMap())
: allPartitions;
- int numRetained = 0;
+
+ int numSkipped = 0;
for (Map partition : keys) {
- for (Row row : data.get(partition)) {
+ Collection rowsInPartition = data.get(partition);
+
+ // handle element skipping
+ int numToSkipInPartition = 0;
+ if (numSkipped < numElementToSkip) {
+ numToSkipInPartition =
+ Math.min(rowsInPartition.size(), numElementToSkip - numSkipped);
+ }
+ numSkipped += numToSkipInPartition;
+
+ // handle predicates and projection
+ List rowsRetained =
+ rowsInPartition.stream()
+ .skip(numToSkipInPartition)
+ .filter(
+ row ->
+ FilterUtils.isRetainedAfterApplyingFilterPredicates(
+ filterPredicates, getValueGetter(row)))
+ .map(
+ row -> {
+ Row projectedRow = projectRow(row);
+ projectedRow.setKind(row.getKind());
+ return projectedRow;
+ })
+ .collect(Collectors.toList());
+
+ // handle aggregates
+ if (!aggregateExpressions.isEmpty()) {
+ rowsRetained = applyAggregatesToRows(rowsRetained);
+ }
+
+ // handle row data
+ for (Row row : rowsRetained) {
+ final RowData rowData = (RowData) converter.toInternal(row);
+ if (rowData != null) {
+ rowData.setRowKind(row.getKind());
+ result.add(rowData);
+ }
+
+ // handle limit. No aggregates will be pushed down when there is a limit.
if (result.size() >= limit) {
return result;
}
- boolean isRetained =
- FilterUtils.isRetainedAfterApplyingFilterPredicates(
- filterPredicates, getValueGetter(row));
- if (isRetained) {
- final Row projectedRow = projectRow(row);
- final RowData rowData = (RowData) converter.toInternal(projectedRow);
- if (rowData != null) {
- if (numRetained >= numElementToSkip) {
- rowData.setRowKind(row.getKind());
- result.add(rowData);
- }
- numRetained++;
- }
+ }
+ }
+
+ return result;
+ }
+
+ private List applyAggregatesToRows(List rows) {
+ if (groupingSet != null && groupingSet.length > 0) {
+ // has group by, group firstly
+ Map> buffer = new HashMap<>();
+ for (Row row : rows) {
+ Row bufferKey = new Row(groupingSet.length);
+ for (int i = 0; i < groupingSet.length; i++) {
+ bufferKey.setField(i, row.getField(groupingSet[i]));
+ }
+ if (buffer.containsKey(bufferKey)) {
+ buffer.get(bufferKey).add(row);
+ } else {
+ buffer.put(bufferKey, new ArrayList<>(Collections.singletonList(row)));
}
}
+ List result = new ArrayList<>();
+ for (Map.Entry> entry : buffer.entrySet()) {
+ result.add(Row.join(entry.getKey(), accumulateRows(entry.getValue())));
+ }
+ return result;
+ } else {
+ return Collections.singletonList(accumulateRows(rows));
+ }
+ }
+
+ // can only apply sum/sum0/avg function for long type fields for testing
+ private Row accumulateRows(List rows) {
+ Row result = new Row(aggregateExpressions.size());
+ for (int i = 0; i < aggregateExpressions.size(); i++) {
+ FunctionDefinition aggFunction =
+ aggregateExpressions.get(i).getFunctionDefinition();
+ List arguments = aggregateExpressions.get(i).getArgs();
+ if (aggFunction instanceof MinAggFunction) {
+ int argIndex = arguments.get(0).getFieldIndex();
+ Row minRow =
+ rows.stream()
+ .min(Comparator.comparing(row -> row.getFieldAs(argIndex)))
+ .orElse(null);
+ result.setField(i, minRow != null ? minRow.getField(argIndex) : null);
+ } else if (aggFunction instanceof MaxAggFunction) {
+ int argIndex = arguments.get(0).getFieldIndex();
+ Row maxRow =
+ rows.stream()
+ .max(Comparator.comparing(row -> row.getFieldAs(argIndex)))
+ .orElse(null);
+ result.setField(i, maxRow != null ? maxRow.getField(argIndex) : null);
+ } else if (aggFunction instanceof SumAggFunction) {
+ int argIndex = arguments.get(0).getFieldIndex();
+ Object finalSum =
+ rows.stream()
+ .filter(row -> row.getField(argIndex) != null)
+ .mapToLong(row -> row.getFieldAs(argIndex))
+ .sum();
+
+ boolean allNull = rows.stream().noneMatch(r -> r.getField(argIndex) != null);
+ result.setField(i, allNull ? null : finalSum);
+ } else if (aggFunction instanceof Sum0AggFunction) {
+ int argIndex = arguments.get(0).getFieldIndex();
+ Object finalSum0 =
+ rows.stream()
+ .filter(row -> row.getField(argIndex) != null)
+ .mapToLong(row -> row.getFieldAs(argIndex))
+ .sum();
+ result.setField(i, finalSum0);
+ } else if (aggFunction instanceof CountAggFunction) {
+ int argIndex = arguments.get(0).getFieldIndex();
+ long count = rows.stream().filter(r -> r.getField(argIndex) != null).count();
+ result.setField(i, count);
+ } else if (aggFunction instanceof Count1AggFunction) {
+ result.setField(i, (long) rows.size());
+ }
}
return result;
}
@@ -953,6 +1085,52 @@ public void applyPartitions(List