Skip to content

[FLINK-39259][table] Support TO_CHANGELOG built-in process table function (upsert stream, flat mode)#27777

Open
gustavodemorais wants to merge 8 commits intoapache:masterfrom
gustavodemorais:FLINK-39259
Open

[FLINK-39259][table] Support TO_CHANGELOG built-in process table function (upsert stream, flat mode)#27777
gustavodemorais wants to merge 8 commits intoapache:masterfrom
gustavodemorais:FLINK-39259

Conversation

@gustavodemorais
Copy link
Contributor

What is the purpose of the change

This is the first implementation of TO_CHANGELOG. The idea is to implement the use cases one by one interactively. With this ticket we will write the first version which allows flat mapping of upsert stream.

Implement TO_CHANGELOG built-in PTF as specified in FLIP-564, section 4.2.2.2 (retract/upsert stream to upsert stream, flat mode).

TO_CHANGELOG converts a dynamic table (retract/upsert stream) into an append-only stream with an explicit operation code column. Each input row is emitted as an INSERT-only row with a string op column indicating the original RowKind. This is stateless - each event maps directly to one output record.

Brief change log

  - Built-in function definition and type strategies
  - Runtime implementation 
  - Table API interface
  - MAP literal support in OperatorBindingCallContext and CallBindingCallContext
  - Plan tests, semantic tests (SQL, Table API, convenience API)
  - Use-case test: LAG on upsert stream (previously impossible without TO_CHANGELOG, like other aggregation functions)
  - Documentation

Verifying this change

  • Plan tests
  • Semantic tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 16, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback for 24a9bed

final List<SqlNode> operands = mapCall.getOperandList();
final Map<String, String> map = new LinkedHashMap<>();
for (int i = 0; i < operands.size(); i += 2) {
final String key = SqlLiteral.unchain(operands.get(i)).getValueAs(String.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this unchaining fails? A map can only be literal if all children are also literal, otherwise the getArgumentValue must return Optional.empty()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how well is null supported in keys and values? officially we support them in Flink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's catch exceptions here and also check for nulls

}

@Override
public boolean isArgumentLiteral(int pos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update isArgumentLiteral as well? A map can only be literal if all children are also literal

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback for 6efff33

if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1) {
return callContext.fail(
throwOnFailure,
"OP descriptor must contain exactly one column name "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The descriptor for argument 'op' must contain exactly one column name

if (!(key instanceof String) || !VALID_ROW_KIND_NAMES.contains(key)) {
return callContext.fail(
throwOnFailure,
"Invalid RowKind name '%s' in op_mapping. Valid names are: %s.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Invalid target mapping for argument 'op_mapping'."

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback for 8d295e1

* limitations under the License.
*/

package org.apache.flink.table.planner.functions.ptf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put into org.apache.flink.table.runtime.functions.ptf in table-runtime

* partition key columns automatically.
*/
@Internal
public class ToChangelogFunction extends ProcessTableFunction<Row> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BuiltInProcessTableFunction similar to BuiltInScalarFunction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably mean we should add a new BuiltInProcessTableFunction similar to BuiltInScalarFunction as start using it as a base class?

collect(buildOutputRow(input, opCode));
}

private Row buildOutputRow(final Row input, final String opCode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once you use BuiltIn stack, everything should be RowData. as efficiently as possible with as little conversion as possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It was necessary to do one more change to the framework to support this: TypeInferenceUtil.inferInputTypes() (line 484) returns semantics.dataType() for table arguments, which always has Row as conversion class - it ignores the conversionClass specified in StaticArgument.table(). Adding bridgedTo(expectedArg.getConversionClass()).

#27777 (comment)

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @gustavodemorais. Sorry for being so picky here but this is the first real built-in PTF so we need to prepare a good template for the future.


@Override
public Table toChangelog() {
return process("TO_CHANGELOG");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't hard code the name here. use the built in function definition. I know this was an issue for ml predict, but we should fix it properly now.

}

@Override
public Table toChangelog() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add op_mapping parameter and op, same functionality as in sql. to avoid to many overloads, we can accept Expression and force people to use .asArgument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Converts this dynamic table into an append-only stream with an explicit operation code column
* using the built-in {@link BuiltInFunctionDefinitions#TO_CHANGELOG TO_CHANGELOG} PTF.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove references to BuiltInFunctionDefinitions#TO_CHANGELOG TO_CHANGELOG. BuiltInFunctionDefinitions is rather internal.

@gustavodemorais
Copy link
Contributor Author

Hey @twalthr, thanks for the review! I've addressed all your comments here ea476c3

@gustavodemorais gustavodemorais requested a review from twalthr March 17, 2026 17:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants