From 3872cc68329862242058264bd4de2e9f2618d511 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 23 Oct 2015 10:34:50 -0700 Subject: [PATCH 1/7] [SPARK-10827] Changed AppClient receiveAndReply to non-blocking using a thread pool --- .../spark/deploy/client/AppClient.scala | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 25ea6925434ab..6854fc87cef04 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -77,6 +77,16 @@ private[spark] class AppClient( private val registrationRetryThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") + private val receiveAndReplyMaxPoolSize = conf.getInt( + "spark.appclient.receiveAndReply.maxThreads", 3) + + private val receiveAndReplyThreadPool = new ThreadPoolExecutor( + 1, + receiveAndReplyMaxPoolSize, + 60L, TimeUnit.SECONDS, + new SynchronousQueue[Runnable](), + ThreadUtils.namedThreadFactory("appclient-receive-and-reply-threadpool")) + override def onStart(): Unit = { try { registerWithMaster(1) @@ -200,7 +210,18 @@ private[spark] class AppClient( case r: RequestExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](r)) + case Some(m) => + receiveAndReplyThreadPool.execute(new Runnable { + override def run(): Unit = { + try { + context.reply(m.askWithRetry[Boolean](r)) + } catch { + case ie: InterruptedException => // Cancelled + case NonFatal(t) => + context.sendFailure(t) + } + } + }) case None => logWarning("Attempted to request executors before registering with Master.") context.reply(false) @@ -208,7 +229,18 @@ private[spark] class AppClient( case k: KillExecutors => master match { - case Some(m) => context.reply(m.askWithRetry[Boolean](k)) + case Some(m) => + receiveAndReplyThreadPool.execute(new Runnable { + override def run(): Unit = { + try { + context.reply(m.askWithRetry[Boolean](k)) + } catch { + case ie: InterruptedException => // Cancelled + case NonFatal(t) => + context.sendFailure(t) + } + } + }) case None => logWarning("Attempted to kill executors before registering with Master.") context.reply(false) @@ -252,6 +284,7 @@ private[spark] class AppClient( registrationRetryThread.shutdownNow() registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() + receiveAndReplyThreadPool.shutdownNow() } } From 80794e826b48e979e8079d5482589c394ca1442d Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 27 Oct 2015 13:56:11 -0700 Subject: [PATCH 2/7] [SPARK-10827] Added unit tests for AppClient --- .../spark/deploy/client/AppClientSuite.scala | 205 ++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala new file mode 100644 index 0000000000000..ea36e1de7eda4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.client + +import org.apache.spark._ +import org.apache.spark.deploy.{Command, ApplicationDescription} +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.master.{ApplicationInfo, Master} +import org.apache.spark.deploy.worker.Worker +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.util.Utils +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually._ + +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.concurrent.duration._ + +/** + * End-to-end tests for application client in standalone mode. + */ +class AppClientSuite + extends SparkFunSuite + with LocalSparkContext + with BeforeAndAfterAll { + + private val numWorkers = 2 + private val conf = new SparkConf() + private val securityManager = new SecurityManager(conf) + + private var masterRpcEnv: RpcEnv = null + private var workerRpcEnvs: Seq[RpcEnv] = null + private var master: Master = null + private var workers: Seq[Worker] = null + + /** + * Start the local cluster. + * Note: local-cluster mode is insufficient because we want a reference to the Master. + */ + override def beforeAll(): Unit = { + super.beforeAll() + masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager) + workerRpcEnvs = (0 until numWorkers).map { i => + RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager) + } + master = makeMaster() + workers = makeWorkers(10, 2048) + // Wait until all workers register with master successfully + eventually(timeout(60.seconds), interval(10.millis)) { + assert(getMasterState.workers.size === numWorkers) + } + } + + override def afterAll(): Unit = { + workerRpcEnvs.foreach(_.shutdown()) + masterRpcEnv.shutdown() + workers.foreach(_.stop()) + master.stop() + workerRpcEnvs = null + masterRpcEnv = null + workers = null + master = null + super.afterAll() + } + + test("interface methods of AppClient using local Master") { + val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) + + ci.client.start() + + // Client should connect with one Master which registers the application + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") + assert(apps.size === 1, "master should have 1 registered app") + } + + // Send message to Master to request Executors, verify request by change in executor limit + assert( ci.client.requestTotalExecutors(1) ) + + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.head.getExecutorLimit === 1, s"executor total request failed") + } + + // Send request to kill executor, verify request was made + assert { + val apps = getApplications() + val executorId: String = apps.head.executors.head._2.fullId + ci.client.killExecutors(Seq(executorId)) + } + + // Issue stop command for Client to disconnect from Master + ci.client.stop() + + // Verify Client is marked dead and unregistered from Master + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(ci.listener.deadReasonList.size === 1, "client should have been marked dead") + assert(apps.isEmpty, "master should have 0 registered apps") + } + } + + test("request from AppClient before initialized with master") { + val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) + + // requests to master should fail immediately + assert(ci.client.requestTotalExecutors(3) === false) + } + + // =============================== + // | Utility methods for testing | + // =============================== + + /** Return a SparkConf for applications that want to talk to our Master. */ + private def appConf: SparkConf = { + new SparkConf() + .setMaster(masterRpcEnv.address.toSparkURL) + .setAppName("test") + .set("spark.executor.memory", "256m") + } + + /** Make a master to which our application will send executor requests. */ + private def makeMaster(): Master = { + val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf) + masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) + master + } + + /** Make a few workers that talk to our master. */ + private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = { + (0 until numWorkers).map { i => + val rpcEnv = workerRpcEnvs(i) + val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), + Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager) + rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker) + worker + } + } + + /** Get the Master state */ + private def getMasterState: MasterStateResponse = { + master.self.askWithRetry[MasterStateResponse](RequestMasterState) + } + + /** Get the applictions that are active from Master */ + private def getApplications(): Seq[ApplicationInfo] = { + getMasterState.activeApps + } + + /** Application Listener to collect events */ + private class AppClientCollector extends AppClientListener with Logging { + val connectedIdList = new ArrayBuffer[String] with SynchronizedBuffer[String] + @volatile var disconnectedCount: Int = 0 + val deadReasonList = new ArrayBuffer[String] with SynchronizedBuffer[String] + val execAddedList = new ArrayBuffer[String] with SynchronizedBuffer[String] + val execRemovedList = new ArrayBuffer[String] with SynchronizedBuffer[String] + + def connected(id: String): Unit = { + connectedIdList += id + } + + def disconnected(): Unit = { + disconnectedCount += 1 + } + + def dead(reason: String): Unit = { + deadReasonList += reason + } + + def executorAdded(id: String, workerId: String, hostPort: String, + cores: Int, memory: Int): Unit = { + execAddedList += id + } + + def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = { + execRemovedList += id + } + } + + /** Create AppClient and supporting objects */ + private class AppClientInst(masterUrl: String) { + val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, securityManager) + private val cmd = new Command(TestExecutor.getClass.getCanonicalName.stripSuffix("$"), + List(), Map(), Seq(), Seq(), Seq()) + private val desc = new ApplicationDescription("AppClientSuite", Some(1), 512, cmd, "ignored") + val listener = new AppClientCollector + val client = new AppClient(rpcEnv, Array(masterUrl), desc, listener, new SparkConf) + } + +} From 5e155cc2f43d98f365524ec6bac81bb6206a780e Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 27 Oct 2015 17:37:25 -0700 Subject: [PATCH 3/7] [SPARK-10827] Used variable to clarify what result the test case is checking --- .../org/apache/spark/deploy/client/AppClientSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index ea36e1de7eda4..c1914f4518ef7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -90,11 +90,12 @@ class AppClientSuite } // Send message to Master to request Executors, verify request by change in executor limit - assert( ci.client.requestTotalExecutors(1) ) + val numExecutorsRequested = 1 + assert( ci.client.requestTotalExecutors(numExecutorsRequested) ) eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.head.getExecutorLimit === 1, s"executor total request failed") + assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed") } // Send request to kill executor, verify request was made From 67bfd2a5eb1b9f42d68d1416bd64fbbf358f9764 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 30 Oct 2015 12:40:56 -0700 Subject: [PATCH 4/7] [SPARK-10827] Moved receive and reply execution to helper function to reduce code --- .../spark/deploy/client/AppClient.scala | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 6854fc87cef04..75b001bb071ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -210,18 +210,7 @@ private[spark] class AppClient( case r: RequestExecutors => master match { - case Some(m) => - receiveAndReplyThreadPool.execute(new Runnable { - override def run(): Unit = { - try { - context.reply(m.askWithRetry[Boolean](r)) - } catch { - case ie: InterruptedException => // Cancelled - case NonFatal(t) => - context.sendFailure(t) - } - } - }) + case Some(m) => receiveAndReplyAsync(m, context, r) case None => logWarning("Attempted to request executors before registering with Master.") context.reply(false) @@ -229,24 +218,28 @@ private[spark] class AppClient( case k: KillExecutors => master match { - case Some(m) => - receiveAndReplyThreadPool.execute(new Runnable { - override def run(): Unit = { - try { - context.reply(m.askWithRetry[Boolean](k)) - } catch { - case ie: InterruptedException => // Cancelled - case NonFatal(t) => - context.sendFailure(t) - } - } - }) + case Some(m) => receiveAndReplyAsync(m, context, k) case None => logWarning("Attempted to kill executors before registering with Master.") context.reply(false) } } + private def receiveAndReplyAsync[T](masterRef: RpcEndpointRef, context: RpcCallContext, + msg: T): Unit = { + receiveAndReplyThreadPool.execute(new Runnable { + override def run(): Unit = { + try { + context.reply(masterRef.askWithRetry[Boolean](msg)) + } catch { + case ie: InterruptedException => // Cancelled + case NonFatal(t) => + context.sendFailure(t) + } + } + }) + } + override def onDisconnected(address: RpcAddress): Unit = { if (master.exists(_.address == address)) { logWarning(s"Connection to $address failed; waiting for master to reconnect...") From 3ae1151a0aa50ec217c9471a3b46ab2259a912f2 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 3 Nov 2015 14:25:41 -0800 Subject: [PATCH 5/7] Fixed indentation, imports, and spacing --- .../spark/deploy/client/AppClient.scala | 6 ++-- .../spark/deploy/client/AppClientSuite.scala | 29 ++++++++++--------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 75b001bb071ee..1ae3bd1fae854 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -225,8 +225,10 @@ private[spark] class AppClient( } } - private def receiveAndReplyAsync[T](masterRef: RpcEndpointRef, context: RpcCallContext, - msg: T): Unit = { + private def receiveAndReplyAsync[T]( + masterRef: RpcEndpointRef, + context: RpcCallContext, + msg: T): Unit = { receiveAndReplyThreadPool.execute(new Runnable { override def run(): Unit = { try { diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index c1914f4518ef7..9da2854c7eb4c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -17,27 +17,24 @@ package org.apache.spark.deploy.client +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.concurrent.duration._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually._ + import org.apache.spark._ -import org.apache.spark.deploy.{Command, ApplicationDescription} +import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.Utils -import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually._ - -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import scala.concurrent.duration._ /** * End-to-end tests for application client in standalone mode. */ -class AppClientSuite - extends SparkFunSuite - with LocalSparkContext - with BeforeAndAfterAll { - +class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll { private val numWorkers = 2 private val conf = new SparkConf() private val securityManager = new SecurityManager(conf) @@ -91,7 +88,7 @@ class AppClientSuite // Send message to Master to request Executors, verify request by change in executor limit val numExecutorsRequested = 1 - assert( ci.client.requestTotalExecutors(numExecutorsRequested) ) + assert(ci.client.requestTotalExecutors(numExecutorsRequested)) eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() @@ -183,8 +180,12 @@ class AppClientSuite deadReasonList += reason } - def executorAdded(id: String, workerId: String, hostPort: String, - cores: Int, memory: Int): Unit = { + def executorAdded( + id: String, + workerId: String, + hostPort: String, + cores: Int, + memory: Int): Unit = { execAddedList += id } From cd5329cc43777596a0f75bb91d655ebf6de6f587 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 4 Nov 2015 11:19:55 -0800 Subject: [PATCH 6/7] Fixed threading issues, cleanup, added comments --- .../spark/deploy/client/AppClient.scala | 33 +++++++++---------- .../spark/deploy/client/AppClientSuite.scala | 4 ++- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 1ae3bd1fae854..91e7dc6ab0ed7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -49,8 +49,8 @@ private[spark] class AppClient( private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var endpoint: RpcEndpointRef = null - private var appId: String = null + @volatile private var endpoint: RpcEndpointRef = null + @volatile private var appId: String = null @volatile private var registered = false private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint @@ -77,15 +77,10 @@ private[spark] class AppClient( private val registrationRetryThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") - private val receiveAndReplyMaxPoolSize = conf.getInt( - "spark.appclient.receiveAndReply.maxThreads", 3) - - private val receiveAndReplyThreadPool = new ThreadPoolExecutor( - 1, - receiveAndReplyMaxPoolSize, - 60L, TimeUnit.SECONDS, - new SynchronousQueue[Runnable](), - ThreadUtils.namedThreadFactory("appclient-receive-and-reply-threadpool")) + // A thread pool to perform receive then reply actions in a thread so as not to block the + // event loop. + private val askAndReplyThreadPool = + ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool") override def onStart(): Unit = { try { @@ -210,7 +205,7 @@ private[spark] class AppClient( case r: RequestExecutors => master match { - case Some(m) => receiveAndReplyAsync(m, context, r) + case Some(m) => askAndReplyAsync(m, context, r) case None => logWarning("Attempted to request executors before registering with Master.") context.reply(false) @@ -218,21 +213,23 @@ private[spark] class AppClient( case k: KillExecutors => master match { - case Some(m) => receiveAndReplyAsync(m, context, k) + case Some(m) => askAndReplyAsync(m, context, k) case None => logWarning("Attempted to kill executors before registering with Master.") context.reply(false) } } - private def receiveAndReplyAsync[T]( - masterRef: RpcEndpointRef, + private def askAndReplyAsync[T]( + endpointRef: RpcEndpointRef, context: RpcCallContext, msg: T): Unit = { - receiveAndReplyThreadPool.execute(new Runnable { + // Create a thread to ask a message and reply with the result. Allow thread to be + // interrupted during shutdown, otherwise context must notified of NonFatal errors. + askAndReplyThreadPool.execute(new Runnable { override def run(): Unit = { try { - context.reply(masterRef.askWithRetry[Boolean](msg)) + context.reply(endpointRef.askWithRetry[Boolean](msg)) } catch { case ie: InterruptedException => // Cancelled case NonFatal(t) => @@ -279,7 +276,7 @@ private[spark] class AppClient( registrationRetryThread.shutdownNow() registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() - receiveAndReplyThreadPool.shutdownNow() + askAndReplyThreadPool.shutdownNow() } } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 9da2854c7eb4c..1e5c05a73f8aa 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -173,7 +173,9 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd } def disconnected(): Unit = { - disconnectedCount += 1 + synchronized { + disconnectedCount += 1 + } } def dead(reason: String): Unit = { From 4f46cfd093b7de6f811b0fb9891a02ebf5d14293 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 4 Nov 2015 13:54:08 -0800 Subject: [PATCH 7/7] Corrected grammar typo --- .../main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 91e7dc6ab0ed7..3f29da663b798 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -225,7 +225,7 @@ private[spark] class AppClient( context: RpcCallContext, msg: T): Unit = { // Create a thread to ask a message and reply with the result. Allow thread to be - // interrupted during shutdown, otherwise context must notified of NonFatal errors. + // interrupted during shutdown, otherwise context must be notified of NonFatal errors. askAndReplyThreadPool.execute(new Runnable { override def run(): Unit = { try {