From 80e9ad9e02fbfd24bbd6d97e03b1bdf01e4c922c Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Thu, 25 May 2017 01:42:43 +0800 Subject: [PATCH 01/14] Make rpc timeout and retry for shuffle registration configurable. --- .../network/shuffle/ExternalShuffleClient.java | 14 ++++++++++++-- .../org/apache/spark/storage/BlockManager.scala | 8 ++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 2c5827bf7dc56..7c555ce822653 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -48,6 +48,7 @@ public class ExternalShuffleClient extends ShuffleClient { private final TransportConf conf; private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; + private final int registrationTimeoutMilli; protected TransportClientFactory clientFactory; protected String appId; @@ -59,10 +60,19 @@ public class ExternalShuffleClient extends ShuffleClient { public ExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, - boolean authEnabled) { + boolean authEnabled, + int registrationTimeoutMilli) { this.conf = conf; this.secretKeyHolder = secretKeyHolder; this.authEnabled = authEnabled; + this.registrationTimeoutMilli = registrationTimeoutMilli; + } + + public ExternalShuffleClient( + TransportConf conf, + SecretKeyHolder secretKeyHolder, + boolean authEnabled) { + this(conf, secretKeyHolder, authEnabled, 5000); } protected void checkInit() { @@ -129,7 +139,7 @@ public void registerWithShuffleServer( checkInit(); try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) { ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer(); - client.sendRpcSync(registerMessage, 5000 /* timeoutMs */); + client.sendRpcSync(registerMessage, registrationTimeoutMilli); } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1689baa832d52..1a903ef01ca68 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -174,7 +174,7 @@ private[spark] class BlockManager( // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) - new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) + new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), registrationTimeout) } else { blockTransferService } @@ -182,6 +182,10 @@ private[spark] class BlockManager( // Max number of failures before this block manager refreshes the block locations from the driver private val maxFailuresBeforeLocationRefresh = conf.getInt("spark.block.failures.beforeLocationRefresh", 5) + private val registrationTimeout = + conf.getInt("spark.shuffle.registration.timeout", 5000) + private val registrationMaxAttempts = + conf.getInt("spark.shuffle.registration.maxAttempts", 3) private val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, @@ -254,7 +258,7 @@ private[spark] class BlockManager( diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName) - val MAX_ATTEMPTS = 3 + val MAX_ATTEMPTS = registrationMaxAttempts val SLEEP_TIME_SECS = 5 for (i <- 1 to MAX_ATTEMPTS) { From aa512614f303bd2de3144fe452d5bb62616e9756 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Thu, 25 May 2017 16:10:15 +0800 Subject: [PATCH 02/14] Use getTimeAsMs to parse time string. --- .../spark/network/shuffle/ExternalShuffleClient.java | 4 ++-- .../scala/org/apache/spark/storage/BlockManager.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 7c555ce822653..65fecbccbdc42 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -48,7 +48,7 @@ public class ExternalShuffleClient extends ShuffleClient { private final TransportConf conf; private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; - private final int registrationTimeoutMilli; + private final long registrationTimeoutMilli; protected TransportClientFactory clientFactory; protected String appId; @@ -61,7 +61,7 @@ public ExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled, - int registrationTimeoutMilli) { + long registrationTimeoutMilli) { this.conf = conf; this.secretKeyHolder = secretKeyHolder; this.authEnabled = authEnabled; diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1a903ef01ca68..daaa43cf84e1a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -170,6 +170,11 @@ private[spark] class BlockManager( // service, or just our own Executor's BlockManager. private[spark] var shuffleServerId: BlockManagerId = _ + private val registrationTimeout = + conf.getTimeAsMs("spark.shuffle.registration.timeout", "5s") + private val registrationMaxAttempts = + conf.getInt("spark.shuffle.registration.maxAttempts", 3) + // Client to read other executors' shuffle files. This is either an external service, or just the // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { @@ -182,10 +187,6 @@ private[spark] class BlockManager( // Max number of failures before this block manager refreshes the block locations from the driver private val maxFailuresBeforeLocationRefresh = conf.getInt("spark.block.failures.beforeLocationRefresh", 5) - private val registrationTimeout = - conf.getInt("spark.shuffle.registration.timeout", 5000) - private val registrationMaxAttempts = - conf.getInt("spark.shuffle.registration.maxAttempts", 3) private val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, From bb1801e278609ba378632c533665960cc80bd007 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Thu, 25 May 2017 16:23:25 +0800 Subject: [PATCH 03/14] Fix scala style. --- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index daaa43cf84e1a..199a93c63f154 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -179,7 +179,8 @@ private[spark] class BlockManager( // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) - new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), registrationTimeout) + new ExternalShuffleClient(transConf, securityManager, + securityManager.isAuthenticationEnabled(), registrationTimeout) } else { blockTransferService } From fb2b7061c1775e7e502228c3f551010cca6b001c Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Fri, 26 May 2017 17:58:46 +0800 Subject: [PATCH 04/14] Add unit test and move config to config.package. --- .../spark/internal/config/package.scala | 9 +++ .../apache/spark/storage/BlockManager.scala | 11 +--- .../spark/storage/BlockManagerSuite.scala | 63 +++++++++++++++++-- 3 files changed, 71 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e193ed222e228..8d5005fae0e32 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -287,4 +287,13 @@ package object config { .bytesConf(ByteUnit.BYTE) .createWithDefault(100 * 1024 * 1024) + private[spark] val SHUFFLE_REGISTRATION_TIMEOUT = + ConfigBuilder("spark.shuffle.registration.timeout") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(5000) + + private[spark] val SHUFFLE_REGISTRATION_MAX_ATTEMPTS = + ConfigBuilder("spark.shuffle.registration.maxAttempts") + .intConf + .createWithDefault(3) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 199a93c63f154..74be70348305c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -31,7 +31,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer @@ -170,17 +170,12 @@ private[spark] class BlockManager( // service, or just our own Executor's BlockManager. private[spark] var shuffleServerId: BlockManagerId = _ - private val registrationTimeout = - conf.getTimeAsMs("spark.shuffle.registration.timeout", "5s") - private val registrationMaxAttempts = - conf.getInt("spark.shuffle.registration.maxAttempts", 3) - // Client to read other executors' shuffle files. This is either an external service, or just the // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) new ExternalShuffleClient(transConf, securityManager, - securityManager.isAuthenticationEnabled(), registrationTimeout) + securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) } else { blockTransferService } @@ -260,7 +255,7 @@ private[spark] class BlockManager( diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName) - val MAX_ATTEMPTS = registrationMaxAttempts + val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS) val SLEEP_TIME_SECS = 5 for (i <- 1 to MAX_ATTEMPTS) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 1e7bcdb6740f6..f4fd5f5bfe554 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.storage import java.nio.ByteBuffer +import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.Future -import scala.language.implicitConversions -import scala.language.postfixOps +import scala.language.{implicitConversions, postfixOps} import scala.reflect.ClassTag import org.mockito.{Matchers => mc} @@ -37,10 +38,13 @@ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.config._ import org.apache.spark.memory.UnifiedMemoryManager -import org.apache.spark.network.{BlockDataManager, BlockTransferService} +import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} -import org.apache.spark.network.netty.NettyBlockTransferService +import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} +import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.BlockFetchingListener +import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} @@ -1280,6 +1284,57 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("item").isEmpty) } + test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { + val shufflePort = 10000 + val tryAgainMsg = "test_spark_20640_try_again" + conf.set("spark.shuffle.service.enabled", "true") + conf.set("spark.shuffle.service.port", shufflePort.toString) + // a server which delays response 50ms and must try twice for success. + def newShuffleServer(): TransportServer = { + val attempts = new mutable.HashMap[String, Int]() + val handler = new NoOpRpcHandler { + override def receive(client: TransportClient, message: ByteBuffer, + callback: RpcResponseCallback): Unit = { + val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) + msgObj match { + case exec: RegisterExecutor => + Thread.sleep(50) + val attempt = attempts.getOrElse(exec.execId, 0) + 1 + attempts(exec.execId) = attempt + if (attempt < 2) { + callback.onFailure(new Exception(tryAgainMsg)) + return + } + callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0))) + } + } + } + + val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) + val transCtx = new TransportContext(transConf, handler, true) + transCtx.createServer(shufflePort, Nil.asInstanceOf[Seq[TransportServerBootstrap]].asJava) + } + newShuffleServer() + + conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40") + conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") + var e = intercept[SparkException]{ + makeBlockManager(8000, "executor1") + }.getMessage + assert(e.contains("TimeoutException")) + + conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") + conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") + e = intercept[SparkException]{ + makeBlockManager(8000, "executor2") + }.getMessage + assert(e.contains(tryAgainMsg)) + + conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") + conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "2") + makeBlockManager(8000, "executor3") + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 From f0d1c083a04ca29c500333965b9c8622b04f7db4 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Sun, 11 Jun 2017 21:15:23 +0800 Subject: [PATCH 05/14] Avoid flaky test and remove old constructor. --- .../network/shuffle/ExternalShuffleClient.java | 7 ------- .../shuffle/mesos/MesosExternalShuffleClient.java | 2 +- .../shuffle/ExternalShuffleIntegrationSuite.java | 4 ++-- .../shuffle/ExternalShuffleSecuritySuite.java | 2 +- .../apache/spark/storage/BlockManagerSuite.scala | 15 +++++++++------ 5 files changed, 13 insertions(+), 17 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index a785cb67f09ee..ca9c71bf8ce5b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -69,13 +69,6 @@ public ExternalShuffleClient( this.registrationTimeoutMilli = registrationTimeoutMilli; } - public ExternalShuffleClient( - TransportConf conf, - SecretKeyHolder secretKeyHolder, - boolean authEnabled) { - this(conf, secretKeyHolder, authEnabled, 5000); - } - protected void checkInit() { assert appId != null : "Called before init()"; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index dbc1010847fb1..46ede394ed41e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -61,7 +61,7 @@ public MesosExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled) { - super(conf, secretKeyHolder, authEnabled); + super(conf, secretKeyHolder, authEnabled, 5000); } public void registerDriverWithShuffleService( diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index d1d8f5b4e188a..aec17983e2fd6 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -133,7 +133,7 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); - ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false); + ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); client.init(APP_ID); client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, new BlockFetchingListener() { @@ -242,7 +242,7 @@ public void testFetchNoServer() throws Exception { private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { - ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false); + ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, 5000); client.init(APP_ID); client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), executorId, executorInfo); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index bf20c577ed420..16bad9f1b319d 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -97,7 +97,7 @@ private void validate(String appId, String secretKey, boolean encrypt) } ExternalShuffleClient client = - new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true); + new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000); client.init(appId); // Registration either succeeds or throws an exception. client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f7d528e55f39e..67c1b445deb8d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -28,6 +28,7 @@ import scala.concurrent.Future import scala.language.{implicitConversions, postfixOps} import scala.reflect.ClassTag +import org.apache.commons.lang3.RandomUtils import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest._ @@ -1286,12 +1287,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { - val shufflePort = 10000 val tryAgainMsg = "test_spark_20640_try_again" - conf.set("spark.shuffle.service.enabled", "true") - conf.set("spark.shuffle.service.port", shufflePort.toString) // a server which delays response 50ms and must try twice for success. - def newShuffleServer(): TransportServer = { + def newShuffleServer(port: Int): (TransportServer, Int) = { val attempts = new mutable.HashMap[String, Int]() val handler = new NoOpRpcHandler { override def receive(client: TransportClient, message: ByteBuffer, @@ -1313,10 +1311,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) val transCtx = new TransportContext(transConf, handler, true) - transCtx.createServer(shufflePort, Nil.asInstanceOf[Seq[TransportServerBootstrap]].asJava) + (transCtx.createServer(port, Seq.empty[TransportServerBootstrap].asJava), port) } - newShuffleServer() + val candidatePort = RandomUtils.nextInt(1024, 65536) + val (server, shufflePort) = Utils.startServiceOnPort(candidatePort, + newShuffleServer, conf, "ShuffleServer") + conf.set("spark.shuffle.service.enabled", "true") + conf.set("spark.shuffle.service.port", shufflePort.toString) conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") var e = intercept[SparkException]{ @@ -1334,6 +1336,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "2") makeBlockManager(8000, "executor3") + server.close() } class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { From c06e871e46d174f8812e3b3ed2a61809de0ca794 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Fri, 16 Jun 2017 09:57:57 +0800 Subject: [PATCH 06/14] Make mesos shuffle client timeout configurable. --- .../network/shuffle/mesos/MesosExternalShuffleClient.java | 5 +++-- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 46ede394ed41e..c3ccd6b99e6f2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -60,8 +60,9 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient { public MesosExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, - boolean authEnabled) { - super(conf, secretKeyHolder, authEnabled, 5000); + boolean authEnabled, + long registrationTimeoutMilli) { + super(conf, secretKeyHolder, authEnabled, registrationTimeoutMilli); } public void registerDriverWithShuffleService( diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 871685c6cccc0..d55e4386e56ec 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -28,6 +28,7 @@ import scala.concurrent.Future import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver +import org.apache.spark.internal.config import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -150,7 +151,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( new MesosExternalShuffleClient( SparkTransportConf.fromSparkConf(conf, "shuffle"), securityManager, - securityManager.isAuthenticationEnabled()) + securityManager.isAuthenticationEnabled(), + conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) } private var nextMesosTaskId = 0 From d01134ef92401a5275c7388c8e6d65c82785acfa Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Fri, 16 Jun 2017 11:40:25 +0800 Subject: [PATCH 07/14] Fix scala style check for importing order. --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d55e4386e56ec..7dd42c41aa7c2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -28,8 +28,8 @@ import scala.concurrent.Future import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver -import org.apache.spark.internal.config import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointAddress From c8e7c64d7e599c3f6283f2390c1ea188e4ed899a Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Mon, 19 Jun 2017 12:42:05 +0800 Subject: [PATCH 08/14] Update style. --- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 40d7f489f4b0f..f9f51a19f5a01 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1292,8 +1292,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE def newShuffleServer(port: Int): (TransportServer, Int) = { val attempts = new mutable.HashMap[String, Int]() val handler = new NoOpRpcHandler { - override def receive(client: TransportClient, message: ByteBuffer, - callback: RpcResponseCallback): Unit = { + override def receive( + client: TransportClient, + message: ByteBuffer, + callback: RpcResponseCallback): Unit = { val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) msgObj match { case exec: RegisterExecutor => From d31d8da7952e1db527fa892087b2feb85799cae4 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Mon, 19 Jun 2017 13:05:56 +0800 Subject: [PATCH 09/14] Fix style. --- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f9f51a19f5a01..88f18294aa015 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1293,9 +1293,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val attempts = new mutable.HashMap[String, Int]() val handler = new NoOpRpcHandler { override def receive( - client: TransportClient, - message: ByteBuffer, - callback: RpcResponseCallback): Unit = { + client: TransportClient, + message: ByteBuffer, + callback: RpcResponseCallback): Unit = { val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) msgObj match { case exec: RegisterExecutor => From ca308bcc12243d1a3011997688883c58f0b2d801 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Mon, 19 Jun 2017 21:30:20 +0800 Subject: [PATCH 10/14] Update Milli to Millis. --- .../spark/network/shuffle/ExternalShuffleClient.java | 8 ++++---- .../network/shuffle/mesos/MesosExternalShuffleClient.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index ca9c71bf8ce5b..327ac4063548b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -49,7 +49,7 @@ public class ExternalShuffleClient extends ShuffleClient { private final TransportConf conf; private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; - private final long registrationTimeoutMilli; + private final long registrationTimeoutMillis; protected TransportClientFactory clientFactory; protected String appId; @@ -62,11 +62,11 @@ public ExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled, - long registrationTimeoutMilli) { + long registrationTimeoutMillis) { this.conf = conf; this.secretKeyHolder = secretKeyHolder; this.authEnabled = authEnabled; - this.registrationTimeoutMilli = registrationTimeoutMilli; + this.registrationTimeoutMillis = registrationTimeoutMillis; } protected void checkInit() { @@ -135,7 +135,7 @@ public void registerWithShuffleServer( checkInit(); try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) { ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer(); - client.sendRpcSync(registerMessage, registrationTimeoutMilli); + client.sendRpcSync(registerMessage, registrationTimeoutMillis); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index c3ccd6b99e6f2..10936bb48ef93 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -61,8 +61,8 @@ public MesosExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled, - long registrationTimeoutMilli) { - super(conf, secretKeyHolder, authEnabled, registrationTimeoutMilli); + long registrationTimeoutMillis) { + super(conf, secretKeyHolder, authEnabled, registrationTimeoutMillis); } public void registerDriverWithShuffleService( From 020f9e21f40f93a7a3b93f3e698ae4ecc0f94685 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Tue, 20 Jun 2017 18:01:55 +0800 Subject: [PATCH 11/14] Add some doc. --- .../org/apache/spark/internal/config/package.scala | 2 ++ docs/configuration.md | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index da01589d80c74..fb4d1fa600a11 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -305,11 +305,13 @@ package object config { private[spark] val SHUFFLE_REGISTRATION_TIMEOUT = ConfigBuilder("spark.shuffle.registration.timeout") + .doc("Timeout in milliseconds for registration to the external service.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(5000) private[spark] val SHUFFLE_REGISTRATION_MAX_ATTEMPTS = ConfigBuilder("spark.shuffle.registration.maxAttempts") + .doc("When we fail to register to the external service, we will retry for maxAttempts times.") .intConf .createWithDefault(3) diff --git a/docs/configuration.md b/docs/configuration.md index f777811a93f62..998af14190176 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -638,6 +638,20 @@ Apart from these, the following properties are also available, and may be useful underestimating shuffle block size when fetch shuffle blocks. + + spark.shuffle.registration.timeout + 5000 + + Timeout in milliseconds for registration to the external service. + + + + spark.shuffle.registration.maxAttempts + 3 + + When we fail to register to the external service, we will retry for maxAttempts times. + + spark.io.encryption.enabled false From 97f825e4e29b2f892f3c104848f9f9086e8b608f Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Wed, 21 Jun 2017 12:23:39 +0800 Subject: [PATCH 12/14] Update doc. --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 ++-- docs/configuration.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index fb4d1fa600a11..9db837e0b4d9d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -305,13 +305,13 @@ package object config { private[spark] val SHUFFLE_REGISTRATION_TIMEOUT = ConfigBuilder("spark.shuffle.registration.timeout") - .doc("Timeout in milliseconds for registration to the external service.") + .doc("Timeout in milliseconds for registration to the external shuffle service.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(5000) private[spark] val SHUFFLE_REGISTRATION_MAX_ATTEMPTS = ConfigBuilder("spark.shuffle.registration.maxAttempts") - .doc("When we fail to register to the external service, we will retry for maxAttempts times.") + .doc("When we fail to register to the external shuffle service, we will retry for maxAttempts times.") .intConf .createWithDefault(3) diff --git a/docs/configuration.md b/docs/configuration.md index 998af14190176..53eb13383c3ee 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -642,14 +642,14 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.registration.timeout 5000 - Timeout in milliseconds for registration to the external service. + Timeout in milliseconds for registration to the external shuffle service. spark.shuffle.registration.maxAttempts 3 - When we fail to register to the external service, we will retry for maxAttempts times. + When we fail to register to the external shuffle service, we will retry for maxAttempts times. From 59a9ebd41fbe3a657bfe8cc6348561f46c0aaa6d Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Wed, 21 Jun 2017 12:26:49 +0800 Subject: [PATCH 13/14] Update Millis to Ms. --- .../spark/network/shuffle/ExternalShuffleClient.java | 8 ++++---- .../network/shuffle/mesos/MesosExternalShuffleClient.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 327ac4063548b..6ac9302517ee0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -49,7 +49,7 @@ public class ExternalShuffleClient extends ShuffleClient { private final TransportConf conf; private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; - private final long registrationTimeoutMillis; + private final long registrationTimeoutMs; protected TransportClientFactory clientFactory; protected String appId; @@ -62,11 +62,11 @@ public ExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled, - long registrationTimeoutMillis) { + long registrationTimeoutMs) { this.conf = conf; this.secretKeyHolder = secretKeyHolder; this.authEnabled = authEnabled; - this.registrationTimeoutMillis = registrationTimeoutMillis; + this.registrationTimeoutMs = registrationTimeoutMs; } protected void checkInit() { @@ -135,7 +135,7 @@ public void registerWithShuffleServer( checkInit(); try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) { ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer(); - client.sendRpcSync(registerMessage, registrationTimeoutMillis); + client.sendRpcSync(registerMessage, registrationTimeoutMs); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 10936bb48ef93..60179f126bc44 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -61,8 +61,8 @@ public MesosExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled, - long registrationTimeoutMillis) { - super(conf, secretKeyHolder, authEnabled, registrationTimeoutMillis); + long registrationTimeoutMs) { + super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs); } public void registerDriverWithShuffleService( From 4e0169fec340543fbd3c9d680ecc35ece7fca5d9 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Wed, 21 Jun 2017 18:35:53 +0800 Subject: [PATCH 14/14] Fix style. --- .../main/scala/org/apache/spark/internal/config/package.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9db837e0b4d9d..615497d36fd14 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -311,7 +311,8 @@ package object config { private[spark] val SHUFFLE_REGISTRATION_MAX_ATTEMPTS = ConfigBuilder("spark.shuffle.registration.maxAttempts") - .doc("When we fail to register to the external shuffle service, we will retry for maxAttempts times.") + .doc("When we fail to register to the external shuffle service, we will " + + "retry for maxAttempts times.") .intConf .createWithDefault(3)