From 1d2af46d1620a18ed438413e5288030e3725bb5e Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 27 Oct 2020 17:33:43 +0800 Subject: [PATCH 1/3] [FLINK-19694][table-planner-blink] Support upsert ChangelogMode for ScanTableSource in planner This closes #13721 --- .../PushProjectIntoTableSourceScanRule.java | 50 +++- .../planner/sources/DynamicSourceUtils.java | 21 +- .../stream/StreamExecChangelogNormalize.scala | 139 +++++++++ .../FlinkChangelogModeInferenceProgram.scala | 21 +- .../StreamExecTableSourceScanRule.scala | 48 ++- .../table/planner/plan/utils/ScanUtil.scala | 27 +- .../planner/plan/stream/sql/TableScanTest.xml | 273 +++++++++++++++--- .../plan/batch/sql/TableScanTest.scala | 23 ++ .../plan/stream/sql/TableScanTest.scala | 197 ++++++++++++- 9 files changed, 719 insertions(+), 80 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java index 96d11244a4f2c..9db75d4b01c34 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java @@ -20,17 +20,21 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; import org.apache.flink.table.planner.plan.utils.RexNodeRewriter; +import org.apache.flink.table.planner.plan.utils.ScanUtil; import org.apache.flink.table.planner.sources.DynamicSourceUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowKind; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; @@ -40,6 +44,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -87,7 +92,22 @@ public void onMatch(RelOptRuleCall call) { final List fieldNames = scan.getRowType().getFieldNames(); final int fieldCount = fieldNames.size(); - final int[] usedFields = RexNodeExtractor.extractRefInputFields(project.getProjects()); + final int[] refFields = RexNodeExtractor.extractRefInputFields(project.getProjects()); + final int[] usedFields; + + TableSourceTable oldTableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + if (isUpsertSource(oldTableSourceTable)) { + // primary key fields are needed for upsert source + List keyFields = oldTableSourceTable.catalogTable().getSchema() + .getPrimaryKey().get().getColumns(); + // we should get source fields from scan node instead of CatalogTable, + // because projection may have been pushed down + List sourceFields = scan.getRowType().getFieldNames(); + int[] primaryKey = ScanUtil.getPrimaryKeyIndices(sourceFields, keyFields); + usedFields = mergeFields(refFields, primaryKey); + } else { + usedFields = refFields; + } // if no fields can be projected, we keep the original plan. if (usedFields.length == fieldCount) { return; @@ -97,8 +117,6 @@ public void onMatch(RelOptRuleCall call) { .mapToObj(fieldNames::get) .collect(Collectors.toList()); - TableSourceTable oldTableSourceTable = scan.getTable().unwrap(TableSourceTable.class); - final TableSchema oldSchema = oldTableSourceTable.catalogTable().getSchema(); final DynamicTableSource oldSource = oldTableSourceTable.tableSource(); final List metadataKeys = DynamicSourceUtils.createRequiredMetadataKeys(oldSchema, oldSource); @@ -203,4 +221,30 @@ private void applyUpdatedMetadata( ((SupportsReadingMetadata) newSource).applyReadableMetadata(usedMetadataKeys, newProducedDataType); } } + + /** + * Returns true if the table is a upsert source when it is works in scan mode. + */ + private static boolean isUpsertSource(TableSourceTable table) { + TableSchema schema = table.catalogTable().getSchema(); + if (!schema.getPrimaryKey().isPresent()) { + return false; + } + DynamicTableSource tableSource = table.tableSource(); + if (tableSource instanceof ScanTableSource) { + ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode(); + return mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE); + } + return false; + } + + private static int[] mergeFields(int[] fields1, int[] fields2) { + List results = Arrays.stream(fields1).boxed().collect(Collectors.toList()); + Arrays.stream(fields2).forEach(idx -> { + if (!results.contains(idx)) { + results.add(idx); + } + }); + return results.stream().mapToInt(Integer::intValue).toArray(); + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java index 700a99062b096..14d73a29f7f24 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java @@ -241,7 +241,7 @@ private static void validateScanSource( validateWatermarks(sourceIdentifier, schema); if (isStreamingMode) { - validateScanSourceForStreaming(sourceIdentifier, scanSource, changelogMode); + validateScanSourceForStreaming(sourceIdentifier, schema, scanSource, changelogMode); } else { validateScanSourceForBatch(sourceIdentifier, changelogMode, provider); } @@ -249,6 +249,7 @@ private static void validateScanSource( private static void validateScanSourceForStreaming( ObjectIdentifier sourceIdentifier, + TableSchema schema, ScanTableSource scanSource, ChangelogMode changelogMode) { // sanity check for produced ChangelogMode @@ -256,15 +257,15 @@ private static void validateScanSourceForStreaming( final boolean hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER); if (!hasUpdateBefore && hasUpdateAfter) { // only UPDATE_AFTER - throw new TableException( - String.format( - "Unsupported source for table '%s'. Currently, a %s doesn't support a changelog which contains " + - "UPDATE_AFTER but no UPDATE_BEFORE. Please adapt the implementation of class '%s'.", - sourceIdentifier.asSummaryString(), - ScanTableSource.class.getSimpleName(), - scanSource.getClass().getName() - ) - ); + if (!schema.getPrimaryKey().isPresent()) { + throw new TableException( + String.format( + "Table '%s' produces a changelog stream contains UPDATE_AFTER, no UPDATE_BEFORE. " + + "This requires to define primary key constraint on the table.", + sourceIdentifier.asSummaryString() + ) + ); + } } else if (hasUpdateBefore && !hasUpdateAfter) { // only UPDATE_BEFORE throw new ValidationException( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala new file mode 100644 index 0000000000000..bc4596135a734 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecChangelogNormalize.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.stream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.dag.Transformation +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.table.api.config.ExecutionConfigOptions +import org.apache.flink.table.data.RowData +import org.apache.flink.table.planner.delegation.StreamPlanner +import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode} +import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ChangelogPlanUtils, KeySelectorUtil} +import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator +import org.apache.flink.table.runtime.operators.deduplicate.{DeduplicateKeepLastRowFunction, MiniBatchDeduplicateKeepLastRowFunction} +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or + * a changelog stream containing duplicate events. This node normalize such stream into a regular + * changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without + * duplication. + */ +class StreamExecChangelogNormalize( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + val uniqueKeys: Array[Int]) + extends SingleRel(cluster, traitSet, input) + with StreamPhysicalRel + with StreamExecNode[RowData] { + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = getInput.getRowType + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecChangelogNormalize( + cluster, + traitSet, + inputs.get(0), + uniqueKeys) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val fieldNames = getRowType.getFieldNames + super.explainTerms(pw) + .item("key", uniqueKeys.map(fieldNames.get).mkString(", ")) + } + + //~ ExecNode methods ----------------------------------------------------------- + + override def getInputNodes: util.List[ExecNode[StreamPlanner, _]] = { + List(getInput.asInstanceOf[ExecNode[StreamPlanner, _]]) + } + + override def replaceInputNode( + ordinalInParent: Int, + newInputNode: ExecNode[StreamPlanner, _]): Unit = { + replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) + } + + override protected def translateToPlanInternal( + planner: StreamPlanner): Transformation[RowData] = { + + val inputTransform = getInputNodes.get(0).translateToPlan(planner) + .asInstanceOf[Transformation[RowData]] + + val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] + val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) + val tableConfig = planner.getTableConfig + val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED) + val operator = if (isMiniBatchEnabled) { + val exeConfig = planner.getExecEnv.getConfig + val rowSerializer = rowTypeInfo.createSerializer(exeConfig) + val processFunction = new MiniBatchDeduplicateKeepLastRowFunction( + rowTypeInfo, + generateUpdateBefore, + true, // generateInsert + false, // inputInsertOnly + rowSerializer, + // disable state ttl, the changelog normalize should keep all state to have data integrity + // we can enable state ttl if this is really needed in some cases + -1) + val trigger = AggregateUtil.createMiniBatchTrigger(tableConfig) + new KeyedMapBundleOperator( + processFunction, + trigger) + } else { + val processFunction = new DeduplicateKeepLastRowFunction( + -1, // disable state ttl + rowTypeInfo, + generateUpdateBefore, + true, // generateInsert + false) // inputInsertOnly + new KeyedProcessOperator[RowData, RowData, RowData](processFunction) + } + + val ret = new OneInputTransformation( + inputTransform, + getRelDetailedDescription, + operator, + rowTypeInfo, + inputTransform.getParallelism) + + if (inputsContainSingleton()) { + ret.setParallelism(1) + ret.setMaxParallelism(1) + } + + val selector = KeySelectorUtil.getRowDataSelector(uniqueKeys, rowTypeInfo) + ret.setStateKeySelector(selector) + ret.setStateKeyType(selector.getProducedType) + ret + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 513e989a7401e..ec57fefa26869 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -292,6 +292,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti createNewNode( union, children, new ModifyKindSetTrait(providedKindSet), requiredTrait, requester) + case materialize: StreamExecChangelogNormalize => + // changelog normalize support update&delete input + val children = visitChildren(materialize, ModifyKindSetTrait.ALL_CHANGES) + // changelog normalize will output all changes + val providedTrait = ModifyKindSetTrait.ALL_CHANGES + createNewNode(materialize, children, providedTrait, requiredTrait, requester) + case ts: StreamExecTableSourceScan => // ScanTableSource supports produces updates and deletions val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode) @@ -625,15 +632,16 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti createNewNode(union, Some(children.flatten), providedTrait) } + case materialize: StreamExecChangelogNormalize => + // changelog normalize currently only supports input only sending UPDATE_AFTER + val children = visitChildren(materialize, UpdateKindTrait.ONLY_UPDATE_AFTER) + // use requiredTrait as providedTrait, + // because changelog normalize supports all kinds of UpdateKind + createNewNode(rel, children, requiredTrait) + case ts: StreamExecTableSourceScan => // currently only support BEFORE_AND_AFTER if source produces updates val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode) - if (providedTrait == UpdateKindTrait.ONLY_UPDATE_AFTER) { - throw new UnsupportedOperationException( - "Currently, ScanTableSource doesn't support producing ChangelogMode " + - "which contains UPDATE_AFTER but no UPDATE_BEFORE. Please update the " + - "implementation of '" + ts.tableSource.asSummaryString() + "' source.") - } createNewNode(rel, Some(List()), providedTrait) case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan | @@ -672,7 +680,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val providedTrait = newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE) if (!providedTrait.satisfies(requiredChildrenTrait)) { // the provided trait can't satisfy required trait, thus we should return None. - // for example, the changelog source can't provide ONLY_UPDATE_AFTER. return None } newChild diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala index 2ac1ab7c2f8e7..62f35286d65e7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala @@ -18,19 +18,25 @@ package org.apache.flink.table.planner.plan.rules.physical.stream +import org.apache.flink.table.api.TableException import org.apache.flink.table.connector.source.ScanTableSource +import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecChangelogNormalize, StreamExecTableSourceScan} import org.apache.flink.table.planner.plan.schema.TableSourceTable - -import org.apache.calcite.plan.{RelOptRuleCall, RelTraitSet} +import org.apache.flink.types.RowKind +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.TableScan +import org.apache.flink.table.planner.plan.utils.ScanUtil /** * Rule that converts [[FlinkLogicalTableSourceScan]] to [[StreamExecTableSourceScan]]. + * + *

Depends whether this is a scan source, this rule will also generate + * [[StreamExecChangelogNormalize]] to materialize the upsert stream. */ class StreamExecTableSourceScanRule extends ConverterRule( @@ -56,12 +62,40 @@ class StreamExecTableSourceScanRule def convert(rel: RelNode): RelNode = { val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) - - new StreamExecTableSourceScan( + val newScan = new StreamExecTableSourceScan( rel.getCluster, traitSet, - scan.getTable.asInstanceOf[TableSourceTable] - ) + scan.getTable.asInstanceOf[TableSourceTable]) + + val table = scan.getTable.asInstanceOf[TableSourceTable] + val tableSource = table.tableSource.asInstanceOf[ScanTableSource] + val changelogMode = tableSource.getChangelogMode + if (changelogMode.contains(RowKind.UPDATE_AFTER) && + !changelogMode.contains(RowKind.UPDATE_BEFORE)) { + // generate changelog normalize node for upsert source + val primaryKey = table.catalogTable.getSchema.getPrimaryKey + if (!primaryKey.isPresent) { + throw new TableException(s"Table '${table.tableIdentifier.asSummaryString()}' produces" + + " a changelog stream contains UPDATE_AFTER but no UPDATE_BEFORE," + + " this requires to define primary key on the table.") + } + val keyFields = primaryKey.get().getColumns + val inputFieldNames = newScan.getRowType.getFieldNames + val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields) + val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices, requireStrict = true) + val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet() + .replace(requiredDistribution) + .replace(FlinkConventions.STREAM_PHYSICAL) + val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet) + + new StreamExecChangelogNormalize( + scan.getCluster, + traitSet, + newInput, + primaryKeyIndices) + } else { + newScan + } } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala index 03d2014af2e7d..0272e0688557e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.utils import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.data.{RowData, GenericRowData} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.data.{GenericRowData, RowData} import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.CodeGenUtils.{DEFAULT_INPUT1_TERM, GENERIC_ROW} import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect @@ -33,11 +33,12 @@ import org.apache.flink.table.sources.TableSource import org.apache.flink.table.types.DataType import org.apache.flink.table.types.logical.RowType import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo - import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rex.RexNode +import java.util + import scala.collection.JavaConversions._ /** @@ -135,4 +136,24 @@ object ScanUtil { val fieldNames = rowType.getFieldNames.mkString(", ") s"SourceConversion(table=[$tableQualifiedName], fields=[$fieldNames])" } + + /** + * Returns the field indices of primary key in given fields. + */ + def getPrimaryKeyIndices( + fieldNames: util.List[String], + keyFields: util.List[String]): Array[Int] = { + // we must use the output field names of scan node instead of the original schema + // to calculate the primary key indices, because the scan node maybe projection pushed down + keyFields.map { k => + val index = fieldNames.indexOf(k) + if (index < 0) { + // primary key shouldn't be pruned, otherwise it's a bug + throw new TableException( + s"Can't find primary key field $k in the input fields $fieldNames. " + + s"This is a bug, please file an issue.") + } + index + }.toArray + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index ade3ae19dc520..b3616bae47cfe 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -34,6 +34,49 @@ GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D]) +- Exchange(distribution=[single], changelogMode=[I,UB,UA]) +- Calc(select=[a], where=[>(a, 1)], changelogMode=[I,UB,UA]) +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[], project=[a]]], fields=[a], changelogMode=[I,UB,UA]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -87,6 +130,39 @@ LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0) + + + + + + + + + + + @@ -126,41 +202,6 @@ LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], b=[$1], c=[$2], metada - - - - - - - - - - - @@ -243,6 +284,55 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3], changelogMode=[ : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency], changelogMode=[I]) +- Exchange(distribution=[hash[currency]], changelogMode=[I,UB,UA]) +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate], changelogMode=[I,UB,UA]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -297,23 +387,118 @@ Calc(select=[b, a, ts], changelogMode=[I,UB,UA,D]) ]]> - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1]]> + + + ($1, 1)]) + +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)]) + +- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + + + (a, 1)], changelogMode=[I,UB,UA,D]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I,UB,UA,D]) + +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[I,UB,UA,D]) + +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala index aa347b15e5197..a44d1bf57fb38 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala @@ -130,6 +130,29 @@ class TableScanTest extends TableTestBase { util.verifyPlan("SELECT * FROM src WHERE a > 1") } + @Test + def testScanOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | id STRING, + | a INT, + | b DOUBLE, + | PRIMARY KEY (id) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'bounded' = 'true', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + thrown.expect(classOf[TableException]) + thrown.expectMessage( + "Querying a table in batch mode is currently only possible for INSERT-only table sources. " + + "But the source for table 'default_catalog.default_database.src' produces other changelog " + + "messages than just INSERT.") + util.verifyPlan("SELECT * FROM src WHERE a > 1") + } + @Test def testDDLWithComputedColumn(): Unit = { util.verifyPlan("SELECT * FROM computed_column_t") diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index 1e0243942b5b7..ad1cf95622c1f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -328,6 +328,193 @@ class TableScanTest extends TableTestBase { util.verifyPlan("SELECT * FROM src WHERE a > 1", ExplainDetail.CHANGELOG_MODE) } + @Test + def testScanOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | id1 STRING, + | a INT, + | id2 BIGINT, + | b DOUBLE, + | PRIMARY KEY (id2, id1) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA' + |) + """.stripMargin) + // projection should be pushed down, but the full primary key (id2, id1) should be kept + util.verifyPlan("SELECT id1, a, b FROM src", ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testUpsertSourceWithComputedColumnAndWatermark(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | id STRING, + | a INT, + | b AS a + 1, + | c STRING, + | ts as to_timestamp(c), + | PRIMARY KEY (id) NOT ENFORCED, + | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + // the last node should keep UB because there is a filter on the changelog stream + util.verifyPlan("SELECT a, b, c FROM src WHERE a > 1", ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testUnionUpsertSourceAndAggregation(): Unit = { + util.addTable( + """ + |CREATE TABLE upsert_src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | PRIMARY KEY (a) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + util.addTable( + """ + |CREATE TABLE append_src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'I' + |) + """.stripMargin) + + val query = + """ + |SELECT b, ts, a + |FROM ( + | SELECT * FROM upsert_src + | UNION ALL + | SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a + |) + |""".stripMargin + util.verifyPlan(query, ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testAggregateOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | c STRING, + | PRIMARY KEY (a) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + // the MAX and MIN should work in retract mode + util.verifyPlan( + "SELECT b, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY b", + ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testAggregateOnUpsertSourcePrimaryKey(): Unit = { + util.addTable( + """ + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | c STRING, + | PRIMARY KEY (a) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + // the MAX and MIN should work in retract mode + util.verifyPlan( + "SELECT a, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY a", + ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testJoinOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE orders ( + | amount BIGINT, + | currency STRING + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'I' + |) + |""".stripMargin) + util.addTable( + """ + |CREATE TABLE rates_history ( + | currency STRING PRIMARY KEY NOT ENFORCED, + | rate BIGINT + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D' + |) + """.stripMargin) + + val sql = + """ + |SELECT o.currency, o.amount, r.rate, o.amount * r.rate + |FROM orders AS o JOIN rates_history AS r + |ON o.currency = r.currency + |""".stripMargin + util.verifyPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testTemporalJoinOnUpsertSource(): Unit = { + util.addTable( + """ + |CREATE TABLE orders ( + | amount BIGINT, + | currency STRING, + | proctime AS PROCTIME() + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'I' + |) + |""".stripMargin) + util.addTable( + """ + |CREATE TABLE rates_history ( + | currency STRING PRIMARY KEY NOT ENFORCED, + | rate BIGINT + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D', + | 'disable-lookup' = 'true' + |) + """.stripMargin) + + val sql = + """ + |SELECT o.currency, o.amount, r.rate, o.amount * r.rate + |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.proctime AS r + |ON o.currency = r.currency + |""".stripMargin + util.verifyPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + @Test def testUnsupportedWindowAggregateOnChangelogSource(): Unit = { util.addTable( @@ -377,7 +564,7 @@ class TableScanTest extends TableTestBase { } @Test - def testUnsupportedSourceChangelogMode(): Unit = { + def testMissingPrimaryKeyForUpsertSource(): Unit = { util.addTable( """ |CREATE TABLE src ( @@ -390,11 +577,9 @@ class TableScanTest extends TableTestBase { |) """.stripMargin) thrown.expect(classOf[TableException]) - thrown.expectMessage( - "Unsupported source for table 'default_catalog.default_database.src'. Currently, a " + - "ScanTableSource doesn't support a changelog which contains UPDATE_AFTER but no " + - "UPDATE_BEFORE. Please adapt the implementation of class 'org.apache.flink.table.planner." + - "factories.TestValuesTableFactory$TestValuesScanLookupTableSource'.") + thrown.expectMessage("Table 'default_catalog.default_database.src' produces a " + + "changelog stream contains UPDATE_AFTER, no UPDATE_BEFORE. " + + "This requires to define primary key constraint on the table.") util.verifyPlan("SELECT * FROM src WHERE a > 1", ExplainDetail.CHANGELOG_MODE) } From f5eb931dc909e8585dc8326c42852ca0c9d77037 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 27 Oct 2020 17:35:32 +0800 Subject: [PATCH 2/3] [FLINK-19694][table-planner-blink] Update MetadataHandlers for the new introduced StreamExecUpsertMaterialize node This closes #13721 --- .../metadata/FlinkRelMdColumnUniqueness.scala | 8 ++++++ .../FlinkRelMdModifiedMonotonicity.scala | 22 ++++++++++------ .../plan/metadata/FlinkRelMdUniqueKeys.scala | 7 ++++++ .../plan/stream/sql/join/SemiAntiJoinTest.xml | 2 +- .../FlinkRelMdColumnUniquenessTest.scala | 9 +++++++ .../metadata/FlinkRelMdHandlerTestBase.scala | 14 ++++++++++- .../FlinkRelMdModifiedMonotonicityTest.scala | 25 +++++++++++++++++++ .../metadata/FlinkRelMdUniqueKeysTest.scala | 7 +++++- 8 files changed, 84 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala index f4912f9923e96..3b237a69c0ac9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -304,6 +304,14 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata columns != null && util.Arrays.equals(columns.toArray, rel.getUniqueKeys) } + def areColumnsUnique( + rel: StreamExecChangelogNormalize, + mq: RelMetadataQuery, + columns: ImmutableBitSet, + ignoreNulls: Boolean): JBoolean = { + columns != null && ImmutableBitSet.of(rel.uniqueKeys: _*).equals(columns) + } + def areColumnsUnique( rel: Aggregate, mq: RelMetadataQuery, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala index 675a317ef7570..20ca5903f3422 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala @@ -164,8 +164,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon } // if partitionBy a update field or partitionBy a field whose mono is null, just return null - if (rel.partitionKey.exists(e => - inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { + if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { return null } @@ -209,7 +208,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon rel: StreamExecDeduplicate, mq: RelMetadataQuery): RelModifiedMonotonicity = { if (allAppend(mq, rel.getInput)) { - val mono = new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(MONOTONIC)) + val mono = new RelModifiedMonotonicity( + Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC)) rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT) mono } else { @@ -217,6 +217,14 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon } } + def getRelModifiedMonotonicity( + rel: StreamExecChangelogNormalize, + mq: RelMetadataQuery): RelModifiedMonotonicity = { + val mono = new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC)) + rel.uniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT) + mono + } + def getRelModifiedMonotonicity( rel: StreamExecWatermarkAssigner, mq: RelMetadataQuery): RelModifiedMonotonicity = { @@ -336,8 +344,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon val inputMonotonicity = fmq.getRelModifiedMonotonicity(input) // if group by an update field or group by a field mono is null, just return null - if (grouping.exists(e => - inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { + if (inputMonotonicity == null || + grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { return null } @@ -357,8 +365,8 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon val inputMonotonicity = fmq.getRelModifiedMonotonicity(input) // if group by a update field or group by a field mono is null, just return null - if (grouping.exists(e => - inputMonotonicity == null || inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { + if (inputMonotonicity == null || + grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { return null } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala index 88d6cb01142a7..44f7b5be6939e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -314,6 +314,13 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList)) } + def getUniqueKeys( + rel: StreamExecChangelogNormalize, + mq: RelMetadataQuery, + ignoreNulls: Boolean): JSet[ImmutableBitSet] = { + ImmutableSet.of(ImmutableBitSet.of(rel.uniqueKeys.map(Integer.valueOf).toList)) + } + def getUniqueKeys( rel: Aggregate, mq: RelMetadataQuery, diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml index 1d9b8d57d272b..a846c2a6906b1 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml @@ -307,7 +307,7 @@ Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], leftInputSpec=[NoUn : +- LegacyTableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[single]) +- Calc(select=[IS NOT NULL(m) AS $f0]) - +- GroupAggregate(select=[MIN(i) AS m]) + +- GroupAggregate(select=[MIN_RETRACT(i) AS m]) +- Exchange(distribution=[single]) +- Calc(select=[true AS i]) +- Join(joinType=[LeftSemiJoin], where=[$f0], select=[d], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala index d9f6042b44bab..d942bff04f4fa 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala @@ -275,6 +275,15 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase { assertFalse(mq.areColumnsUnique(streamDeduplicateLastRow, ImmutableBitSet.of(0, 1, 2))) } + @Test + def testAreColumnsUniqueCountOnStreamExecChangelogNormalize(): Unit = { + assertTrue(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(0, 1))) + assertTrue(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1, 0))) + assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1))) + assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(2))) + assertFalse(mq.areColumnsUnique(streamChangelogNormalize, ImmutableBitSet.of(1, 2))) + } + @Test def testAreColumnsUniqueOnAggregate(): Unit = { Array(logicalAgg, flinkLogicalAgg).foreach { agg => diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index fc5d15cb78bdd..46e14c8a0aaa6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -691,6 +691,18 @@ class FlinkRelMdHandlerTestBase { (calcOfFirstRow, calcOfLastRow) } + protected lazy val streamChangelogNormalize = { + val key = Array(1, 0) + val hash1 = FlinkRelDistribution.hash(key, requireStrict = true) + val streamExchange = new StreamExecExchange( + cluster, studentStreamScan.getTraitSet.replace(hash1), studentStreamScan, hash1) + new StreamExecChangelogNormalize( + cluster, + streamPhysicalTraits, + streamExchange, + key) + } + // equivalent SQL is // select a, b, c from ( // select a, b, c, rowtime @@ -703,7 +715,7 @@ class FlinkRelMdHandlerTestBase { cluster, flinkLogicalTraits, temporalLogicalScan, - ImmutableBitSet.of(5), + ImmutableBitSet.of(1), RelCollations.of(4), RankType.ROW_NUMBER, new ConstantRankRange(1, 1), diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala index 64a9ff2c1189e..d49df3859de71 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala @@ -315,5 +315,30 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase { assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinOnUniqueKeys)) } + @Test + def testGetRelMonotonicityOnDeduplicate(): Unit = { + assertEquals( + new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(streamDeduplicateFirstRow)) + + assertEquals( + new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, CONSTANT)), + mq.getRelModifiedMonotonicity(streamDeduplicateLastRow)) + + assertEquals( + new RelModifiedMonotonicity(Array( + NOT_MONOTONIC, CONSTANT, NOT_MONOTONIC, NOT_MONOTONIC, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(rowtimeDeduplicate)) + } + + @Test + def testGetRelMonotonicityOnChangelogNormalize(): Unit = { + assertEquals( + new RelModifiedMonotonicity(Array( + CONSTANT, CONSTANT, NOT_MONOTONIC, NOT_MONOTONIC, + NOT_MONOTONIC, NOT_MONOTONIC, NOT_MONOTONIC)), + mq.getRelModifiedMonotonicity(streamChangelogNormalize)) + } + } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala index e195205b5ea8f..e662c59ec0c1d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala @@ -155,7 +155,12 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { def testGetUniqueKeysOnStreamExecDeduplicate(): Unit = { assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(streamDeduplicateFirstRow).toSet) assertEquals(uniqueKeys(Array(1, 2)), mq.getUniqueKeys(streamDeduplicateLastRow).toSet) - assertEquals(uniqueKeys(Array(5)), mq.getUniqueKeys(rowtimeDeduplicate).toSet) + assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(rowtimeDeduplicate).toSet) + } + + @Test + def testGetUniqueKeysOnStreamExecChangelogNormalize(): Unit = { + assertEquals(uniqueKeys(Array(1, 0)), mq.getUniqueKeys(streamChangelogNormalize).toSet) } @Test From 5ee687b03b2e9cbc2369055ea935f54e8c38b9ed Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 27 Oct 2020 17:35:42 +0800 Subject: [PATCH 3/3] [FLINK-19694][table-runtime-blink] Support upsert ChangelogMode for ScanTableSource in runtime This closes #13721 --- .../stream/StreamExecDeduplicate.scala | 22 +- .../stream/sql/ChangelogSourceITCase.scala | 297 +++++++++++++++--- .../runtime/stream/sql/JoinITCase.scala | 44 --- .../planner/runtime/utils/TestData.scala | 18 ++ .../DeduplicateFunctionHelper.java | 56 +++- .../DeduplicateKeepLastRowFunction.java | 15 +- ...niBatchDeduplicateKeepLastRowFunction.java | 13 +- .../DeduplicateKeepLastRowFunctionTest.java | 3 +- ...tchDeduplicateKeepLastRowFunctionTest.java | 1 + 9 files changed, 357 insertions(+), 112 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala index 997e962cfb153..109e3beb22979 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala @@ -117,6 +117,7 @@ class StreamExecDeduplicate( rowTypeInfo, generateUpdateBefore, generateInsert, + true, rowSerializer, minRetentionTime) } else { @@ -134,7 +135,8 @@ class StreamExecDeduplicate( minRetentionTime, rowTypeInfo, generateUpdateBefore, - generateInsert) + generateInsert, + true) } else { new DeduplicateKeepFirstRowFunction(minRetentionTime) } @@ -163,12 +165,14 @@ object StreamExecDeduplicate { @Experimental val TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE: ConfigOption[JBoolean] = - key("table.exec.insert-and-updateafter-sensitive") - .defaultValue(JBoolean.valueOf(true)) - .withDescription("Set whether the job (especially the sinks) is sensitive to " + - "INSERT messages and UPDATE_AFTER messages. " + - "If false, Flink may send UPDATE_AFTER instead of INSERT for the first row " + - "at some times (e.g. deduplication for last row). " + - "If true, Flink will guarantee to send INSERT for the first row. " + - "Default is true.") + key("table.exec.insert-and-updateafter-sensitive") + .booleanType() + .defaultValue(JBoolean.valueOf(true)) + .withDescription("Set whether the job (especially the sinks) is sensitive to " + + "INSERT messages and UPDATE_AFTER messages. " + + "If false, Flink may send UPDATE_AFTER instead of INSERT for the first row " + + "at some times (e.g. deduplication for last row). " + + "If true, Flink will guarantee to send INSERT for the first row, " + + "but there will be additional overhead." + + "Default is true.") } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala index 3993f743e1a0e..86e9cea35ab8e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala @@ -20,46 +20,60 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory -import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestData, TestingRetractSink} +import org.apache.flink.table.planner.runtime.utils.{StreamingWithMiniBatchTestBase, TestData, TestingRetractSink} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.planner.runtime.stream.sql.ChangelogSourceITCase.SourceMode +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND} +import org.apache.flink.table.planner.runtime.stream.sql.ChangelogSourceITCase.{CHANGELOG_SOURCE, NO_UPDATE_SOURCE, UPSERT_SOURCE} +import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchOff, MiniBatchOn} +import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode import org.apache.flink.types.{Row, RowKind} import org.junit.Assert.assertEquals import org.junit.{Before, Test} import org.junit.runner.RunWith import org.junit.runners.Parameterized +import java.util + import scala.collection.JavaConversions._ import scala.collection.Seq +/** + * Integration tests for operations on changelog source, including upsert source. + */ @RunWith(classOf[Parameterized]) -class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateTestBase(state) { - - val dataId: String = TestValuesTableFactory.registerData(TestData.userChangelog) +class ChangelogSourceITCase( + sourceMode: SourceMode, + miniBatch: MiniBatchMode, + state: StateBackendMode) + extends StreamingWithMiniBatchTestBase(miniBatch, state) { @Before override def before(): Unit = { super.before() - val ddl = + val orderDataId = TestValuesTableFactory.registerData(TestData.ordersData) + tEnv.executeSql( s""" - |CREATE TABLE user_logs ( - | user_id STRING, - | user_name STRING, - | email STRING, - | balance DECIMAL(18,2), - | balance2 AS balance * 2 + |CREATE TABLE orders ( + | amount BIGINT, + | currency STRING |) WITH ( | 'connector' = 'values', - | 'data-id' = '$dataId', - | 'changelog-mode' = 'I,UA,UB,D' + | 'data-id' = '$orderDataId', + | 'changelog-mode' = 'I' |) - |""".stripMargin - tEnv.executeSql(ddl) + |""".stripMargin) + sourceMode match { + case CHANGELOG_SOURCE => registerChangelogSource() + case UPSERT_SOURCE => registerUpsertSource() + case NO_UPDATE_SOURCE => registerNoUpdateSource() + } } @Test - def testChangelogSourceAndToRetractStream(): Unit = { - val result = tEnv.sqlQuery("SELECT * FROM user_logs").toRetractStream[Row] + def testToRetractStream(): Unit = { + val result = tEnv.sqlQuery(s"SELECT * FROM ${sourceMode.usersTable}").toRetractStream[Row] val sink = new TestingRetractSink() result.addSink(sink).setParallelism(result.parallelism) env.execute() @@ -72,7 +86,7 @@ class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateT } @Test - def testChangelogSourceAndUpsertSink(): Unit = { + def testToUpsertSink(): Unit = { val sinkDDL = s""" |CREATE TABLE user_sink ( @@ -89,7 +103,7 @@ class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateT val dml = s""" |INSERT INTO user_sink - |SELECT * FROM user_logs + |SELECT * FROM ${sourceMode.usersTable} |""".stripMargin tEnv.executeSql(sinkDDL) tEnv.executeSql(dml).await() @@ -102,11 +116,11 @@ class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateT } @Test - def testAggregateOnChangelogSource(): Unit = { + def testAggregate(): Unit = { val query = s""" |SELECT count(*), sum(balance), max(email) - |FROM user_logs + |FROM ${sourceMode.usersTable} |""".stripMargin val result = tEnv.sqlQuery(query).toRetractStream[Row] @@ -119,7 +133,7 @@ class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateT } @Test - def testAggregateOnChangelogSourceAndUpsertSink(): Unit = { + def testAggregateToUpsertSink(): Unit = { val sinkDDL = s""" |CREATE TABLE user_sink ( @@ -137,7 +151,7 @@ class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateT s""" |INSERT INTO user_sink |SELECT 'ALL', count(*), sum(balance), max(email) - |FROM user_logs + |FROM ${sourceMode.usersTable} |GROUP BY 'ALL' |""".stripMargin tEnv.executeSql(sinkDDL) @@ -148,9 +162,193 @@ class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateT } @Test - def testAggregateOnInsertDeleteChangelogSource(): Unit = { + def testGroupByNonPrimaryKey(): Unit = { + val sinkDDL = + s""" + |CREATE TABLE user_sink ( + | balance DECIMAL(18,2), + | cnt BIGINT, + | max_email STRING, + | PRIMARY KEY (balance) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin + val dml = + s""" + |INSERT INTO user_sink + |SELECT balance2, count(*), max(email) + |FROM ${sourceMode.usersTable} + |GROUP BY balance2 + |""".stripMargin + tEnv.executeSql(sinkDDL) + tEnv.executeSql(dml).await() + + val expected = Seq( + "16.20,1,tom123@gmail.com", + "19.98,1,bailey@qq.com", + "22.60,1,tina@gmail.com") + assertEquals(expected.sorted, TestValuesTableFactory.getResults("user_sink").sorted) + } + + @Test + def testFilter(): Unit = { + val sinkDDL = + s""" + |CREATE TABLE user_sink ( + | user_id STRING PRIMARY KEY NOT ENFORCED, + | user_name STRING, + | email STRING, + | balance DECIMAL(18,2), + | balance2 DECIMAL(18,2) + |) WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin + + // the sink is an upsert sink, but the update_before must be sent, + // otherwise "user1=8.10" can't be removed + val dml = + s""" + |INSERT INTO user_sink + |SELECT * FROM ${sourceMode.usersTable} WHERE balance > 9 + |""".stripMargin + tEnv.executeSql(sinkDDL) + tEnv.executeSql(dml).await() + + val expected = Seq( + "user3,Bailey,bailey@qq.com,9.99,19.98", + "user4,Tina,tina@gmail.com,11.30,22.60") + assertEquals(expected.sorted, TestValuesTableFactory.getResults("user_sink").sorted) + } + + @Test + def testRegularJoin(): Unit = { + val sql = + s""" + |SELECT o.currency, o.amount, r.rate, o.amount * r.rate + |FROM orders AS o JOIN ${sourceMode.ratesTable} AS r + |ON o.currency = r.currency + |""".stripMargin + + val sink = new TestingRetractSink + val result = tEnv.sqlQuery(sql).toRetractStream[Row] + result.addSink(sink).setParallelism(result.parallelism) + env.execute() + + val expected = Seq( + "Euro,2,119,238", "Euro,3,119,357", + "US Dollar,1,102,102", "US Dollar,5,102,510") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + // ------------------------------------------------------------------------------------------ + + private def registerChangelogSource(): Unit = { + val userDataId: String = TestValuesTableFactory.registerData(TestData.userChangelog) + tEnv.executeSql( + s""" + |CREATE TABLE ${CHANGELOG_SOURCE.usersTable} ( + | user_id STRING, + | user_name STRING, + | email STRING, + | balance DECIMAL(18,2), + | balance2 AS balance * 2 + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$userDataId', + | 'changelog-mode' = 'I,UA,UB,D', + | 'disable-lookup' = 'true' + |) + |""".stripMargin) + val ratesDataId = TestValuesTableFactory.registerData(TestData.ratesHistoryData) + tEnv.executeSql( + s""" + |CREATE TABLE ${CHANGELOG_SOURCE.ratesTable} ( + | currency STRING, + | rate BIGINT + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$ratesDataId', + | 'changelog-mode' = 'I,UB,UA,D', + | 'disable-lookup' = 'true' + |) + """.stripMargin) + } + + private def registerUpsertSource(): Unit = { + val userDataId = TestValuesTableFactory.registerData(TestData.userUpsertlog) + tEnv.executeSql( + s""" + |CREATE TABLE ${UPSERT_SOURCE.usersTable} ( + | user_id STRING, + | user_name STRING, + | email STRING, + | balance DECIMAL(18,2), + | balance2 AS balance * 2, + | PRIMARY KEY (user_name, user_id) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$userDataId', + | 'changelog-mode' = 'UA,D', + | 'disable-lookup' = 'true' + |) + |""".stripMargin) + val ratesDataId = TestValuesTableFactory.registerData(TestData.ratesUpsertData) + tEnv.executeSql( + s""" + |CREATE TABLE ${UPSERT_SOURCE.ratesTable} ( + | currency STRING, + | rate BIGINT, + | PRIMARY KEY (currency) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$ratesDataId', + | 'changelog-mode' = 'UA,D', + | 'disable-lookup' = 'true' + |) + """.stripMargin) + } + + private def registerNoUpdateSource(): Unit = { // only contains INSERT and DELETE - val userChangelog = TestData.userChangelog.map { row => + val userChangelog = convertToNoUpdateData(TestData.userChangelog) + val userDataId = TestValuesTableFactory.registerData(userChangelog) + tEnv.executeSql( + s""" + |CREATE TABLE ${NO_UPDATE_SOURCE.usersTable} ( + | user_id STRING, + | user_name STRING, + | email STRING, + | balance DECIMAL(18,2), + | balance2 AS balance * 2 + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$userDataId', + | 'changelog-mode' = 'I,D', + | 'disable-lookup' = 'true' + |) + |""".stripMargin) + val ratesChangelog = convertToNoUpdateData(TestData.ratesHistoryData) + val ratesDataId = TestValuesTableFactory.registerData(ratesChangelog) + tEnv.executeSql( + s""" + |CREATE TABLE ${NO_UPDATE_SOURCE.ratesTable} ( + | currency STRING, + | rate BIGINT + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$ratesDataId', + | 'changelog-mode' = 'I,D', + | 'disable-lookup' = 'true' + |) + """.stripMargin) + } + + private def convertToNoUpdateData(data: Seq[Row]): Seq[Row] = { + data.map { row => row.getKind match { case RowKind.INSERT | RowKind.DELETE => row case RowKind.UPDATE_BEFORE => @@ -163,34 +361,31 @@ class ChangelogSourceITCase(state: StateBackendMode) extends StreamingWithStateT ret } } - val dataId = TestValuesTableFactory.registerData(userChangelog) - val ddl = - s""" - |CREATE TABLE user_logs2 ( - | user_id STRING, - | user_name STRING, - | email STRING, - | balance DECIMAL(18,2) - |) WITH ( - | 'connector' = 'values', - | 'data-id' = '$dataId', - | 'changelog-mode' = 'I,D' - |) - |""".stripMargin - tEnv.executeSql(ddl) + } - val query = - s""" - |SELECT count(*), sum(balance), max(email) - |FROM user_logs2 - |""".stripMargin +} - val result = tEnv.sqlQuery(query).toRetractStream[Row] - val sink = new TestingRetractSink() - result.addSink(sink).setParallelism(result.parallelism) - env.execute() +object ChangelogSourceITCase { - val expected = Seq("3,29.39,tom123@gmail.com") - assertEquals(expected.sorted, sink.getRetractResults.sorted) + case class SourceMode(mode: String, usersTable: String, ratesTable: String) { + override def toString: String = mode.toString + } + + val CHANGELOG_SOURCE: SourceMode = SourceMode("CHANGELOG", "users_changelog", "rates_changelog") + val UPSERT_SOURCE: SourceMode = SourceMode("UPSERT", "users_upsert", "rates_upsert") + val NO_UPDATE_SOURCE: SourceMode = SourceMode("NO_UPDATE", "users_no_update", "rates_no_update") + + @Parameterized.Parameters(name = "Source={0}, StateBackend={1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(CHANGELOG_SOURCE, MiniBatchOff, HEAP_BACKEND), + Array(CHANGELOG_SOURCE, MiniBatchOff, ROCKSDB_BACKEND), + Array(UPSERT_SOURCE, MiniBatchOff, HEAP_BACKEND), + Array(UPSERT_SOURCE, MiniBatchOff, ROCKSDB_BACKEND), + // upsert source supports minibatch, we enable minibatch only for RocksDB to save time + Array(UPSERT_SOURCE, MiniBatchOn, ROCKSDB_BACKEND), + // we only test not_update for RocksDB to save time + Array(NO_UPDATE_SOURCE, MiniBatchOff, ROCKSDB_BACKEND) + ) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala index 5fb304d685049..9916730d9ebdc 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala @@ -1140,48 +1140,4 @@ class JoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(sta val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") assertEquals(expected.sorted, sink.getRetractResults.sorted) } - - @Test - def testJoinOnChangelogSource(): Unit = { - val orderDataId = TestValuesTableFactory.registerData(TestData.ordersData) - val ratesDataId = TestValuesTableFactory.registerData(TestData.ratesHistoryData) - tEnv.executeSql( - s""" - |CREATE TABLE orders ( - | amount BIGINT, - | currency STRING - |) WITH ( - | 'connector' = 'values', - | 'data-id' = '$orderDataId', - | 'changelog-mode' = 'I' - |) - |""".stripMargin) - tEnv.executeSql( - s""" - |CREATE TABLE rates_history ( - | currency STRING, - | rate BIGINT - |) WITH ( - | 'connector' = 'values', - | 'data-id' = '$ratesDataId', - | 'changelog-mode' = 'I,UB,UA' - |) - """.stripMargin) - - val sql = - """ - |SELECT o.currency, o.amount, r.rate, o.amount * r.rate - |FROM orders AS o JOIN rates_history AS r - |ON o.currency = r.currency - |""".stripMargin - - val sink = new TestingRetractSink - tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) - env.execute() - - val expected = Seq( - "Euro,2,119,238", "Euro,3,119,357", - "US Dollar,1,102,102", "US Dollar,5,102,510") - assertEquals(expected.sorted, sink.getRetractResults.sorted) - } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala index ae067863802b6..a03a1cfbf05f3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala @@ -490,6 +490,15 @@ object TestData { changelogRow("-U", "user3", "Bailey", "bailey@gmail.com", new JBigDecimal("9.99")), changelogRow("+U", "user3", "Bailey", "bailey@qq.com", new JBigDecimal("9.99"))) + val userUpsertlog: Seq[Row] = Seq( + changelogRow("+U", "user1", "Tom", "tom@gmail.com", new JBigDecimal("10.02")), + changelogRow("+U", "user2", "Jack", "jack@hotmail.com", new JBigDecimal("71.2")), + changelogRow("+U", "user1", "Tom", "tom123@gmail.com", new JBigDecimal("8.1")), + changelogRow("+U", "user3", "Bailey", "bailey@gmail.com", new JBigDecimal("9.99")), + changelogRow("-D", "user2", "Jack", "jack@hotmail.com", new JBigDecimal("71.2")), + changelogRow("+U", "user4", "Tina", "tina@gmail.com", new JBigDecimal("11.3")), + changelogRow("+U", "user3", "Bailey", "bailey@qq.com", new JBigDecimal("9.99"))) + // [amount, currency] val ordersData: Seq[Row] = Seq( row(2L, "Euro"), @@ -511,6 +520,15 @@ object TestData { changelogRow("-D", "Yen", JLong.valueOf(1L)) ) + val ratesUpsertData: Seq[Row] = Seq( + changelogRow("+U", "US Dollar", JLong.valueOf(102L)), + changelogRow("+U", "Euro", JLong.valueOf(114L)), + changelogRow("+U", "Yen", JLong.valueOf(1L)), + changelogRow("+U", "Euro", JLong.valueOf(116L)), + changelogRow("+U", "Euro", JLong.valueOf(119L)), + changelogRow("-D", "Yen", JLong.valueOf(1L)) + ) + val fullDataTypesData: Seq[Row] = { val bools = List(true, false, true, false, null) val bytes = List(Byte.MaxValue, Byte.MinValue, 0.byteValue(), 5.byteValue(), null) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java index ba896e6c4de83..e770343e4291c 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java @@ -38,7 +38,7 @@ class DeduplicateFunctionHelper { * @param state state of function, null if generateUpdateBefore is false * @param out underlying collector */ - static void processLastRow( + static void processLastRowOnInsertOnly( RowData currentRow, boolean generateUpdateBefore, boolean generateInsert, @@ -71,6 +71,60 @@ static void processLastRow( } } + /** + * Processes element to deduplicate on keys, sends current element as last row, retracts previous element if + * needed. + * + *

Note: we don't support stateless mode yet. Because this is not safe for Kafka tombstone + * messages which doesn't contain full content. This can be a future improvement if the + * downstream (e.g. sink) doesn't require full content for DELETE messages. + * + * @param currentRow latest row received by deduplicate function + * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates + * @param state state of function + * @param out underlying collector + */ + static void processLastRowOnChangelog( + RowData currentRow, + boolean generateUpdateBefore, + ValueState state, + Collector out) throws Exception { + RowData preRow = state.value(); + RowKind currentKind = currentRow.getRowKind(); + if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) { + if (preRow == null) { + // the first row, send INSERT message + currentRow.setRowKind(RowKind.INSERT); + out.collect(currentRow); + } else { + if (generateUpdateBefore) { + preRow.setRowKind(RowKind.UPDATE_BEFORE); + out.collect(preRow); + } + currentRow.setRowKind(RowKind.UPDATE_AFTER); + out.collect(currentRow); + } + // normalize row kind + currentRow.setRowKind(RowKind.INSERT); + // save to state + state.update(currentRow); + } else { + // DELETE or UPDATER_BEFORE + if (preRow != null) { + // always set to DELETE because this row has been removed + // even the the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it. + preRow.setRowKind(RowKind.DELETE); + // output the preRow instead of currentRow, + // because preRow always contains the full content. + // currentRow may only contain key parts (e.g. Kafka tombstone records). + out.collect(preRow); + // clear state as the row has been removed + state.clear(); + } + // nothing to do if removing a non-existed row + } + } + /** * Processes element to deduplicate on keys, sends current element if it is first row. * diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunction.java index 42cedc3d7a350..66e4842ae972c 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunction.java @@ -27,7 +27,8 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.util.Collector; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRow; +import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog; +import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnInsertOnly; import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; /** @@ -40,6 +41,7 @@ public class DeduplicateKeepLastRowFunction private final InternalTypeInfo rowTypeInfo; private final boolean generateUpdateBefore; private final boolean generateInsert; + private final boolean inputIsInsertOnly; private final long minRetentionTime; // state stores complete row. @@ -49,11 +51,13 @@ public DeduplicateKeepLastRowFunction( long minRetentionTime, InternalTypeInfo rowTypeInfo, boolean generateUpdateBefore, - boolean generateInsert) { + boolean generateInsert, + boolean inputInsertOnly) { this.minRetentionTime = minRetentionTime; this.rowTypeInfo = rowTypeInfo; this.generateUpdateBefore = generateUpdateBefore; this.generateInsert = generateInsert; + this.inputIsInsertOnly = inputInsertOnly; } @Override @@ -69,7 +73,10 @@ public void open(Configuration configure) throws Exception { @Override public void processElement(RowData input, Context ctx, Collector out) throws Exception { - processLastRow(input, generateUpdateBefore, generateInsert, state, out); + if (inputIsInsertOnly) { + processLastRowOnInsertOnly(input, generateUpdateBefore, generateInsert, state, out); + } else { + processLastRowOnChangelog(input, generateUpdateBefore, state, out); + } } - } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.java index 5185480847c5f..4d54671c5c3b1 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunction.java @@ -32,7 +32,8 @@ import java.util.Map; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRow; +import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog; +import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnInsertOnly; import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; /** @@ -46,6 +47,8 @@ public class MiniBatchDeduplicateKeepLastRowFunction private final InternalTypeInfo rowTypeInfo; private final boolean generateUpdateBefore; private final boolean generateInsert; + private final boolean inputInsertOnly; + private final TypeSerializer typeSerializer; private final long minRetentionTime; // state stores complete row. @@ -55,12 +58,14 @@ public MiniBatchDeduplicateKeepLastRowFunction( InternalTypeInfo rowTypeInfo, boolean generateUpdateBefore, boolean generateInsert, + boolean inputInsertOnly, TypeSerializer typeSerializer, long minRetentionTime) { this.minRetentionTime = minRetentionTime; this.rowTypeInfo = rowTypeInfo; this.generateUpdateBefore = generateUpdateBefore; this.generateInsert = generateInsert; + this.inputInsertOnly = inputInsertOnly; this.typeSerializer = typeSerializer; } @@ -88,7 +93,11 @@ public void finishBundle( RowData currentKey = entry.getKey(); RowData currentRow = entry.getValue(); ctx.setCurrentKey(currentKey); - processLastRow(currentRow, generateUpdateBefore, generateInsert, state, out); + if (inputInsertOnly) { + processLastRowOnInsertOnly(currentRow, generateUpdateBefore, generateInsert, state, out); + } else { + processLastRowOnChangelog(currentRow, generateUpdateBefore, state, out); + } } } } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunctionTest.java index 163f254624754..51d6906c86f30 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunctionTest.java @@ -42,7 +42,8 @@ private DeduplicateKeepLastRowFunction createFunction(boolean generateUpdateBefo minTime.toMilliseconds(), inputRowType, generateUpdateBefore, - generateInsert); + generateInsert, + true); } private OneInputStreamOperatorTestHarness createTestHarness( diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.java index 82e7fc92c3d40..3bb5cb7cd7a57 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepLastRowFunctionTest.java @@ -52,6 +52,7 @@ private MiniBatchDeduplicateKeepLastRowFunction createFunction( inputRowType, generateUpdateBefore, generateInsert, + true, typeSerializer, minRetentionTime); }