-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25990][table] Expose uid generator for DataStream/Transformation providers #18667
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 8926e35 (Tue Feb 08 16:39:17 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
db2efa4
to
4af7f82
Compare
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @slinkydeveloper. So far we have not added a single test to the uid business. I'm wondering whether it makes sense to have a temporary solution by adapting org.apache.flink.table.planner.utils.JsonPlanTestBase#compileSqlAndExecutePlan
and traverse the transformations tree to check if all transformations have a uid assigned. What do you think?
...nk-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
Outdated
Show resolved
Hide resolved
...nner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
Outdated
Show resolved
Hide resolved
...nner/src/main/java/org/apache/flink/table/planner/connectors/TransformationSinkProvider.java
Outdated
Show resolved
Hide resolved
Transformation<?> createTransformation(Context context); | ||
|
||
/** Context for {@link #createTransformation(Context)}. */ | ||
interface Context { | ||
interface Context extends ProviderContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to TransformationProviderContext
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this already implied by the fact that this context is within TransformationSinkProvider
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, but if you implement a connector, you have a couple of Context
classes and the code could become quite messy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then the solution is just to don't have static import for Context
?
@@ -93,39 +98,40 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { | |||
|
|||
@Override | |||
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { | |||
return (DataStreamSinkProvider) | |||
inputStream -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we break the API for lambdas now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes 😢 the only way to not break it is to not default implement the new method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes 😢 the only way to not break it is to not default implement the new method
...e-planner/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSource.java
Outdated
Show resolved
Hide resolved
...e-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
Outdated
Show resolved
Hide resolved
...e-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
Outdated
Show resolved
Hide resolved
…xt for generating uid and use it in DataStreamScanProvider, DataStreamSinkProvider, TransformationScanProvider, TransformationSinkProvider Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
4af7f82
to
f8e3980
Compare
…ormation uids Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
f8e3980
to
ca7a596
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @slinkydeveloper. I added the last comments. Should be good in the next iteration.
@@ -71,6 +71,7 @@ public void testDeduplication() throws Exception { | |||
tableEnv.getConfig() | |||
.getConfiguration() | |||
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); | |||
checkTransformationUids(compiledPlan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why not to use compileSqlAndExecutePlan
? I think the configuration can also be set before without side effects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly didn't knew if this affects the plan or not, so i defaulted to just keeping the code as it is. I can change it if you want
} | ||
|
||
@Test | ||
public void testBatchTransformationScanProvider() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the tests are important, they test the propagation of boundedness. please readd them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, we can remove them as they're only testing mocks now
Transformation<?> createTransformation(Context context); | ||
|
||
/** Context for {@link #createTransformation(Context)}. */ | ||
interface Context { | ||
interface Context extends ProviderContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, but if you implement a connector, you have a couple of Context
classes and the code could become quite messy.
Signed-off-by: slinkydeveloper <francescoguard@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ng uids in DataStream/Transformation Scan/SinkProvider This might break existing implementations if the provider was implemented as a Java lambda. Please note that the interface was not annotated as a @FunctionalInterface. This closes apache#18667.
…ng uids in DataStream/Transformation Scan/SinkProvider This might break existing implementations if the provider was implemented as a Java lambda. Please note that the interface was not annotated as a @FunctionalInterface. This closes apache#18667.
What is the purpose of the change
Expose uid generator for DataStream/Transformation providers. Also update collect, print, kafka, hive and files connectors to properly set up uids for generated DataStreams/Transformations