Skip to content

Commit

Permalink
[FLINK-19694][table-planner-blink] Support upsert ChangelogMode for S…
Browse files Browse the repository at this point in the history
…canTableSource in planner

This closes #13721
  • Loading branch information
wuchong committed Oct 27, 2020
1 parent bca8e47 commit 3c67cce
Show file tree
Hide file tree
Showing 9 changed files with 719 additions and 80 deletions.
Expand Up @@ -20,17 +20,21 @@

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
import org.apache.flink.table.planner.plan.utils.RexNodeRewriter;
import org.apache.flink.table.planner.plan.utils.ScanUtil;
import org.apache.flink.table.planner.sources.DynamicSourceUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
Expand All @@ -40,6 +44,7 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -87,7 +92,22 @@ public void onMatch(RelOptRuleCall call) {
final List<String> fieldNames = scan.getRowType().getFieldNames();
final int fieldCount = fieldNames.size();

final int[] usedFields = RexNodeExtractor.extractRefInputFields(project.getProjects());
final int[] refFields = RexNodeExtractor.extractRefInputFields(project.getProjects());
final int[] usedFields;

TableSourceTable oldTableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
if (isUpsertSource(oldTableSourceTable)) {
// primary key fields are needed for upsert source
List<String> keyFields = oldTableSourceTable.catalogTable().getSchema()
.getPrimaryKey().get().getColumns();
// we should get source fields from scan node instead of CatalogTable,
// because projection may have been pushed down
List<String> sourceFields = scan.getRowType().getFieldNames();
int[] primaryKey = ScanUtil.getPrimaryKeyIndices(sourceFields, keyFields);
usedFields = mergeFields(refFields, primaryKey);
} else {
usedFields = refFields;
}
// if no fields can be projected, we keep the original plan.
if (usedFields.length == fieldCount) {
return;
Expand All @@ -97,8 +117,6 @@ public void onMatch(RelOptRuleCall call) {
.mapToObj(fieldNames::get)
.collect(Collectors.toList());

TableSourceTable oldTableSourceTable = scan.getTable().unwrap(TableSourceTable.class);

final TableSchema oldSchema = oldTableSourceTable.catalogTable().getSchema();
final DynamicTableSource oldSource = oldTableSourceTable.tableSource();
final List<String> metadataKeys = DynamicSourceUtils.createRequiredMetadataKeys(oldSchema, oldSource);
Expand Down Expand Up @@ -203,4 +221,30 @@ private void applyUpdatedMetadata(
((SupportsReadingMetadata) newSource).applyReadableMetadata(usedMetadataKeys, newProducedDataType);
}
}

/**
* Returns true if the table is a upsert source when it is works in scan mode.
*/
private static boolean isUpsertSource(TableSourceTable table) {
TableSchema schema = table.catalogTable().getSchema();
if (!schema.getPrimaryKey().isPresent()) {
return false;
}
DynamicTableSource tableSource = table.tableSource();
if (tableSource instanceof ScanTableSource) {
ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();
return mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);
}
return false;
}

private static int[] mergeFields(int[] fields1, int[] fields2) {
List<Integer> results = Arrays.stream(fields1).boxed().collect(Collectors.toList());
Arrays.stream(fields2).forEach(idx -> {
if (!results.contains(idx)) {
results.add(idx);
}
});
return results.stream().mapToInt(Integer::intValue).toArray();
}
}
Expand Up @@ -241,30 +241,31 @@ private static void validateScanSource(
validateWatermarks(sourceIdentifier, schema);

if (isStreamingMode) {
validateScanSourceForStreaming(sourceIdentifier, scanSource, changelogMode);
validateScanSourceForStreaming(sourceIdentifier, schema, scanSource, changelogMode);
} else {
validateScanSourceForBatch(sourceIdentifier, changelogMode, provider);
}
}

