[FLINK-39576][table] Expose NO_PASS_THROUGH as a PTF trait#28065
[FLINK-39576][table] Expose NO_PASS_THROUGH as a PTF trait#28065gustavodemorais wants to merge 2 commits intoapache:masterfrom
Conversation
| break; | ||
| case NONE: | ||
| prefix = EMPTY_PREFIX; | ||
| break; |
There was a problem hiding this comment.
missing default branch
better to have it otherwise adding new mode will lead to painful debug
| case ALL: | ||
| return tableArg.getType().getFieldCount(); | ||
| default: | ||
| // KEY |
There was a problem hiding this comment.
can we have dedicated case for KEY?
better to have it otherwise adding new mode will lead to painful debug
| callContext | ||
| .getTableSemantics(0) | ||
| .orElseThrow(IllegalStateException::new) | ||
| .dataType() | ||
| .notNull())) |
There was a problem hiding this comment.
how about extraction into a separate var to reduce formatting
| input -> | ||
| input.hasSetSemantics() | ||
| && input.passThroughMode() != PassThroughMode.ALL); |
| .withConditionalTrait(StaticArgumentTrait.NO_PASS_THROUGH, ctx -> false); | ||
|
|
||
| final StaticArgument resolved = base.applyConditionalTraits(noopContext()); | ||
| assertThat(resolved.getPassThroughMode()).isEqualTo(PassThroughMode.KEY); |
There was a problem hiding this comment.
here and in other places better to compare enums with ==
| assertThat(resolved.getPassThroughMode()).isEqualTo(PassThroughMode.KEY); | |
| assertThat(resolved.getPassThroughMode()).isSameAs(PassThroughMode.KEY); |
| void passColumnsThroughTraitMapsToAll() { | ||
| final StaticArgument arg = | ||
| StaticArgument.table( | ||
| "in", | ||
| Row.class, | ||
| false, | ||
| EnumSet.of( | ||
| StaticArgumentTrait.TABLE, | ||
| StaticArgumentTrait.SET_SEMANTIC_TABLE, | ||
| StaticArgumentTrait.PASS_COLUMNS_THROUGH)); | ||
| assertThat(arg.getPassThroughMode()).isEqualTo(PassThroughMode.ALL); | ||
| } | ||
|
|
||
| @Test | ||
| void noPassThroughTraitMapsToNone() { | ||
| final StaticArgument arg = | ||
| StaticArgument.table( | ||
| "in", | ||
| Row.class, | ||
| false, | ||
| EnumSet.of( | ||
| StaticArgumentTrait.TABLE, | ||
| StaticArgumentTrait.SET_SEMANTIC_TABLE, | ||
| StaticArgumentTrait.NO_PASS_THROUGH)); | ||
| assertThat(arg.getPassThroughMode()).isEqualTo(PassThroughMode.NONE); | ||
| } |
There was a problem hiding this comment.
there is a number of tests with input StaticArgument and checking for
assertThat(arg.getPassThroughMode())can we just parameterized them?
probably some others as well
| trait -> | ||
| trait.getIncompatibleWith() | ||
| .forEach( | ||
| incompatible -> { | ||
| if (traits.contains(incompatible)) { | ||
| throw new ValidationException( | ||
| String.format( | ||
| "Invalid argument traits for argument '%s'. Trait %s is mutually exclusive with %s.", | ||
| name, trait, incompatible)); | ||
| } | ||
| })); |
There was a problem hiding this comment.
this and another snippet above are very similar, can we extract common logic or at least vars to void heavy formatting?
| The `ArgumentTrait.NO_PASS_THROUGH` instructs the system to omit all framework-added columns. The output is fully | ||
| controlled by the function's declared output type. | ||
|
|
||
| By default, the framework prepends the `PARTITION BY` columns and appends a `rowtime` column when `on_time` is provided. |
There was a problem hiding this comment.
The rowtime column is not a pass-through column. The value depends on both the incoming on_time value OR the timer firing value. If we don't output it, following temporal operations become unavailable. It is possible to disable on_time for this purpose (via TypeInference.disableSystemArgs) or just don't pass on_time to the call. Therefore, there is an alternative for the user I would vote for only handling the partition key.
|
I've though long and had a deep look at our plans with TO and FROM Changelog and changed my mind. My goal was to make this not impact the metadata that comes out the of PTFs but this is not easily possible - partition keys, unique keys and upsert keys. We need these metadata to be preserved and it's better to keep this specific ptf change out of scope from this init. Thus I'm closing this and we'll move forward with the schema changes while preserving the partition by columns with the help of the ptf framework, which pre appends them at the beginning. |
What is the purpose of the change
Adds a new public PTF argument trait
ArgumentTrait.NO_PASS_THROUGHthat lets a function fully own its output schema. By default the framework prepends thePARTITION BYcolumns and appends arowtimesuffix whenon_timeisprovided; with this trait, neither is added. Useful when a PTF emits its input verbatim or otherwise produces a schema that should not be augmented by the framework.
Brief change log
ArgumentTrait.NO_PASS_THROUGHand internalStaticArgumentTrait.NO_PASS_THROUGH.PassThroughModeenum (KEY,ALL,NONE) derived per table arg from the declared traits.PassAllCollector,PassPartitionKeysCollector) into a singlePassThroughCollectorthat dispatches per-arg by mode.StaticArgument.checkTraitsviaStaticArgumentTrait.getIncompatibleWith().Verifying this change
StaticArgumentTestandStaticArgumentTraitTestcover trait-to-mode derivation, conditional-trait resolution, mutual-exclusion validation, and symmetry of incompatibility.ProcessTableFunctionTest#testNoPassThroughFunctionOwnsOutputSchemaexercises the end-to-end planner path and verifies the output rowtype omits the partition key.ProcessTableFunctionRestoreTestscontinue to pass against both the new code and the pre-existing JSON plans + savepoints.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes - newArgumentTrait.NO_PASS_THROUGHconstant)Documentation
ArgumentTrait.NO_PASS_THROUGHand updates todocs/content/docs/dev/table/functions/ptfs.mdand the Chinese translation)Was generative AI tooling used to co-author this PR?
2.1.117 (Claude Code)