From 2c55fc202f3780a9090ed17a9f41bea4c887f048 Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Tue, 25 Oct 2016 18:19:46 +0000 Subject: [PATCH 1/5] Dist cache yarn during submit should throw error for adding same file under archives and files --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6e4f68c74c365..4bb51d307124d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -598,8 +598,12 @@ private[spark] class Client( ).foreach { case (flist, resType, addToClasspath) => flist.foreach { file => val (_, localizedPath) = distribute(file, resType = resType) - if (addToClasspath && localizedPath != null) { - cachedSecondaryJarLinks += localizedPath + if (addToClasspath) { + if (localizedPath != null) { + cachedSecondaryJarLinks += localizedPath + } + } else { + require(localizedPath !=null) } } } From a1dc8583f09d01ab611b4e2992fb7ec00138bcef Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Fri, 28 Oct 2016 19:25:19 +0000 Subject: [PATCH 2/5] Convert required to IllegalArgumentException --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4bb51d307124d..5eec60739f04d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -600,10 +600,14 @@ private[spark] class Client( val (_, localizedPath) = distribute(file, resType = resType) if (addToClasspath) { if (localizedPath != null) { - cachedSecondaryJarLinks += localizedPath + cachedSecondaryJarLinks += localizedPath } } else { - require(localizedPath !=null) + if (localizedPath != null) { + throw new IllegalArgumentException(s"Attempt to add ($file) multiple times. " + + "Please check the values of 'spark.yarn.dist.files' and/or " + + "'spark.yarn.dist.archives'.") + } } } } From 33f95abc8277dc0925a73712bb9d729a5c9c625b Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Fri, 28 Oct 2016 19:48:10 +0000 Subject: [PATCH 3/5] Reword exception message --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5eec60739f04d..92eae3fc67bfb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -604,9 +604,8 @@ private[spark] class Client( } } else { if (localizedPath != null) { - throw new IllegalArgumentException(s"Attempt to add ($file) multiple times. " + - "Please check the values of 'spark.yarn.dist.files' and/or " + - "'spark.yarn.dist.archives'.") + throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" + + " to the distributed cache.") } } } From f7974812a5cc76cf98bba1c70e739bbc770d7dde Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Mon, 31 Oct 2016 19:04:56 +0000 Subject: [PATCH 4/5] Adding comment and unit tests --- .../org/apache/spark/deploy/yarn/Client.scala | 1 + .../spark/deploy/yarn/ClientSuite.scala | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 92eae3fc67bfb..4adf9b75baf39 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -598,6 +598,7 @@ private[spark] class Client( ).foreach { case (flist, resType, addToClasspath) => flist.foreach { file => val (_, localizedPath) = distribute(file, resType = resType) + // If addToClassPath, we ignore adding jar multiple times to distitrbuted cache. if (addToClasspath) { if (localizedPath != null) { cachedSecondaryJarLinks += localizedPath diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 0a4f291e25fb0..4032c67995135 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -282,6 +282,37 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } } + test("distribute archive multiple times") { + val libs = Utils.createTempDir() + val jarsDir = new File(libs, "jars") + assert(jarsDir.mkdir()) + new FileOutputStream(new File(libs, "RELEASE")).close() + val userLib1 = Utils.createTempDir() + val userLib2 = Utils.createTempDir() + + val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir) + val jar2 = TestUtils.createJarWithFiles(Map(), userLib1) + // Copy jar2 to jar3 with same name + val jar3 = { + val target = new File(userLib2, new File(jar2.toURI).getName) + val input = new FileInputStream(jar2.getPath) + val output = new FileOutputStream(target) + Utils.copyStream(input, output, closeStreams = true) + target.toURI.toURL + } + + val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(FILES_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath)) + .set(ARCHIVES_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath)) + + + val client = createClient(sparkConf) + val tempDir = Utils.createTempDir() + intercept[IllegalArgumentException] { + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) + } + } + test("distribute local spark jars") { val temp = Utils.createTempDir() val jarsDir = new File(temp, "jars") From 51eefa50faeb26c6c1fd9d689b8450358291d569 Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Thu, 3 Nov 2016 15:22:22 -0500 Subject: [PATCH 5/5] Updating unit test cover all three scenarios. --- .../spark/deploy/yarn/ClientSuite.scala | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 4032c67995135..06516c1baf1cc 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -284,33 +284,44 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll test("distribute archive multiple times") { val libs = Utils.createTempDir() + // Create jars dir and RELEASE file to avoid IllegalStateException. val jarsDir = new File(libs, "jars") assert(jarsDir.mkdir()) new FileOutputStream(new File(libs, "RELEASE")).close() - val userLib1 = Utils.createTempDir() - val userLib2 = Utils.createTempDir() - val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir) - val jar2 = TestUtils.createJarWithFiles(Map(), userLib1) - // Copy jar2 to jar3 with same name - val jar3 = { - val target = new File(userLib2, new File(jar2.toURI).getName) - val input = new FileInputStream(jar2.getPath) - val output = new FileOutputStream(target) - Utils.copyStream(input, output, closeStreams = true) - target.toURI.toURL - } + val userLib1 = Utils.createTempDir() + val testJar = TestUtils.createJarWithFiles(Map(), userLib1) + // Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) - .set(FILES_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath)) - .set(ARCHIVES_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath)) - + .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath)) + .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath)) val client = createClient(sparkConf) val tempDir = Utils.createTempDir() intercept[IllegalArgumentException] { client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) } + + // Case 2: FILES_TO_DISTRIBUTE can't have duplicate files. + val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) + + val clientFiles = createClient(sparkConfFiles) + val tempDirForFiles = Utils.createTempDir() + intercept[IllegalArgumentException] { + clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil) + } + + // Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files. + val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) + + val clientArchives = createClient(sparkConfArchives) + val tempDirForArchives = Utils.createTempDir() + intercept[IllegalArgumentException] { + clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil) + } } test("distribute local spark jars") {