Skip to content

Commit

Permalink
[SPARK-14423][YARN] Avoid same name files added to distributed cache …
Browse files Browse the repository at this point in the history
…again

## What changes were proposed in this pull request?

In the current implementation of assembly-free spark deployment, jars under `assembly/target/scala-xxx/jars` will be uploaded to distributed cache by default, there's a chance these jars' name will be conflicted with name of jars specified in `--jars`, this will introduce exception when starting application:

```
client token: N/A
	 diagnostics: Application application_1459907402325_0004 failed 2 times due to AM Container for appattempt_1459907402325_0004_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://hw12100.local:8088/proxy/application_1459907402325_0004/Then, click on links to logs of each attempt.
Diagnostics: Resource hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar changed on src filesystem (expected 1459909780508, was 1459909782590
java.io.IOException: Resource hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar changed on src filesystem (expected 1459909780508, was 1459909782590
	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

So here by checking the name of file to avoid same name files uploaded again.

## How was this patch tested?

Unit test and manual integrated test is done locally.

Author: jerryshao <sshao@hortonworks.com>

Closes #12203 from jerryshao/SPARK-14423.
  • Loading branch information
jerryshao authored and Marcelo Vanzin committed Apr 18, 2016
1 parent 1a39664 commit d6fb485
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
14 changes: 11 additions & 3 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Expand Up @@ -364,6 +364,10 @@ private[spark] class Client(
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
// Used to keep track of URIs(files) added to the distribute cache have the same name. If
// same name but different path files are added multiple time, YARN will fail to launch
// containers for the app with an internal error.
val distributedNames = new HashSet[String]
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)

Expand All @@ -376,11 +380,16 @@ private[spark] class Client(

def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
val fileName = new File(uri.getPath).getName
if (distributedUris.contains(uriStr)) {
logWarning(s"Resource $uri added multiple times to distributed cache.")
logWarning(s"Same path resource $uri added multiple times to distributed cache.")
false
} else if (distributedNames.contains(fileName)) {
logWarning(s"Same name resource $uri added multiple times to distributed cache")
false
} else {
distributedUris += uriStr
distributedNames += fileName
true
}
}
Expand Down Expand Up @@ -519,8 +528,7 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType)
require(localizedPath != null)
if (addToClasspath) {
if (addToClasspath && localizedPath != null) {
cachedSecondaryJarLinks += localizedPath
}
}
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.{File, FileOutputStream}
import java.io.{File, FileInputStream, FileOutputStream}
import java.net.URI
import java.util.Properties

Expand Down Expand Up @@ -285,6 +285,36 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}

test("ignore same name jars") {
val libs = Utils.createTempDir()
val jarsDir = new File(libs, "jars")
assert(jarsDir.mkdir())
new FileOutputStream(new File(libs, "RELEASE")).close()
val userLibs = Utils.createTempDir()

val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
// Copy jar2 to jar3 with same name
val jar3 = {
val target = new File(userLibs, new File(jar1.toURI).getName)
val input = new FileInputStream(jar2.getPath)
val output = new FileOutputStream(target)
Utils.copyStream(input, output, closeStreams = true)
target.toURI.toURL
}

val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath))

val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)

// Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
// ignored.
sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
}

object Fixtures {

val knownDefYarnAppCP: Seq[String] =
Expand Down

0 comments on commit d6fb485

Please sign in to comment.