private static void validateScanSourceForStreaming(
ObjectIdentifier sourceIdentifier,
TableSchema schema,
ScanTableSource scanSource,
ChangelogMode changelogMode) {
// sanity check for produced ChangelogMode
final boolean hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE);
final boolean hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER);
if (!hasUpdateBefore && hasUpdateAfter) {
// only UPDATE_AFTER
throw new TableException(
String.format(
"Unsupported source for table '%s'. Currently, a %s doesn't support a changelog which contains " +
"UPDATE_AFTER but no UPDATE_BEFORE. Please adapt the implementation of class '%s'.",
sourceIdentifier.asSummaryString(),
ScanTableSource.class.getSimpleName(),
scanSource.getClass().getName()
)
);
if (!schema.getPrimaryKey().isPresent()) {
throw new TableException(
String.format(
"Table '%s' produces a changelog stream contains UPDATE_AFTER, no UPDATE_BEFORE. " +
"This requires to define primary key constraint on the table.",
sourceIdentifier.asSummaryString()
)
);
}
} else if (hasUpdateBefore && !hasUpdateAfter) {
// only UPDATE_BEFORE
throw new ValidationException(
Expand Down
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.physical.stream

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ChangelogPlanUtils, KeySelectorUtil}
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator
import org.apache.flink.table.runtime.operators.deduplicate.{DeduplicateKeepLastRowFunction, MiniBatchDeduplicateKeepLastRowFunction}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo

import java.util

import scala.collection.JavaConversions._

/**
* Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or
* a changelog stream containing duplicate events. This node normalize such stream into a regular
* changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without
* duplication.
*/
class StreamExecChangelogNormalize(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
val uniqueKeys: Array[Int])
extends SingleRel(cluster, traitSet, input)
with StreamPhysicalRel
with StreamExecNode[RowData] {

override def requireWatermark: Boolean = false

override def deriveRowType(): RelDataType = getInput.getRowType

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamExecChangelogNormalize(
cluster,
traitSet,
inputs.get(0),
uniqueKeys)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val fieldNames = getRowType.getFieldNames
super.explainTerms(pw)
.item("key", uniqueKeys.map(fieldNames.get).mkString(", "))
}

//~ ExecNode methods -----------------------------------------------------------

override def getInputNodes: util.List[ExecNode[StreamPlanner, _]] = {
List(getInput.asInstanceOf[ExecNode[StreamPlanner, _]])
}

override def replaceInputNode(
ordinalInParent: Int,
newInputNode: ExecNode[StreamPlanner, _]): Unit = {
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}

override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {

val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]

val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
val tableConfig = planner.getTableConfig
val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
val operator = if (isMiniBatchEnabled) {
val exeConfig = planner.getExecEnv.getConfig
val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
val processFunction = new MiniBatchDeduplicateKeepLastRowFunction(
rowTypeInfo,
generateUpdateBefore,
true, // generateInsert
false, // inputInsertOnly
rowSerializer,
// disable state ttl, the changelog normalize should keep all state to have data integrity
// we can enable state ttl if this is really needed in some cases
-1)
val trigger = AggregateUtil.createMiniBatchTrigger(tableConfig)
new KeyedMapBundleOperator(
processFunction,
trigger)
} else {
val processFunction = new DeduplicateKeepLastRowFunction(
-1, // disable state ttl
rowTypeInfo,
generateUpdateBefore,
true, // generateInsert
false) // inputInsertOnly
new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
}

val ret = new OneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
rowTypeInfo,
inputTransform.getParallelism)

if (inputsContainSingleton()) {
ret.setParallelism(1)
ret.setMaxParallelism(1)
}

val selector = KeySelectorUtil.getRowDataSelector(uniqueKeys, rowTypeInfo)
ret.setStateKeySelector(selector)
ret.setStateKeyType(selector.getProducedType)
ret
}
}
Expand Up @@ -292,6 +292,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(
union, children, new ModifyKindSetTrait(providedKindSet), requiredTrait, requester)

