-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-15912][table] Add Context to TableSourceFactory and TableSinkFactory #11047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 86c4939 (Mon Feb 10 08:03:11 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
| CatalogTable getTable(); | ||
|
|
||
| /** | ||
| * @return readable config of this table environment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a description that the configuration gives the factory instance the ability to access TableConfig#getConfiguration() which holds the current TableEnvironment session configurations.
| * @param context context of this table sink. | ||
| * @return the configured table sink. | ||
| */ | ||
| default TableSink<T> createTableSink(Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we deprecate the other createTableSink and createTableSource interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know... we can deprecate createTableSink(ObjectPath, CatalogTable). CC: @twalthr
| return Optional.empty(); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make sure TableSinkFactory#createTableSink(context) is the only entry to be invoked by the planner. However, the above createTableSinkForCatalogTable method still calling the old createTableSink method in old planner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll push another commit to Support create table source/sink by context in legacy planner
| def testTableSourceSinkFactory(): Unit = { | ||
| val factory = new TestContextTableFactory | ||
| util.tableEnv.getConfig.getConfiguration.setBoolean(factory.needContain, true) | ||
| util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only guarantees one of the paths, i.e. via TableFactoryUtil#createTableSinkForCatalogTable, which is only used by Hive. However, the another path is not covered, i.e. via CatalogSourceTable#findAndCreateTableSource which is used by most users.
Maybe you can upgrade the existing TestCollectionTableFactory to use the new interface and expose a session configuration, e.g. collection.is-bounded, the flag will be accessed via Context#getConfiguration and pass to the created CollectionTableSource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, but I want to modify this step by step.
The previous method createTableSink(ObjectPath tablePath, CatalogTable table) is only work in createTableSinkForCatalogTable too.
after these commits looks good to you, I will create following commit to fix this.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But without a test to cover the new interface, we have no idea whether it works as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? Is this not a test cover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned above, this test doesn't cover another path, i.e. CatalogSourceTable#findAndCreateTableSource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll modify these things by multi commits.
18beb9a to
a9125de
Compare
|
We can wait this one: #11055 , to be more cleaner. |
8c7c673 to
b1f4018
Compare
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @JingsongLi . It looks good in general. I left some minor comments.
...link-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
Outdated
Show resolved
Hide resolved
...able/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
Outdated
Show resolved
Hide resolved
...able/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
Show resolved
Hide resolved
...able/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
Show resolved
Hide resolved
| */ | ||
| @Override | ||
| default TableSource<T> createTableSource(Context context) { | ||
| return createStreamTableSource(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should validate the return value of createStreamTableSource is not null. If is null, we should throw an exception to indicate users to implement createTableSource(context), because createStreamTableSource(Map) is default implemented now, and users may not override it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indicate users to implement createStreamTableSource(context)?
| * @param context context of this table source. | ||
| * @return the configured table source. | ||
| */ | ||
| default StreamTableSource<T> createStreamTableSource(Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somebody implements StreamTableSourceFactory looks like need this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But he can still use the TableSource<T> createTableSource(context) interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if user use TableSource, there is no need to use StreamTableSourceFactory..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. StreamTableSourceFactory should be deprecated. It is confused why there is a StreamTableSourceFactory and TableSourceFactory. But for compatibility, users can still use StreamTableSourceFactory but indicate users to use TableSource<T> createTableSource(context).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will remove this method.
...er-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableFactoryTest.scala
Outdated
Show resolved
Hide resolved
...blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
Outdated
Show resolved
Hide resolved
...link-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala
Outdated
Show resolved
Hide resolved
...link-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala
Outdated
Show resolved
Hide resolved
b1f4018 to
3428261
Compare
2db8738 to
daf7fda
Compare
…y context in blink planner
…ext in legacy planner
daf7fda to
eba92d7
Compare
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Will merge it after tests passed in my branch.
What is the purpose of the change
Discussion in: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-TableFactory-td36647.html
Vote in: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Improve-TableFactory-to-add-Context-td37211.html
Motivation:
Now the main needs and problems are:
Connector can't get TableConfig[1], and some behaviors really need to be
controlled by the user's table configuration. In the era of catalog, we
can't put these config in connector properties, which is too inconvenient.
A context class also allows for future modifications without touching the TableFactory interface again.
Brief change log
Verifying this change
SinkTest.testTableSourceSinkFactoryDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation