Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
<td>Boolean</td>
<td>When it is true, the optimizer will try to find out duplicated sub-plans and reuse them.</td>
</tr>
<tr>
<td><h5>table.optimizer.source.aggregate-pushdown-enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When it is true, the optimizer will push down the local aggregates into the TableSource which implements SupportsAggregatePushDown.</td>
</tr>
<tr>
<td><h5>table.optimizer.source.predicate-pushdown-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ public class OptimizerConfigOptions {
+ TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
+ " is true.");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED =
key("table.optimizer.source.aggregate-pushdown-enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"When it is true, the optimizer will push down the local aggregates into "
+ "the TableSource which implements SupportsAggregatePushDown.");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED =
key("table.optimizer.source.predicate-pushdown-enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@
*
* <p>Regardless if this interface is implemented or not, a final aggregation is always applied in a
* subsequent operation after the source.
*
* <p>Note: currently, the {@link SupportsAggregatePushDown} is not supported by planner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iyupeng are we sure that the JavaDocs don't need more updates for this PR?

For example, is this description still correct?

 * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
 * all aggregate functions are supported.

Also maybe we should mention the config option that was added in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @twalthr, the strategy is still all or nothing here. applyAggregates returns a bool value, meaning whether all local aggregates are accepted by underlying data source or not at all.

The docs could be improved, like mentioning the new option, etc.

I could push a hotfix for docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great. The more information in the interface description, the better for all implementers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @twalthr I pushed this for doc improvement: #17630

*/
@PublicEvolving
public interface SupportsAggregatePushDown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.table.expressions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -47,9 +46,6 @@
* <li>{@code approximate} indicates whether this is a approximate aggregate function.
* <li>{@code ignoreNulls} indicates whether this aggregate function ignore null value.
* </ul>
*
* <p>Note: currently, the {@link AggregateExpression} is only used in {@link
* SupportsAggregatePushDown}.
*/
@PublicEvolving
public class AggregateExpression implements ResolvedExpression {
Expand Down Expand Up @@ -107,7 +103,6 @@ public List<FieldReferenceExpression> getArgs() {
return args;
}

@Nullable
public Optional<CallExpression> getFilterExpression() {
return Optional.ofNullable(filterExpression);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.abilities.source;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import org.apache.flink.table.expressions.AggregateExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
import org.apache.flink.table.planner.plan.utils.AggregateInfo;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;

import org.apache.calcite.rel.core.AggregateCall;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import scala.Tuple2;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the aggregation
* to/from JSON, but also can push the local aggregate into a {@link SupportsAggregatePushDown}.
*/
@JsonTypeName("AggregatePushDown")
public class AggregatePushDownSpec extends SourceAbilitySpecBase {

public static final String FIELD_NAME_INPUT_TYPE = "inputType";

public static final String FIELD_NAME_GROUPING_SETS = "groupingSets";

public static final String FIELD_NAME_AGGREGATE_CALLS = "aggregateCalls";

@JsonProperty(FIELD_NAME_INPUT_TYPE)
private final RowType inputType;

@JsonProperty(FIELD_NAME_GROUPING_SETS)
private final List<int[]> groupingSets;

@JsonProperty(FIELD_NAME_AGGREGATE_CALLS)
private final List<AggregateCall> aggregateCalls;

@JsonCreator
public AggregatePushDownSpec(
@JsonProperty(FIELD_NAME_INPUT_TYPE) RowType inputType,
@JsonProperty(FIELD_NAME_GROUPING_SETS) List<int[]> groupingSets,
@JsonProperty(FIELD_NAME_AGGREGATE_CALLS) List<AggregateCall> aggregateCalls,
@JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) {
super(producedType);

this.inputType = inputType;
this.groupingSets = new ArrayList<>(checkNotNull(groupingSets));
this.aggregateCalls = aggregateCalls;
}

@Override
public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
checkArgument(getProducedType().isPresent());
apply(
inputType,
groupingSets,
aggregateCalls,
getProducedType().get(),
tableSource,
context);
}

@Override
public String getDigests(SourceAbilityContext context) {
int[] grouping = groupingSets.get(0);
String groupingStr =
Arrays.stream(grouping)
.mapToObj(index -> inputType.getFieldNames().get(index))
.collect(Collectors.joining(","));

List<AggregateExpression> aggregateExpressions =
buildAggregateExpressions(inputType, aggregateCalls);
String aggFunctionsStr =
aggregateExpressions.stream()
.map(AggregateExpression::asSummaryString)
.collect(Collectors.joining(","));

return "aggregates=[grouping=["
+ groupingStr
+ "], aggFunctions=["
+ aggFunctionsStr
+ "]]";
}

public static boolean apply(
RowType inputType,
List<int[]> groupingSets,
List<AggregateCall> aggregateCalls,
RowType producedType,
DynamicTableSource tableSource,
SourceAbilityContext context) {
assert context.isBatchMode() && groupingSets.size() == 1;

List<AggregateExpression> aggregateExpressions =
buildAggregateExpressions(inputType, aggregateCalls);

if (tableSource instanceof SupportsAggregatePushDown) {
DataType producedDataType = TypeConversions.fromLogicalToDataType(producedType);
return ((SupportsAggregatePushDown) tableSource)
.applyAggregates(groupingSets, aggregateExpressions, producedDataType);
} else {
throw new TableException(
String.format(
"%s does not support SupportsAggregatePushDown.",
tableSource.getClass().getName()));
}
}

private static List<AggregateExpression> buildAggregateExpressions(
RowType inputType, List<AggregateCall> aggregateCalls) {
AggregateInfoList aggInfoList =
AggregateUtil.transformToBatchAggregateInfoList(
inputType, JavaScalaConversionUtil.toScala(aggregateCalls), null, null);
if (aggInfoList.aggInfos().length == 0) {
// no agg function need to be pushed down
return Collections.emptyList();
}

List<AggregateExpression> aggExpressions = new ArrayList<>();
for (AggregateInfo aggInfo : aggInfoList.aggInfos()) {
List<FieldReferenceExpression> arguments = new ArrayList<>(1);
for (int argIndex : aggInfo.argIndexes()) {
DataType argType =
TypeConversions.fromLogicalToDataType(
inputType.getFields().get(argIndex).getType());
FieldReferenceExpression field =
new FieldReferenceExpression(
inputType.getFieldNames().get(argIndex), argType, 0, argIndex);
arguments.add(field);
}
if (aggInfo.function() instanceof AvgAggFunction) {
Tuple2<Sum0AggFunction, CountAggFunction> sum0AndCountFunction =
AggregateUtil.deriveSumAndCountFromAvg((AvgAggFunction) aggInfo.function());
AggregateExpression sum0Expression =
new AggregateExpression(
sum0AndCountFunction._1(),
arguments,
null,
aggInfo.externalResultType(),
aggInfo.agg().isDistinct(),
aggInfo.agg().isApproximate(),
aggInfo.agg().ignoreNulls());
aggExpressions.add(sum0Expression);
AggregateExpression countExpression =
new AggregateExpression(
sum0AndCountFunction._2(),
arguments,
null,
aggInfo.externalResultType(),
aggInfo.agg().isDistinct(),
aggInfo.agg().isApproximate(),
aggInfo.agg().ignoreNulls());
aggExpressions.add(countExpression);
} else {
AggregateExpression aggregateExpression =
new AggregateExpression(
aggInfo.function(),
arguments,
null,
aggInfo.externalResultType(),
aggInfo.agg().isDistinct(),
aggInfo.agg().isApproximate(),
aggInfo.agg().ignoreNulls());
aggExpressions.add(aggregateExpression);
}
}
return aggExpressions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* <li>project push down (SupportsProjectionPushDown)
* <li>partition push down (SupportsPartitionPushDown)
* <li>watermark push down (SupportsWatermarkPushDown)
* <li>aggregate push down (SupportsAggregatePushDown)
* <li>reading metadata (SupportsReadingMetadata)
* </ul>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
@JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
@JsonSubTypes.Type(value = SourceWatermarkSpec.class)
@JsonSubTypes.Type(value = SourceWatermarkSpec.class),
@JsonSubTypes.Type(value = AggregatePushDownSpec.class)
})
@Internal
public interface SourceAbilitySpec {
Expand Down
Loading