From 9d6af4026e8039655419ea4d1e55fa9eb529c44c Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Wed, 6 May 2026 16:10:51 +0200 Subject: [PATCH 1/5] [FLINK-39614][table] Support TO_CHANGELOG: retract stream -> retract stream with set semantics --- .../ChangelogTypeStrategyUtils.java | 61 +++++++++++++++++++ .../strategies/ToChangelogTypeStrategy.java | 7 ++- .../exec/stream/ToChangelogSemanticTests.java | 7 ++- .../exec/stream/ToChangelogTestPrograms.java | 37 ++++++++++- .../plan/stream/sql/ToChangelogTest.java | 4 +- .../plan/stream/sql/ToChangelogTest.xml | 28 ++++----- .../functions/ptf/ToChangelogFunction.java | 16 ++++- 7 files changed, 136 insertions(+), 24 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java new file mode 100644 index 0000000000000..2857e9e48469f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; + +/** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code FROM_CHANGELOG}). */ +@Internal +public final class ChangelogTypeStrategyUtils { + + /** + * Returns the input column indices that pass through to the function's output, excluding: - + * Partition keys, if present (the PTF framework prepends them when the input has set semantics) + * - The operation column {@code opColumnName} when non-null and present. + */ + public static int[] computeOutputIndices( + final TableSemantics tableSemantics, final @Nullable String opColumnName) { + final List inputFields = DataType.getFields(tableSemantics.dataType()); + final Set excluded = new HashSet<>(); + for (final int idx : tableSemantics.partitionByColumns()) { + excluded.add(idx); + } + if (opColumnName != null) { + for (int i = 0; i < inputFields.size(); i++) { + if (inputFields.get(i).getName().equals(opColumnName)) { + excluded.add(i); + break; + } + } + } + return IntStream.range(0, inputFields.size()).filter(i -> !excluded.contains(i)).toArray(); + } + + private ChangelogTypeStrategyUtils() {} +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index c910885d69e8f..00b376354ced0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -35,6 +35,7 @@ import org.apache.flink.types.ColumnList; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -96,10 +97,14 @@ public List getExpectedSignatures(final FunctionDefinition definition final String opColumnName = resolveOpColumnName(callContext); final List inputFields = DataType.getFields(semantics.dataType()); + // Excludes partition keys when set semantics; the framework prepends them so + // including them again here would duplicate the columns. + final int[] outputIndices = + ChangelogTypeStrategyUtils.computeOutputIndices(semantics, null); final List outputFields = new ArrayList<>(); outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); - outputFields.addAll(inputFields); + Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add); return Optional.of(DataTypes.ROW(outputFields).notNull()); }; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index da3c9270ebaad..ca34b79974a3d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -39,9 +39,10 @@ protected void applyDefaultEnvironmentOptions(TableConfig config) { @Override public List programs() { return List.of( - ToChangelogTestPrograms.INSERT_ONLY_INPUT, - ToChangelogTestPrograms.UPDATING_INPUT, - ToChangelogTestPrograms.UPSERT_INPUT, + ToChangelogTestPrograms.INSERT, + ToChangelogTestPrograms.RETRACT, + ToChangelogTestPrograms.UPSERT, + ToChangelogTestPrograms.RETRACT_PARTITION_BY, ToChangelogTestPrograms.CUSTOM_OP_MAPPING, ToChangelogTestPrograms.CUSTOM_OP_NAME, ToChangelogTestPrograms.TABLE_API_DEFAULT, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index d6da85f8c0ca0..36f602225dd89 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -42,7 +42,7 @@ public class ToChangelogTestPrograms { // SQL tests // -------------------------------------------------------------------------------------------- - public static final TableTestProgram INSERT_ONLY_INPUT = + public static final TableTestProgram INSERT = TableTestProgram.of("to-changelog-insert-only", "insert-only input produces op=INSERT") .setupTableSource( SourceTestStep.newBuilder("t") @@ -60,7 +60,7 @@ public class ToChangelogTestPrograms { .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)") .build(); - public static final TableTestProgram UPDATING_INPUT = + public static final TableTestProgram RETRACT = TableTestProgram.of( "to-changelog-updating-input", "retract input produces all op codes including UPDATE_BEFORE") @@ -121,7 +121,38 @@ public class ToChangelogTestPrograms { .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)") .build(); - public static final TableTestProgram UPSERT_INPUT = + /** Partitions by a non-leading column ({@code id}, the middle column of three). */ + public static final TableTestProgram RETRACT_PARTITION_BY = + TableTestProgram.of( + "to-changelog-retract-partition-by-middle-column", + "PARTITION BY a non-leading column drops it from the function output " + + "without disturbing the order of the remaining columns") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", + "id STRING", + "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", "EU", 10L), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", "EU", 10L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", "EU", 30L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id STRING", "op STRING", "name STRING", "score BIGINT") + .consumedValues( + "+I[EU, INSERT, Alice, 10]", + "+I[EU, UPDATE_BEFORE, Alice, 10]", + "+I[EU, UPDATE_AFTER, Alice, 30]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t PARTITION BY id)") + .build(); + + public static final TableTestProgram UPSERT = TableTestProgram.of( "to-changelog-upsert-input", "upsert input gets ChangelogNormalize for UPDATE_BEFORE and full deletes") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index e98e28fc53af9..1279031868820 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -77,7 +77,7 @@ void testUpsertSource() { } @Test - void testInsertOnlySource() { + void testInsertSource() { util.tableEnv() .executeSql( "CREATE TABLE insert_only_source (" @@ -89,7 +89,7 @@ void testInsertOnlySource() { } @Test - void testSetSemanticsWithPartitionBy() { + void testRetractPartitionBy() { util.tableEnv() .executeSql( "CREATE TABLE retract_source (" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index 77133f4fe419c..b477039f2da3c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. --> - + TABLE insert_only_source)]]> @@ -35,42 +35,42 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), D ]]> - + - TABLE retract_source)]]> + TABLE retract_source PARTITION BY id)]]> - + - TABLE retract_source PARTITION BY id)]]> + TABLE retract_source)]]> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 247f565228b6d..71bc86c2a8802 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -24,10 +24,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.data.utils.ProjectedRowData; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils; import org.apache.flink.types.ColumnList; import org.apache.flink.types.RowKind; @@ -57,10 +60,12 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction { RowKind.DELETE, "DELETE"); private final Map rawOpMap; + private final int[] outputIndices; private transient Map opMap; private transient GenericRowData opRow; private transient JoinedRowData output; + private transient ProjectedRowData projectedOutput; @SuppressWarnings("unchecked") public ToChangelogFunction(final SpecializedContext context) { @@ -70,6 +75,14 @@ public ToChangelogFunction(final SpecializedContext context) { final Map opMapping = callContext.getArgumentValue(2, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); + + // Drop partition keys when set semantics: the framework prepends them automatically. + // For row semantics, partitionByColumns is empty and this is a no-op identity projection. + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow(() -> new IllegalStateException("Table argument expected.")); + this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics, null); } @Override @@ -79,6 +92,7 @@ public void open(final FunctionContext context) throws Exception { rawOpMap.forEach((kind, code) -> opMap.put(kind, StringData.fromString(code))); opRow = new GenericRowData(1); output = new JoinedRowData(); + projectedOutput = ProjectedRowData.from(outputIndices); } /** @@ -110,6 +124,6 @@ public void eval( } opRow.setField(0, opCode); - collect(output.replace(opRow, input)); + collect(output.replace(opRow, projectedOutput.replaceRow(input))); } } From 1f70352e5d814546f6aaa0a2b557c7e6e98d133c Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Wed, 6 May 2026 17:43:01 +0200 Subject: [PATCH 2/5] [FLINK-39614][table] Refactor ChangelogTypeStrategyUtils.computeOutputIndices into two overloads --- .../ChangelogTypeStrategyUtils.java | 48 ++++++++++++------- .../strategies/ToChangelogTypeStrategy.java | 4 +- .../functions/ptf/ToChangelogFunction.java | 11 ++--- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java index 2857e9e48469f..dfaf09d1fce79 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java @@ -23,8 +23,6 @@ import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; -import javax.annotation.Nullable; - import java.util.HashSet; import java.util.List; import java.util.Set; @@ -35,26 +33,42 @@ public final class ChangelogTypeStrategyUtils { /** - * Returns the input column indices that pass through to the function's output, excluding: - - * Partition keys, if present (the PTF framework prepends them when the input has set semantics) - * - The operation column {@code opColumnName} when non-null and present. + * Returns the input column indices that pass through to the function's output, excluding the + * partition key columns (the PTF framework prepends them when the input has set semantics). + */ + public static int[] computeOutputIndices(final TableSemantics tableSemantics) { + final int inputFieldCount = DataType.getFields(tableSemantics.dataType()).size(); + final Set excluded = collectPartitionKeyIndices(tableSemantics); + return filterIndices(inputFieldCount, excluded); + } + + /** + * Returns the input column indices that pass through to the function's output, excluding the + * partition key columns and the operation column matching {@code opColumnName}. */ public static int[] computeOutputIndices( - final TableSemantics tableSemantics, final @Nullable String opColumnName) { + final TableSemantics tableSemantics, final String opColumnName) { final List inputFields = DataType.getFields(tableSemantics.dataType()); - final Set excluded = new HashSet<>(); - for (final int idx : tableSemantics.partitionByColumns()) { - excluded.add(idx); - } - if (opColumnName != null) { - for (int i = 0; i < inputFields.size(); i++) { - if (inputFields.get(i).getName().equals(opColumnName)) { - excluded.add(i); - break; - } + final Set excluded = collectPartitionKeyIndices(tableSemantics); + for (int i = 0; i < inputFields.size(); i++) { + if (inputFields.get(i).getName().equals(opColumnName)) { + excluded.add(i); + break; } } - return IntStream.range(0, inputFields.size()).filter(i -> !excluded.contains(i)).toArray(); + return filterIndices(inputFields.size(), excluded); + } + + private static Set collectPartitionKeyIndices(final TableSemantics tableSemantics) { + final Set indices = new HashSet<>(); + for (final int idx : tableSemantics.partitionByColumns()) { + indices.add(idx); + } + return indices; + } + + private static int[] filterIndices(final int total, final Set excluded) { + return IntStream.range(0, total).filter(i -> !excluded.contains(i)).toArray(); } private ChangelogTypeStrategyUtils() {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index 00b376354ced0..d977deab4a93b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -97,10 +97,8 @@ public List getExpectedSignatures(final FunctionDefinition definition final String opColumnName = resolveOpColumnName(callContext); final List inputFields = DataType.getFields(semantics.dataType()); - // Excludes partition keys when set semantics; the framework prepends them so - // including them again here would duplicate the columns. final int[] outputIndices = - ChangelogTypeStrategyUtils.computeOutputIndices(semantics, null); + ChangelogTypeStrategyUtils.computeOutputIndices(semantics); final List outputFields = new ArrayList<>(); outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 71bc86c2a8802..36ed1a615bbc7 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -71,18 +71,13 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction { public ToChangelogFunction(final SpecializedContext context) { super(BuiltInFunctionDefinitions.TO_CHANGELOG, context); final CallContext callContext = context.getCallContext(); + // Table argument is guaranteed by the type strategy's validation phase. + final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); final Map opMapping = callContext.getArgumentValue(2, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); - - // Drop partition keys when set semantics: the framework prepends them automatically. - // For row semantics, partitionByColumns is empty and this is a no-op identity projection. - final TableSemantics tableSemantics = - callContext - .getTableSemantics(0) - .orElseThrow(() -> new IllegalStateException("Table argument expected.")); - this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics, null); + this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); } @Override From b9e8d1f4089c912f2824bf2b003fcc14d89c1b60 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Wed, 6 May 2026 17:58:02 +0200 Subject: [PATCH 3/5] [FLINK-39614][table] Use DataType.getFieldNames in ChangelogTypeStrategyUtils --- .../strategies/ChangelogTypeStrategyUtils.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java index dfaf09d1fce79..3f27b8182c87a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.table.types.inference.strategies; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; @@ -37,7 +36,7 @@ public final class ChangelogTypeStrategyUtils { * partition key columns (the PTF framework prepends them when the input has set semantics). */ public static int[] computeOutputIndices(final TableSemantics tableSemantics) { - final int inputFieldCount = DataType.getFields(tableSemantics.dataType()).size(); + final int inputFieldCount = DataType.getFieldNames(tableSemantics.dataType()).size(); final Set excluded = collectPartitionKeyIndices(tableSemantics); return filterIndices(inputFieldCount, excluded); } @@ -48,15 +47,13 @@ public static int[] computeOutputIndices(final TableSemantics tableSemantics) { */ public static int[] computeOutputIndices( final TableSemantics tableSemantics, final String opColumnName) { - final List inputFields = DataType.getFields(tableSemantics.dataType()); + final List inputFieldNames = DataType.getFieldNames(tableSemantics.dataType()); final Set excluded = collectPartitionKeyIndices(tableSemantics); - for (int i = 0; i < inputFields.size(); i++) { - if (inputFields.get(i).getName().equals(opColumnName)) { - excluded.add(i); - break; - } + final int opIndex = inputFieldNames.indexOf(opColumnName); + if (opIndex >= 0) { + excluded.add(opIndex); } - return filterIndices(inputFields.size(), excluded); + return filterIndices(inputFieldNames.size(), excluded); } private static Set collectPartitionKeyIndices(final TableSemantics tableSemantics) { From b5b1722df09031a6758491e1403da6973e285b58 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Thu, 7 May 2026 11:13:47 +0200 Subject: [PATCH 4/5] [FLINK-39614][table] Reduce duplication and use DataType.getFieldCount in ChangelogTypeStrategyUtils --- .../ChangelogTypeStrategyUtils.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java index 3f27b8182c87a..e9608ee779524 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java @@ -22,9 +22,10 @@ import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; +import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; /** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code FROM_CHANGELOG}). */ @@ -36,9 +37,7 @@ public final class ChangelogTypeStrategyUtils { * partition key columns (the PTF framework prepends them when the input has set semantics). */ public static int[] computeOutputIndices(final TableSemantics tableSemantics) { - final int inputFieldCount = DataType.getFieldNames(tableSemantics.dataType()).size(); - final Set excluded = collectPartitionKeyIndices(tableSemantics); - return filterIndices(inputFieldCount, excluded); + return computeOutputIndices(tableSemantics, -1); } /** @@ -47,25 +46,25 @@ public static int[] computeOutputIndices(final TableSemantics tableSemantics) { */ public static int[] computeOutputIndices( final TableSemantics tableSemantics, final String opColumnName) { - final List inputFieldNames = DataType.getFieldNames(tableSemantics.dataType()); - final Set excluded = collectPartitionKeyIndices(tableSemantics); - final int opIndex = inputFieldNames.indexOf(opColumnName); - if (opIndex >= 0) { - excluded.add(opIndex); - } - return filterIndices(inputFieldNames.size(), excluded); + final int opIndex = DataType.getFieldNames(tableSemantics.dataType()).indexOf(opColumnName); + return computeOutputIndices(tableSemantics, opIndex); } - private static Set collectPartitionKeyIndices(final TableSemantics tableSemantics) { - final Set indices = new HashSet<>(); - for (final int idx : tableSemantics.partitionByColumns()) { - indices.add(idx); + private static int[] computeOutputIndices( + final TableSemantics tableSemantics, final int extraExcludedIndex) { + final Set excluded = collectPartitionKeyIndices(tableSemantics); + if (extraExcludedIndex >= 0) { + excluded.add(extraExcludedIndex); } - return indices; + final int inputFieldCount = DataType.getFieldCount(tableSemantics.dataType()); + return IntStream.range(0, inputFieldCount).filter(i -> !excluded.contains(i)).toArray(); } - private static int[] filterIndices(final int total, final Set excluded) { - return IntStream.range(0, total).filter(i -> !excluded.contains(i)).toArray(); + private static Set collectPartitionKeyIndices(final TableSemantics tableSemantics) { + return new HashSet<>( + Arrays.stream(tableSemantics.partitionByColumns()) + .boxed() + .collect(Collectors.toSet())); } private ChangelogTypeStrategyUtils() {} From ed4c5d7845ae5f6487b39b4ca3449f4c01902f65 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Thu, 7 May 2026 15:16:07 +0200 Subject: [PATCH 5/5] [FLINK-39614][docs] Document partitioning by a key in TO_CHANGELOG --- .../docs/sql/reference/queries/changelog.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 494dcc60be892..34945439f92b0 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -268,6 +268,25 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` +#### Partitioning by a key + +```sql +-- Input table 'my_aggregation' with columns (name, id, cnt) +-- Default output schema: [op, name, id, cnt] +-- Output schema with PARTITION BY: [id, op, name, cnt] + +SELECT * FROM TO_CHANGELOG( + input => TABLE my_aggregation PARTITION BY id +) +``` +When `PARTITION BY` is provided, **the output schema changes**. The partition key columns are moved to the front by the engine, and the function emits the remaining input columns. The order becomes: + +``` +[partition_keys, op_column, non_partition_input_columns] +``` + +Prefer row semantics, when possible. `PARTITION BY` is only necessary when downstream operators are keyed on that column and you want to co-locate rows for the same key in the same parallel operator instance. + #### Table API ```java