Skip to content

Commit

Permalink
[SPARK-1010] Clean up uses of System.setProperty in unit tests
Browse files Browse the repository at this point in the history
Several of our tests call System.setProperty (or test code which implicitly sets system properties) and don't always reset/clear the modified properties, which can create ordering dependencies between tests and cause hard-to-diagnose failures.

This patch removes most uses of System.setProperty from our tests, since in most cases we can use SparkConf to set these configurations (there are a few exceptions, including the tests of SparkConf itself).

For the cases where we continue to use System.setProperty, this patch introduces a `ResetSystemProperties` ScalaTest mixin class which snapshots the system properties before individual tests and to automatically restores them on test completion / failure.  See the block comment at the top of the ResetSystemProperties class for more details.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3739 from JoshRosen/cleanup-system-properties-in-tests and squashes the following commits:

0236d66 [Josh Rosen] Replace setProperty uses in two example programs / tools
3888fe3 [Josh Rosen] Remove setProperty use in LocalJavaStreamingContext
4f4031d [Josh Rosen] Add note on why SparkSubmitSuite needs ResetSystemProperties
4742a5b [Josh Rosen] Clarify ResetSystemProperties trait inheritance ordering.
0eaf0b6 [Josh Rosen] Remove setProperty call in TaskResultGetterSuite.
7a3d224 [Josh Rosen] Fix trait ordering
3fdb554 [Josh Rosen] Remove setProperty call in TaskSchedulerImplSuite
bee20df [Josh Rosen] Remove setProperty calls in SparkContextSchedulerCreationSuite
655587c [Josh Rosen] Remove setProperty calls in JobCancellationSuite
3f2f955 [Josh Rosen] Remove System.setProperty calls in DistributedSuite
cfe9cce [Josh Rosen] Remove use of system properties in SparkContextSuite
8783ab0 [Josh Rosen] Remove TestUtils.setSystemProperty, since it is subsumed by the ResetSystemProperties trait.
633a84a [Josh Rosen] Remove use of system properties in FileServerSuite
25bfce2 [Josh Rosen] Use ResetSystemProperties in UtilsSuite
1d1aa5a [Josh Rosen] Use ResetSystemProperties in SizeEstimatorSuite
dd9492b [Josh Rosen] Use ResetSystemProperties in AkkaUtilsSuite
b0daff2 [Josh Rosen] Use ResetSystemProperties in BlockManagerSuite
e9ded62 [Josh Rosen] Use ResetSystemProperties in TaskSchedulerImplSuite
5b3cb54 [Josh Rosen] Use ResetSystemProperties in SparkListenerSuite
0995c4b [Josh Rosen] Use ResetSystemProperties in SparkContextSchedulerCreationSuite
c83ded8 [Josh Rosen] Use ResetSystemProperties in SparkConfSuite
51aa870 [Josh Rosen] Use withSystemProperty in ShuffleSuite
60a63a1 [Josh Rosen] Use ResetSystemProperties in JobCancellationSuite
14a92e4 [Josh Rosen] Use withSystemProperty in FileServerSuite
628f46c [Josh Rosen] Use ResetSystemProperties in DistributedSuite
9e3e0dd [Josh Rosen] Add ResetSystemProperties test fixture mixin; use it in SparkSubmitSuite.
4dcea38 [Josh Rosen] Move withSystemProperty to TestUtils class.
  • Loading branch information
JoshRosen committed Dec 31, 2014
1 parent 035bac8 commit 352ed6b
Show file tree
Hide file tree
Showing 23 changed files with 216 additions and 232 deletions.
21 changes: 5 additions & 16 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark

import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
Expand All @@ -29,16 +28,10 @@ class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}


class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
with LocalSparkContext {
class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {

val clusterUrl = "local-cluster[2,1,512]"

after {
System.clearProperty("spark.reducer.maxMbInFlight")
System.clearProperty("spark.storage.memoryFraction")
}

test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
Expand Down Expand Up @@ -84,15 +77,14 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}

test("groupByKey where map output sizes exceed maxMbInFlight") {
System.setProperty("spark.reducer.maxMbInFlight", "1")
sc = new SparkContext(clusterUrl, "test")
val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
sc = new SparkContext(clusterUrl, "test", conf)
// This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
// file should be about 2.5 MB
val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
assert(groups.length === 16)
assert(groups.map(_._2).sum === 2000)
// Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
}

test("accumulators") {
Expand Down Expand Up @@ -210,28 +202,25 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}

test("compute without caching when no partitions fit in memory") {
System.setProperty("spark.storage.memoryFraction", "0.0001")
sc = new SparkContext(clusterUrl, "test")
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("compute when only some partitions fit in memory") {
System.setProperty("spark.storage.memoryFraction", "0.01")
sc = new SparkContext(clusterUrl, "test")
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
sc = new SparkContext(clusterUrl, "test", conf)
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
// to make sure that *some* of them do fit though
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("passing environment variables to cluster") {
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
@transient var tmpJarUrl: String = _

def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")

override def beforeEach() {
super.beforeEach()
resetSparkContext()
System.setProperty("spark.authenticate", "false")
}

override def beforeAll() {
Expand All @@ -52,7 +53,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val jarFile = new File(testTempDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
System.setProperty("spark.authenticate", "false")

val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
Expand All @@ -74,7 +74,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test("Distributing files locally") {
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand Down Expand Up @@ -108,7 +108,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {

test("Distributing files locally using URL as input") {
// addFile("file:///....")
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(new File(tmpFile.toString).toURI.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand All @@ -122,7 +122,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
Expand All @@ -133,7 +133,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test("Distributing files on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand All @@ -147,7 +147,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
Expand All @@ -158,7 +158,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
Expand Down
21 changes: 10 additions & 11 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,42 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
override def afterEach() {
super.afterEach()
resetSparkContext()
System.clearProperty("spark.scheduler.mode")
}

test("local mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test")
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("local mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test")
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("cluster mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test")
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("cluster mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test")
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
Expand Down
22 changes: 9 additions & 13 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,15 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
conf.set("spark.test.noStageRetry", "true")

test("groupByKey without compression") {
try {
System.setProperty("spark.shuffle.compress", "false")
sc = new SparkContext("local", "test", conf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
} finally {
System.setProperty("spark.shuffle.compress", "true")
}
val myConf = conf.clone().set("spark.shuffle.compress", "false")
sc = new SparkContext("local", "test", myConf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}

test("shuffle non-zero block size") {
Expand Down
51 changes: 19 additions & 32 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,20 @@ package org.apache.spark

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext {
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
test("loading from system properties") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
}

test("initializing without loading defaults") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
}

test("named set methods") {
Expand Down Expand Up @@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {

test("nested property names") {
// This wasn't supported by some external conf parsing libraries
try {
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
} finally {
System.clearProperty("spark.test.a")
System.clearProperty("spark.test.a.b")
System.clearProperty("spark.test.a.b.c")
}
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}

test("register kryo classes through registerKryoClasses") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {

def createTaskScheduler(master: String): TaskSchedulerImpl = {
def createTaskScheduler(master: String): TaskSchedulerImpl =
createTaskScheduler(master, new SparkConf())

def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test", conf)
val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
Expand Down Expand Up @@ -102,19 +105,13 @@ class SparkContextSchedulerCreationSuite
}

test("local-default-parallelism") {
val defaultParallelism = System.getProperty("spark.default.parallelism")
System.setProperty("spark.default.parallelism", "16")
val sched = createTaskScheduler("local")
val conf = new SparkConf().set("spark.default.parallelism", "16")
val sched = createTaskScheduler("local", conf)

sched.backend match {
case s: LocalBackend => assert(s.defaultParallelism() === 16)
case _ => fail()
}

Option(defaultParallelism) match {
case Some(v) => System.setProperty("spark.default.parallelism", v)
case _ => System.clearProperty("spark.default.parallelism")
}
}

test("simr") {
Expand Down Expand Up @@ -155,9 +152,10 @@ class SparkContextSchedulerCreationSuite
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
}

def testMesos(master: String, expectedClass: Class[_]) {
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
try {
val sched = createTaskScheduler(master)
val sched = createTaskScheduler(master, conf)
assert(sched.backend.getClass === expectedClass)
} catch {
case e: UnsatisfiedLinkError =>
Expand All @@ -168,17 +166,14 @@ class SparkContextSchedulerCreationSuite
}

test("mesos fine-grained") {
System.setProperty("spark.mesos.coarse", "false")
testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
}

test("mesos coarse-grained") {
System.setProperty("spark.mesos.coarse", "true")
testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
}

test("mesos with zookeeper") {
System.setProperty("spark.mesos.coarse", "false")
testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend])
testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
}
}
Loading

0 comments on commit 352ed6b

Please sign in to comment.