Skip to content

Commit

Permalink
[SPARK-18099][YARN] Fail if same files added to distributed cache for…
Browse files Browse the repository at this point in the history
… --files and --archives

## What changes were proposed in this pull request?

During spark-submit, if yarn dist cache is instructed to add same file under --files and --archives, This code change ensures the spark yarn distributed cache behaviour is retained i.e. to warn and fail if same files is mentioned in both --files and --archives.
## How was this patch tested?

Manually tested:
1. if same jar is mentioned in --jars and --files it will continue to submit the job.
- basically functionality [SPARK-14423] #12203 is unchanged
  1. if same file is mentioned in --files and --archives it will fail to submit the job.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

… under archives and files

Author: Kishor Patil <kpatil@yahoo-inc.com>

Closes #15627 from kishorvpatil/spark18099.
  • Loading branch information
kishorvpatil authored and Tom Graves committed Nov 3, 2016
1 parent 1629331 commit 098e4ca
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
12 changes: 10 additions & 2 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Expand Up @@ -598,8 +598,16 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType)
if (addToClasspath && localizedPath != null) {
cachedSecondaryJarLinks += localizedPath
// If addToClassPath, we ignore adding jar multiple times to distitrbuted cache.
if (addToClasspath) {
if (localizedPath != null) {
cachedSecondaryJarLinks += localizedPath
}
} else {
if (localizedPath != null) {
throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
" to the distributed cache.")
}
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
Expand Up @@ -282,6 +282,48 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
}

test("distribute archive multiple times") {
val libs = Utils.createTempDir()
// Create jars dir and RELEASE file to avoid IllegalStateException.
val jarsDir = new File(libs, "jars")
assert(jarsDir.mkdir())
new FileOutputStream(new File(libs, "RELEASE")).close()

val userLib1 = Utils.createTempDir()
val testJar = TestUtils.createJarWithFiles(Map(), userLib1)

// Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath))
.set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath))

val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
intercept[IllegalArgumentException] {
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
}

// Case 2: FILES_TO_DISTRIBUTE can't have duplicate files.
val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))

val clientFiles = createClient(sparkConfFiles)
val tempDirForFiles = Utils.createTempDir()
intercept[IllegalArgumentException] {
clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil)
}

// Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files.
val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))

val clientArchives = createClient(sparkConfArchives)
val tempDirForArchives = Utils.createTempDir()
intercept[IllegalArgumentException] {
clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil)
}
}

test("distribute local spark jars") {
val temp = Utils.createTempDir()
val jarsDir = new File(temp, "jars")
Expand Down

0 comments on commit 098e4ca

Please sign in to comment.