Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog #8222

Merged
merged 6 commits into from
Apr 25, 2019

Conversation

bowenli86
Copy link
Member

What is the purpose of the change

This PR adds support for partition related operations to Catalog APIs

Brief change log

  • adds partition related APIs in both ReadableCatalog and ReadableWritableCatalog
  • implemented them in GenericInMemoryCatalog

Verifying this change

This change added tests and can be verified as follows:

  • added corresponding unit tests in GenericInMemoryCatalogTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes )
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 19, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@bowenli86
Copy link
Member Author

bowenli86 commented Apr 19, 2019

@bowenli86 bowenli86 changed the title [FLINK-11518] [SQL/TABLE] Add partition related catalog APIs and implement them in GenericInMemoryCatalog [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog Apr 21, 2019
@JingsongLi
Copy link
Contributor

@flinkbot attention @wuchong

@rmetzger rmetzger requested a review from wuchong April 23, 2019 05:39
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pull request looks good to me. I only have some doubts, as I'm not familiar with Hive and catalog.

* Get the partition keys of the table. This will be an empty set if the table is not partitioned.
* @return partition keys of the table.
*/
LinkedHashSet<String> getPartitionKeys() throws TableNotPartitionedException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that Hive doesn't allow partitioned keys in table schema, do we have this restriction?

Copy link
Member Author

@bowenli86 bowenli86 Apr 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand the question. What do you mean restriction? Can you elaborate?

You are right that Hive's Table uses a separate field dedicated for partition keys.

@rmetzger rmetzger requested a review from wuchong April 23, 2019 12:44
Copy link
Contributor

@xuefuz xuefuz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Some suggestions for consideration.

@bowenli86
Copy link
Member Author

bowenli86 commented Apr 24, 2019

Thanks for your reivew @JingsongLi @wuchong @xuefuz @zjffdu . To summarize major feedbacks I've addressed:

  • make CatalogPartitionSpec its own class, and use it as id for partitions in APIs, like ObjectPath as id for tables
  • removed its copy() method, since it internally keeps an unmodifiable map

@KurtYoung Would be great to have you take a look and merge this PR if everything looks good

@bowenli86
Copy link
Member Author

@flinkbot attention @KurtYoung

* @throws PartitionNotExistException thrown if the partition is not partitioned
* @throws CatalogException in case of any runtime exception
*/
CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functionality can be covered by "listPartitions"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really. PartitionSpec in listPartition can be a partial spec to indicate a subset of all partitions in the table, while it must be a full spec in getPartition to represent a single partition.

E.g., say we have a table with partition keys(k1, k2) and 2 partitions (k1=a, k2=b) and (k1=a, k2=c), listPartitions(tablePath, (k1=a)) returns both partitions, and of course as an extreme case, listPartitions(tablePath, (k1=a, k2=b)) returns a list with the first partition as the only element inside. getPartition requires a full spec not partial ones, e.g. you can pass (k1=a, k2=b), but not just (k1=a) in which case it throws exception

* @throws PartitionAlreadyExistsException thrown if the target partition already exists
* @throws CatalogException in case of any runtime exception
*/
void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain more about this interface? In which case this API would be invoked. I mean, does there exists some partition-only operations? My feeling is most operations to partition is through table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example from a SQL perspective is "ALTER TABLE ADD PARTITION". Operations on partitions will be using and only using table as an identifier to locate the partition, and that's why this API includes tablePath.

@KurtYoung
Copy link
Contributor

LGTM, will merge this after travis

@KurtYoung KurtYoung merged commit 6cbc9bf into apache:master Apr 25, 2019
@bowenli86 bowenli86 deleted the partitionapi branch May 9, 2019 00:07
tianchen92 pushed a commit to tianchen92/flink that referenced this pull request May 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants