From 3c67cce3f01442fb67d9c3b823312afcfe752095 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 27 Oct 2020 17:33:43 +0800 Subject: [PATCH] [FLINK-19694][table-planner-blink] Support upsert ChangelogMode for ScanTableSource in planner This closes #13721 --- .../PushProjectIntoTableSourceScanRule.java | 50 +++- .../planner/sources/DynamicSourceUtils.java | 21 +- .../stream/StreamExecChangelogNormalize.scala | 139 +++++++++ .../FlinkChangelogModeInferenceProgram.scala | 21 +- .../StreamExecTableSourceScanRule.scala | 48 ++- .../table/planner/plan/utils/ScanUtil.scala | 27 +- .../planner/plan/stream/sql/TableScanTest.xml | 273 +++++++++++++++--- .../plan/batch/sql/TableScanTest.scala | 23 ++ .../plan/stream/sql/TableScanTest.scala | 197 ++++++++++++- 9 files changed, 719 insertions(+), 80 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java index 96d11244a4f2c..9db75d4b01c34 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java @@ -20,17 +20,21 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; import org.apache.flink.table.planner.plan.utils.RexNodeRewriter; +import org.apache.flink.table.planner.plan.utils.ScanUtil; import org.apache.flink.table.planner.sources.DynamicSourceUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowKind; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; @@ -40,6 +44,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -87,7 +92,22 @@ public void onMatch(RelOptRuleCall call) { final List fieldNames = scan.getRowType().getFieldNames(); final int fieldCount = fieldNames.size(); - final int[] usedFields = RexNodeExtractor.extractRefInputFields(project.getProjects()); + final int[] refFields = RexNodeExtractor.extractRefInputFields(project.getProjects()); + final int[] usedFields; + + TableSourceTable oldTableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + if (isUpsertSource(oldTableSourceTable)) { + // primary key fields are needed for upsert source + List keyFields = oldTableSourceTable.catalogTable().getSchema() + .getPrimaryKey().get().getColumns(); + // we should get source fields from scan node instead of CatalogTable, + // because projection may have been pushed down + List sourceFields = scan.getRowType().getFieldNames(); + int[] primaryKey = ScanUtil.getPrimaryKeyIndices(sourceFields, keyFields); + usedFields = mergeFields(refFields, primaryKey); + } else { + usedFields = refFields; + } // if no fields can be projected, we keep the original plan. if (usedFields.length == fieldCount) { return; @@ -97,8 +117,6 @@ public void onMatch(RelOptRuleCall call) { .mapToObj(fieldNames::get) .collect(Collectors.toList()); - TableSourceTable oldTableSourceTable = scan.getTable().unwrap(TableSourceTable.class); - final TableSchema oldSchema = oldTableSourceTable.catalogTable().getSchema(); final DynamicTableSource oldSource = oldTableSourceTable.tableSource(); final List metadataKeys = DynamicSourceUtils.createRequiredMetadataKeys(oldSchema, oldSource); @@ -203,4 +221,30 @@ private void applyUpdatedMetadata( ((SupportsReadingMetadata) newSource).applyReadableMetadata(usedMetadataKeys, newProducedDataType); } } + + /** + * Returns true if the table is a upsert source when it is works in scan mode. + */ + private static boolean isUpsertSource(TableSourceTable table) { + TableSchema schema = table.catalogTable().getSchema(); + if (!schema.getPrimaryKey().isPresent()) { + return false; + } + DynamicTableSource tableSource = table.tableSource(); + if (tableSource instanceof ScanTableSource) { + ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode(); + return mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE); + } + return false; + } + + private static int[] mergeFields(int[] fields1, int[] fields2) { + List results = Arrays.stream(fields1).boxed().collect(Collectors.toList()); + Arrays.stream(fields2).forEach(idx -> { + if (!results.contains(idx)) { + results.add(idx); + } + }); + return results.stream().mapToInt(Integer::intValue).toArray(); + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java index 700a99062b096..14d73a29f7f24 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java @@ -241,7 +241,7 @@ private static void validateScanSource( validateWatermarks(sourceIdentifier, schema); if (isStreamingMode) { - validateScanSourceForStreaming(sourceIdentifier, scanSource, changelogMode); + validateScanSourceForStreaming(sourceIdentifier, schema, scanSource, changelogMode); } else { validateScanSourceForBatch(sourceIdentifier, changelogMode, provider); } @@ -249,6 +249,7 @@ private static void validateScanSource( private static void validateScanSourceForStreaming( ObjectIdentifier sourceIdentifier, + TableSchema schema, ScanTableSource scanSource, ChangelogMode changelogMode) { // sanity check for produced ChangelogMode @@ -256,15 +257,15 @@ private static void validateScanSourceForStreaming( final boolean hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER); if (!hasUpdateBefore && hasUpdateAfter) { // only UPDATE_AFTER - throw new TableException( - String.format( - "Unsupported source for table '%s'. Currently, a %s doesn't support a changelog which contains " + - "UPDATE_AFTER but no UPDATE_BEFORE. Please adapt the implementation of class '%s'.", - sourceIdentifier.asSummaryString(), - ScanTableSource.class.getSimpleName(), - scanSource.getClass().getName() - ) - ); + if (!schema.getPrimaryKey().isPresent()) { + throw new TableException( + String.format( + "Table '%s' produces a changelog stream contains UPDATE_AFTER, no UPDATE_BEFORE. " + + "This requires to define primary key constraint on the table.", + sourceIdentifier.asSummaryString() + ) + ); + } } else if (hasUpdateBefore && !hasUpdateAfter) { // only UPDATE_BEFORE throw new ValidationException( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala new file mode 100644 index 0000000000000..bc4596135a734 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.stream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.dag.Transformation +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.table.api.config.ExecutionConfigOptions +import org.apache.flink.table.data.RowData +import org.apache.flink.table.planner.delegation.StreamPlanner +import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode} +import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ChangelogPlanUtils, KeySelectorUtil} +import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator +import org.apache.flink.table.runtime.operators.deduplicate.{DeduplicateKeepLastRowFunction, MiniBatchDeduplicateKeepLastRowFunction} +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or + * a changelog stream containing duplicate events. This node normalize such stream into a regular + * changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without + * duplication. + */ +class StreamExecChangelogNormalize( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + val uniqueKeys: Array[Int]) + extends SingleRel(cluster, traitSet, input) + with StreamPhysicalRel + with StreamExecNode[RowData] { + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = getInput.getRowType + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecChangelogNormalize( + cluster, + traitSet, + inputs.get(0), + uniqueKeys) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val fieldNames = getRowType.getFieldNames + super.explainTerms(pw) + .item("key", uniqueKeys.map(fieldNames.get).mkString(", ")) + } + + //~ ExecNode methods ----------------------------------------------------------- + + override def getInputNodes: util.List[ExecNode[StreamPlanner, _]] = { + List(getInput.asInstanceOf[ExecNode[StreamPlanner, _]]) + } + + override def replaceInputNode( + ordinalInParent: Int, + newInputNode: ExecNode[StreamPlanner, _]): Unit = { + replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) + } + + override protected def translateToPlanInternal( + planner: StreamPlanner): Transformation[RowData] = { + + val inputTransform = getInputNodes.get(0).translateToPlan(planner) + .asInstanceOf[Transformation[RowData]] + + val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] + val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) + val tableConfig = planner.getTableConfig + val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED) + val operator = if (isMiniBatchEnabled) { + val exeConfig = planner.getExecEnv.getConfig + val rowSerializer = rowTypeInfo.createSerializer(exeConfig) + val processFunction = new MiniBatchDeduplicateKeepLastRowFunction( + rowTypeInfo, + generateUpdateBefore, + true, // generateInsert + false, // inputInsertOnly + rowSerializer, + // disable state ttl, the changelog normalize should keep all state to have data integrity + // we can enable state ttl if this is really needed in some cases + -1) + val trigger = AggregateUtil.createMiniBatchTrigger(tableConfig) + new KeyedMapBundleOperator( + processFunction, + trigger) + } else { + val processFunction = new DeduplicateKeepLastRowFunction( + -1, // disable state ttl + rowTypeInfo, + generateUpdateBefore, + true, // generateInsert + false) // inputInsertOnly + new KeyedProcessOperator[RowData, RowData, RowData](processFunction) + } + + val ret = new OneInputTransformation( + inputTransform, + getRelDetailedDescription, + operator, + rowTypeInfo, + inputTransform.getParallelism) + + if (inputsContainSingleton()) { + ret.setParallelism(1) + ret.setMaxParallelism(1) + } + + val selector = KeySelectorUtil.getRowDataSelector(uniqueKeys, rowTypeInfo) + ret.setStateKeySelector(selector) + ret.setStateKeyType(selector.getProducedType) + ret + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 513e989a7401e..ec57fefa26869 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -292,6 +292,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti createNewNode( union, children, new ModifyKindSetTrait(providedKindSet), requiredTrait, requester) + case materialize: StreamExecChangelogNormalize => + // changelog normalize support update&delete input + val children = visitChildren(materialize, ModifyKindSetTrait.ALL_CHANGES) + // changelog normalize will output all changes + val providedTrait = ModifyKindSetTrait.ALL_CHANGES + createNewNode(materialize, children, providedTrait, requiredTrait, requester) + case ts: StreamExecTableSourceScan => // ScanTableSource supports produces updates and deletions val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode) @@ -625,15 +632,16 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti createNewNode(union, Some(children.flatten), providedTrait) } + case materialize: StreamExecChangelogNormalize => + // changelog normalize currently only supports input only sending UPDATE_AFTER + val children = visitChildren(materialize, UpdateKindTrait.ONLY_UPDATE_AFTER) + // use requiredTrait as providedTrait, + // because changelog normalize supports all kinds of UpdateKind + createNewNode(rel, children, requiredTrait) + case ts: StreamExecTableSourceScan => // currently only support BEFORE_AND_AFTER if source produces updates val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode) - if (providedTrait == UpdateKindTrait.ONLY_UPDATE_AFTER) { - throw new UnsupportedOperationException( - "Currently, ScanTableSource doesn't support producing ChangelogMode " + - "which contains UPDATE_AFTER but no UPDATE_BEFORE. Please update the " + - "implementation of '" + ts.tableSource.asSummaryString() + "' source.") - } createNewNode(rel, Some(List()), providedTrait) case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan | @@ -672,7 +680,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val providedTrait = newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE) if (!providedTrait.satisfies(requiredChildrenTrait)) { // the provided trait can't satisfy required trait, thus we should return None. - // for example, the changelog source can't provide ONLY_UPDATE_AFTER. return None } newChild diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala index 2ac1ab7c2f8e7..62f35286d65e7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala @@ -18,19 +18,25 @@ package org.apache.flink.table.planner.plan.rules.physical.stream +import org.apache.flink.table.api.TableException import org.apache.flink.table.connector.source.ScanTableSource +import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecChangelogNormalize, StreamExecTableSourceScan} import org.apache.flink.table.planner.plan.schema.TableSourceTable - -import org.apache.calcite.plan.{RelOptRuleCall, RelTraitSet} +import org.apache.flink.types.RowKind +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.TableScan +import org.apache.flink.table.planner.plan.utils.ScanUtil /** * Rule that converts [[FlinkLogicalTableSourceScan]] to [[StreamExecTableSourceScan]]. + * + *

Depends whether this is a scan source, this rule will also generate + * [[StreamExecChangelogNormalize]] to materialize the upsert stream. */ class StreamExecTableSourceScanRule extends ConverterRule( @@ -56,12 +62,40 @@ class StreamExecTableSourceScanRule def convert(rel: RelNode): RelNode = { val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) - - new StreamExecTableSourceScan( + val newScan = new StreamExecTableSourceScan( rel.getCluster, traitSet, - scan.getTable.asInstanceOf[TableSourceTable] - ) + scan.getTable.asInstanceOf[TableSourceTable]) + + val table = scan.getTable.asInstanceOf[TableSourceTable] + val tableSource = table.tableSource.asInstanceOf[ScanTableSource] + val changelogMode = tableSource.getChangelogMode + if (changelogMode.contains(RowKind.UPDATE_AFTER) && + !changelogMode.contains(RowKind.UPDATE_BEFORE)) { + // generate changelog normalize node for upsert source + val primaryKey = table.catalogTable.getSchema.getPrimaryKey + if (!primaryKey.isPresent) { + throw new TableException(s"Table '${table.tableIdentifier.asSummaryString()}' produces" + + " a changelog stream contains UPDATE_AFTER but no UPDATE_BEFORE," + + " this requires to define primary key on the table.") + } + val keyFields = primaryKey.get().getColumns + val inputFieldNames = newScan.getRowType.getFieldNames + val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields) + val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices, requireStrict = true) + val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet() + .replace(requiredDistribution) + .replace(FlinkConventions.STREAM_PHYSICAL) + val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet) + + new StreamExecChangelogNormalize( + scan.getCluster, + traitSet, + newInput, + primaryKeyIndices) + } else { + newScan + } } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala index 03d2014af2e7d..0272e0688557e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.utils import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.data.{RowData, GenericRowData} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.data.{GenericRowData, RowData} import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.CodeGenUtils.{DEFAULT_INPUT1_TERM, GENERIC_ROW} import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect @@ -33,11 +33,12 @@ import org.apache.flink.table.sources.TableSource import org.apache.flink.table.types.DataType import org.apache.flink.table.types.logical.RowType import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo - import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rex.RexNode +import java.util + import scala.collection.JavaConversions._ /** @@ -135,4 +136,24 @@ object ScanUtil { val fieldNames = rowType.getFieldNames.mkString(", ") s"SourceConversion(table=[$tableQualifiedName], fields=[$fieldNames])" } + + /** + * Returns the field indices of primary key in given fields. + */ + def getPrimaryKeyIndices( + fieldNames: util.List[String], + keyFields: util.List[String]): Array[Int] = { + // we must use the output field names of scan node instead of the original schema + // to calculate the primary key indices, because the scan node maybe projection pushed down + keyFields.map { k => + val index = fieldNames.indexOf(k) + if (index < 0) { + // primary key shouldn't be pruned, otherwise it's a bug + throw new TableException( + s"Can't find primary key field $k in the input fields $fieldNames. " + + s"This is a bug, please file an issue.") + } + index + }.toArray + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index ade3ae19dc520..b3616bae47cfe 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -34,6 +34,49 @@ GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D]) +- Exchange(distribution=[single], changelogMode=[I,UB,UA]) +- Calc(select=[a], where=[>(a, 1)], changelogMode=[I,UB,UA]) +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[], project=[a]]], fields=[a], changelogMode=[I,UB,UA]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -87,6 +130,39 @@ LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0) + + + + + + + + + + + @@ -126,41 +202,6 @@ LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], b=[$1], c=[$2], metada - - - - - - - - - - - @@ -243,6 +284,55 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3], changelogMode=[ : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency], changelogMode=[I]) +- Exchange(distribution=[hash[currency]], changelogMode=[I,UB,UA]) +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate], changelogMode=[I,UB,UA]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -297,23 +387,118 @@ Calc(select=[b, a, ts], changelogMode=[I,UB,UA,D]) ]]> - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1]]> + + + ($1, 1)]) + +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)]) + +- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + + + (a, 1)], changelogMode=[I,UB,UA,D]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I,UB,UA,D]) + +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[I,UB,UA,D]) + +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala index aa347b15e5197..a44d1bf57fb38 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala @@ -130,6 +130,29 @@ class TableScanTest extends TableTestBase { util.verifyPlan("SELECT * FROM src WHERE a > 1") } + @Test + def testScanOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | id STRING, + | a INT, + | b DOUBLE, + | PRIMARY KEY (id) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'bounded' = 'true', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + thrown.expect(classOf[TableException]) + thrown.expectMessage( + "Querying a table in batch mode is currently only possible for INSERT-only table sources. " + + "But the source for table 'default_catalog.default_database.src' produces other changelog " + + "messages than just INSERT.") + util.verifyPlan("SELECT * FROM src WHERE a > 1") + } + @Test def testDDLWithComputedColumn(): Unit = { util.verifyPlan("SELECT * FROM computed_column_t") diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index 1e0243942b5b7..ad1cf95622c1f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -328,6 +328,193 @@ class TableScanTest extends TableTestBase { util.verifyPlan("SELECT * FROM src WHERE a > 1", ExplainDetail.CHANGELOG_MODE) } + @Test + def testScanOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | id1 STRING, + | a INT, + | id2 BIGINT, + | b DOUBLE, + | PRIMARY KEY (id2, id1) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA' + |) + """.stripMargin) + // projection should be pushed down, but the full primary key (id2, id1) should be kept + util.verifyPlan("SELECT id1, a, b FROM src", ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testUpsertSourceWithComputedColumnAndWatermark(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | id STRING, + | a INT, + | b AS a + 1, + | c STRING, + | ts as to_timestamp(c), + | PRIMARY KEY (id) NOT ENFORCED, + | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + // the last node should keep UB because there is a filter on the changelog stream + util.verifyPlan("SELECT a, b, c FROM src WHERE a > 1", ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testUnionUpsertSourceAndAggregation(): Unit = { + util.addTable( + """ + |CREATE TABLE upsert_src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | PRIMARY KEY (a) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + util.addTable( + """ + |CREATE TABLE append_src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'I' + |) + """.stripMargin) + + val query = + """ + |SELECT b, ts, a + |FROM ( + | SELECT * FROM upsert_src + | UNION ALL + | SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a + |) + |""".stripMargin + util.verifyPlan(query, ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testAggregateOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | c STRING, + | PRIMARY KEY (a) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + // the MAX and MIN should work in retract mode + util.verifyPlan( + "SELECT b, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY b", + ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testAggregateOnUpsertSourcePrimaryKey(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | c STRING, + | PRIMARY KEY (a) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + // the MAX and MIN should work in retract mode + util.verifyPlan( + "SELECT a, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY a", + ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testJoinOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE orders ( + | amount BIGINT, + | currency STRING + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'I' + |) + |""".stripMargin) + util.addTable( + """ + |CREATE TABLE rates_history ( + | currency STRING PRIMARY KEY NOT ENFORCED, + | rate BIGINT + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + + val sql = + """ + |SELECT o.currency, o.amount, r.rate, o.amount * r.rate + |FROM orders AS o JOIN rates_history AS r + |ON o.currency = r.currency + |""".stripMargin + util.verifyPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testTemporalJoinOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE orders ( + | amount BIGINT, + | currency STRING, + | proctime AS PROCTIME() + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'I' + |) + |""".stripMargin) + util.addTable( + """ + |CREATE TABLE rates_history ( + | currency STRING PRIMARY KEY NOT ENFORCED, + | rate BIGINT + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D', + | 'disable-lookup' = 'true' + |) + """.stripMargin) + + val sql = + """ + |SELECT o.currency, o.amount, r.rate, o.amount * r.rate + |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.proctime AS r + |ON o.currency = r.currency + |""".stripMargin + util.verifyPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + @Test def testUnsupportedWindowAggregateOnChangelogSource(): Unit = { util.addTable( @@ -377,7 +564,7 @@ class TableScanTest extends TableTestBase { } @Test - def testUnsupportedSourceChangelogMode(): Unit = { + def testMissingPrimaryKeyForUpsertSource(): Unit = { util.addTable( """ |CREATE TABLE src ( @@ -390,11 +577,9 @@ class TableScanTest extends TableTestBase { |) """.stripMargin) thrown.expect(classOf[TableException]) - thrown.expectMessage( - "Unsupported source for table 'default_catalog.default_database.src'. Currently, a " + - "ScanTableSource doesn't support a changelog which contains UPDATE_AFTER but no " + - "UPDATE_BEFORE. Please adapt the implementation of class 'org.apache.flink.table.planner." + - "factories.TestValuesTableFactory$TestValuesScanLookupTableSource'.") + thrown.expectMessage("Table 'default_catalog.default_database.src' produces a " + + "changelog stream contains UPDATE_AFTER, no UPDATE_BEFORE. " + + "This requires to define primary key constraint on the table.") util.verifyPlan("SELECT * FROM src WHERE a > 1", ExplainDetail.CHANGELOG_MODE) }