-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-13068][hive] HiveTableSink should implement PartitionableTable… #8965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 8e18c2d (Wed Aug 07 08:15:00 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
...flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
Outdated
Show resolved
Hide resolved
| Preconditions.checkArgument(numStaticPart == 0, | ||
| "Dynamic partition cannot appear before static partition"); | ||
| } else { | ||
| numStaticPart--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line doesn't seem having any effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually does. We'll use it to make sure we don't see any dynamic partition columns before we have seen all the static ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. After re-read the code, the logic becomes clear. Looks good.
| int numStaticPart = staticPartitionSpec.size(); | ||
| if (numStaticPart < partitionCols.size()) { | ||
| for (String partitionCol : partitionCols) { | ||
| if (!staticPartitionSpec.containsKey(partitionCol)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this if condition will be true (so the assertion will fail) at a certain point as long as it's dynamic partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. And when it's true, we will verify that we have checked all the static partition columns.
xuefuz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution. I left a couple of minor comments for consideration.
| // make it a LinkedHashMap to maintain partition column order | ||
| staticPartitionSpec = new LinkedHashMap<>(); | ||
| for (String partitionCol : getPartitionFieldNames()) { | ||
| if (partitions.containsKey(partitionCol)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if it doesn't contain the key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that partition column is not contained in the static spec, and therefore it's a dynamic partition column.
|
|
||
| private void validatePartitionSpec() { | ||
| List<String> partitionCols = getPartitionFieldNames(); | ||
| Preconditions.checkArgument(new HashSet<>(partitionCols).containsAll( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments here:
-
can we output which column is unknown to the error message?
-
the check of
new HashSet<>(partitionCols).containsAll(staticPartitionSpec.keySet())will lose order of the partitions, which seems to be not enough for a legit validation. Shall it bepartitionCols.equals(new ArrayList(staticPartitionSpec.keySet()))? since staticPartitionSpec is a linked hashmap, keys from itskeySet()should be ordered -
nit: can we reformat it to be more readable?
Preconditions.checkArgument(
...
"...";
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'll print the specific columns and reformat the code.
We don't need the check the order here. Partition columns order is defined by getPartitionFieldNames(). We can always reorder a partition spec (which is a map) as long as it only contains valid partition columns.
|
Thanks @xuefuz and @bowenli86 for the review. Please take another look. |
|
CI passed on my personal repo: |
xuefuz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
The CI failure is due to flink-table-planner-blink failed checkstyle and thus not related to this PR. |
|
LGTM, thanks for your contribution! @flinkbot approve all I've restarted the CI since it's failing at the compile stage. Will merge it once it passes |
|
@lirui-apache can you rebase the PR? |
|
The test failure cannot be reproduced locally. |
|
PR rebased. @bowenli86 please take another look, thanks. |
|
LGTM. Merging |
…Sink
What is the purpose of the change
Make
HiveTableSinkimplementPartitionableTableSink, so thatHiveTableSinksupports static partitioning.Brief change log
PartitionableTableSink.HiveTableOutputFormatTesttoHiveTableSinkTest.Verifying this change
Existing test case.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation