-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#3362] feat(flink-connector): Add the code skeleton for flink-connector #2635
Conversation
6b4dff5
to
d37b64b
Compare
@coolderli can we please move forward the flink support? |
@jerryshao Of course. I will finish the first one today. |
@jerryshao @FANNG1 Could you please help view this? Thanks. |
|
||
@Override | ||
public Catalog createCatalog(Context context) { | ||
this.hiveCatalogFactory = new HiveCatalogFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the HiveCatalogFactory from flink-connector-hive to create the HiveCatalog.
...java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
Outdated
Show resolved
Hide resolved
...java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/datastrato/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
Outdated
Show resolved
Hide resolved
...ctor/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java
Outdated
Show resolved
Hide resolved
...ctor/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java
Outdated
Show resolved
Hide resolved
# | ||
|
||
com.datastrato.gravitino.flink.connector.store.GravitinoCatalogStoreFactory | ||
com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If supporting Iceberg, just provide another GravitinoIcebergCatalogFactory
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FANNG1 Yes. This is what I want to talk about. Do we implement one CatalogFactory for each storage, or just one GravitinoCatalogFactory? If we use one GravitinoCatalogFactory, we may need another property to identify which real catalog should be used.
Another question is do we need to use GravitinoCatalog for all storage. I think the CatalogFactory should be consistent with the Catalog. Flink supports registering the catalog manually, if we only give a GravitinoCatalog, we can also simplify the usage.
The original usage:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/catalogs/#using-sql-ddl
// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
Use one GravitinoCatalog.
Map<String, String> properties = new HashMap<>();
**properties.put("catalog.type", "hive"); // another property used to identify the real catalog**
Catalog catalog = new GravitinoCatalog("gravitino", properties)
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
Use different GravitinoCatalog for different storage:
Catalog catalog = new GravitinoHiveCatalog("gravitino", xxx)
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
which do you prefer? Can you share your thoughts? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also help review it? Thanks. @hackergin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer different GravitinoCatalog, because this solution seems simple for user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, the Catalog Store is essentially for storing the configuration of Flink Catalog. Therefore, if users create a catalog using the following DDL, we should support saving the configuration of the Catalog.
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/hive-conf'
);
At the same time, when constructing a real Catalog instance, we can automatically create a GravitinoCatalog instead of a HiveCatalog.
And we should prohibit the direct creation of Catalog instances.
tableEnv.registerCatalog("myhive", catalog);
When using the Catalog instance to register this Catalog with the table env, this Catalog will not be persisted to the Catalog Store.
...nector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
Show resolved
Hide resolved
...src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
Show resolved
Hide resolved
...c/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java
Outdated
Show resolved
Hide resolved
@FANNG1 I have updated the MR. Please help review it when you have time. Thanks. |
...ctor/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java
Outdated
Show resolved
Hide resolved
...ctor/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java
Show resolved
Hide resolved
...ctor/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
Show resolved
Hide resolved
.../java/com/datastrato/gravitino/flink/connector/integration/test/hive/HiveCatalogStoreIT.java
Outdated
Show resolved
Hide resolved
...tor/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
Outdated
Show resolved
Hide resolved
...tor/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
Show resolved
Hide resolved
...tor/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
Show resolved
Hide resolved
77806f4
to
7e20a11
Compare
@FANNG1 Could you help review this MR again? Thanks. |
this.hiveCatalogFactory = new HiveCatalogFactory(); | ||
final FactoryUtil.CatalogFactoryHelper helper = | ||
FactoryUtil.createCatalogFactoryHelper(this, context); | ||
helper.validateExcept( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will skip the validation with the specified prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if user add a properties like flink.bypass.aa
by mistake, this will cause all Flink jobs fail? I prefer to ignore it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FANNG1 It's good for users. How about just removing the validation? Otherwise, I have to remove these useless key before the validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FANNG1 I thought about it again. The validation is needed, it can be used to validate the required options such as metastore.uris
. Let me check how to finish it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comment before helper.validateExcept
to descript how it works
...src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
Show resolved
Hide resolved
@FANNG1 I think I have already rebased the main branch. But the GitHub CI still failed. Can you help me with it? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FANNG1 I have finished it. Make hive.metastore.uris
as required and ignore the unknown key. Please take a look. Thanks.
flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/FactoryUtils.java
Show resolved
Hide resolved
...n/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactoryOptions.java
Show resolved
Hide resolved
...src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
Show resolved
Hide resolved
Assertions.assertEquals( | ||
"unknown.value", | ||
flinkProperties.get(flinkByPass("unknown.key")), | ||
"The unknown.key will not cause failure and will be saved in Gravitino."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unknown key will be saved in Gravitino but not be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it works? the unknow.key
not exists in requiredOptions
or optionalOptions
and the customer CatalogFactoryHelper only works when loading catalog from Gravitino. Do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FANNG1 When creating the catalog, it will pass the unknow.key
to the Context of GravitinoHiveCatalog and the CatalogDescription of the GravitinoCatalogStore. The GravitinoCatalogStore will not validate the unknow.key
. And we use FactoryUtils.GravitinoCatalogFactoryHelper
instead of FactoryUtil.CatalogFactoryHelper
to skip the validation. So the unknow.key
will save to the Gravitino successfully.
Of course, there is a little weird about the key that is not in the optionalOptions
not failing the Flink Job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FANNG1 Do you think this is reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a little weird,but let's go ahead
@coolderli , thanks for proposing the base PR for Flink, a big step! |
…connector (apache#2635) <!-- 1. Title: [#<issue>] <type>(<scope>): <subject> Examples: - "[apache#123] feat(operator): support xxx" - "[apache#233] fix: check null before access result in xxx" - "[MINOR] refactor: fix typo in variable name" - "[MINOR] docs: fix typo in README" - "[apache#255] test: fix flaky test NameOfTheTest" Reference: https://www.conventionalcommits.org/en/v1.0.0/ 2. If the PR is unfinished, please mark this PR as draft. --> ### What changes were proposed in this pull request? - support GravitinoCatalogStore to register the catalog. In the MR, we will support to create the hive catalog. ### Why are the changes needed? - Fix apache#3362 ### Does this PR introduce _any_ user-facing change? - support flink in gravitino ### How was this patch tested? - add UTs
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?