case materialize: StreamExecChangelogNormalize =>
// changelog normalize support update&delete input
val children = visitChildren(materialize, ModifyKindSetTrait.ALL_CHANGES)
// changelog normalize will output all changes
val providedTrait = ModifyKindSetTrait.ALL_CHANGES
createNewNode(materialize, children, providedTrait, requiredTrait, requester)

case ts: StreamExecTableSourceScan =>
// ScanTableSource supports produces updates and deletions
val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
Expand Down Expand Up @@ -625,15 +632,16 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(union, Some(children.flatten), providedTrait)
}

case materialize: StreamExecChangelogNormalize =>
// changelog normalize currently only supports input only sending UPDATE_AFTER
val children = visitChildren(materialize, UpdateKindTrait.ONLY_UPDATE_AFTER)
// use requiredTrait as providedTrait,
// because changelog normalize supports all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)

case ts: StreamExecTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
if (providedTrait == UpdateKindTrait.ONLY_UPDATE_AFTER) {
throw new UnsupportedOperationException(
"Currently, ScanTableSource doesn't support producing ChangelogMode " +
"which contains UPDATE_AFTER but no UPDATE_BEFORE. Please update the " +
"implementation of '" + ts.tableSource.asSummaryString() + "' source.")
}
createNewNode(rel, Some(List()), providedTrait)

case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan |
Expand Down Expand Up @@ -672,7 +680,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val providedTrait = newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
if (!providedTrait.satisfies(requiredChildrenTrait)) {
// the provided trait can't satisfy required trait, thus we should return None.
// for example, the changelog source can't provide ONLY_UPDATE_AFTER.
return None
}
newChild
Expand Down
Expand Up @@ -18,19 +18,25 @@

package org.apache.flink.table.planner.plan.rules.physical.stream

import org.apache.flink.table.api.TableException
import org.apache.flink.table.connector.source.ScanTableSource
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecChangelogNormalize, StreamExecTableSourceScan}
import org.apache.flink.table.planner.plan.schema.TableSourceTable

import org.apache.calcite.plan.{RelOptRuleCall, RelTraitSet}
import org.apache.flink.types.RowKind
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
import org.apache.flink.table.planner.plan.utils.ScanUtil

/**
* Rule that converts [[FlinkLogicalTableSourceScan]] to [[StreamExecTableSourceScan]].
*
* <p>Depends whether this is a scan source, this rule will also generate
* [[StreamExecChangelogNormalize]] to materialize the upsert stream.
*/
class StreamExecTableSourceScanRule
extends ConverterRule(
Expand All @@ -56,12 +62,40 @@ class StreamExecTableSourceScanRule
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)

new StreamExecTableSourceScan(
val newScan = new StreamExecTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable.asInstanceOf[TableSourceTable]
)
scan.getTable.asInstanceOf[TableSourceTable])

val table = scan.getTable.asInstanceOf[TableSourceTable]
val tableSource = table.tableSource.asInstanceOf[ScanTableSource]
val changelogMode = tableSource.getChangelogMode
if (changelogMode.contains(RowKind.UPDATE_AFTER) &&
!changelogMode.contains(RowKind.UPDATE_BEFORE)) {
// generate changelog normalize node for upsert source
val primaryKey = table.catalogTable.getSchema.getPrimaryKey
if (!primaryKey.isPresent) {
throw new TableException(s"Table '${table.tableIdentifier.asSummaryString()}' produces" +
" a changelog stream contains UPDATE_AFTER but no UPDATE_BEFORE," +
" this requires to define primary key on the table.")
}
val keyFields = primaryKey.get().getColumns
val inputFieldNames = newScan.getRowType.getFieldNames
val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields)
val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices, requireStrict = true)
val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet()
.replace(requiredDistribution)
.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet)

new StreamExecChangelogNormalize(
scan.getCluster,
traitSet,
newInput,
primaryKeyIndices)
} else {
newScan
}
}
}

Expand Down

0 comments on commit 3c67cce

Please sign in to comment.