From 114ac05102c9d563c922447423ec8445bb37e9ef Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 12 Mar 2018 21:23:59 -0700 Subject: [PATCH 1/5] SPARK-23660: Fix exception in yarn cluster mode when application ended fast --- .../apache/spark/deploy/yarn/YarnAllocator.scala | 5 +++-- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index a537243d641cb..93d9baf47dfc9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -98,8 +98,9 @@ private[yarn] class YarnAllocator( * * @see SPARK-12864 */ - private var executorIdCounter: Int = + private lazy val initialExecutorIdCounter: Int = driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) + private var executorIdCounter: Int = 0 // Queue to store the timestamp of failed executors private val failedExecutorsTimeStamps = new Queue[Long]() @@ -496,7 +497,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = executorIdCounter.toString + val executorId = (initialExecutorIdCounter + executorIdCounter).toString assert(container.getResource.getMemory >= resource.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index cb1e3c5268510..f2bca0bda06e4 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.yarn +import java.util.concurrent.RejectedExecutionException + import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -33,6 +35,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.ManualClock class MockResolver extends SparkRackResolver { @@ -83,6 +86,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator( maxExecutors: Int = 5, + driverRef: RpcEndpointRef = mock(classOf[RpcEndpointRef]), rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = { val args = Array( "--jar", "somejar.jar", @@ -94,7 +98,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter .set("spark.executor.memory", "2048") new YarnAllocator( "not used", - mock(classOf[RpcEndpointRef]), + driverRef, conf, sparkConfClone, rmClient, @@ -284,7 +288,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter // Internally we track the set of blacklisted nodes, but yarn wants us to send *changes* // to the blacklist. This makes sure we are sending the right updates. val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val handler = createAllocator(4, mockAmClient) + val driverRef = mock(classOf[RpcEndpointRef]) + val handler = createAllocator(4, driverRef, mockAmClient) handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA")) verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava) @@ -350,4 +355,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock.advance(50 * 1000L) handler.getNumExecutorsFailed should be (0) } + + test("SPARK-23660: allocator should be created even if the driver not reachable") { + val driverRef = mock(classOf[RpcEndpointRef]) + when(driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)) + .thenThrow(new RejectedExecutionException) + createAllocator(4, driverRef) + } } From 442cfb424f1150beb8d70c2a073da6ab15f25226 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 13 Mar 2018 00:47:17 -0700 Subject: [PATCH 2/5] Pause user class thread until proper initialisation --- .../spark/deploy/yarn/ApplicationMaster.scala | 9 ++++++++- .../apache/spark/deploy/yarn/YarnAllocator.scala | 5 ++--- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 16 ++-------------- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2f88feb0f1fdf..51d976537d1ff 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -417,8 +417,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext) = synchronized { + // Notify runDriver function that SparkContext is available sparkContextPromise.success(sc) + // Pause the user class thread in order to make proper initialisation in runDriver function + wait() } private def registerAM( @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue + synchronized { notify() } userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => @@ -506,6 +511,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") + } finally { + synchronized { notify() } } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 93d9baf47dfc9..a537243d641cb 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -98,9 +98,8 @@ private[yarn] class YarnAllocator( * * @see SPARK-12864 */ - private lazy val initialExecutorIdCounter: Int = + private var executorIdCounter: Int = driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) - private var executorIdCounter: Int = 0 // Queue to store the timestamp of failed executors private val failedExecutorsTimeStamps = new Queue[Long]() @@ -497,7 +496,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = (initialExecutorIdCounter + executorIdCounter).toString + val executorId = executorIdCounter.toString assert(container.getResource.getMemory >= resource.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index f2bca0bda06e4..cb1e3c5268510 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy.yarn -import java.util.concurrent.RejectedExecutionException - import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -35,7 +33,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.ManualClock class MockResolver extends SparkRackResolver { @@ -86,7 +83,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator( maxExecutors: Int = 5, - driverRef: RpcEndpointRef = mock(classOf[RpcEndpointRef]), rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = { val args = Array( "--jar", "somejar.jar", @@ -98,7 +94,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter .set("spark.executor.memory", "2048") new YarnAllocator( "not used", - driverRef, + mock(classOf[RpcEndpointRef]), conf, sparkConfClone, rmClient, @@ -288,8 +284,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter // Internally we track the set of blacklisted nodes, but yarn wants us to send *changes* // to the blacklist. This makes sure we are sending the right updates. val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val driverRef = mock(classOf[RpcEndpointRef]) - val handler = createAllocator(4, driverRef, mockAmClient) + val handler = createAllocator(4, mockAmClient) handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA")) verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava) @@ -355,11 +350,4 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock.advance(50 * 1000L) handler.getNumExecutorsFailed should be (0) } - - test("SPARK-23660: allocator should be created even if the driver not reachable") { - val driverRef = mock(classOf[RpcEndpointRef]) - when(driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)) - .thenThrow(new RejectedExecutionException) - createAllocator(4, driverRef) - } } From 5b304d158f6a53483cef2b6c00b2546004f976c4 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 15 Mar 2018 19:11:57 -0700 Subject: [PATCH 3/5] Used sparkContextPromise as lock --- .../spark/deploy/yarn/ApplicationMaster.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 51d976537d1ff..25dbfd04c47b8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -417,11 +417,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private def sparkContextInitialized(sc: SparkContext) = synchronized { + private def sparkContextInitialized(sc: SparkContext) = { // Notify runDriver function that SparkContext is available sparkContextPromise.success(sc) - // Pause the user class thread in order to make proper initialisation in runDriver function - wait() + // Pause the user class thread in order to make proper initialization in runDriver function. + // When it happened the thread has to be resumed with resumeDriver function. + sparkContextPromise.synchronized { + sparkContextPromise.wait() + } + } + + private def resumeDriver(): Unit = { + sparkContextPromise.synchronized { + sparkContextPromise.notify() + } } private def registerAM( @@ -501,7 +510,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends throw new IllegalStateException("User did not initialize spark context!") } // After initialisation notify user class thread to continue - synchronized { notify() } + resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => @@ -512,7 +521,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { - synchronized { notify() } + resumeDriver() } } From 284ed686398673337a5208510c61a6051d3e719f Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 16 Mar 2018 10:07:56 -0700 Subject: [PATCH 4/5] Race fix + comment move --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 25dbfd04c47b8..e7ec90b798458 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -418,16 +418,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } private def sparkContextInitialized(sc: SparkContext) = { - // Notify runDriver function that SparkContext is available - sparkContextPromise.success(sc) - // Pause the user class thread in order to make proper initialization in runDriver function. - // When it happened the thread has to be resumed with resumeDriver function. sparkContextPromise.synchronized { + // Notify runDriver function that SparkContext is available + sparkContextPromise.success(sc) + // Pause the user class thread in order to make proper initialization in runDriver function. sparkContextPromise.wait() } } private def resumeDriver(): Unit = { + // When initialization in runDriver happened the user class thread has to be resumed. sparkContextPromise.synchronized { sparkContextPromise.notify() } From 33ca59d912a8eb5d787c4b8eb93aac669ea6b9cb Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 16 Mar 2018 12:45:43 -0700 Subject: [PATCH 5/5] Remove unnecessary comment --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e7ec90b798458..6e35d23def6f0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -509,7 +509,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } - // After initialisation notify user class thread to continue resumeDriver() userClassThread.join() } catch {