Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37217][SQL] The number of dynamic partitions should early check when writing to external tables #34493

Closed
wants to merge 10 commits into from

Conversation

cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Nov 5, 2021

What changes were proposed in this pull request?

SPARK-29295 introduces a mechanism that writes to external tables is a dynamic partition method, and the data in the target partition will be deleted first.

Assuming that 1001 partitions are written, the data of 10001 partitions will be deleted first, but because hive.exec.max.dynamic.partitions is 1000 by default, loadDynamicPartitions will fail at this time, but the data of 1001 partitions has been deleted.

So we can check whether the number of dynamic partitions is greater than hive.exec.max.dynamic.partitions before deleting, it should fail quickly at this time.

Why are the changes needed?

Avoid data that cannot be recovered when the job fails.

Does this PR introduce any user-facing change?

No

How was this patch tested?

add UT

@github-actions github-actions bot added the SQL label Nov 5, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, @cxzl25 .

s"Number of dynamic partitions created is $numWrittenParts" +
s", which is more than $maxDynamicPartitions" +
s". To solve this try to set $maxDynamicPartitionsKey" +
s" to at least $numWrittenParts."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we set hive.exec.max.dynamic.partitions automatically from Spark side in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to automatically adjust the number of hive.exec.max.dynamic.partitions.
However, if it is automatically adjusted, many partitions may be created accidentally, and this parameter is meaningless.

https://github.com/apache/hive/blob/135629b8d6b538fed092641537034a9fbc59c7a0/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1857-L1864

@dongjoon-hyun
Copy link
Member

cc @sunchao

@sunchao
Copy link
Member

sunchao commented Nov 7, 2021

cc @viirya too since this is related to your change in SPARK-29295

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my question is, as we are going to overwrite the table partitions, why we need to prevent data to be deleted? Any other delete-like command, I think if any failure happens during deletion, there will be some data that are already deleted before the failure. I think we don't provide atomicity guarantee for this command, right?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Nov 8, 2021

Hmm, my question is, as we are going to overwrite the table partitions, why we need to prevent data to be deleted? Any other delete-like command, I think if any failure happens during deletion, there will be some data that are already deleted before the failure. I think we don't provide atomicity guarantee for this command, right?

Yes. I agree with you.
Operation is not guaranteed to be atomic.
Failure during the deletion process is not guaranteed to be restored.

But in this case, if the number of dynamic partitions exceeds hive.exec.max.dynamic.partitions, Spark deletes the partition data first, and then checks that the number of partitions exceeds the configured number when client.loadDynamicPartitions loads the data, and it fails immediately. No data is written to the partition.

The user thought that the operation was not successful, and theoretically the original data should still be there.

Or the user will check whether the number of partitions meets expectations. If it does, the user needs to adjust the hive configuration. If it does not, it needs to modify the sql logic.
It also takes time to re-run sql, and the data during this period will not be able to be read.

@sunchao
Copy link
Member

sunchao commented Nov 8, 2021

I feel even though we can't guarantee operations like delete to be atomic, we should make effort to do so. This PR looks simple enough and fixes a potential issue which could corrupt an external Hive table, so I think it's well worth it?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this looks simple and just a check before the Hive operation. For the purpose of adding an early check before running the query, it is fine. I only don't think it makes sense for preventing data deletion as rationale because this command is going to delete the data actually and there is no atomicity.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Nov 8, 2021

It may be that the PR title is not clear.
Maybe i can change to
The number of dynamic partitions should early check when writing to external tables ?

@viirya
Copy link
Member

viirya commented Nov 8, 2021

Sounds good to me. Thanks.

@cxzl25 cxzl25 changed the title [SPARK-37217][SQL] Dynamic partitions should fail quickly when writing to external tables to prevent data deletion [SPARK-37217][SQL] The number of dynamic partitions should early check when writing to external tables Nov 8, 2021
# Conflicts:
#	sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@cxzl25
Copy link
Contributor Author

cxzl25 commented Nov 22, 2021

Can we continue to review this pr? @dongjoon-hyun @sunchao @viirya

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @cxzl25 , forgot about this. Left a few minor comments but this looks good to me.

s", which is more than $maxDynamicPartitions" +
s". To solve this try to set $maxDynamicPartitionsKey" +
s" to at least $numWrittenParts."
throw new SparkException(maxDynamicPartitionsErrMsg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may want to group this error message and define it in QueryExecutionErrors.

@@ -192,6 +192,17 @@ case class InsertIntoHiveTable(
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
val numWrittenParts = writtenParts.size
val maxDynamicPartitionsKey = "hive.exec.max.dynamic.partitions"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname for the key and HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal instead of 1000?

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with one nit

@@ -1905,4 +1905,14 @@ object QueryExecutionErrors {
def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = {
new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'")
}

def writePartitionExceedConfigSizeWhenDynamicPartitionError(numWrittenParts: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: format

  def writePartitionExceedConfigSizeWhenDynamicPartitionError(
      numWrittenParts: Int,
      maxDynamicPartitions: Int,
      maxDynamicPartitionsKey: String): Throwable = {
    ...
  }

@@ -1905,4 +1905,15 @@ object QueryExecutionErrors {
def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = {
new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'")
}

def writePartitionExceedConfigSizeWhenDynamicPartitionError(
numWrittenParts: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we need to use 4-space indentation here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not notice the indentation problem here, you have already provided an example before, thank you.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sunchao sunchao closed this in 4b849ef Dec 14, 2021
@sunchao
Copy link
Member

sunchao commented Dec 14, 2021

Merged, thanks! also going to cherry-pick to branch-3.2.

@sunchao
Copy link
Member

sunchao commented Dec 14, 2021

@cxzl25 could you open another PR to backport this to branch-3.2? I tried to cherry-pick it but there's some conflict.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Dec 14, 2021

@cxzl25 could you open another PR to backport this to branch-3.2? I tried to cherry-pick it but there's some conflict.

ok, i will do it now.

cxzl25 added a commit to cxzl25/spark that referenced this pull request Dec 14, 2021
…k when writing to external tables

### What changes were proposed in this pull request?
SPARK-29295 introduces a mechanism that writes to external tables is a dynamic partition method, and the data in the target partition will be deleted first.

Assuming that 1001 partitions are written, the data of 10001 partitions will be deleted first, but because `hive.exec.max.dynamic.partitions` is 1000 by default, loadDynamicPartitions will fail at this time, but the data of 1001 partitions has been deleted.

So we can check whether the number of dynamic partitions is greater than `hive.exec.max.dynamic.partitions` before deleting, it should fail quickly at this time.

### Why are the changes needed?
Avoid data that cannot be recovered when the job fails.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add UT

Closes apache#34493 from cxzl25/SPARK-37217.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Chao Sun <sunchao@apple.com>

(cherry picked from commit 4b849ef)
@cxzl25
Copy link
Contributor Author

cxzl25 commented Dec 14, 2021

branch-3.2 #34889

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants