Skip to content

[FLINK-39515][table] Fix compiled plan restore for built-in PTFs with default args#27996

Merged
twalthr merged 2 commits intoapache:masterfrom
confluentinc:FLINK-39515
Apr 23, 2026
Merged

[FLINK-39515][table] Fix compiled plan restore for built-in PTFs with default args#27996
twalthr merged 2 commits intoapache:masterfrom
confluentinc:FLINK-39515

Conversation

@gustavodemorais
Copy link
Copy Markdown
Contributor

What is the purpose of the change

When restoring a compiled plan whose query calls a built-in PTF with unprovided optional args (e.g. TO_CHANGELOG(input => TABLE t) without op / op_mapping), code generation fails: Unsupported call: DEFAULT()

Cause: Unprovided slots are filled by SqlDefaultArgOperator, a per-call-site instance that carries the expected return type. It's not registered in any operator table, so on restore RexNodeJsonDeserializer falls through to Calcite's stock SqlStdOperatorTable.DEFAULT_OPERATOR. Codegen's op instanceof SqlDefaultArgOperator check (StringCallGen.scala:242) misses, and the call hits the generic unsupported branch.

The same bug affects any built-in PTF that exposes optional user-level args (those survive toUdfCall and reach codegen). Existing user-defined PTF restore tests don't catch it because their DEFAULT() operands are system args (uid, on_time) that toUdfCall strips.

Fix: In RexNodeJsonDeserializer.deserializeCall, when the deserialized operator's kind is SqlKind.DEFAULT, rebuild it as new SqlDefaultArgOperator(callType) so the typed Flink instance reaches codegen.

Brief change log

  • Added fix to RexNodejsonDeserializer
  • Added restore tests

Verifying this change

Repro / coverage: New restore tests ToChangelogRestoreTest and FromChangelogRestoreTest, each running a RETRACT_RESTORE program. Both fail with Unsupported call: DEFAULT() without the fix and pass with it.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 22, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @gustavodemorais.

Comment on lines +378 to +381
// SqlDefaultArgOperator is constructed per-call site by FlinkSqlCallBinding and not
// registered in any operator table, so the lookup above falls through to Calcite's stock
// SqlStdOperatorTable.DEFAULT_OPERATOR. Rebuild the typed Flink instance here so codegen's
// `op instanceof SqlDefaultArgOperator` check (StringCallGen) recognizes it on restore.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SqlDefaultArgOperator is constructed per-call site by FlinkSqlCallBinding and not
// registered in any operator table, so the lookup above falls through to Calcite's stock
// SqlStdOperatorTable.DEFAULT_OPERATOR. Rebuild the typed Flink instance here so codegen's
// `op instanceof SqlDefaultArgOperator` check (StringCallGen) recognizes it on restore.
// SqlDefaultArgOperator is constructed per-call site by FlinkSqlCallBinding and not
// registered in any operator table. Rebuild the typed Flink instance here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +27 to +29
* Restore tests for the built-in FROM_CHANGELOG PTF. Verifies that {@link
* org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator} placeholders for unprovided
* optional args round-trip through compiled-plan serialization.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody cares about this specific bug in the future. So comments can be simplified.

Suggested change
* Restore tests for the built-in FROM_CHANGELOG PTF. Verifies that {@link
* org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator} placeholders for unprovided
* optional args round-trip through compiled-plan serialization.
* Restore tests for the built-in {@link BuiltInFunctionDefinitsions.FROM_CHANGELOG} PTF.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +192 to +194
* savepoint restore. The unprovided {@code op} and {@code op_mapping} args become {@code
* DEFAULT()} placeholders that survive {@code toUdfCall} and reach codegen on the restore path
* - exercising the {@link org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* savepoint restore. The unprovided {@code op} and {@code op_mapping} args become {@code
* DEFAULT()} placeholders that survive {@code toUdfCall} and reach codegen on the restore path
* - exercising the {@link org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator}
* savepoint restore. The unprovided {@code op} and {@code op_mapping} args become {@code
* DEFAULT()} placeholders.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified - also removed "The unprovided {@code op} and {@code op_mapping} args become {@code * DEFAULT()} placeholders."

callType = serdeContext.getRexBuilder().deriveReturnType(operator, rexOperands);
}
return serdeContext.getRexBuilder().makeCall(callType, operator, rexOperands);
// SqlDefaultArgOperator is constructed per-call site by FlinkSqlCallBinding and not
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update RexNodeJsonSerdeTest

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test

@gustavodemorais gustavodemorais requested a review from twalthr April 22, 2026 15:28
@gustavodemorais
Copy link
Copy Markdown
Contributor Author

Thanks for the review, @twalthr! Simplified comments and added test

}

@Test
public void testDefaultArgOperatorRoundTrip() throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you added a dedicated test, and not just a new list entry in testRexNodeSerde?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there isn't. Thanks for the pointer

Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@twalthr twalthr merged commit de207e1 into apache:master Apr 23, 2026
@twalthr twalthr deleted the FLINK-39515 branch April 23, 2026 08:17
airlock-confluentinc Bot pushed a commit to confluentinc/flink-public that referenced this pull request Apr 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants