Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-29406][table] Expose finish method for TableFunction #20899

Merged
merged 3 commits into from
Oct 8, 2022

Conversation

lincoln-lil
Copy link
Contributor

What is the purpose of the change

The task lifecycle was changed in FLINK-22972: a new finish() phase was introduced (extracted the ‘finish’ part out of the ‘close’) and the behavior changed in close method. This was some kind of 'break change' for TableFunction users who did some flush work in previous close method(< 1.14). This pr aims to provide a possible migration path to support it again. If necessary, we can also consider backporting it to the corresponding older versions, including 1.14~1.16.
However, as a method that is not recommended for most users, I tend not to add it to the user documentation (just as we didn't actively prompt users to do flush data in the close method before), so it is only been described in the method comments of the TableFunction, users who used this 'advanced' usage in previous versions 'should' be also able to get the usage of the new method.

Brief change log

  • new finish() method in TableFunction
  • add finish() method invocation in related code generated operator

Verifying this change

CorrelateITCase#testTableFunctionWithFinishMethod

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @public(Evolving): (yes)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 26, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

s"""
|$fieldTerm.finish();
""".stripMargin
reusableFinishStatements.add(finishFunction)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just s"$fieldTerm.finish();"?

@Override
public void finish() throws Exception {
${ctx.reuseFinishCode()}
super.finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: why reuseFinishCode first, super.finish second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the close sequence, it is relatively more safer (not always strictly) to handle the actions of the child class first and call the parent public logic last. Btw, I changed the current close call at first but reverted it for focusing on this pr itself.

@@ -25,7 +25,7 @@ LogicalJoin(condition=[=($3, $1)], joinType=[inner])
: +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
: :- LogicalProject(o_rowtime=[AS($0, _UTF-16LE'o_rowtime')], o_comment=[AS($1, _UTF-16LE'o_comment')], o_amount=[AS($2, _UTF-16LE'o_amount')], o_currency=[AS($3, _UTF-16LE'o_currency')], o_secondary_key=[AS($4, _UTF-16LE'o_secondary_key')])
: : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
: +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Can have serialVersionUID in TableFunction to be compatible with previous version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try to add the uid, but it doesn't work in the current TemporalTableFunction implementation, it was wrapped into a BridgingSqlFunction (a subclass of calcite's SqlFunction which is not serializable) during the conversion, so give up the change for now.

Copy link
Contributor Author

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review this @JingsongLi, I've updated the pr according to your comments.

@@ -25,7 +25,7 @@ LogicalJoin(condition=[=($3, $1)], joinType=[inner])
: +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
: :- LogicalProject(o_rowtime=[AS($0, _UTF-16LE'o_rowtime')], o_comment=[AS($1, _UTF-16LE'o_comment')], o_amount=[AS($2, _UTF-16LE'o_amount')], o_currency=[AS($3, _UTF-16LE'o_currency')], o_secondary_key=[AS($4, _UTF-16LE'o_secondary_key')])
: : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
: +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try to add the uid, but it doesn't work in the current TemporalTableFunction implementation, it was wrapped into a BridgingSqlFunction (a subclass of calcite's SqlFunction which is not serializable) during the conversion, so give up the change for now.

@Override
public void finish() throws Exception {
${ctx.reuseFinishCode()}
super.finish();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the close sequence, it is relatively more safer (not always strictly) to handle the actions of the child class first and call the parent public logic last. Btw, I changed the current close call at first but reverted it for focusing on this pr itself.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

@JingsongLi JingsongLi merged commit 1f2001c into apache:master Oct 8, 2022
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants