[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901
[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901raminqaf wants to merge 4 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Thanks for the PR, @raminqaf! In general, it looks very consistent with to_changelog.
Here is my first set of reviews. In this one, I think there are some minor details we have to decide on and two bigger things:
- Going with retract by default as the output mode
- Finding a nice way of expressing the changelog mode with the BuiltInFunctionsDefinition
See below for more details
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE source_table PARTITION BY key_col, | ||
| [op => DESCRIPTOR(op_column_name),] | ||
| [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']] |
There was a problem hiding this comment.
The invalid_op_handling param would already be relevant here but we can add it in a follow up PR
There was a problem hiding this comment.
Good point and good idea, let's keep this PR not grow too big.
|
|
||
| #### Default op_mapping | ||
|
|
||
| When `op_mapping` is omitted, the following standard names are used: |
There was a problem hiding this comment.
We're missing the UPDATE_BEFORE -> UPDATE_BEFORE
We're actually not going to drop UPDATE_BEFORE in the default implementations. We want a simply flat mapping in the default behavior
There was a problem hiding this comment.
Added a dynamic way to detect the changelog mode based on the op_mapping:
- no op_mapping (default) -> retract
- op_mapping not containing UPDATE_BEFORE -> upsert
- op_mapping containing UPDATE_BEFORE -> retract
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public ChangelogMode getChangelogMode( | ||
| ChangelogContext changelogContext) { | ||
| return ChangelogMode.upsert(false); |
There was a problem hiding this comment.
I think we'll go with ChangelogMode.ALL
| String runtimeClass, | ||
| boolean isInternal) { | ||
| boolean isInternal, | ||
| @Nullable ChangelogFunction changelogFunction) { |
There was a problem hiding this comment.
We need to add a way for BultInFunctions to set the ChangelogMode but this adding it like this doesn't feel very clean. We also have to add some dead code below in the func definition when creating 'new ChangelogFunction()' and it's very specific.
A simple alternative would be just going adding changelogMode to the BuiltInFunctionDefinition, but this might not be enough for eventual optimizations where we want to decide between append/upsert/retract stream output depending on the args inside opMapping
BuiltInFunctionDefinition.newBuilder()
.name("FROM_CHANGELOG")
.outputChangelogMode(ChangelogMode.upsert(false))
.build();
I think we have to sync with Timo on this. Do you have other alternatives you think could be better to solve this?
There was a problem hiding this comment.
I have added a new changelogModeResolver that takes in a Function with the CallContext and outputs a ChangelogMode
/**
* Sets a resolver that dynamically determines the output {@link ChangelogMode} for this
* built-in PTF. The resolver receives the {@link CallContext} and can inspect function
* arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit
* updates (e.g., FROM_CHANGELOG).
*/
public Builder changelogModeResolver(
Function<CallContext, ChangelogMode> changelogModeResolver) {
this.changelogModeResolver = changelogModeResolver;
return this;
}
gustavodemorais
left a comment
There was a problem hiding this comment.
Thanks for the updates, Ramin! Added some comments, take a look
| -- +U[id:1, name:'Alice2'] | ||
| -- -D[id:2, name:'Bob'] | ||
|
|
||
| -- Table state after all events: {id:1, name:'Alice2'} |
There was a problem hiding this comment.
nit: this looks more like json. Can you use a table format?
There was a problem hiding this comment.
Can't we implement "ChangelogFunction" and then have a default implementation keep the current behavior and then just change this for our new builtin function
There was a problem hiding this comment.
I tried this but, implementing ChangelogFunction on BuiltInFunctionDefinition can't give us dynamic changelog mode based on (input) arguments, because ChangelogContext only provides input changelog modes and downstream requirements and not the function's arguments like op_mapping.
We need the CallContext to inspect the inputs (i.e., from op_mapping), which is what the resolver provides.
There was a problem hiding this comment.
I see. What do you think about extending ChangelogContext and adding new functionality to it like getScalarValue to make it able to access arguments?
| val inputChangelogModes = children.map(toChangelogMode(_, None, None)) | ||
| val changelogModeOpt: Option[ChangelogMode] = definition match { | ||
| // User-defined PTFs that implement ChangelogFunction | ||
| case cf: ChangelogFunction => |
There was a problem hiding this comment.
Ideally we don't touch this file. If you implement ChangelogFunction that might be enough and we can then skip these changes
|
|
||
| ```sql | ||
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE source_table PARTITION BY key_col, |
There was a problem hiding this comment.
We're moving to row semantics as the first default version - no partition by. I'm finishing a new PR that should help you see the changes that you'll have to make
|
|
||
| private final SqlCallSyntax sqlCallSyntax; | ||
|
|
||
| private final @Nullable Function<ChangelogFunction.ChangelogContext, ChangelogMode> |
There was a problem hiding this comment.
| private final @Nullable Function<ChangelogFunction.ChangelogContext, ChangelogMode> | |
| private final @Nullable Function<ChangelogContext, ChangelogMode> |
| */ | ||
| @SuppressWarnings({"unchecked", "rawtypes"}) | ||
| public static ChangelogMode resolveChangelogMode( | ||
| final ChangelogFunction.ChangelogContext changelogContext) { |
There was a problem hiding this comment.
I think we almost always follow this pattern
| final ChangelogFunction.ChangelogContext changelogContext) { | |
| final ChangelogContext changelogContext) { |
You've done this in multiple locations. Can you take a look?
| definition match { | ||
| case changelogFunction: ChangelogFunction => | ||
| val inputChangelogModes = children.map(toChangelogMode(_, None, None)) | ||
| val inputChangelogModes = children.map(toChangelogMode(_, None, None)) |
There was a problem hiding this comment.
can we undo the changes to this file? This is one of the trickiest files so we usually want to change it if we have a good reason
| throwOnFailure, | ||
| String.format( | ||
| "Invalid target mapping for argument 'op_mapping'. " | ||
| + "Duplicate change operation: '%s'.", |
There was a problem hiding this comment.
Some users may think they can they need two entries mapping different user codes to the same RowKind (e.g., 'c' -> INSERT, 'r'
-> INSERT).
I think here we can have a better message: "If you need mulitple change codes to map to the same type, use a comma separated list, e.g. 'c, r' -> 'INSERT'. What do you think?
| FromChangelogTestPrograms.DEBEZIUM_MAPPING, | ||
| FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED, | ||
| FromChangelogTestPrograms.TABLE_API_DEFAULT, | ||
| FromChangelogTestPrograms.MISSING_PARTITION_BY); |
There was a problem hiding this comment.
I think there are two good tests we could add here
- round-trip FROM_CHANGELOG(TO_CHANGELOG(table)) (important, might work better when we merge [FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes + require update before #27911 which is approved)
- custom op column name - There's no test for op => DESCRIPTOR(operation)
What is the purpose of the change
Implement the FROM_CHANGELOG built-in process table function as specified in FLIP-564, section 4.1.3.1 (append-only stream to upsert stream, flat mode).
FROM_CHANGELOG converts an append-only table with an explicit operation code column (e.g., Debezium's
'c','u','d') into a dynamic table backed by a Flink upsert stream ({+I, +U, -D}). This is the reverse of TO_CHANGELOG. The implementation is stateless — each input record maps directly to one output record with the appropriate RowKind.Brief change log
ChangelogFunctiondelegate toBuiltInFunctionDefinitionso built-in PTFs can declare their output changelog modeFlinkChangelogModeInferenceProgramto handle the new delegateFromChangelogTypeStrategywith input validation (op column existence, STRING type, op_mapping value validation) and output type inference (removes op column from output)FROM_CHANGELOGbuilt-in function definition withChangelogMode.upsert(false)outputFromChangelogFunctionruntime implementation usingProjectedRowDatafor zero-copy projectionfromChangelog()convenience method toPartitionedTableTable APIVerifying this change
This change added tests and can be verified as follows:
FromChangelogTypeStrategyinput validation (valid mapping, op column not found, wrong type, invalid descriptor, invalid RowKind, UPDATE_BEFORE rejected, duplicate RowKind)changelogMode=[I,UA,D]for PTF output,changelogMode=[I]for source input)Does this pull request potentially affect one of the following parts:
@Public(Evolving): yes (PartitionedTable.fromChangelog())Documentation