-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-14490][table] Add methods for interacting with temporary objects #9971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 87124ec (Wed Dec 04 14:50:47 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for 3424926
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some feedback for eab35b0.
...able-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/CalciteParser.scala
Outdated
Show resolved
Hide resolved
...able-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/CalciteParser.scala
Outdated
Show resolved
Hide resolved
...-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
Outdated
Show resolved
Hide resolved
" 'connector' = 'kafka', \n" + | ||
" 'kafka.topic' = 'log.test'\n" + | ||
")\n"; | ||
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the planner by dialect still necessary if we also create a parser by dialect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It uses the dialect to create a parser for view expansion. (though we do not support views expansion yet).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments for b4030de
.../flink-table-api-java/src/main/java/org/apache/flink/table/catalog/UnresolvedIdentifier.java
Outdated
Show resolved
Hide resolved
.../flink-table-api-java/src/main/java/org/apache/flink/table/catalog/UnresolvedIdentifier.java
Outdated
Show resolved
Hide resolved
.../flink-table-api-java/src/main/java/org/apache/flink/table/catalog/UnresolvedIdentifier.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments for 4e9ee4e.
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
Outdated
Show resolved
Hide resolved
...able-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/ParserImpl.scala
Outdated
Show resolved
Hide resolved
...able-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/ParserImpl.scala
Outdated
Show resolved
Hide resolved
|
||
class ParserImpl( | ||
catalogManager: CatalogManager, | ||
validatorProvider: () => FlinkPlannerImpl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plannerProvider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the context of the parser it serves only as a validator. We do not use the toRel
conversion part. Ideally it would be best to further split the FlinkPlanner class into subclasses, but did not have enough time.
...able-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/ParserImpl.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// wrapper contains only sink (not source) | ||
ConnectorCatalogTable sourceAndSink = ConnectorCatalogTable | ||
.sourceAndSink(sourceSinkTable.getTableSource().get(), tableSink, !IS_STREAM_TABLE); | ||
catalogManager.alterTable(sourceAndSink, getTemporaryObjectIdentifier(name), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alterTable
is not used anymore, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, alterTable works with permanent table. We do not have an API for that.
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Show resolved
Hide resolved
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Outdated
Show resolved
Hide resolved
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Outdated
Show resolved
Hide resolved
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Outdated
Show resolved
Hide resolved
...planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
Outdated
Show resolved
Hide resolved
...planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
Outdated
Show resolved
Hide resolved
...-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java
Outdated
Show resolved
Hide resolved
...lanner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
Outdated
Show resolved
Hide resolved
Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null); | ||
assertNotNull(catalog); | ||
catalog.createTable( | ||
ObjectPath.fromString("default_database.T1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side comment: we should get rid of this method, it doesn't even support the identifier escaping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should improve the docs a bit further in 7d6721c.
...ble-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
Outdated
Show resolved
Hide resolved
...ble-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
Outdated
Show resolved
Hide resolved
...ble-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
Outdated
Show resolved
Hide resolved
...ble-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
Outdated
Show resolved
Hide resolved
...le-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
Outdated
Show resolved
Hide resolved
...le-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
Outdated
Show resolved
Hide resolved
...k-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One little comment for 90bf886
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some suggestions for 64d8ca6
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some feedback for 754bf46.
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
Outdated
Show resolved
Hide resolved
...-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/CalciteParser.scala
Outdated
Show resolved
Hide resolved
...nk-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
Outdated
Show resolved
Hide resolved
...-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
Outdated
Show resolved
Hide resolved
...-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
Show resolved
Hide resolved
Thanks for this important API refactoring. The code changes look mostly good but we should definitely further improve the JavaDocs to avoid any confusion for our existing and future users. We should also update the website documentation and add good release notes to JIRA when closing this issue. This change will annoy existing users. We need to communicate the reasons for these changes. |
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Outdated
Show resolved
Hide resolved
...e/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
Outdated
Show resolved
Hide resolved
...e/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
Outdated
Show resolved
Hide resolved
Thank you @bowenli86 @twalthr @zjuwangg for the reviews. I did first pass over your comments. Still need to apply a few remaining comments. |
ce946ec
to
0fbaa98
Compare
Hey @bowenli86 could you have another pass over the PR. I think @twalthr will not be able to do it any time soon. I tried to address all of his comments as well as yours and @zjuwangg. |
private Optional<TableLookupResult> getPermanentTable(ObjectIdentifier objectIdentifier) | ||
throws TableNotExistException { | ||
Catalog currentCatalog = catalogs.get(objectIdentifier.getCatalogName()); | ||
ObjectPath objectPath = new ObjectPath( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this call can be saved by calling objectIdentifier.toObjectPath()
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Show resolved
Hide resolved
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Show resolved
Hide resolved
...-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
Show resolved
Hide resolved
...lanner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
Show resolved
Hide resolved
...lanner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
Show resolved
Hide resolved
* <p>The field names of the {@link Table} are automatically derived | ||
* from the type of the {@link DataSet}. | ||
* | ||
* <p>The view is registered in the current catalog and database. To register the view in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment can be a bit misleading as the view is registered actually only in the namespace of current catalog and db, but isn't physically located in the catalog/db, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to The view is registered in the namespace of the current catalog and database.
return doDropTemporaryTable(identifier, (table) -> table instanceof CatalogView); | ||
} | ||
|
||
private boolean doDropTemporaryTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the API naming of "doXxx()" feels a bit weird. rename to dropTemporaryTableInternal()
?
* does not exist. | ||
*/ | ||
public void dropTable(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { | ||
if (temporaryTables.containsKey(objectIdentifier)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this behavior feels a bit abnormal to me. Why cannot users drop a catalog table when a temp table with same name space exist?
Table and temp table creation and deletion should be independent of each other, e,g, from sql perspective, users should be able to run "CREATE/DROP TABLE" and "CREATE/DROP TEMP TABLE" independently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious about this, I took a look to spark:
* If a database is specified in `name`, this will drop the table from that database.
* If no database is specified, this will first attempt to drop a temporary view with
* the same name, then, if that does not exist, drop the table from the current database.
Curious about other databases, I don't know how they behave.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark does not have 3-part qualified temporary objects. We followed MySQL on that topic.
This behavior is introduced to prevent unintentional drops of temporary tables, which might have very costly undesirable effects. Users are usually accustomed with DROP TABLE
, but not necessarily with DROP TEMPORARY TABLE
. If a user issues a DROP TABLE
with the intent of dropping the temporary table it will actually drop in a drop of the permanent one.
Dropping tables is a very invasive operation. I tried to design the behaviour in a way to force the user make a really conscious decision on which table should be dropped.
BTW, as a side note, this behaviour is described in the FLIP-64. There were no objections towards that behaviour during the discussion.
Hi Dawid, I was only able to review the last five commits as I don't really know much about planner. This PR, as well as several individual commits, is quite huge, and it makes really hard to review and follow. Would be nice to break such good amount of work down into several smaller PRs next time. would be good to have your reviews if you have time. @KurtYoung @JingsongLi @xuefuz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments for 988bd85
...k-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ble-api-java/src/main/java/org/apache/flink/table/operations/CatalogSinkModifyOperation.java
Outdated
Show resolved
Hide resolved
...table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
Outdated
Show resolved
Hide resolved
...k-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one question for b426a97
case e: CSqlParseException => | ||
throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e) | ||
} | ||
val sqlNode: SqlNode = parser.parse(queryString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended to re-use parser here? I noticed in PlannerBase::parser
you pointed out that you didn't want to reuse parser since config might changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because the whole FlinkPlannerImpl
is used only once per planning session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks dawid, left some comments.
.toJava(scanInternal(Array(name)).map(t => new TableReferenceExpression(name, t))) | ||
.toJava( | ||
Try({ | ||
val unresolvedIdentifier = UnresolvedIdentifier.of(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use Try
? If it is an illegal name, should we throw exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gives positive results when resolving IN
expression. But will be invoked for all UnresolvedExpressions
. That's why it might not necessary be a table identifier. The contract of the ExpressionResolver
is that it actually never throws exception.
As a side note. The way TableIdentifier
is currently implemented is actually wrong. This method is primarily for resolving SELECT * FROM t WHERE f IN (tableName)
, which is actually incorrect SQL statement. It is not allowed to use identifiers in an IN
clause. Therefore once we introduce the Java Expression DSL we should get rid of this catalog lookup for a TableReference
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. You can put these comments to code.
val unresolvedIdentifier = UnresolvedIdentifier.of(parser.parseIdentifier(path).names: _*) | ||
scanInternal(unresolvedIdentifier) match { | ||
case Some(table) => createTable(table) | ||
case None => throw new TableException(s"Table '$path' was not found.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a unify exception message? Use unresolvedIdentifier
?
Map<String, String> properties = new HashMap<>(toProperties()); | ||
schemaProperties.keySet().forEach(properties::remove); | ||
|
||
CatalogTableImpl catalogTable = new CatalogTableImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalogTable
has partitionKeys
too, consider add partition keys in CatalogTableImpl.toProperties
and parse partition keys here?
(We can add later too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree we should do that. I think we can do that in a follow-up? If I understand it correctly it will still use the ProjectableTableSource
if we do not expose the partitions through the CatalogTable
right?
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
After https://issues.apache.org/jira/browse/FLINK-14381 , we removed partition support for temporary table for clean up PartitionableTableSource/Sink
.
I created JIRA to track it: https://issues.apache.org/jira/browse/FLINK-14543 , will fixed it in 1.10.
Map<String, String> properties = new HashMap<>(toProperties()); | ||
schemaProperties.keySet().forEach(properties::remove); | ||
|
||
CatalogTableImpl catalogTable = new CatalogTableImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use CatalogTableBuilder
here?
Looks like there some bugs in CatalogTableBuilder
, it not remove schemaProperties
in properties
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use the CatalogTableBuilder
, yes. Wasn't aware of the class. I am not sure though, if we need the CatalogTableBuilder
class in a long run. I think the functionality is actually duplicated with the one in ConnectTableDescriptor
.
* does not exist. | ||
*/ | ||
public void dropTable(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { | ||
if (temporaryTables.containsKey(objectIdentifier)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious about this, I took a look to spark:
* If a database is specified in `name`, this will drop the table from that database.
* If no database is specified, this will first attempt to drop a temporary view with
* the same name, then, if that does not exist, drop the table from the current database.
Curious about other databases, I don't know how they behave.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment for 77880fc
.../flink-table-api-java/src/main/java/org/apache/flink/table/catalog/UnresolvedIdentifier.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some comment for Parser's interface: dfc8f07
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
Show resolved
Hide resolved
I tried to address all your comments @KurtYoung @JingsongLi @bowenli86 Could you have another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dawidwys for this big effort, it indeed cleaned lots of APIs. The changes LGTM now and +1 to merge.
Thanks for the reviews. Merging |
What is the purpose of the change
This PR adds methods for interacting with temporary objects as described in FLIP-64:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
Verifying this change
This change added tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation