From aecb305577a7d16065738afcf4bbeee6397b4f53 Mon Sep 17 00:00:00 2001 From: Marcin Tustin Date: Sat, 16 Apr 2016 19:41:22 -0400 Subject: [PATCH 1/3] [SPARK-14685] Document inheritability of localProperties --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +++++ .../org/apache/spark/api/java/JavaSparkContext.scala | 9 +++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e41088f7c8f69..f90c63360a9e4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -606,6 +606,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * scheduler pool. User-defined properties may also be set here. These properties are propagated * through to worker tasks and can be accessed there via * [[org.apache.spark.TaskContext#getLocalProperty]]. + * + * These properties are inherited by child threads spawned from this thread. This + * may have unexpected consequences when working with thread pools. The standard java + * implementation of thread pools have worker threads spawn other worker threads. + * As a result, local properties may propagate unpredictably. */ def setLocalProperty(key: String, value: String) { if (value == null) { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index dfd91ae338e89..fb6323413e3ea 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -712,8 +712,13 @@ class JavaSparkContext(val sc: SparkContext) } /** - * Set a local property that affects jobs submitted from this thread, such as the - * Spark fair scheduler pool. + * Set a local property that affects jobs submitted from this thread, and all child + * threads, such as the Spark fair scheduler pool. + * + * These properties are inherited by child threads spawned from this thread. This + * may have unexpected consequences when working with thread pools. The standard java + * implementation of thread pools have worker threads spawn other worker threads. + * As a result, local properties may propagate unpredictably. */ def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value) From 9964e2e937718194521c6705ba345deba11f1f3d Mon Sep 17 00:00:00 2001 From: Marcin Tustin Date: Sun, 17 Apr 2016 10:49:51 -0400 Subject: [PATCH 2/3] Add test for heritability of local properties --- .../scala/org/apache/spark/SparkContextSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 841fd02ae8bb6..41c8627fd4d62 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -319,4 +319,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { assert(sc.getConf.getInt("spark.executor.instances", 0) === 6) } } + + + test("localPropertyies are inherited by spawned threads.") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.setLocalProperty("testProperty", "testValue") + var result = "unset"; + val thread = new Thread(){ override def run() = {result = sc.getLocalProperty("testProperty")}} + thread.start() + thread.join() + sc.stop() + assert(result == "testValue") + } } From b96cde118c1265bf37ac7036581b8bb1bef80ee0 Mon Sep 17 00:00:00 2001 From: Marcin Tustin Date: Sun, 17 Apr 2016 11:09:38 -0400 Subject: [PATCH 3/3] SPARK-14685 add test to ensure no crosstalk between threads on localProperties Work with me in NYC: https://www.handy.com/careers/73115?gh_jid=73115&gh_src=o5qcxn --- .../org/apache/spark/SparkContextSuite.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 41c8627fd4d62..e6e30f77e4b09 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -321,14 +321,30 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } - test("localPropertyies are inherited by spawned threads.") { + test("localProperties are inherited by spawned threads.") { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) sc.setLocalProperty("testProperty", "testValue") var result = "unset"; - val thread = new Thread(){ override def run() = {result = sc.getLocalProperty("testProperty")}} + val thread = new Thread() { override def run() = {result = sc.getLocalProperty("testProperty")}} thread.start() thread.join() sc.stop() assert(result == "testValue") } + + test("localProperties do not cross-talk between threads.") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + var result = "unset"; + val thread1 = new Thread() { + override def run() = {sc.setLocalProperty("testProperty", "testValue")}} + // testProperty should be unset and thus return null + val thread2 = new Thread() { + override def run() = {result = sc.getLocalProperty("testProperty")}} + thread1.start() + thread1.join() + thread2.start() + thread2.join() + sc.stop() + assert(result == null) + } }