From 6da1d348ee5a14cb4cb34d6c2ca8be824aab5264 Mon Sep 17 00:00:00 2001 From: Mark Petruska Date: Mon, 6 Nov 2017 13:43:42 +0100 Subject: [PATCH 1/3] fixes test case 'flakiness' --- .../spark/storage/BlockManagerSuite.scala | 58 +++++++++++++------ 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d45c194d31adc..0b376088aafda 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.storage import java.nio.ByteBuffer import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future import scala.concurrent.duration._ @@ -44,8 +43,9 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, TempFileManager} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} +import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} @@ -1322,9 +1322,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { val tryAgainMsg = "test_spark_20640_try_again" + val `timingoutExecutor` = "timingoutExecutor" + val `tryAgainExecutor` = "tryAgainExecutor" + val `succeedingExecutor` = "succeedingExecutor" + // a server which delays response 50ms and must try twice for success. def newShuffleServer(port: Int): (TransportServer, Int) = { - val attempts = new mutable.HashMap[String, Int]() + val failure = new Exception(tryAgainMsg) + val success = ByteBuffer.wrap(new Array[Byte](0)) + + var secondExecutorFailedOnce = false + var thirdExecutorFailedOnce = false + val handler = new NoOpRpcHandler { override def receive( client: TransportClient, @@ -1332,23 +1341,36 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE callback: RpcResponseCallback): Unit = { val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) msgObj match { - case exec: RegisterExecutor => - Thread.sleep(50) - val attempt = attempts.getOrElse(exec.execId, 0) + 1 - attempts(exec.execId) = attempt - if (attempt < 2) { - callback.onFailure(new Exception(tryAgainMsg)) - return - } - callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0))) + + case exec: RegisterExecutor if exec.execId == `timingoutExecutor` => + () // No reply to generate client-side timeout + + case exec: RegisterExecutor + if exec.execId == `tryAgainExecutor` && !secondExecutorFailedOnce => + secondExecutorFailedOnce = true + callback.onFailure(failure) + + case exec: RegisterExecutor if exec.execId == `tryAgainExecutor` => + callback.onSuccess(success) + + case exec: RegisterExecutor + if exec.execId == `succeedingExecutor` && !thirdExecutorFailedOnce => + thirdExecutorFailedOnce = true + callback.onFailure(failure) + + case exec: RegisterExecutor if exec.execId == `succeedingExecutor` => + callback.onSuccess(success) + } } } - val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) + val transConf: TransportConf = + SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) val transCtx = new TransportContext(transConf, handler, true) (transCtx.createServer(port, Seq.empty[TransportServerBootstrap].asJava), port) } + val candidatePort = RandomUtils.nextInt(1024, 65536) val (server, shufflePort) = Utils.startServiceOnPort(candidatePort, newShuffleServer, conf, "ShuffleServer") @@ -1357,21 +1379,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.shuffle.service.port", shufflePort.toString) conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") - var e = intercept[SparkException]{ - makeBlockManager(8000, "executor1") + var e = intercept[SparkException] { + makeBlockManager(8000, `timingoutExecutor`) }.getMessage assert(e.contains("TimeoutException")) conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") - e = intercept[SparkException]{ - makeBlockManager(8000, "executor2") + e = intercept[SparkException] { + makeBlockManager(8000, `tryAgainExecutor`) }.getMessage assert(e.contains(tryAgainMsg)) conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "2") - makeBlockManager(8000, "executor3") + makeBlockManager(8000, `succeedingExecutor`) server.close() } From 0532cc30e9ae2ab72b278da98cecf956f5a46013 Mon Sep 17 00:00:00 2001 From: Mark Petruska Date: Thu, 16 Nov 2017 11:20:41 +0100 Subject: [PATCH 2/3] removes unneeded backticks --- .../spark/storage/BlockManagerSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0b376088aafda..d8515f90c17e2 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1322,9 +1322,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { val tryAgainMsg = "test_spark_20640_try_again" - val `timingoutExecutor` = "timingoutExecutor" - val `tryAgainExecutor` = "tryAgainExecutor" - val `succeedingExecutor` = "succeedingExecutor" + val timingoutExecutor = "timingoutExecutor" + val tryAgainExecutor = "tryAgainExecutor" + val succeedingExecutor = "succeedingExecutor" // a server which delays response 50ms and must try twice for success. def newShuffleServer(port: Int): (TransportServer, Int) = { @@ -1342,23 +1342,23 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) msgObj match { - case exec: RegisterExecutor if exec.execId == `timingoutExecutor` => + case exec: RegisterExecutor if exec.execId == timingoutExecutor => () // No reply to generate client-side timeout case exec: RegisterExecutor - if exec.execId == `tryAgainExecutor` && !secondExecutorFailedOnce => + if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce => secondExecutorFailedOnce = true callback.onFailure(failure) - case exec: RegisterExecutor if exec.execId == `tryAgainExecutor` => + case exec: RegisterExecutor if exec.execId == tryAgainExecutor => callback.onSuccess(success) case exec: RegisterExecutor - if exec.execId == `succeedingExecutor` && !thirdExecutorFailedOnce => + if exec.execId == succeedingExecutor && !thirdExecutorFailedOnce => thirdExecutorFailedOnce = true callback.onFailure(failure) - case exec: RegisterExecutor if exec.execId == `succeedingExecutor` => + case exec: RegisterExecutor if exec.execId == succeedingExecutor => callback.onSuccess(success) } @@ -1380,20 +1380,20 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") var e = intercept[SparkException] { - makeBlockManager(8000, `timingoutExecutor`) + makeBlockManager(8000, timingoutExecutor) }.getMessage assert(e.contains("TimeoutException")) conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") e = intercept[SparkException] { - makeBlockManager(8000, `tryAgainExecutor`) + makeBlockManager(8000, tryAgainExecutor) }.getMessage assert(e.contains(tryAgainMsg)) conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "2") - makeBlockManager(8000, `succeedingExecutor`) + makeBlockManager(8000, succeedingExecutor) server.close() } From 5319ae31f6fbba0b5ffe92ff934e170966fbe470 Mon Sep 17 00:00:00 2001 From: Mark Petruska Date: Wed, 24 Jan 2018 09:18:13 +0100 Subject: [PATCH 3/3] removes unneeded type annotation --- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d8515f90c17e2..d1c5fe314ce75 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1365,8 +1365,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } - val transConf: TransportConf = - SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) + val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) val transCtx = new TransportContext(transConf, handler, true) (transCtx.createServer(port, Seq.empty[TransportServerBootstrap].asJava), port) }