Skip to content

Commit

Permalink
[SPARK-21138][YARN] Cannot delete staging dir when the clusters of "s…
Browse files Browse the repository at this point in the history
…park.yarn.stagingDir" and "spark.hadoop.fs.defaultFS" are different

## What changes were proposed in this pull request?

When I set different clusters for "spark.hadoop.fs.defaultFS" and "spark.yarn.stagingDir" as follows:
```
spark.hadoop.fs.defaultFS  hdfs://tl-nn-tdw.tencent-distribute.com:54310
spark.yarn.stagingDir hdfs://ss-teg-2-v2/tmp/spark
```
The staging dir can not be deleted, it will prompt following message:
```
java.lang.IllegalArgumentException: Wrong FS: hdfs://ss-teg-2-v2/tmp/spark/.sparkStaging/application_1496819138021_77618, expected: hdfs://tl-nn-tdw.tencent-distribute.com:54310
```

## How was this patch tested?

Existing tests

Author: sharkdtu <sharkdtu@tencent.com>

Closes #18352 from sharkdtu/master.
  • Loading branch information
sharkdtu authored and Marcelo Vanzin committed Jun 19, 2017
1 parent 581565d commit 3d4d11a
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ private[spark] class ApplicationMaster(

logInfo("ApplicationAttemptId: " + appAttemptId)

val fs = FileSystem.get(yarnConf)

// This shutdown hook should run *after* the SparkContext is shut down.
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
Expand All @@ -232,7 +230,7 @@ private[spark] class ApplicationMaster(
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir(fs)
cleanupStagingDir()
}
}
}
Expand Down Expand Up @@ -533,7 +531,7 @@ private[spark] class ApplicationMaster(
/**
* Clean up the staging directory.
*/
private def cleanupStagingDir(fs: FileSystem) {
private def cleanupStagingDir(): Unit = {
var stagingDirPath: Path = null
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
Expand All @@ -544,6 +542,7 @@ private[spark] class ApplicationMaster(
return
}
logInfo("Deleting staging directory " + stagingDirPath)
val fs = stagingDirPath.getFileSystem(yarnConf)
fs.delete(stagingDirPath, true)
}
} catch {
Expand Down

0 comments on commit 3d4d11a

Please sign in to comment.