-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12235][hive] Support Hive partition in HiveCatalog #8449
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@xuefuz @bowenli86 please take a look. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!Thanks your effort on this.
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
Outdated
Show resolved
Hide resolved
...link-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
Outdated
Show resolved
Hide resolved
...link-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went over the changes once. Had some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lirui-apache thanks for the PR, I left some comments besides xuefu's
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Outdated
Show resolved
Hide resolved
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Show resolved
Hide resolved
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
* @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have different sizes, | ||
* or any key in partitionKeys doesn't exist in partitionSpec. | ||
*/ | ||
List<String> getFullPartitionValues(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, List<String> partitionKeys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does the "full" mean here? probably rename to "getOrdered/ArrangedPartitionValues" to conform to its logic?
nit: can we move tablePath
to the end as the last param given it's here to only build a message without any real effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means a full spec, which contains values for all partition keys. And a partial spec can only contain values for a subset of partition keys. Operations like createPartition
, dropPartition
require a full spec. Operations like listPartitions
can accept partial spec. Let me rename to to getOrderedFullPartitionValues, to indicate it orders the values and requires a full spec.
...link-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
Show resolved
Hide resolved
...k-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
.../flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
Outdated
Show resolved
Hide resolved
@bowenli86 @xuefuz thanks for your comments. Please take a look at the updated PR. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a FYI, PR #8490 will remove some classes that are changed here. Not sure which PR should get in first, but it seems one of them has to rebase.
Rebased. |
...link-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
Show resolved
Hide resolved
...link-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
Show resolved
Hide resolved
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lirui-apache Thank you very much for the update!
I just spotted that, currently how several APIs impl work like this: 1) get a raw hive table 2) parse part of the raw table. The latter step actually duplicate with logic in instantiateHiveCatalogTable()
. E.g. ensureTableAndPartitionMatch()
parses FLINK_PROPERTY_IS_GENERIC
, instantiateHivePartition()
parses partition keys, ensurePartitionedTable()
parses the raw table's partition key size, all of which we can get by just parsing the raw table to a CatalogTable
thru instantiateHiveCatalogTable()
in advance. The current duplication also means if we change some general logic in parsing a hive table, we need to change two places. Thus I wonder if it makes sense to just parse the raw table as whole at the beginning rather than having scattered places each parsing only part of it themselves. And we can remove util methods such as getFieldNames()
which is only used to get the partition keys which is already available in CatalogTable
.
For example, change
public void createPartition(...) {
Table hiveTable = getHiveTable(tablePath);
ensureTableAndPartitionMatch(hiveTable, partition);
ensurePartitionedTable(tablePath, hiveTable);
try {
client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition));
} ...
}
to something like:
public void createPartition(...) {
Table hiveTable = getHiveTable(tablePath);
CatalogBaseTable catalogTable = instantiateHiveCatalogTable(hiveTable);
... check whether catalogTabe and catalogPartition type matches would be much easier here ...
... check whether catalogTable is partitioned would be easier here ...
try {
client.add_partition(
instantiateHivePartition(catalogTable, partitionSpec, partition, hiveTable.getSd()));
} ...
}
@@ -639,8 +640,12 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partition | |||
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); | |||
checkNotNull(partition, "Partition cannot be null"); | |||
|
|||
checkArgument(partition instanceof HiveCatalogPartition, "Currently only supports HiveCatalogPartition"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently throw CatalogException
if the type doesn't match. checkArgument()
will throw IllegalArgumentException
.
@@ -740,10 +745,13 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS | |||
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); | |||
checkNotNull(newPartition, "New partition cannot be null"); | |||
|
|||
checkArgument(newPartition instanceof HiveCatalogPartition, "Currently only supports HiveCatalogPartition"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC)); | ||
if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) || | ||
(!isGeneric && catalogPartition instanceof GenericCatalogPartition)) { | ||
throw new IllegalArgumentException(String.format("Cannot handle %s partition for %s table", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw CatalogException
* or any key in partitionKeys doesn't exist in partitionSpec. | ||
*/ | ||
private List<String> getOrderedFullPartitionValues(CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath) | ||
throws PartitionSpecInvalidException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one more tab
I'm not sure how much benefit this can bring us. It might make |
ok, let's leave that part as it for now |
Rebase and address comments. |
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Show resolved
Hide resolved
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Show resolved
Hide resolved
...tors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
private Partition instantiateHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static?
return new HiveCatalogPartition(hivePartition.getParameters(), hivePartition.getSd().getLocation()); | ||
} | ||
|
||
private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static
* @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have different sizes, | ||
* or any key in partitionKeys doesn't exist in partitionSpec. | ||
*/ | ||
private List<String> getOrderedFullPartitionValues(CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. A few minor comments for consideration.
Made |
No. I think we only need to make what is static in nature static in their current forms. |
Thanks @lirui-apache very much for the PR. LGTM, merging |
What is the purpose of the change
To implement partition related operations in
HiveCatalogBase
.Brief change log
HiveCatalogPartition
to represent a partition that can be handled byHiveCatalog
.HiveCatalogBase
. Although we intend to letHiveCatalog
andGenericHiveMetastoreCatalog
share the implementations, this PR only enables/tests these operations forHiveCatalog
.GenericInMemoryCatalogTest
toCatalogTestBase
, so thatGenericInMemoryCatalogTest
andHiveCatalogTest
can share these test cases.Verifying this change
This PR is tested using partition related test cases in
CatalogTestBase
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation