From 626e7bd16769ee6dc42d7d04df10981e719c530d Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 5 Aug 2018 18:01:07 -0500 Subject: [PATCH 1/2] Initial fixes for 2.12 test issues --- .../spark/scheduler/DAGSchedulerSuite.scala | 65 ++++++++++--------- .../spark/util/AccumulatorV2Suite.scala | 3 +- .../graphx/util/BytecodeUtilsSuite.scala | 7 ++ 3 files changed, 42 insertions(+), 33 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index dad339e2cdb91..ff1339a969778 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2369,39 +2369,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } - test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" + + test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages " + "still behave correctly on fetch failures") { - // Runs a job that always encounters a fetch failure, so should eventually be aborted - def runJobWithPersistentFetchFailure: Unit = { - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() - val shuffleHandle = - rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd1.map { - case (x, _) if (x == 1) => - throw new FetchFailedException( - BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - case (x, _) => x - }.count() - } - - // Runs a job that encounters a single fetch failure but succeeds on the second attempt - def runJobWithTemporaryFetchFailure: Unit = { - object FailThisAttempt { - val _fail = new AtomicBoolean(true) - } - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() - val shuffleHandle = - rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd1.map { - case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) => - throw new FetchFailedException( - BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - } - } failAfter(10.seconds) { val e = intercept[SparkException] { - runJobWithPersistentFetchFailure + runJobWithPersistentFetchFailure(sc) } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } @@ -2410,16 +2383,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // This job will hang without the fix for SPARK-17644. failAfter(10.seconds) { val e = intercept[SparkException] { - runJobWithPersistentFetchFailure + runJobWithPersistentFetchFailure(sc) } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } failAfter(10.seconds) { try { - runJobWithTemporaryFetchFailure + runJobWithTemporaryFetchFailure(sc) } catch { - case e: Throwable => fail("A job with one fetch failure should eventually succeed") + case e: Throwable => fail("A job with one fetch failure should eventually succeed", e) } } } @@ -2583,4 +2556,32 @@ object DAGSchedulerSuite { def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) + + // Runs a job that always encounters a fetch failure, so should eventually be aborted + def runJobWithPersistentFetchFailure(sc: SparkContext): Unit = { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd1.map { + case (x, _) if x == 1 => + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + case (x, _) => x + }.count() + } + + // Runs a job that encounters a single fetch failure but succeeds on the second attempt + def runJobWithTemporaryFetchFailure(sc: SparkContext): Unit = { + object FailThisAttempt { + val _fail = new AtomicBoolean(true) + } + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd1.map { + case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) => + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + } + } } diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index fe0a9a471a651..94c79388e3639 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -165,7 +165,6 @@ class AccumulatorV2Suite extends SparkFunSuite { } test("LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode") { - class MyData(val i: Int) extends Serializable val param = new AccumulatorParam[MyData] { override def zero(initialValue: MyData): MyData = new MyData(0) override def addInPlace(r1: MyData, r2: MyData): MyData = new MyData(r1.i + r2.i) @@ -182,3 +181,5 @@ class AccumulatorV2Suite extends SparkFunSuite { ser.serialize(acc) } } + +class MyData(val i: Int) extends Serializable diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala index 61e44dcab578c..5325978a0a1ec 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.graphx.util import org.apache.spark.SparkFunSuite +import org.apache.spark.util.ClosureCleanerSuite2 // scalastyle:off println @@ -26,6 +27,7 @@ class BytecodeUtilsSuite extends SparkFunSuite { import BytecodeUtilsSuite.TestClass test("closure invokes a method") { + assume(!ClosureCleanerSuite2.supportsLMFs) val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) @@ -43,6 +45,7 @@ class BytecodeUtilsSuite extends SparkFunSuite { } test("closure inside a closure invokes a method") { + assume(!ClosureCleanerSuite2.supportsLMFs) val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } val c2 = {e: TestClass => c1(e); println(e.foo); } assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo")) @@ -51,6 +54,7 @@ class BytecodeUtilsSuite extends SparkFunSuite { } test("closure inside a closure inside a closure invokes a method") { + assume(!ClosureCleanerSuite2.supportsLMFs) val c1 = {e: TestClass => println(e.baz); } val c2 = {e: TestClass => c1(e); println(e.foo); } val c3 = {e: TestClass => c2(e) } @@ -60,6 +64,7 @@ class BytecodeUtilsSuite extends SparkFunSuite { } test("closure calling a function that invokes a method") { + assume(!ClosureCleanerSuite2.supportsLMFs) def zoo(e: TestClass) { println(e.baz) } @@ -70,6 +75,7 @@ class BytecodeUtilsSuite extends SparkFunSuite { } test("closure calling a function that invokes a method which uses another closure") { + assume(!ClosureCleanerSuite2.supportsLMFs) val c2 = {e: TestClass => println(e.baz)} def zoo(e: TestClass) { c2(e) @@ -81,6 +87,7 @@ class BytecodeUtilsSuite extends SparkFunSuite { } test("nested closure") { + assume(!ClosureCleanerSuite2.supportsLMFs) val c2 = {e: TestClass => println(e.baz)} def zoo(e: TestClass, c: TestClass => Unit) { c(e) From 422c4ab259b5e27ef12c2d5093a4ae93f2b7f522 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 5 Aug 2018 21:39:05 -0500 Subject: [PATCH 2/2] More 2.12 fixes; revert DAGSchedulerSuite change in favor of a simpler one --- .../spark/scheduler/DAGSchedulerSuite.scala | 64 +++++++++---------- .../spark/ml/tree/impl/RandomForest.scala | 2 +- .../org/apache/spark/repl/ReplSuite.scala | 2 + 3 files changed, 35 insertions(+), 33 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ff1339a969778..8b2b6b6bede02 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2369,12 +2369,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } - test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages " + + test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" + "still behave correctly on fetch failures") { + // Runs a job that always encounters a fetch failure, so should eventually be aborted + def runJobWithPersistentFetchFailure: Unit = { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd1.map { + case (x, _) if (x == 1) => + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + case (x, _) => x + }.count() + } + + // Runs a job that encounters a single fetch failure but succeeds on the second attempt + def runJobWithTemporaryFetchFailure: Unit = { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd1.map { + case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) => + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + } + } failAfter(10.seconds) { val e = intercept[SparkException] { - runJobWithPersistentFetchFailure(sc) + runJobWithPersistentFetchFailure } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } @@ -2383,16 +2407,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // This job will hang without the fix for SPARK-17644. failAfter(10.seconds) { val e = intercept[SparkException] { - runJobWithPersistentFetchFailure(sc) + runJobWithPersistentFetchFailure } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } failAfter(10.seconds) { try { - runJobWithTemporaryFetchFailure(sc) + runJobWithTemporaryFetchFailure } catch { - case e: Throwable => fail("A job with one fetch failure should eventually succeed", e) + case e: Throwable => fail("A job with one fetch failure should eventually succeed") } } } @@ -2556,32 +2580,8 @@ object DAGSchedulerSuite { def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) +} - // Runs a job that always encounters a fetch failure, so should eventually be aborted - def runJobWithPersistentFetchFailure(sc: SparkContext): Unit = { - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() - val shuffleHandle = - rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd1.map { - case (x, _) if x == 1 => - throw new FetchFailedException( - BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - case (x, _) => x - }.count() - } - - // Runs a job that encounters a single fetch failure but succeeds on the second attempt - def runJobWithTemporaryFetchFailure(sc: SparkContext): Unit = { - object FailThisAttempt { - val _fail = new AtomicBoolean(true) - } - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() - val shuffleHandle = - rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd1.map { - case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) => - throw new FetchFailedException( - BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - } - } +object FailThisAttempt { + val _fail = new AtomicBoolean(true) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index bb3f3a015c715..918560a5988eb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -77,7 +77,7 @@ import org.apache.spark.util.random.{SamplingUtils, XORShiftRandom} * the heaviest part of the computation. In general, this implementation is bound by either * the cost of statistics computation on workers or by communicating the sufficient statistics. */ -private[spark] object RandomForest extends Logging { +private[spark] object RandomForest extends Logging with Serializable { /** * Train a random forest. diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index cdd5cdd841740..4f3df729177fb 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -21,6 +21,7 @@ import java.io._ import java.net.URLClassLoader import scala.collection.mutable.ArrayBuffer +import scala.tools.nsc.interpreter.SimpleReader import org.apache.log4j.{Level, LogManager} @@ -84,6 +85,7 @@ class ReplSuite extends SparkFunSuite { settings = new scala.tools.nsc.Settings settings.usejavacp.value = true org.apache.spark.repl.Main.interp = this + in = SimpleReader() } val out = new StringWriter()