diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 494dcc60be892..34945439f92b0 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -268,6 +268,25 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` +#### Partitioning by a key + +```sql +-- Input table 'my_aggregation' with columns (name, id, cnt) +-- Default output schema: [op, name, id, cnt] +-- Output schema with PARTITION BY: [id, op, name, cnt] + +SELECT * FROM TO_CHANGELOG( + input => TABLE my_aggregation PARTITION BY id +) +``` +When `PARTITION BY` is provided, **the output schema changes**. The partition key columns are moved to the front by the engine, and the function emits the remaining input columns. The order becomes: + +``` +[partition_keys, op_column, non_partition_input_columns] +``` + +Prefer row semantics, when possible. `PARTITION BY` is only necessary when downstream operators are keyed on that column and you want to co-locate rows for the same key in the same parallel operator instance. + #### Table API ```java diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java new file mode 100644 index 0000000000000..e9608ee779524 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code FROM_CHANGELOG}). */ +@Internal +public final class ChangelogTypeStrategyUtils { + + /** + * Returns the input column indices that pass through to the function's output, excluding the + * partition key columns (the PTF framework prepends them when the input has set semantics). + */ + public static int[] computeOutputIndices(final TableSemantics tableSemantics) { + return computeOutputIndices(tableSemantics, -1); + } + + /** + * Returns the input column indices that pass through to the function's output, excluding the + * partition key columns and the operation column matching {@code opColumnName}. + */ + public static int[] computeOutputIndices( + final TableSemantics tableSemantics, final String opColumnName) { + final int opIndex = DataType.getFieldNames(tableSemantics.dataType()).indexOf(opColumnName); + return computeOutputIndices(tableSemantics, opIndex); + } + + private static int[] computeOutputIndices( + final TableSemantics tableSemantics, final int extraExcludedIndex) { + final Set excluded = collectPartitionKeyIndices(tableSemantics); + if (extraExcludedIndex >= 0) { + excluded.add(extraExcludedIndex); + } + final int inputFieldCount = DataType.getFieldCount(tableSemantics.dataType()); + return IntStream.range(0, inputFieldCount).filter(i -> !excluded.contains(i)).toArray(); + } + + private static Set collectPartitionKeyIndices(final TableSemantics tableSemantics) { + return new HashSet<>( + Arrays.stream(tableSemantics.partitionByColumns()) + .boxed() + .collect(Collectors.toSet())); + } + + private ChangelogTypeStrategyUtils() {} +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index c910885d69e8f..d977deab4a93b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -35,6 +35,7 @@ import org.apache.flink.types.ColumnList; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -96,10 +97,12 @@ public List getExpectedSignatures(final FunctionDefinition definition final String opColumnName = resolveOpColumnName(callContext); final List inputFields = DataType.getFields(semantics.dataType()); + final int[] outputIndices = + ChangelogTypeStrategyUtils.computeOutputIndices(semantics); final List outputFields = new ArrayList<>(); outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); - outputFields.addAll(inputFields); + Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add); return Optional.of(DataTypes.ROW(outputFields).notNull()); }; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index da3c9270ebaad..ca34b79974a3d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -39,9 +39,10 @@ protected void applyDefaultEnvironmentOptions(TableConfig config) { @Override public List programs() { return List.of( - ToChangelogTestPrograms.INSERT_ONLY_INPUT, - ToChangelogTestPrograms.UPDATING_INPUT, - ToChangelogTestPrograms.UPSERT_INPUT, + ToChangelogTestPrograms.INSERT, + ToChangelogTestPrograms.RETRACT, + ToChangelogTestPrograms.UPSERT, + ToChangelogTestPrograms.RETRACT_PARTITION_BY, ToChangelogTestPrograms.CUSTOM_OP_MAPPING, ToChangelogTestPrograms.CUSTOM_OP_NAME, ToChangelogTestPrograms.TABLE_API_DEFAULT, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index d6da85f8c0ca0..36f602225dd89 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -42,7 +42,7 @@ public class ToChangelogTestPrograms { // SQL tests // -------------------------------------------------------------------------------------------- - public static final TableTestProgram INSERT_ONLY_INPUT = + public static final TableTestProgram INSERT = TableTestProgram.of("to-changelog-insert-only", "insert-only input produces op=INSERT") .setupTableSource( SourceTestStep.newBuilder("t") @@ -60,7 +60,7 @@ public class ToChangelogTestPrograms { .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)") .build(); - public static final TableTestProgram UPDATING_INPUT = + public static final TableTestProgram RETRACT = TableTestProgram.of( "to-changelog-updating-input", "retract input produces all op codes including UPDATE_BEFORE") @@ -121,7 +121,38 @@ public class ToChangelogTestPrograms { .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)") .build(); - public static final TableTestProgram UPSERT_INPUT = + /** Partitions by a non-leading column ({@code id}, the middle column of three). */ + public static final TableTestProgram RETRACT_PARTITION_BY = + TableTestProgram.of( + "to-changelog-retract-partition-by-middle-column", + "PARTITION BY a non-leading column drops it from the function output " + + "without disturbing the order of the remaining columns") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", + "id STRING", + "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", "EU", 10L), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", "EU", 10L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", "EU", 30L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id STRING", "op STRING", "name STRING", "score BIGINT") + .consumedValues( + "+I[EU, INSERT, Alice, 10]", + "+I[EU, UPDATE_BEFORE, Alice, 10]", + "+I[EU, UPDATE_AFTER, Alice, 30]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t PARTITION BY id)") + .build(); + + public static final TableTestProgram UPSERT = TableTestProgram.of( "to-changelog-upsert-input", "upsert input gets ChangelogNormalize for UPDATE_BEFORE and full deletes") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index e98e28fc53af9..1279031868820 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -77,7 +77,7 @@ void testUpsertSource() { } @Test - void testInsertOnlySource() { + void testInsertSource() { util.tableEnv() .executeSql( "CREATE TABLE insert_only_source (" @@ -89,7 +89,7 @@ void testInsertOnlySource() { } @Test - void testSetSemanticsWithPartitionBy() { + void testRetractPartitionBy() { util.tableEnv() .executeSql( "CREATE TABLE retract_source (" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index 77133f4fe419c..b477039f2da3c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. --> - + TABLE insert_only_source)]]> @@ -35,42 +35,42 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), D ]]> - + - TABLE retract_source)]]> + TABLE retract_source PARTITION BY id)]]> - + - TABLE retract_source PARTITION BY id)]]> + TABLE retract_source)]]> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 247f565228b6d..36ed1a615bbc7 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -24,10 +24,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.data.utils.ProjectedRowData; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils; import org.apache.flink.types.ColumnList; import org.apache.flink.types.RowKind; @@ -57,19 +60,24 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction { RowKind.DELETE, "DELETE"); private final Map rawOpMap; + private final int[] outputIndices; private transient Map opMap; private transient GenericRowData opRow; private transient JoinedRowData output; + private transient ProjectedRowData projectedOutput; @SuppressWarnings("unchecked") public ToChangelogFunction(final SpecializedContext context) { super(BuiltInFunctionDefinitions.TO_CHANGELOG, context); final CallContext callContext = context.getCallContext(); + // Table argument is guaranteed by the type strategy's validation phase. + final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); final Map opMapping = callContext.getArgumentValue(2, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); + this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); } @Override @@ -79,6 +87,7 @@ public void open(final FunctionContext context) throws Exception { rawOpMap.forEach((kind, code) -> opMap.put(kind, StringData.fromString(code))); opRow = new GenericRowData(1); output = new JoinedRowData(); + projectedOutput = ProjectedRowData.from(outputIndices); } /** @@ -110,6 +119,6 @@ public void eval( } opRow.setField(0, opCode); - collect(output.replace(opRow, input)); + collect(output.replace(opRow, projectedOutput.replaceRow(input))); } }