-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31675][CORE] Fix rename and delete files with different filesystem #36070
Conversation
…uceCommitProtocol
cc @cloud-fan could you help take a look when you have time? Thanks. |
Can one of the admins verify this patch? |
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
Outdated
Show resolved
Hide resolved
Cross-file-system table writing sounds like a big feature to me. Currently, Spark fails to write so this feature is not supported yet, but I'm wondering what's the best way to do it, e.g. shall we put the staging dir in the same file system of the target path? |
Staging Dir is generated based on the table location
Hive's approach is to keep the path to the partition location and move files across file systems.Hive.java |
Current DS insert only support write staging dir for dynamic partition overwrite, this pr's case seems is to use hive serde(since hive serde support config staging dir use different file system). And spark's commit protocol not support different file system for staging dir. Add support for different file system have to consider a lot, you can check the point mentioned in #33828 Also I am writing a new build in commit protocol #36056, it's behavior like hive, and it make all overwrite use a staging dir in the same file system of the target path. |
Do we need a wrapper file system to handle all the files in spark, including cross file system operations? |
Yes, this pr's case is to use hive serde(hive serde support config staging dir use different file system) #33828 and #36056 If we have multiple filesystems that are different from the staging dir filesystem, we still have exceptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You really should not be using the classic FileOutputCommitter against S3; as well as performance being awful it lacks correctness and resilience against failure of task commit. Problems with file rename here are essentially second order.
Which committer are you using and what filesystems?
the code does look good for cross EZ copies in hdfs.
val dstFs = dstPath.getFileSystem(hadoopConf) | ||
// Copying files across different file systems | ||
if (needCopy(srcPath, dstPath, srcFs, dstFs)) { | ||
if (!FileUtil.copy(srcFs, srcFs.listStatus(srcPath).map(_.getPath), dstFs, dstPath, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may want to think about parallelizing the copy, as for each file it is now going to take time proportional to data.length/(download_bandwidth+upload_bandwidth)
shame copy returns false sometimes; looks like it is only if mkdirs() on the dest or delete(src) fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can i highlight something i've noticed here, that copy() command stos on src read() returning -1, without doing any checks to validate file length, not great.
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
When we use a partition table, if the filesystem of partition location is different from the filesystem of the table location,
we will get an exception like that:
java.lang.IllegalArgumentException: Wrong FS: s3a://path/to/spark3_snap/dt=2020-09-10, expected: hdfs://cluster
,because
HadoopMapReduceCommitProtocol
will use the filesystem of the table location to operate the file.For example, the following SQL will cause the above exception:
See details in the JIRA. SPARK-31675
Why are the changes needed?
We cannot operate on partitions with different from filesystem of table partition location
Does this PR introduce any user-facing change?
Yes, before this PR, an exception will be reported when the user operates a filesystem of partition location different from the filesystem of table location. After this PR, it will be processed as needed.
How was this patch tested?
Manual testing, not sure how to use unit tests in Spark to verify this patch.