Skip to content

Commit

Permalink
[SPARK-9284] [TESTS] Allow all tests to run without an assembly.
Browse files Browse the repository at this point in the history
This change aims at speeding up the dev cycle a little bit, by making
sure that all tests behave the same w.r.t. where the code to be tested
is loaded from. Namely, that means that tests don't rely on the assembly
anymore, rather loading all needed classes from the build directories.

The main change is to make sure all build directories (classes and test-classes)
are added to the classpath of child processes when running tests.

YarnClusterSuite required some custom code since the executors are run
differently (i.e. not through the launcher library, like standalone and
Mesos do).

I also found a couple of tests that could leak a SparkContext on failure,
and added code to handle those.

With this patch, it's possible to run the following command from a clean
source directory and have all tests pass:

  mvn -Pyarn -Phadoop-2.4 -Phive-thriftserver install

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #7629 from vanzin/SPARK-9284.
  • Loading branch information
Marcelo Vanzin committed Aug 28, 2015
1 parent d3f87dc commit c53c902
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 37 deletions.
16 changes: 9 additions & 7 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ else
fi

num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
if [ -d "$ASSEMBLY_DIR" ]; then
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi
fi

SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,14 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val _sc =
new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
_sc
try {
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
_sc
} catch {
case e: Throwable =>
_sc.stop()
throw e
}
} else {
new SparkContext("local", "test", broadcastConf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ List<String> buildClassPath(String appClassPath) throws IOException {
"streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
"yarn", "launcher");
if (prependClasses) {
System.err.println(
"NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
"assembly.");
if (!isTesting) {
System.err.println(
"NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
"assembly.");
}
for (String project : projects) {
addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
scala));
Expand Down Expand Up @@ -200,7 +202,7 @@ List<String> buildClassPath(String appClassPath) throws IOException {
// For the user code case, we fall back to looking for the Spark assembly under SPARK_HOME.
// That duplicates some of the code in the shell scripts that look for the assembly, though.
String assembly = getenv(ENV_SPARK_ASSEMBLY);
if (assembly == null && isEmpty(getenv("SPARK_TESTING"))) {
if (assembly == null && !isTesting) {
assembly = findAssembly();
}
addToClassPath(cp, assembly);
Expand All @@ -215,12 +217,14 @@ List<String> buildClassPath(String appClassPath) throws IOException {
libdir = new File(sparkHome, "lib_managed/jars");
}

checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
libdir.getAbsolutePath());
for (File jar : libdir.listFiles()) {
if (jar.getName().startsWith("datanucleus-")) {
addToClassPath(cp, jar.getAbsolutePath());
if (libdir.isDirectory()) {
for (File jar : libdir.listFiles()) {
if (jar.getName().startsWith("datanucleus-")) {
addToClassPath(cp, jar.getAbsolutePath());
}
}
} else {
checkState(isTesting, "Library directory '%s' does not exist.", libdir.getAbsolutePath());
}

addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
Expand Down Expand Up @@ -256,15 +260,15 @@ String getScalaVersion() {
return scala;
}
String sparkHome = getSparkHome();
File scala210 = new File(sparkHome, "assembly/target/scala-2.10");
File scala211 = new File(sparkHome, "assembly/target/scala-2.11");
File scala210 = new File(sparkHome, "launcher/target/scala-2.10");
File scala211 = new File(sparkHome, "launcher/target/scala-2.11");
checkState(!scala210.isDirectory() || !scala211.isDirectory(),
"Presence of build for both scala versions (2.10 and 2.11) detected.\n" +
"Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
if (scala210.isDirectory()) {
return "2.10";
} else {
checkState(scala211.isDirectory(), "Cannot find any assembly build directories.");
checkState(scala211.isDirectory(), "Cannot find any build directories.");
return "2.11";
}
}
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,10 @@
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -1892,6 +1896,8 @@
launched by the tests have access to the correct test-time classpath.
-->
<SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH>
<SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES>
<SPARK_TESTING>1</SPARK_TESTING>
<JAVA_HOME>${test.java.home}</JAVA_HOME>
</environmentVariables>
<systemProperties>
Expand Down Expand Up @@ -1929,6 +1935,8 @@
launched by the tests have access to the correct test-time classpath.
-->
<SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH>
<SPARK_PREPEND_CLASSES>1</SPARK_PREPEND_CLASSES>
<SPARK_TESTING>1</SPARK_TESTING>
<JAVA_HOME>${test.java.home}</JAVA_HOME>
</environmentVariables>
<systemProperties>
Expand Down
2 changes: 2 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ object TestSettings {
envVars in Test ++= Map(
"SPARK_DIST_CLASSPATH" ->
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
"SPARK_PREPEND_CLASSES" -> "1",
"SPARK_TESTING" -> "1",
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
javaOptions in Test += s"-Djava.io.tmpdir=$testTempDir",
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}

import org.apache.spark._
import org.apache.spark.launcher.TestClasspathBuilder
import org.apache.spark.util.Utils

abstract class BaseYarnClusterSuite
Expand All @@ -43,6 +44,9 @@ abstract class BaseYarnClusterSuite
|log4j.appender.console.target=System.err
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|log4j.logger.org.apache.hadoop=WARN
|log4j.logger.org.eclipse.jetty=WARN
|log4j.logger.org.spark-project.jetty=WARN
""".stripMargin

private var yarnCluster: MiniYARNCluster = _
Expand All @@ -51,8 +55,7 @@ abstract class BaseYarnClusterSuite
private var hadoopConfDir: File = _
private var logConfDir: File = _


def yarnConfig: YarnConfiguration
def newYarnConfig(): YarnConfiguration

override def beforeAll() {
super.beforeAll()
Expand All @@ -65,8 +68,14 @@ abstract class BaseYarnClusterSuite
val logConfFile = new File(logConfDir, "log4j.properties")
Files.write(LOG4J_CONF, logConfFile, UTF_8)

// Disable the disk utilization check to avoid the test hanging when people's disks are
// getting full.
val yarnConf = newYarnConfig()
yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
"100.0")

yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(yarnConfig)
yarnCluster.init(yarnConf)
yarnCluster.start()

// There's a race in MiniYARNCluster in which start() may return before the RM has updated
Expand Down Expand Up @@ -114,19 +123,23 @@ abstract class BaseYarnClusterSuite
sparkArgs: Seq[String] = Nil,
extraClassPath: Seq[String] = Nil,
extraJars: Seq[String] = Nil,
extraConf: Map[String, String] = Map()): Unit = {
extraConf: Map[String, String] = Map(),
extraEnv: Map[String, String] = Map()): Unit = {
val master = if (clientMode) "yarn-client" else "yarn-cluster"
val props = new Properties()

props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())

val childClasspath = logConfDir.getAbsolutePath() +
File.pathSeparator +
sys.props("java.class.path") +
File.pathSeparator +
extraClassPath.mkString(File.pathSeparator)
props.setProperty("spark.driver.extraClassPath", childClasspath)
props.setProperty("spark.executor.extraClassPath", childClasspath)
val testClasspath = new TestClasspathBuilder()
.buildClassPath(
logConfDir.getAbsolutePath() +
File.pathSeparator +
extraClassPath.mkString(File.pathSeparator))
.asScala
.mkString(File.pathSeparator)

props.setProperty("spark.driver.extraClassPath", testClasspath)
props.setProperty("spark.executor.extraClassPath", testClasspath)

// SPARK-4267: make sure java options are propagated correctly.
props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
Expand Down Expand Up @@ -168,7 +181,7 @@ abstract class BaseYarnClusterSuite
appArgs

Utils.executeAndGetOutput(argv,
extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers

import org.apache.spark._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, SparkListenerExecutorAdded}
import org.apache.spark.launcher.TestClasspathBuilder
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
SparkListenerExecutorAdded}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils

Expand All @@ -39,7 +41,7 @@ import org.apache.spark.util.Utils
*/
class YarnClusterSuite extends BaseYarnClusterSuite {

override def yarnConfig: YarnConfiguration = new YarnConfiguration()
override def newYarnConfig(): YarnConfiguration = new YarnConfiguration()

private val TEST_PYFILE = """
|import mod1, mod2
Expand Down Expand Up @@ -111,6 +113,17 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, UTF_8)

// When running tests, let's not assume the user has built the assembly module, which also
// creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the
// needed locations.
val sparkHome = sys.props("spark.test.home");
val pythonPath = Seq(
s"$sparkHome/python/lib/py4j-0.8.2.1-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator))

val moduleDir =
if (clientMode) {
// In client-mode, .py files added with --py-files are not visible in the driver.
Expand All @@ -130,7 +143,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {

runSpark(clientMode, primaryPyFile.getAbsolutePath(),
sparkArgs = Seq("--py-files", pyFiles),
appArgs = Seq(result.getAbsolutePath()))
appArgs = Seq(result.getAbsolutePath()),
extraEnv = extraEnv)
checkResult(result)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
*/
class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {

override def yarnConfig: YarnConfiguration = {
override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.launcher

import java.util.{List => JList, Map => JMap}

/**
* Exposes AbstractCommandBuilder to the YARN tests, so that they can build classpaths the same
* way other cluster managers do.
*/
private[spark] class TestClasspathBuilder extends AbstractCommandBuilder {

childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sys.props("spark.test.home"))

override def buildClassPath(extraCp: String): JList[String] = super.buildClassPath(extraCp)

/** Not used by the YARN tests. */
override def buildCommand(env: JMap[String, String]): JList[String] =
throw new UnsupportedOperationException()

}

0 comments on commit c53c902

Please sign in to comment.