Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLIN-12663]Implement HiveTableSource to read Hive tables #8809

Closed
wants to merge 3 commits into from

Conversation

zjuwangg
Copy link
Contributor

What is the purpose of the change

Implement HiveTableSource to read Hive tables

Brief change log

  • add hive table source to read hive tables

Verifying this change

This change added tests and can be verified as follows:

  • Added HiveTableSourceTest that verify read normally

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: ( no)

Documentation

  • Does this pull request introduce a new feature? (yes )
  • If yes, how is the feature documented? (JavaDocs)

@zjuwangg
Copy link
Contributor Author

cc @xuefuz @bowenli86 @lirui-apache to review.

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@xuefuz xuefuz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than a minor comment.

Copy link
Member

@bowenli86 bowenli86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjuwangg Thanks for the PR!

private final String dbName;
private final String tableName;
private final Boolean isPartitionTable;
private final String[] partitionColNames;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use List instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use List is better than String[]? Is there a benefit by doing so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that just to be consistent with HiveTableSink,CatalogTable, and Hive table which all use List to store partition col keys/names. Using String array means you will need to convert the list to array somewhere in the code path (very likely in HiveTableFactory), which is not necessary

JobConf jobConf,
String dbName,
String tableName,
String[] partitionColNames) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
//serDe temporarily.
HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: why use full path of Table? I didn't find there's any class name conflicts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's conflicts with org.apache.flink.table.api.Table.

@bowenli86
Copy link
Member

The other thing I noticed is that Hive table source and sink impl have quite a few duplication in logic. Maybe that's something we can unify. They don't have to happen immediately though

flink-connectors/flink-connector-hive/pom.xml Outdated Show resolved Hide resolved
* limitations under the License.
*/

package org.apache.flink.batch.connectors.hive;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to not use org.apache.flink.batch prefix for package name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to be done in another pr.

@zjuwangg
Copy link
Contributor Author

cc @bowenli86 to review again

@bowenli86
Copy link
Member

bowenli86 commented Jun 24, 2019

@KurtYoung regarding your comment on the package name, what's your suggestion on a proper name?

It's been brought by @zjffdu too before. I think @zjuwangg named it this way because most connector packages are named as org.apache.flink.streaming.connectors.xxx and he is just following the convention. However, as we are forwarding to streaming-batch unification, we probably don't need "streaming/batch" in the package names any more, coz, like file source/sink, hive source/sink can (doesn't mean we necessarily will) be made as streaming in the future. I'm thinking of just org.apache.flink.connectors.hive. What do you think?

@zjuwangg can you please create a JIRA ticket to track this discussion? We probably need to finalize the package name before releasing 1.9 (not necessarily in this PR), otherwise it's hard to change.

cc @xuefuz @lirui-apache

Copy link
Member

@bowenli86 bowenli86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjuwangg Thanks for the update! Only one issue left

private final String dbName;
private final String tableName;
private final Boolean isPartitionTable;
private final String[] partitionColNames;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that just to be consistent with HiveTableSink,CatalogTable, and Hive table which all use List to store partition col keys/names. Using String array means you will need to convert the list to array somewhere in the code path (very likely in HiveTableFactory), which is not necessary

JobConf jobConf,
String dbName,
String tableName,
String[] partitionColNames) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@xuefuz
Copy link
Contributor

xuefuz commented Jun 24, 2019

For package names, I think we can do as a followup. For the last change request, I think I can make the changes as I will have to refactor it a little bit for HiveTableFactory work that I'm doing.

If possible, let's get this in first, as it's kind of blocking me.

@bowenli86
Copy link
Member

bowenli86 commented Jun 24, 2019

For package names, I think we can do as a followup. For the last change request, I think I can make the changes as I will have to refactor it a little bit for HiveTableFactory work that I'm doing.

If possible, let's get this in first, as it's kind of blocking me.

Sounds good. @KurtYoung @zjffdu @xuefuz @lirui-apache @zjuwangg I've created FLINK-12966 to track the effort of finalizing package name.

I will merge this PR to unblock @xuefuz given the build has passed

@asfgit asfgit closed this in 2949166 Jun 24, 2019
@zjuwangg zjuwangg deleted the FLINK-12663 branch June 25, 2019 01:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants