-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-24388][table] Modules can provide a table source/sink factory #17384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
*/ | ||
public static DynamicTableSource createTableSource( | ||
@Nullable Catalog catalog, | ||
public static DynamicTableSource createSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I shortened the name here because simply adding an overload (replacing the first argument) would be a breaking change when null
is passed (as it would cause an ambiguity).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name it createDynamicTableSource
instead? Source
sounds like the core Source
abstraction.
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 607bc15 (Wed Sep 29 12:23:54 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
607bc15
to
53268da
Compare
53268da
to
999215d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @Airblader. I added some comments. Please also update each commit message with a component tag that the commit mostly touches.
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
Show resolved
Hide resolved
*/ | ||
public static DynamicTableSource createTableSource( | ||
@Nullable Catalog catalog, | ||
public static DynamicTableSource createSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name it createDynamicTableSource
instead? Source
sounds like the core Source
abstraction.
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
Show resolved
Hide resolved
...planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java
Outdated
Show resolved
Hide resolved
...lanner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
Outdated
Show resolved
Hide resolved
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
Outdated
Show resolved
Hide resolved
999215d
to
7af1ce3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More comments ;-)
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
Outdated
Show resolved
Hide resolved
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
Show resolved
Hide resolved
7af1ce3
to
1c3a1b0
Compare
1c3a1b0
to
f26b757
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I had one last minor thing but can do this while merging once the build is green.
boolean upsertMaterialize) { | ||
final DynamicTableSink tableSink = tableSinkSpec.getTableSink(); | ||
final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner); | ||
final ChangelogMode inputChangelogMode = tableSink.getChangelogMode(changelogMode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, but I still fine this confusing. Shouldn't this variable be called changelogMode
and the member inputChangelogMode
What is the purpose of the change
This introduces
Module#getTableSourceFactory
andModule#getTableSinkFactory
, which allow providing a corresponding factory when sources/sinks are created. The precedence is as follows:It may seem more appropriate to give modules the higher precedence, but in practice it's more useful to prefer the specificity of the catalog and have the module act as a fallback (that can override the discovery process fallback).
The changes in
FactoryUtil
have been kept backwards-compatible, but the old methods have been deprecated.Brief change log
ModuleManager
inFlinkContext
Verifying this change
(Please pick either of the following options)
Compatibility is covered through existing tests. A new
ModuleITCase
has been added to verify the new module APIs.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation