Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,10 @@ private[spark] class Executor(
if (currentTimeStamp < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
val url = Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal,
conf.get(SPARK_JARS_DECORATE_NAME)).toURI.toURL
currentJars(name) = timestamp
// Add it to our class loader
val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL
if (!urlClassLoader.getURLs().contains(url)) {
logInfo("Adding " + url + " to class loader")
urlClassLoader.addURL(url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,4 +525,12 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1g")

private[spark] val SPARK_JARS_DECORATE_NAME =
ConfigBuilder("spark.jars.decorateName")
.doc("When executor updates dependencies, it's possible it will fetch different jars but" +
" with the same file name(e.g. /pathA/udfs.jar and /pathB/udfs.jar). To avoid the" +
" conflict, user can enable this config and all jars fetched by executor will be" +
" renamed with a MD5 prefix.")
.booleanConf
.createWithDefault(false)
}
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import _root_.io.netty.channel.unix.Errors.NativeIoException
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
Expand Down Expand Up @@ -451,8 +452,13 @@ private[spark] object Utils extends Logging {
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean): File = {
val fileName = decodeFileNameInURI(new URI(url))
useCache: Boolean,
withMD5Prefix: Boolean = false): File = {
val fileName = if (withMD5Prefix) {
s"${DigestUtils.md5Hex(url)}-${decodeFileNameInURI(new URI(url))}"
} else {
decodeFileNameInURI(new URI(url))
}
val targetFile = new File(targetDir, fileName)
val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
if (useCache && fetchCacheEnabled) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.Files
import org.apache.commons.codec.digest.DigestUtils
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -873,6 +874,10 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
assert(Files.equal(f, destFile))
}
val jarWithMD5Prefix = Utils.fetchFile(jarUri, destDir, conf, sm, hc, 0L, false, true)
assert(jarWithMD5Prefix.getAbsolutePath ==
new File(destDir, DigestUtils.md5Hex(jarUri) + "-jar").getAbsolutePath)
assert(Files.equal(jarWithMD5Prefix, jar))

// Try to download files that do not exist.
Seq("files", "jars", "dir1").foreach { root =>
Expand Down