-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-11476][table] Create CatalogManager to manage multiple catalogs #8404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
@flinkbot approve-until consensus |
f839911 to
e58c242
Compare
|
fyi: @bowenli86 @xuefuz |
| * @return the current default catalog that is used for path resolution | ||
| * @see TableEnvironment#setCurrentCatalog(String) | ||
| */ | ||
| String getCurrentCatalogName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I wonder if we just name it getCurrentCatalog(), which returns the name of the current catalog. (Same for get current database name. I understand that the given name is more explicit, but getCurrentCatalog() is a mirror of the set method below and is more consistent with catalog APIs in which, every get (table, database) is named getXxx() instead of getXxxName().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also in favor of being consistent. If we have a setCurrentCatalog, we should have a getCurrentCatalog. On the other hand, I also find it confusing that "catalog" sometimes describes a catalog instance and sometimes a catalog name. However, the SQL standard does the same with USE DATABASE. How about we also introduce useCatalog and useDatabase?
| * when looking for unqualified object names. | ||
| * | ||
| * <p>This is used during resolution of object paths. The default path is constructed as | ||
| * {@code [current-catalog].[current.database]}. During the resolution, first we try to look for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be [current-database]?
|
|
||
| @Override | ||
| public Map<String, String> getProperties() { | ||
| throw new UnsupportedOperationException("Calcite table cannot be expressed as a map of properties."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better just to return an empty map instead of throwing an exception. By the definition, getProperties() return any additional properties a table might have. It doesn't mean the property form of the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this method represents a serialized (toProperties) representation of the Table, but apparently I was wrong. I will fix this, as I was explained those are just additional properties.
|
|
||
| @Override | ||
| public CatalogBaseTable copy() { | ||
| return this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we throw an exception here because we are not able to provide a copy. Return just this might be dangerous because caller assumes a copy and make changes to the returned object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this would be better. Unfortunately this makes it impossible to register the table in the catalog as it creates a copy on either createTable or getTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind then.
|
|
||
| @Override | ||
| public RelProtoDataType getType(String name) { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning "null" means no type with the given name is found. I'm not sure if that's desired, but feel free to add an TODO item for this if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK we do not support user defined types to be stored in Catalog, do we? That's why I think it is desired to return null here until we do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this was to resolve any type in the type system and I saw in Blink something other than null was returned. Anyway, I wasn't clear what this API is for.
| * @return the current default catalog | ||
| * @see CatalogManager#resolveTable(String...) | ||
| */ | ||
| public String getCurrentCatalogName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just getCurrentCatalog()?
| * @return the current default database | ||
| * @see CatalogManager#resolveTable(String...) | ||
| */ | ||
| public String getCurrentDatabaseName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
| if (table instanceof CalciteCatalogTable) { | ||
| return ((CalciteCatalogTable) table).getTable(); | ||
| } else { | ||
| throw new TableException("Unsupported table type: " + table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about CatalogTable and CatalogView types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the CatalogTable we need to agree how do we retrieve/create TableSourceFactory, shall we get it from the catalog or via service discovery.
For a CatalogView we can add it already, but in this PR I wanted to focus on just the existing functionalities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay.
|
|
||
| @Override | ||
| public RelProtoDataType getType(String name) { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
| case _ => | ||
| } else { | ||
| throw new ValidationException( | ||
| "External catalog table does not support the current environment for a table source.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message might need to change because it covers two cases: isBatch==true && externalTable.isStreamTable, and isBatch==false && externalTable.isBatchTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I think this message applies to both of those. BTW the logic has not changed, just the type of the flag has changed.
xuefuz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dawidwys Thanks for the PR. I have posted some minor comments for consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering how much value we bring by retaining original ExternalCatalog stuff because 1, it's never used anywhere 2. the implementation (such as table resolution, table manage implementation) would be much simplified if without it. I'd like to hear alternative thoughts though.
bowenli86
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @dawidwys! I've reviewed till DatabaseCalciteSchema.java, will continue reviewing later.
My biggest concern, as we chatted before, is that there's so much overhead in making CatalogManager support the old ExternalCatalog. In the CatalogManager itself, this PR adds several new APIs that are immediately annotated as "Deprecated". The table resolution and calcite schema mapping is also quite complicated and prone to error because of that.
Thus, I doubt whether it's worth making CatalogManager support old ExternalCatalog. Given that the official user of ExternalCatalog in previous release seems to be only SQL CLI and SQL CLI is marked as a beta feature, I think dropping support for ExternalCatalog is reasonable. It will save us quite some bandwidth from developing, testing, and maintaining it and work on higher priority features. What do you think? @xuefuz @twalthr
| /** | ||
| * Gets a registered {@link Catalog} by name. | ||
| * | ||
| * @param catalogName The name to look up the {@link Catalog} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "the name ..."
| public static final String DEFAULT_DB = "default"; | ||
|
|
||
| private String currentDatabase = DEFAULT_DB; | ||
| private String currentDatabase; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rebase the in-memory catalog to #8390 ?
| import java.util.Set; | ||
|
|
||
| /** | ||
| * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"...access tables and views in SQL queries without registering them in advance"?
|
|
||
| /** | ||
| * A CatalogManager that encapsulates all available catalogs. It also implements the logic of | ||
| * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Catalog", not "ReadableCatalog"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add )
|
|
||
| private String currentDatabaseName; | ||
|
|
||
| public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate defaultCatalogName and defaultCatalog?
| * @throws ExternalCatalogAlreadyExistException thrown if the name is already taken | ||
| * @deprecated {@link ExternalCatalog} APIs will be dropped | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's a bit weird to add a new API and immediately marked as “Deprecated”
| } | ||
|
|
||
| /** | ||
| * Sets the current default catalog name that will be used when resolving table path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * Sets the current default catalog name that will be used when resolving table path. | |
| * Sets the current default database name that will be used when resolving table path. |
| currentDatabaseName = databaseName; | ||
|
|
||
| LOG.info( | ||
| "Sets the current default catalog as '{}' and the current default database as '{}'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we rephrase the logging msg to be different than that in "setCurrentCatalog"? because 1) it doesn't actually set the current catalog, 2) to differentiate from calling of "setCurrentCatalog"
can be sth like "Sets the current default database as '{}' in the current default catalog '{}'"?
| * @return {@link CatalogTableOperation} containing both fully qualified table identifier and its | ||
| * {@link TableSchema}. | ||
| */ | ||
| public Optional<CatalogTableOperation> resolveTable(String... tablePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems resolving tables registered either in Catalogs or ExternalCatalogs. What about those registered in neither of them, such as those registered by registerTable()?
Also, it's unclear where this method is going to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side comment: In the end we should be able to register everything in the new catalog APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As Timo said, all objects in the end should be registered in one of the Catalogs. Right now we maintain the ExternalCatalogs just for backwards compatibility.
This method is used whenever we have to look up any object exclusively in the API module. This means e.g. in the TableEnvironment#scan method.
bowenli86
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I finished 1st round of review on all the 4 commits.
This PR seems to focus on only integrating CatalogManager with flink-table-planner, but not with flink-table-planner-blink. I wonder what's the reason?
| // set the executor to evaluate constant expressions | ||
| .executor(new ExpressionReducer(config)) | ||
| .build | ||
| private val BUILTIN_CATALOG_NAME = "builtin" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does TableEnvironment need to know the default catalog's name in CatalogManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does CatalogManager needs to know about it? This is just a initial value for the current catalog, so I think it fits better to the TableEnvironment rather than CatalogManager.
| * <p>It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 to Calcite's schema. | ||
| */ | ||
| @Internal | ||
| public class CatalogManagerSchema implements Schema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use a better name for this class? Not sure if we should support ExternalCatalog yet, but even if we do, its naming has been a bit confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe CatalogManagerCalciteSchema? to be in sync with DatabaseCalciteSchema?
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for 4250798
| return new FlinkRelBuilder(context, cluster, relOptSchema, expressionBridge); | ||
| } | ||
|
|
||
| /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update comment here and below to Java.
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @dawidwys. I added some feedback. In general, I like the design and tests. My biggest concern is the exception handling that looks inconsistent to me right now.
@bowenli86 We had the discussion about API breaking changes in the past. We agreed on at least keep old interfaces for one or two releases. So that users have an alternative for migration. The external catalog interface was added a long time ago and it was used not only in the SQL Client but was available through the API.
| "TableSourceSink"); | ||
| "builtin.default.TableNumber1", | ||
| "builtin.default.TableNumber2", | ||
| "builtin.default.TableSourceSink"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also test what happens if one of the components contains a space?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One additional comment: could we rename the default names to something more meaningful such as: default-catalog.default-database.table
| * @param catalog the catalog to register | ||
| * @throws CatalogAlreadyExistsException thrown if catalog with given name already exists | ||
| */ | ||
| void registerCatalog(String name, Catalog catalog) throws CatalogAlreadyExistsException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: catalogName
| * <p>A table to scan must be registered in the TableEnvironment. It can be either directly | ||
| * registered or be a member of an {@link ExternalCatalog}. | ||
| * <p>A table to scan must be registered in the {@link TableEnvironment}. It can be either directly | ||
| * registered or be a member of an {@link ExternalCatalog} or {@link Catalog}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove external catalog already in the comments.
| * <p>A table to scan must be registered in the {@link TableEnvironment}. It can be either directly | ||
| * registered or be a member of an {@link ExternalCatalog} or {@link Catalog}. | ||
| * | ||
| * <p>First we try to look for {@code [default-path].[table-path]} if no object is found we assume the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an example here. How can a default-path and table-pathlook like? And you mention object path and object, I think this is unrelated in this method.
| * @return the current default catalog that is used for path resolution | ||
| * @see TableEnvironment#setCurrentCatalog(String) | ||
| */ | ||
| String getCurrentCatalogName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also in favor of being consistent. If we have a setCurrentCatalog, we should have a getCurrentCatalog. On the other hand, I also find it confusing that "catalog" sometimes describes a catalog instance and sometimes a catalog name. However, the SQL standard does the same with USE DATABASE. How about we also introduce useCatalog and useDatabase?
| * Tables are registered as tables in the schema. | ||
| */ | ||
| class DatabaseCalciteSchema implements Schema { | ||
| private final String dbName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use a consistent naming databaseName
| * <p>It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 to Calcite's schema. | ||
| */ | ||
| @Internal | ||
| public class CatalogManagerSchema implements Schema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe CatalogManagerCalciteSchema? to be in sync with DatabaseCalciteSchema?
| throw new TableException("Unsupported table type: " + table); | ||
| } | ||
| } catch (Exception e) { | ||
| throw new TableException("Could not find table: " + tableName, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception design is very inconsistent. In theory we could also use TableNotExistException here. But actually this would be more like a ValidationException, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we should never end up here with TableNotExistException as we are checking catalog.tableExist() before. That's why I went with TableExceptin here.
I added appropriate comment there.
| break | ||
| } | ||
| else if (expectedLine != actualLine) { | ||
| } else if (!verifyCatalogPath && actualLine.contains("table=[[")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain these changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not want to change all of the tests. That's why I added an option to strip the default catalog & database during the validation.
| return new DatabaseCalciteSchema(schemaName, catalog); | ||
| } else { | ||
| LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); | ||
| throw new CatalogException(new DatabaseNotExistException(catalogName, schemaName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we return null here?
|
@bowenli86 The goal is to have a common At some point though we have to figure out how do we want to share e.g. the |
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
Outdated
Show resolved
Hide resolved
| public Optional<CatalogTableOperation> resolveTable(String... tablePath) { | ||
| checkArgument(tablePath != null && tablePath.length != 0, "Table path must not be null or empty."); | ||
|
|
||
| List<String> defaultPath = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems assume that user input cannot be "[current_db].[table_name]" for new Catalogs and doesn't handle that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also spotted that and I'm currently fixing it.
| } | ||
|
|
||
| private Optional<TableSchema> lookupCatalogTable(List<String> path) throws TableNotExistException { | ||
| if (path.size() >= 3) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got a bit confused here: why can size of a valid path be larger than 3 for catalog table name resolution?
And shall we validate the size of input with checkArgument rather than swallowing all kinds of path?
| String tableName = String.join(".", path.subList(2, path.size())); | ||
| ObjectPath objectPath = new ObjectPath(currentDatabaseName, tableName); | ||
|
|
||
| if (currentCatalog != null && currentCatalog.tableExists(objectPath)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can check whether the current catalog exists right after getting it, and not proceed with extra steps
| } | ||
| } | ||
|
|
||
| private Optional<TableSchema> lookupCatalogTable(List<String> path) throws TableNotExistException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is TableNotExistException thrown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In currentCatalog.getTable(objectPath)
| * @param catalogName The name of the catalog to set as the current default catalog. | ||
| * @throws CatalogException thrown if a catalog with given name could not be set as the default one | ||
| */ | ||
| void useCatalog(String catalogName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add the exception (even though it's runtime exception) to signature to inform users this may throw exception, we've done that for catalog APIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is against majority (if not all) code style guidelines. The throws cause should declare only those exception that we expect the user to handle.
BTW, me & @twalthr had a chat few times before and we fill we should revisit the exception structure in the catalog API anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding CatalogException to signature is actually suggested by Timo at #8007 (comment) .
I agree we should revisit the exceptions as I too find it inconsistent and problematic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I still support my comment there. For implementers of the interface, I think it is an important information that all methods could throw a CatalogException if the communication fails. But we don't need to expose it to the user in the API.
Right now we have a binary exception structure (ValidationException if user did something wrong, TableException if something general went wrong). It should serve most use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment can be a bit misunderstanding. Yeah, my point is we just need to be consistent with how it's exposed. An example is that now, contrary to useCatalog(), Catalog's APIs not only document CatalogException in javadoc but also throws CatalogException in their signatures.
Seems that we all agree upon the javadoc part. What do you guys think of the signature part? If we all feel it's not necessary to expose it to users in the API, I feel it's better to remove throws CatalogException from Catalog's API signatures to be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion here. Personally, I would add it to the signature if it is an important interface to highlight the exception. I mean there is a reason why Java also support declaring runtime exceptions explicitly. But I would definitely not bubble the runtime exception up in the stack of signatures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think the root cause here is that CatalogException is not a checked exception. I suggested it to handle communication failures similar to a IOException.
| * @param databaseName The name of the database to set as the current database. | ||
| * @throws CatalogException thrown if the given catalog and database could not be set as the default ones | ||
| */ | ||
| void useDatabase( |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
Can we add to the deprecated APIs some comments of which version they can be dropped? e.g. "Can be dropped after release 1.10" |
d0518eb to
a563d1c
Compare
bowenli86
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dawidwys , I left some further comments to consider
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
||
| import static java.lang.String.format; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we revert the newly added static import besides Preconditions? I think we should avoid static import as much as possible as they make the code not noticeably shorter but less readable and maintainable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should only allow two cases to use static import: a) static constants b) extraordinarily common static helpers that don't cause confusions at all for any java developers in some specific context, like assertXxxx in junit tests, and checkArgument/checkNotNull for validating input args.
When I read this code, it indeed confused me that I had to look around for where the format() and asList() come from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should apply common sense here. In my experience it makes it much cleaner when we statically import well known methods or constants (like e.g. String.format()/Assert.assertTrue`Collectors.toList()` etc.). There is very little chance anyone will confuse
CatalogException(format(
"A database with name [%s] does not exist in the catalog: [%s].",
databaseName,
currentCatalogName))
with any other format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rephrased my previous comment and I think we agree in the principal. I guess the only difference is if we assume String#format and Arrays#asList to be well known by majority of java developers.
They really broadly used in Flink's code base(and not only in Flink's) and I truly believe they are rarely confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, after a quick scan to be remain consistent with other places, I will change it.
| List<String> path = new ArrayList<>(prefix); | ||
| path.addAll(userPath); | ||
|
|
||
| Optional<CatalogTableOperation> potentialTable = lookupCatalogTable(path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can check whether the path is of size 3 before calling lookupCatalogTable() because we are fully sure that catalog table's full path must be of size 3.
And in lookupCatalogTable(), we can change the check from if (path.size() == 3) to checkArgument(path.size() == 3) and add a comment to let callers be aware that they should make sure their input args are valid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
External catalogs support arbitrary nesting, so we are not sure about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which part we are not sure about? isn't table lookup for external catalogs in lookupExternalTable rather than lookupCatalogTable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I mean is, code can be written as
private Optional<CatalogTableOperation> lookupPath(...) {
...
Optional<CatalogTableOperation> potentialTable
// if the path is not of size 3, we don't even need to bother with looking it up in catalogs
if (path.size() == 3) {
potentialTable = lookupCatalogTable(path);
}
if (!potentialTable.isPresent()) {
potentialTable = lookupExternalTable(path);
}
return potentialTable
}
private Optional<CatalogTableOperation> lookupCatalogTable(List<String> path) {
checkArgument(path.size() == 3, "xxx")
try {
...
} catch () {}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I misunderstood you.
I don't like this approach as this way both lookupPath and lookupCatalogTable validate structure of the Catalog path. In my approach the whole logic is enclosed in a single lookupCatalogTable method.
This is not a performance critical code. We do not need to optimize the number of function calls, but should rather optimize for readability and separation of concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need to do the checkArgument but I do think the way I suggested for lookupPath is more readable because readers can immediately understand that catalog table path should be of size 3.
I'll leave this up to you
|
|
||
| private Optional<TableSchema> lookupCatalogTable(List<String> path) throws TableNotExistException { | ||
| if (path.size() >= 3) { | ||
| private Optional<CatalogTableOperation> lookupCatalogTable(List<String> path) throws TableNotExistException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the signature has been confusing. Returning an Optional implies that it will be empty if the operation doesn't find the target, but the method is also throwing TableNotExistException when the target table is not found.
I'd suggest try-catch TableNotExistException in lookupCatalogTable() and, in case of it, return an empty Optional. And remove the try-catch in lookupPath()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
| if (path.size() == 3) { | ||
| Catalog currentCatalog = catalogs.get(path.get(0)); | ||
| String currentDatabaseName = path.get(1); | ||
| String tableName = String.join(".", path.subList(2, path.size())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that the path will be of size 3, tableName should just be path.get(2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
4f979a4 to
7aa0c68
Compare
| * Specifies the name of the default database in the initial catalog to be created when instantiating | ||
| * TableEnvironment. | ||
| */ | ||
| private String bultinDatabaseName = "default-database"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having '-' in the names might confuse the parser as it can be read as a numeric minus operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was also my first thought. We could use an underscore if that is better?
| .setCaseSensitive(caseSensitive) | ||
| .build(); | ||
|
|
||
| return new CatalogReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is taken care of somewhere else, but I'm wondering how we tell Calcite to look up in new catalogs that are registered later on in user's session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say this is the core difference from the previous PR.
We do not create a separate Calcite schema, but we use the CatalogManagerCalciteSchema as the root one (that's why we had to create the CalciteSchemaBuilder). At the same time CatalogManagerCalciteSchema is just a thin adapter around CatalogManager. That means any changes to CatalogManager are directly visible by Calcite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
xuefuz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good to me now. Just a couple of new comment/questions.
|
Just passed it with another round. LGTM +1 to merge |
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dawidwys. I added some final comments about the user-facing API.
| Class<?> clazz = Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl"); | ||
| Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class); | ||
| CatalogManager catalogManager = new CatalogManager( | ||
| tableConfig.getBultinCatalogName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: getBuiltInCatalogName and the same for database name
| * written. | ||
| */ | ||
| void insertInto(String tableName); | ||
| void insertInto(String... tablePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be insertInto(String tablePath, String... tablePathContinued) also other methods, to enforce at least one argument.
| * Specifies the name of the default database in the initial catalog to be created when instantiating | ||
| * TableEnvironment. | ||
| */ | ||
| private String bultinDatabaseName = "default-database"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was also my first thought. We could use an underscore if that is better?
|
|
||
| /** | ||
| * Specifies the name of the initial catalog to be created when instantiating | ||
| * TableEnvironment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: link to the class here and in other methods?
| * see {@link TableEnvironment#useDatabase(String, String)}. | ||
| * | ||
| * <p>This is used during the resolution of object paths. Both the catalog and database are optional | ||
| * when referencing catalog objects(tables, views etc.). The algorithm looks for requested objects in following |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catalog objects such as tables, views etc.
| * @param catalogName The name of the catalog to set as the current default catalog. | ||
| * @throws CatalogException thrown if a catalog with given name could not be set as the default one | ||
| */ | ||
| void useCatalog(String catalogName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I still support my comment there. For implementers of the interface, I think it is an important information that all methods could throw a CatalogException if the communication fails. But we don't need to expose it to the user in the API.
Right now we have a binary exception structure (ValidationException if user did something wrong, TableException if something general went wrong). It should serve most use cases.
| * @throws CatalogException thrown if the given catalog and database could not be set as the default ones | ||
| */ | ||
| void useDatabase( | ||
| String catalogName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this combining the functionality of useCatalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, sort of. Underneath they translate to:
useCatalog = CatalogManager.setCatalog
useDatabase = CatalogManager.setCatalog + CatalogManager.setDatabase
I think it makes sense to leave it as it is, as the alternative would be to provide just useDatabase(databaseName). I find this method error prone though. As the catalog name would be implicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This form is useful, as our DDL will have "use catalogName.databaseName" as well as "use databaseName". Additionally, it might make sense also provide useDatabase(databaseName), without which, user has to figure out the catalog name in order to switch databases in the current catalog.
| currentDatabaseName = potentialCurrentCatalog.getDefaultDatabase(); | ||
|
|
||
| LOG.info( | ||
| "Sets the current default catalog as [{}] and the current default database as [{}].", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Set the
| * Table source factory for testing. It creates a dummy {@link TableSource} | ||
| * that returns an empty {@link TableSchema}. | ||
| */ | ||
| public class TestExternalTableSourceFactory implements TableSourceFactory<Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I added this comment before. But why not using the existing testing table source factory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of the existing factories actually create a TableSource, whereas it is needed to get a TableSchema. This is required for the path resolution, as the output of a path resolution is the full resolved path and the TableSchema of the entry.
|
|
||
| // majority of tests did not assume existence of Catalog API. | ||
| // this enables disabling catalog path verification | ||
| val acutalWithAdjustedPath = if (!verifyCatalogPath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo
27dda5e to
1545c0b
Compare
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dawidwys. I had only minor comments. Maybe @sunjincheng121 can take a quick look at the python changes to verify if everything was integrated as expected.
flink-python/pyflink/table/table.py
Outdated
| """ | ||
| Writes the :class:`Table` to a :class:`TableSink` that was registered under | ||
| the specified name. | ||
| the specified name. For the path resolution algorithm :func:`~TableEnvironment.useDatabase`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the path resolution algorithm see :func:~TableEnvironment.use_database
| * @param catalogName The name of the catalog to set as the current default catalog. | ||
| * @throws CatalogException thrown if a catalog with given name could not be set as the default one | ||
| */ | ||
| void useCatalog(String catalogName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion here. Personally, I would add it to the signature if it is an important interface to highlight the exception. I mean there is a reason why Java also support declaring runtime exceptions explicitly. But I would definitely not bubble the runtime exception up in the stack of signatures.
| * | ||
| * @param catalogName The name of the catalog to set as the current catalog. | ||
| * @param databaseName The name of the database to set as the current database. | ||
| * @throws CatalogException thrown if the given catalog and database could not be set as the default ones |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this @throws?
| * | ||
| * @param tablePath The path of the table to scan. | ||
| * @return The resulting {@link Table}. | ||
| * @throws ValidationException if no table is found using the given table path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if we should document these exceptions. Usually, there is a lot that can go wrong under the hood.
| * @param catalogName The name of the catalog to set as the current default catalog. | ||
| * @throws CatalogException thrown if a catalog with given name could not be set as the default one | ||
| */ | ||
| void useCatalog(String catalogName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think the root cause here is that CatalogException is not a checked exception. I suggested it to handle communication failures similar to a IOException.
…talogs and encapsulate Calcite schema
In this commit both registration & tables lookup go through CatalogManager with respect to the current default catalog & database. It is not decoupled from Calcite yet. The next step will be to register the CatalogTables built exclusively in the table api rather than converted already to the Calcite's Tables. This closes apache#8404
What is the purpose of the change
This PR adds a
CatalogManagerthat stores tables and resolves paths accordingly. It subsumes #8214Brief change log
CatalogManagerimplementation with logic for resolving tables using also the default pathTableEnvironmentgo throughCatalogManagerSchemastructure corresponding to theCatalogManager. This enables Calcite to lookup for Tables throughCatalogManagerCatalogrelated APIs toTableEnvironmentVerifying this change
This change added tests and can be verified as follows:
org.apache.flink.table.catalog.CatalogManagerTestorg.apache.flink.table.catalog.CatalogManagerPathResolutionTestDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation