Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25174][table] Introduce managed table interfaces and callback #18088

Closed
wants to merge 19 commits into from

Conversation

JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Dec 13, 2021

What is the purpose of the change

Introduce managed table which introduced in https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage

Brief change log

  • 4565223 Introduce CatalogBaseTable.TableKind.MANAGED and serialization for CatalogTable
  • e9f5cdc Introduce ManagedTableFactory interface
  • 5bce725 Introduce Catalog.supportsManagedTable
  • 8af06de Introduce TableDescriptor.forConnector.
  • 801da05 Implement ManagedTableListener for managed table and callback in CatalogManager.

Verifying this change

  • CatalogBaseTableResolutionTest.testManagedPropertyDeSerialization
  • TableEnvironmentTest.testCreateManagedTableFromDescriptor

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 13, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 801da05 (Mon Dec 13 08:05:17 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

}
CatalogBaseTable origin = table.getOrigin();
// only DefaultCatalogTable or CatalogTableImpl is managed table
return origin instanceof DefaultCatalogTable || origin instanceof CatalogTableImpl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for this restriction? How does a catalog implementor learn about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we should exclude InlineCatalogTable and ConnectorCatalogTable these two troublemakers.

Copy link
Contributor Author

@JingsongLi JingsongLi Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For InlineCatalogTable, it is a little tricky like:

try {
  origin.getOptions();
} catch (TableException ignore) {
  // exclude abnormal tables, such as InlineCatalogTable that does not have the options
  return false;
}

if (ignoreIfExists) {
tEnv.executeInternal(createOperation);
} else {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire try-catch block could be written in a nicer way using AssertJ's assertThatThrownBy.

@@ -110,4 +118,96 @@ public void testTableFromDescriptor() {

assertEquals("fake", lookupResult.get().getTable().getOptions().get("connector"));
}

@Test
public void testCreateManagedTableFromDescriptor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer splitting this into separate test cases. If one case fails, we'll immediately know which one. Right now this is one giant test case that actually tests several cases, and several things per case.

import static org.apache.flink.table.factories.ManagedTableFactory.discoverManagedTableFactory;

/** The listener for managed table operations. */
public class ManagedTableListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an API stability annotation here.

@@ -311,6 +311,14 @@ void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfE
void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;

/**
* If return true, the Table without specified connector will be translated to the Flink managed
* table. See {@link CatalogBaseTable.TableKind#MANAGED}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use @see?

@@ -39,6 +39,7 @@
@PublicEvolving
enum TableKind {
TABLE,
MANAGED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to add some JavaDoc for this value what a managed table is.

this.schema = checkNotNull(schema, "Schema must not be null.");
this.comment = comment;
this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must not be null.");
this.options = checkNotNull(options, "Options must not be null.");
this.tableKind = checkNotNull(tableKind, "Table kind must not be null.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate that it isn't VIEW either. In fact, would it maybe make sense to introduce a separate DefaultManagedTable (which maybe inherits from DefaultCatalogTable) instead of adding the table kind here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can introduce a DefaultManagedTable.

JingsongLi and others added 3 commits December 14, 2021 11:01
…table/catalog/Catalog.java

Co-authored-by: Ingo Bürk <admin@airblader.de>
…k/table/api/TestManagedTableFactory.java

Co-authored-by: Ingo Bürk <admin@airblader.de>
@JingsongLi
Copy link
Contributor Author

@flinkbot run azure

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @JingsongLi. Maybe it was not a good idea to introduce an additional table kind for managed tables. From the changes, I don't think it actually provides much benefit. A utility method that simply check the getOptions() of a CatalogTable could also do the job. It would reduce the number of changes in this PR to a minimum. What do you think?

In the end the only thing that matters is whether connector= is present or not. We can neglect the legacy.

getCatalog(objectIdentifier.getCatalogName()).orElse(null),
objectIdentifier,
resolveCatalogBaseTable(table),
true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't the properties "temporary" and "catalog = null" related? I don't think we need the temporary flag. we can still add it on the internal API if necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our temporary tables can be catalog-owned, I remember this is why we do: https://issues.apache.org/jira/browse/FLINK-21185

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: We also do org.apache.flink.table.catalog.CatalogManager.TableLookupResult#isTemporary. So the temporary table might be catalog owned during creation but will look like a non-temporary one currently in the factory.

But I'm fine with the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it. Here is for temporary managed table.

managedTableListener.notifyTableCreation(
catalog,
objectIdentifier,
resolveCatalogBaseTable(table),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: introduce a local variable here and below, it avoids too much nesting and makes it easier to set breakpoints during debugging

return false;
}

if (!StringUtils.isNullOrWhitespaceOnly(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we call table.getOptions() very often here. how about calling it once in a try-catch and than just check for the remaining cases?

(k, v) -> {
if (v != null) {
if (v.get() == null) {
v.set(context.getCatalogTable().toProperties());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't calling getOptions() enough, do we really need to serialize the schema as well?

@@ -79,6 +80,10 @@
properties.put(COMMENT, comment);
}

if (resolvedTable.getTableKind() == TableKind.MANAGED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? can't we derive the "managed" property from the options? looking for a missing connector should already be enough? I think we can assume that connector.type is not used by now.


/** Default implementation of a managed {@link CatalogTable}. */
@Internal
public class DefaultManagedTable extends DefaultCatalogTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's drop this class and enrich DefaultCatalogTable with an additional constructor argument. it is internal anyway and implementers should not do instanceof checks against this class.

@@ -86,6 +86,7 @@ private FlinkStatistic getStatistic(
final ResolvedCatalogBaseTable<?> resolvedBaseTable = lookupResult.getResolvedTable();
switch (resolvedBaseTable.getTableKind()) {
case TABLE:
case MANAGED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about my previous comment around a special TableKind again. I fear we are mixing metadata encoded in table kind and metadata encoded in options.

@JingsongLi
Copy link
Contributor Author

Thanks for the PR @JingsongLi. Maybe it was not a good idea to introduce an additional table kind for managed tables. From the changes, I don't think it actually provides much benefit. A utility method that simply check the getOptions() of a CatalogTable could also do the job. It would reduce the number of changes in this PR to a minimum. What do you think?

In the end the only thing that matters is whether connector= is present or not. We can neglect the legacy.

+1, The biggest benefit is the distinction between table and managed, but as the implementation looks at:

  • planner identifies managed table by just looking at the catalog and options
  • catalog identifies managed table by just looking at the options

Yes, the benefits are not big

Comment on lines 173 to 175
assertThat(resolvedTable.toProperties(), equalTo(properties));

assertThat(resolvedTable.getResolvedSchema(), equalTo(RESOLVED_TABLE_SCHEMA));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use only assertj assertions, and convert the tests here to use assertj

*
* @see CatalogBaseTable.TableKind#MANAGED
*/
default boolean supportsManagedTable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss a point in the original design, but why the catalog needs to support managed tables? Isn't the support already provided by CatalogManager itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some catalogs that already use table without connector as their catalog internal table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the first few replies of the FLIP discussion.

if (ignoreIfExists) {
tEnv.executeInternal(createOperation);
} else {
Assertions.assertThrows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use assertj for testing

String.format(
"Table options do not contain an option key '%s' for discovering a connector.",
CONNECTOR.key()));
ManagedTableFactory factory =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than having an empty connector id and hardcoding the discovery of ManagedTableFactory, I wonder if we can have a connector id for that, and then we inject it from TableEnvironmentImpl every time we parse a table which doesn't provide a connector value in the table options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the empty string to other ids because the empty string itself is a special symbol that is often mixed up with "none".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh I also don't like the empty string. A managed-table or even default (which we use for planner and executor discovery) would be nicer and looks less like a bug. default would ensure consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have convinced me that I think we can use default

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @JingsongLi. The changes look much better now. I just had a last comment on the user experience in case of errors.

if (!factoryClass.isAssignableFrom(factory.getClass())) {
throw new ValidationException(
String.format(
"The managed table factory '%s' dose not implement '%s'.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the exception contains a typo and is not very nice. in the end we should have a helpful exception that includes the old exception:

Table options do not contain an option key '%s' for discovering a connector. Therefore, Flink assumes a managed table. However, a managed table factory is not in the classpath. 

Either we wrap the original exception or call some internal discoverFactory with an optional return type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I prefer to throw new exception directly, we don't need repetitive old exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created FactoryUtil.discoverManagedTableFactory, it throws exception messages for not found and multiple found.

@JingsongLi
Copy link
Contributor Author

@flinkbot run azure

@JingsongLi
Copy link
Contributor Author

Hi @twalthr @slinkydeveloper @Airblader Can you see if there are any other questions? (Or maybe I missed something)

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@JingsongLi
Copy link
Contributor Author

Thanks for your review! Merged.

niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
@JingsongLi JingsongLi deleted the managed_interface branch March 8, 2022 07:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants