Skip to content

Conversation

@xkrogen
Copy link
Contributor

@xkrogen xkrogen commented Apr 16, 2021

What changes were proposed in this pull request?

Clean up code in HadoopMapReduceCommitProtocol#commitJob to avoid renames that will always fail (usually silently).

Why are the changes needed?

This renames in this block will always fail under dynamicPartitionOverwrite == true:

if (dynamicPartitionOverwrite) {
val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
absPartitionPaths.foreach(fs.delete(_, true))
}
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
}
if (dynamicPartitionOverwrite) {
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
// According to the official hadoop FileSystem API spec, delete op should assume
// the destination is no longer present regardless of return value, thus we do not
// need to double check if finalPartPath exists before rename.
// Also in our case, based on the spec, delete returns false only when finalPartPath
// does not exist. When this happens, we need to take action if parent of finalPartPath
// also does not exist(e.g. the scenario described on SPARK-23815), because
// FileSystem API spec on rename op says the rename dest(finalPartPath) must have
// a parent that exists, otherwise we may get unexpected result on the rename.
fs.mkdirs(finalPartPath.getParent)
}
fs.rename(new Path(stagingDir, part), finalPartPath)
}
}

We have the following sequence of events:

  1. The first block deletes all parent directories of filesToMove.values
  2. The for-loop block attempts to rename all filesToMove.keys to filesToMove.values
  3. The third block does directory-level renames to place files into their final locations

All renames in the for-loop will always fail, since all parent directories of filesToMove.values were just deleted. Under a normal HDFS scenario, the contract of fs.rename is to return false under such a failure scenario, as opposed to throwing an exception. This allows for dynamic partition overwrite to work, albeit with a bunch of failed renames in the middle. Really, we should only run the for-loop deletions in the dynamicPartitionOverwrite == false case, and consolidate the two if-blocks for the true case.

Does this PR introduce any user-facing change?

In almost all cases, no. However if you happen to use a FileSystem implementation which throws an exception on this kind of fs.rename case, dynamicPartitionOverwrite will be unusable prior to this PR, and start working after this PR.

How was this patch tested?

Did not add/modify tests. Didn't see test cases for this file. Open to suggestions on where/how to add such tests.

@github-actions github-actions bot added the CORE label Apr 16, 2021
@github-actions
Copy link

Test build #756210704 for PR 32207 at commit 07f9189.

@xkrogen
Copy link
Contributor Author

xkrogen commented Apr 16, 2021

cc @mridulm and @cloud-fan

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42065/

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42065/

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Test build #137490 has finished for PR 32207 at commit 07f9189.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xkrogen
Copy link
Contributor Author

xkrogen commented Apr 16, 2021

I took a look at the failing test:

org.apache.spark.sql.sources.InsertSuite.SPARK-20236: dynamic partition overwrite with customer partition path

== Results ==
!== Correct Answer - 3 ==   == Spark Answer - 2 ==
!struct<>                   struct<i:int,part1:int,part2:int>
 [2,2,2]                    [2,2,2]
 [3,1,2]                    [3,1,2]
![4,1,1]                    

It does seem legitimate, but it led me to be even more confused about this functionality. It fails at this last step:

        sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
        checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

I ran through this code in a debugger and it appears that the unit test is relying on the behavior of LocalFileSystem, which is different from HDFS: if you try to rename to a nonexistent directory, the directory is silently created for you. I double-checked in a Spark shell to be sure, and this is not what HDFS does; it returns false from rename as expected.

This means that the unit test works properly on a local FS, but fails when run against HDFS. I verified this by executing the unit test code (slightly modified) in a Spark Shell instance:

scala> val scheme = "file"
scala> :paste
// Entering paste mode (ctrl-D to finish)

        val basepath = s"$scheme:/tmp/ekrogentest/base"
        val path1 = s"$scheme:/tmp/ekrogentest/1"
        val path2 = s"$scheme:/tmp/ekrogentest/2"

        // refresh everything
        sql("DROP TABLE IF EXISTS t")
        val fs = new Path(basepath).getFileSystem(sc.hadoopConfiguration)
        fs.delete(new Path(basepath).getParent, true)
        Seq(basepath, path1, path2).foreach(p => fs.mkdirs(new Path(p)))

        sql(
          s"""
            |create table t(i int, part1 int, part2 int) using parquet
            |partitioned by (part1, part2) location '$basepath'
          """.stripMargin)

        //val path1 = Utils.createTempDir()
        sql(s"alter table t add partition(part1=1, part2=1) location '$path1'")
        sql(s"insert into t partition(part1=1, part2=1) select 1")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(1, 1, 1))

        sql("insert overwrite table t partition(part1=1, part2=1) select 2")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(2, 1, 1))

        sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)

        //val path2 = Utils.createTempDir()
        sql(s"alter table t add partition(part1=1, part2=2) location '$path2'")
        sql("insert overwrite table t partition(part1=1, part2=2) select 3")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

        sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

// Exiting paste mode, now interpreting.

+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  1|    1|    1|
+---+-----+-----+

+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  2|    1|    1|
+---+-----+-----+

+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  2|    2|    2|
|  2|    1|    1|
+---+-----+-----+

+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  3|    1|    2|
|  2|    2|    2|
|  2|    1|    1|
+---+-----+-----+

+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  3|    1|    2|
|  2|    2|    2|
|  4|    1|    1|
+---+-----+-----+

Works fine when using the local file system.

However when I rerun the same using HDFS:

scala> val scheme = "hdfs"
scheme: String = hdfs

scala> :paste
// Entering paste mode (ctrl-D to finish)

        val basepath = s"$scheme:/tmp/ekrogentest/base"
        val path1 = s"$scheme:/tmp/ekrogentest/1"
        val path2 = s"$scheme:/tmp/ekrogentest/2"

        sql("DROP TABLE IF EXISTS t")
        val fs = new Path(basepath).getFileSystem(sc.hadoopConfiguration)
        fs.delete(new Path(basepath).getParent, true)
        Seq(basepath, path1, path2).foreach(p => fs.mkdirs(new Path(p)))

        sql(
          s"""
            |create table t(i int, part1 int, part2 int) using parquet
            |partitioned by (part1, part2) location '$basepath'
          """.stripMargin)

        //val path1 = Utils.createTempDir()
        sql(s"alter table t add partition(part1=1, part2=1) location '$path1'")
        sql(s"insert into t partition(part1=1, part2=1) select 1")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(1, 1, 1))

        sql("insert overwrite table t partition(part1=1, part2=1) select 2")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(2, 1, 1))

        sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)

        //val path2 = Utils.createTempDir()
        sql(s"alter table t add partition(part1=1, part2=2) location '$path2'")
        sql("insert overwrite table t partition(part1=1, part2=2) select 3")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

        sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
        sql("SELECT * FROM t").show()
        //checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

// Exiting paste mode, now interpreting.

+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  1|    1|    1|
+---+-----+-----+

21/04/16 22:43:38 WARN HadoopFSUtils: The directory hdfs://.../tmp/ekrogentest/1 was not found. Was it deleted very recently?
+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
+---+-----+-----+

21/04/16 22:43:39 WARN HadoopFSUtils: The directory hdfs://.../tmp/ekrogentest/1 was not found. Was it deleted very recently?

+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  2|    2|    2|
+---+-----+-----+

21/04/16 22:43:39 WARN HadoopFSUtils: The directory hdfs://.../tmp/ekrogentest/2 was not found. Was it deleted very recently?
21/04/16 22:43:39 WARN HadoopFSUtils: The directory hdfs://.../tmp/ekrogentest/1 was not found. Was it deleted very recently?
+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  2|    2|    2|
+---+-----+-----+

21/04/16 22:43:39 WARN HadoopFSUtils: The directory hdfs://.../tmp/ekrogentest/2 was not found. Was it deleted very recently?
21/04/16 22:43:40 WARN HadoopFSUtils: The directory hdfs://.../tmp/ekrogentest/1 was not found. Was it deleted very recently?
+---+-----+-----+
|  i|part1|part2|
+---+-----+-----+
|  2|    2|    2|
+---+-----+-----+

basepath: String = hdfs:/tmp/ekrogentest/base
path1: String = hdfs:/tmp/ekrogentest/1
path2: String = hdfs:/tmp/ekrogentest/2
fs: org.apache.hadoop.fs.FileSystem = DFS[DFSClient[...]]

Now everything is broken.

@cloud-fan , it looks like you added this, what is the expected behavior here? I can't tell if I'm missing something.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 19, 2021

@xkrogen the test does seem to be legitimate, but this is likely to be a long standing bug. Can you look into it and see if we can support it? If not we need to throw a clear error instead of relying on LocalFileSystem behavior.

@xkrogen
Copy link
Contributor Author

xkrogen commented Apr 19, 2021

I don't quite understand the commit sequence under the various modes so I can't provide any quick input on whether this is easily fixable. I already spent more time on this issue than I was expecting and it's pretty far outside of my normal scope so I can't devote more time currently, but if I do find some spare cycles in the future, I will try to circle back here. Thanks for your input so far!

@cloud-fan
Copy link
Contributor

Or we can ignore the test first and move forward. It's a long-standing bug and not caused by this patch.

@YuzhouSun
Copy link

YuzhouSun commented May 13, 2021

Hello, about “we should only run Block 2 in the dynamicPartitionOverwrite == false case”: the Block 2 is actually meant for custom partition paths (i.e. absolute partitions), in both dynamic partition overwrite or static partition overwrite cases. That’s probably the reason why InsertSuite.test("SPARK-20236: dynamic partition overwrite with custom partition path") failed with the changes.

The fix could be re-creating the parent directories when required. We created another PR for this jira.

@cloud-fan
Copy link
Contributor

@YuzhouSun Can you help to take over this PR?

@YuzhouSun
Copy link

Created #32530. @cloud-fan Could you review it? Thanks!

@xkrogen xkrogen closed this May 13, 2021
@xkrogen
Copy link
Contributor Author

xkrogen commented May 13, 2021

Closing in favor of #32530

@xkrogen xkrogen deleted the xkrogen-SPARK-35106-mapreducecommitprotocol-bug branch September 15, 2021 22:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants