From 6b997bfcf44f8b73b3bcd453e805d16593e7c6ff Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Mon, 28 Jul 2014 09:41:11 +0800 Subject: [PATCH 01/13] Executors of same application in same host should only download files & jars once --- .../org/apache/spark/executor/Executor.scala | 9 ++++---- .../scala/org/apache/spark/util/Utils.scala | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index dd903dc65d204..da1121879c780 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,14 +322,13 @@ private[spark] class Executor( // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, - hadoopConf) - currentFiles(name) = timestamp + Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, + env.securityManager, hadoopConf, timestamp) } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, - hadoopConf) + Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, + env.securityManager, hadoopConf, timestamp) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0ae28f911e302..8c321300faa88 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -312,6 +312,28 @@ private[spark] object Utils extends Logging { uri.getQuery(), uri.getFragment()) } + /** + * Copy cached file to targetDir, if not exists, download it from url. + */ + def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, + timestamp: Long) { + val fileName = url.split("/").last + val cachedFileName = fileName + timestamp + val targetFile = new File(targetDir, fileName) + val lockFileName = fileName + timestamp + "_lock" + val localDir = new File(getLocalDir(conf)) + val lockFile = new File(localDir, lockFileName) + val raf = new RandomAccessFile(lockFile, "rw") + val lock = raf.getChannel().lock() // only one executor entry + val cachedFile = new File(localDir, cachedFileName) + if (!cachedFile.exists()) { + fetchFile(url, localDir, conf, securityMgr) + Files.move(new File(localDir, fileName), cachedFile) + } + Files.copy(cachedFile, targetFile) + lock.release() + } + /** * Download a file requested by the executor. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. From 7fb7c0b1c03b350a18770210f276dfd1fa76b9f5 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Mon, 28 Jul 2014 09:54:20 +0800 Subject: [PATCH 02/13] Release lock before copy files --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8c321300faa88..c64bfd909c264 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -330,8 +330,8 @@ private[spark] object Utils extends Logging { fetchFile(url, localDir, conf, securityMgr) Files.move(new File(localDir, fileName), cachedFile) } - Files.copy(cachedFile, targetFile) lock.release() + Files.copy(cachedFile, targetFile) } /** From e0ebd485636722749d196a79999f235665b97360 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Mon, 28 Jul 2014 14:07:20 +0800 Subject: [PATCH 03/13] Try and finally lock.release --- .../main/scala/org/apache/spark/util/Utils.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c64bfd909c264..67e5636689f57 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -313,7 +313,7 @@ private[spark] object Utils extends Logging { } /** - * Copy cached file to targetDir, if not exists, download it from url. + * Copy cached file to targetDir, if not exists, download it from url firstly. */ def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, timestamp: Long) { @@ -326,11 +326,14 @@ private[spark] object Utils extends Logging { val raf = new RandomAccessFile(lockFile, "rw") val lock = raf.getChannel().lock() // only one executor entry val cachedFile = new File(localDir, cachedFileName) - if (!cachedFile.exists()) { - fetchFile(url, localDir, conf, securityMgr) - Files.move(new File(localDir, fileName), cachedFile) + try { + if (!cachedFile.exists()) { + fetchFile(url, localDir, conf, securityMgr) + Files.move(new File(localDir, fileName), cachedFile) + } + } finally { + lock.release() } - lock.release() Files.copy(cachedFile, targetFile) } From 2ffd7424384fc7dee594b00528e8991622ffbb45 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Mon, 4 Aug 2014 17:29:06 +0800 Subject: [PATCH 04/13] add comment for FileLock --- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 67e5636689f57..5105bede24c35 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -324,7 +324,10 @@ private[spark] object Utils extends Logging { val localDir = new File(getLocalDir(conf)) val lockFile = new File(localDir, lockFileName) val raf = new RandomAccessFile(lockFile, "rw") - val lock = raf.getChannel().lock() // only one executor entry + // Only one executor entry. + // The FileLock is only used to control synchronization for executors download file, + // it's always safe regardless of lock type(mandatory or advisory). + val lock = raf.getChannel().lock() val cachedFile = new File(localDir, cachedFileName) try { if (!cachedFile.exists()) { From 3510eb056d302a15fb244c9b8eec04f47c568215 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Tue, 5 Aug 2014 10:21:18 +0800 Subject: [PATCH 05/13] Keep fetchFile private --- .../scala/org/apache/spark/SparkContext.scala | 7 +- .../org/apache/spark/executor/Executor.scala | 5 +- .../scala/org/apache/spark/util/Utils.scala | 64 ++++++++++--------- 3 files changed, 42 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6eaf6794764c7..255e5d0e98369 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -805,11 +805,12 @@ class SparkContext(config: SparkConf) extends Logging { case "local" => "file:" + uri.getPath case _ => path } - addedFiles(key) = System.currentTimeMillis + val timestamp = System.currentTimeMillis + addedFiles(key) = timestamp // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - hadoopConfiguration) + Utils.fetchCachedFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, + timestamp, hadoopConfiguration, false) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index da1121879c780..fac21582ffb1a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -323,12 +323,13 @@ private[spark] class Executor( for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp) + env.securityManager, hadoopConf, timestamp, true) + currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp) + env.securityManager, hadoopConf, timestamp, true) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5105bede24c35..55c23c46be722 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -314,30 +314,46 @@ private[spark] object Utils extends Logging { /** * Copy cached file to targetDir, if not exists, download it from url firstly. + * If useCache == false, download file to targetDir directly. */ def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, - timestamp: Long) { + timestamp: Long, useCache: Boolean) { val fileName = url.split("/").last - val cachedFileName = fileName + timestamp val targetFile = new File(targetDir, fileName) - val lockFileName = fileName + timestamp + "_lock" - val localDir = new File(getLocalDir(conf)) - val lockFile = new File(localDir, lockFileName) - val raf = new RandomAccessFile(lockFile, "rw") - // Only one executor entry. - // The FileLock is only used to control synchronization for executors download file, - // it's always safe regardless of lock type(mandatory or advisory). - val lock = raf.getChannel().lock() - val cachedFile = new File(localDir, cachedFileName) - try { - if (!cachedFile.exists()) { - fetchFile(url, localDir, conf, securityMgr) - Files.move(new File(localDir, fileName), cachedFile) + if (useCache) { + val cachedFileName = fileName + timestamp + val lockFileName = fileName + timestamp + "_lock" + val localDir = new File(getLocalDir(conf)) + val lockFile = new File(localDir, lockFileName) + val raf = new RandomAccessFile(lockFile, "rw") + // Only one executor entry. + // The FileLock is only used to control synchronization for executors download file, + // it's always safe regardless of lock type(mandatory or advisory). + val lock = raf.getChannel().lock() + val cachedFile = new File(localDir, cachedFileName) + try { + if (!cachedFile.exists()) { + fetchFile(url, localDir, conf, securityMgr) + Files.move(new File(localDir, fileName), cachedFile) + } + } finally { + lock.release() } - } finally { - lock.release() + Files.copy(cachedFile, targetFile) + } else { + fetchFile(url, targetDir, conf, securityMgr) + } + + // Decompress the file if it's a .tar or .tar.gz + if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { + logInfo("Untarring " + fileName) + Utils.execute(Seq("tar", "-xzf", fileName), targetDir) + } else if (fileName.endsWith(".tar")) { + logInfo("Untarring " + fileName) + Utils.execute(Seq("tar", "-xf", fileName), targetDir) } - Files.copy(cachedFile, targetFile) + // Make the file executable - That's necessary for scripts + FileUtil.chmod(targetFile.getAbsolutePath, "a+x") } /** @@ -347,7 +363,7 @@ private[spark] object Utils extends Logging { * Throws SparkException if the target file already exists and has different contents than * the requested file. */ - def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, + private def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration) { val filename = url.split("/").last val tempDir = getLocalDir(conf) @@ -437,16 +453,6 @@ private[spark] object Utils extends Logging { } Files.move(tempFile, targetFile) } - // Decompress the file if it's a .tar or .tar.gz - if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) { - logInfo("Untarring " + filename) - Utils.execute(Seq("tar", "-xzf", filename), targetDir) - } else if (filename.endsWith(".tar")) { - logInfo("Untarring " + filename) - Utils.execute(Seq("tar", "-xf", filename), targetDir) - } - // Make the file executable - That's necessary for scripts - FileUtil.chmod(targetFile.getAbsolutePath, "a+x") } /** From 76a7b66eed73594ad425213b8a230b837f7c4da4 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Thu, 4 Sep 2014 16:40:32 +0800 Subject: [PATCH 06/13] Clean code & use applcation work directory as cache directory --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 26 ++++++++++++++----- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 255e5d0e98369..ef4fe0c297d7b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -810,7 +810,7 @@ class SparkContext(config: SparkConf) extends Logging { // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). Utils.fetchCachedFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - timestamp, hadoopConfiguration, false) + hadoopConfiguration, timestamp, useCache = false) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 55c23c46be722..6bbde1953f714 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -313,11 +313,23 @@ private[spark] object Utils extends Logging { } /** - * Copy cached file to targetDir, if not exists, download it from url firstly. - * If useCache == false, download file to targetDir directly. + * Download a file requested by the executor . Supports fetching the file in a variety of ways, + * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. + * + * If `useCache` is true, first attempts to fetch the file from a local cache that's shared across + * executors running the same application. + * + * Throws SparkException if the target file already exists and has different contents than + * the requested file. */ - def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, - timestamp: Long, useCache: Boolean) { + def fetchCachedFile( + url: String, + targetDir: File, + conf: SparkConf, + securityMgr: SecurityManager, + hadoopConf: Configuration, + timestamp: Long, + useCache: Boolean) { val fileName = url.split("/").last val targetFile = new File(targetDir, fileName) if (useCache) { @@ -330,10 +342,10 @@ private[spark] object Utils extends Logging { // The FileLock is only used to control synchronization for executors download file, // it's always safe regardless of lock type(mandatory or advisory). val lock = raf.getChannel().lock() - val cachedFile = new File(localDir, cachedFileName) + val cachedFile = new File(SparkFiles.getRootDirectory + "../", cachedFileName) try { if (!cachedFile.exists()) { - fetchFile(url, localDir, conf, securityMgr) + fetchFile(url, localDir, conf, securityMgr, hadoopConf) Files.move(new File(localDir, fileName), cachedFile) } } finally { @@ -341,7 +353,7 @@ private[spark] object Utils extends Logging { } Files.copy(cachedFile, targetFile) } else { - fetchFile(url, targetDir, conf, securityMgr) + fetchFile(url, targetDir, conf, securityMgr, hadoopConf) } // Decompress the file if it's a .tar or .tar.gz From 27660556fbc87863d6a433e3913d7fd290d68b20 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Fri, 5 Sep 2014 16:38:57 +0800 Subject: [PATCH 07/13] Use url.hashCode + timestamp as cachedFileName --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 4 ++-- .../scala/org/apache/spark/util/Utils.scala | 20 +++++++++++-------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ef4fe0c297d7b..37aa208a5838f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -809,7 +809,7 @@ class SparkContext(config: SparkConf) extends Logging { addedFiles(key) = timestamp // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchCachedFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, + Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConfiguration, timestamp, useCache = false) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index fac21582ffb1a..4a900d46c0da9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,13 +322,13 @@ private[spark] class Executor( // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, hadoopConf, timestamp, true) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, hadoopConf, timestamp, true) currentJars(name) = timestamp // Add it to our class loader diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6bbde1953f714..3b1907ece83b6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -322,7 +322,7 @@ private[spark] object Utils extends Logging { * Throws SparkException if the target file already exists and has different contents than * the requested file. */ - def fetchCachedFile( + def fetchFile( url: String, targetDir: File, conf: SparkConf, @@ -333,8 +333,8 @@ private[spark] object Utils extends Logging { val fileName = url.split("/").last val targetFile = new File(targetDir, fileName) if (useCache) { - val cachedFileName = fileName + timestamp - val lockFileName = fileName + timestamp + "_lock" + val cachedFileName = url.hashCode + timestamp + "_cach" + val lockFileName = url.hashCode + timestamp + "_lock" val localDir = new File(getLocalDir(conf)) val lockFile = new File(localDir, lockFileName) val raf = new RandomAccessFile(lockFile, "rw") @@ -342,10 +342,10 @@ private[spark] object Utils extends Logging { // The FileLock is only used to control synchronization for executors download file, // it's always safe regardless of lock type(mandatory or advisory). val lock = raf.getChannel().lock() - val cachedFile = new File(SparkFiles.getRootDirectory + "../", cachedFileName) + val cachedFile = new File(localDir, cachedFileName) try { if (!cachedFile.exists()) { - fetchFile(url, localDir, conf, securityMgr, hadoopConf) + doFetchFile(url, localDir, conf, securityMgr, hadoopConf) Files.move(new File(localDir, fileName), cachedFile) } } finally { @@ -353,7 +353,7 @@ private[spark] object Utils extends Logging { } Files.copy(cachedFile, targetFile) } else { - fetchFile(url, targetDir, conf, securityMgr, hadoopConf) + doFetchFile(url, targetDir, conf, securityMgr, hadoopConf) } // Decompress the file if it's a .tar or .tar.gz @@ -375,8 +375,12 @@ private[spark] object Utils extends Logging { * Throws SparkException if the target file already exists and has different contents than * the requested file. */ - private def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, - hadoopConf: Configuration) { + private def doFetchFile( + url: String, + targetDir: File, + conf: SparkConf, + securityMgr: SecurityManager, + hadoopConf: Configuration) { val filename = url.split("/").last val tempDir = getLocalDir(conf) val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) From 03ed3a839a83fdd658ee62a389cad1805c221cc3 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Tue, 9 Sep 2014 10:07:45 +0800 Subject: [PATCH 08/13] rename cache file name as XXXXXXXXX_cache --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3b1907ece83b6..46b3e88ff78cc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -333,7 +333,7 @@ private[spark] object Utils extends Logging { val fileName = url.split("/").last val targetFile = new File(targetDir, fileName) if (useCache) { - val cachedFileName = url.hashCode + timestamp + "_cach" + val cachedFileName = url.hashCode + timestamp + "_cache" val lockFileName = url.hashCode + timestamp + "_lock" val localDir = new File(getLocalDir(conf)) val lockFile = new File(localDir, lockFileName) From 074a42282087c7e44e44a69a026809251943d63b Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Fri, 12 Sep 2014 10:46:37 +0800 Subject: [PATCH 09/13] Fix: deal with spark.files.overwrite --- .../scala/org/apache/spark/util/Utils.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 46b3e88ff78cc..610f423ce2b1c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -333,8 +333,8 @@ private[spark] object Utils extends Logging { val fileName = url.split("/").last val targetFile = new File(targetDir, fileName) if (useCache) { - val cachedFileName = url.hashCode + timestamp + "_cache" - val lockFileName = url.hashCode + timestamp + "_lock" + val cachedFileName = s"${url.hashCode}${timestamp}_cache" + val lockFileName = s"${url.hashCode}${timestamp}_lock" val localDir = new File(getLocalDir(conf)) val lockFile = new File(localDir, lockFileName) val raf = new RandomAccessFile(lockFile, "rw") @@ -345,15 +345,24 @@ private[spark] object Utils extends Logging { val cachedFile = new File(localDir, cachedFileName) try { if (!cachedFile.exists()) { - doFetchFile(url, localDir, conf, securityMgr, hadoopConf) - Files.move(new File(localDir, fileName), cachedFile) + doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) } } finally { lock.release() } + if (targetFile.exists && !Files.equal(cachedFile, targetFile)) { + if (conf.getBoolean("spark.files.overwrite", false)) { + targetFile.delete() + logInfo(("File %s exists and does not match contents of %s, " + + "replacing it with %s").format(targetFile, url, url)) + } else { + throw new SparkException( + "File " + targetFile + " exists and does not match contents of" + " " + url) + } + } Files.copy(cachedFile, targetFile) } else { - doFetchFile(url, targetDir, conf, securityMgr, hadoopConf) + doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) } // Decompress the file if it's a .tar or .tar.gz @@ -378,10 +387,10 @@ private[spark] object Utils extends Logging { private def doFetchFile( url: String, targetDir: File, + filename: String, conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration) { - val filename = url.split("/").last val tempDir = getLocalDir(conf) val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val targetFile = new File(targetDir, filename) From 7050d468a4019516e5c06735c1228010be94f2a2 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Mon, 15 Sep 2014 09:38:41 +0800 Subject: [PATCH 10/13] Clean code --- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 610f423ce2b1c..e09a647cbf318 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -356,8 +356,7 @@ private[spark] object Utils extends Logging { logInfo(("File %s exists and does not match contents of %s, " + "replacing it with %s").format(targetFile, url, url)) } else { - throw new SparkException( - "File " + targetFile + " exists and does not match contents of" + " " + url) + throw new SparkException(s"File $targetFile exists and does not match contents of $url") } } Files.copy(cachedFile, targetFile) From f9330d447dd749d530de96739f5e3598f973bd60 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Wed, 17 Sep 2014 11:00:10 +0800 Subject: [PATCH 11/13] Clean code again --- .../main/scala/org/apache/spark/executor/Executor.scala | 4 ++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4a900d46c0da9..36dd3c79394a9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -323,13 +323,13 @@ private[spark] class Executor( for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp, true) + env.securityManager, hadoopConf, timestamp, useCache = true) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp, true) + env.securityManager, hadoopConf, timestamp, useCache = true) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e09a647cbf318..c7c5f4bcf07bf 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -313,11 +313,12 @@ private[spark] object Utils extends Logging { } /** - * Download a file requested by the executor . Supports fetching the file in a variety of ways, + * Download a file to target directory. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. * - * If `useCache` is true, first attempts to fetch the file from a local cache that's shared across - * executors running the same application. + * If `useCache` is true, first attempts to fetch the file to a local cache that's shared + * across executors running the same application. `useCache` is used mainly for + * the the executors, not in local mode. * * Throws SparkException if the target file already exists and has different contents than * the requested file. @@ -377,7 +378,7 @@ private[spark] object Utils extends Logging { } /** - * Download a file requested by the executor. Supports fetching the file in a variety of ways, + * Download a file to target directory. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. * * Throws SparkException if the target file already exists and has different contents than From 935fed670bd951ca90072d2ebe24665485d164f3 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Wed, 8 Oct 2014 11:03:32 +0800 Subject: [PATCH 12/13] Clean code. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c7c5f4bcf07bf..1ed744b6b3919 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -318,7 +318,7 @@ private[spark] object Utils extends Logging { * * If `useCache` is true, first attempts to fetch the file to a local cache that's shared * across executors running the same application. `useCache` is used mainly for - * the the executors, not in local mode. + * the executors, and not in local mode. * * Throws SparkException if the target file already exists and has different contents than * the requested file. @@ -341,7 +341,7 @@ private[spark] object Utils extends Logging { val raf = new RandomAccessFile(lockFile, "rw") // Only one executor entry. // The FileLock is only used to control synchronization for executors download file, - // it's always safe regardless of lock type(mandatory or advisory). + // it's always safe regardless of lock type (mandatory or advisory). val lock = raf.getChannel().lock() val cachedFile = new File(localDir, cachedFileName) try { @@ -354,8 +354,8 @@ private[spark] object Utils extends Logging { if (targetFile.exists && !Files.equal(cachedFile, targetFile)) { if (conf.getBoolean("spark.files.overwrite", false)) { targetFile.delete() - logInfo(("File %s exists and does not match contents of %s, " + - "replacing it with %s").format(targetFile, url, url)) + logInfo((s"File $targetFile exists and does not match contents of $url, " + + s"replacing it with $url")) } else { throw new SparkException(s"File $targetFile exists and does not match contents of $url") } From 36940df56f2af4071bf903559be8ae64d82a1808 Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Fri, 24 Oct 2014 08:46:37 +0800 Subject: [PATCH 13/13] Close cache for local mode --- .../src/main/scala/org/apache/spark/executor/Executor.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 36dd3c79394a9..964417c6147d5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,14 +322,16 @@ private[spark] class Executor( // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) + // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp, useCache = true) + env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) + // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp, useCache = true) + env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last