From 73fec720c6fa3f80216520a74b8ae426e43fd338 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 13 Mar 2018 22:15:56 +0800 Subject: [PATCH 1/2] [SPARK-23669] Executors fetch jars and name the jars with md5 prefix --- .../scala/org/apache/spark/executor/Executor.scala | 7 +++---- core/src/main/scala/org/apache/spark/util/Utils.scala | 10 ++++++++-- .../test/scala/org/apache/spark/rpc/RpcEnvSuite.scala | 5 +++++ 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index dcec3ec21b546..26e1df773ef15 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -752,11 +752,10 @@ private[spark] class Executor( if (currentTimeStamp < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) // Fetch file with useCache mode, close cache for local mode. - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, - env.securityManager, hadoopConf, timestamp, useCache = !isLocal) + val url = Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, + env.securityManager, hadoopConf, timestamp, useCache = !isLocal, + conf.getBoolean("spark.jars.withDecoratedName", false)).toURI.toURL currentJars(name) = timestamp - // Add it to our class loader - val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL if (!urlClassLoader.getURLs().contains(url)) { logInfo("Adding " + url + " to class loader") urlClassLoader.addURL(url) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5caedeb526469..dc83455ec4119 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -46,6 +46,7 @@ import _root_.io.netty.channel.unix.Errors.NativeIoException import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses +import org.apache.commons.codec.digest.DigestUtils import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} @@ -451,8 +452,13 @@ private[spark] object Utils extends Logging { securityMgr: SecurityManager, hadoopConf: Configuration, timestamp: Long, - useCache: Boolean): File = { - val fileName = decodeFileNameInURI(new URI(url)) + useCache: Boolean, + withMD5Prefix: Boolean = false): File = { + val fileName = if (withMD5Prefix) { + DigestUtils.md5Hex(url) + "-" + decodeFileNameInURI(new URI(url)) + } else { + decodeFileNameInURI(new URI(url)) + } val targetFile = new File(targetDir, fileName) val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true) if (useCache && fetchCacheEnabled) { diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index a799b1cfb0765..5ac66eb91dd5f 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -29,6 +29,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.Files +import org.apache.commons.codec.digest.DigestUtils import org.mockito.Matchers.any import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.BeforeAndAfterAll @@ -873,6 +874,10 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false) assert(Files.equal(f, destFile)) } + val jarWithMD5Prefix = Utils.fetchFile(jarUri, destDir, conf, sm, hc, 0L, false, true) + assert(jarWithMD5Prefix.getAbsolutePath == + new File(destDir, DigestUtils.md5Hex(jarUri) + "-jar").getAbsolutePath) + assert(Files.equal(jarWithMD5Prefix, jar)) // Try to download files that do not exist. Seq("files", "jars", "dir1").foreach { root => From 4473878d4bfc457a10522248798429465310adaa Mon Sep 17 00:00:00 2001 From: jinxing Date: Fri, 16 Mar 2018 15:37:52 +0800 Subject: [PATCH 2/2] add 'internal/config' --- .../main/scala/org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/internal/config/package.scala | 8 ++++++++ core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 26e1df773ef15..f51773885f581 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -754,7 +754,7 @@ private[spark] class Executor( // Fetch file with useCache mode, close cache for local mode. val url = Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal, - conf.getBoolean("spark.jars.withDecoratedName", false)).toURI.toURL + conf.get(SPARK_JARS_DECORATE_NAME)).toURI.toURL currentJars(name) = timestamp if (!urlClassLoader.getURLs().contains(url)) { logInfo("Adding " + url + " to class loader") diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a313ad0554a3a..da8ccc1151686 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -525,4 +525,12 @@ package object config { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("1g") + private[spark] val SPARK_JARS_DECORATE_NAME = + ConfigBuilder("spark.jars.decorateName") + .doc("When executor updates dependencies, it's possible it will fetch different jars but" + + " with the same file name(e.g. /pathA/udfs.jar and /pathB/udfs.jar). To avoid the" + + " conflict, user can enable this config and all jars fetched by executor will be" + + " renamed with a MD5 prefix.") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index dc83455ec4119..60ebf0dbdbf68 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -455,7 +455,7 @@ private[spark] object Utils extends Logging { useCache: Boolean, withMD5Prefix: Boolean = false): File = { val fileName = if (withMD5Prefix) { - DigestUtils.md5Hex(url) + "-" + decodeFileNameInURI(new URI(url)) + s"${DigestUtils.md5Hex(url)}-${decodeFileNameInURI(new URI(url))}" } else { decodeFileNameInURI(new URI(url)) }