From 1c9082500114ee2e3c97d15c72a4d9ddbb726eff Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 29 Nov 2015 23:54:10 +0800 Subject: [PATCH 1/3] init commit --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 50ae7ffeec4c5..254f1707707fb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -621,9 +621,13 @@ private[spark] class ApplicationMaster( } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - // In cluster mode, do not rely on the disassociated event to exit + // In cluster mode, shut down reporterThread in order to avoid to re-allocating containers + // and do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails - if (!isClusterMode) { + if (isClusterMode) { + reporterThread.interrupt() + reporterThread = null + } else { logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } From 38109039618d29c1e2f4530631a02549d018cd06 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 30 Nov 2015 20:37:22 +0800 Subject: [PATCH 2/3] stop reportThread when SparkContext has been stopped. --- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 254f1707707fb..d7ee1f9a632a7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -259,6 +259,8 @@ private[spark] class ApplicationMaster( private def sparkContextStopped(sc: SparkContext) = { sparkContextRef.compareAndSet(sc, null) + reporterThread.interrupt() + reporterThread = null } private def registerAM( @@ -621,13 +623,9 @@ private[spark] class ApplicationMaster( } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - // In cluster mode, shut down reporterThread in order to avoid to re-allocating containers - // and do not rely on the disassociated event to exit + // In cluster mode, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails - if (isClusterMode) { - reporterThread.interrupt() - reporterThread = null - } else { + if (!isClusterMode) { logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } From 0fdede23e1575d0dc3fae8253a6fb23239423b31 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 30 Nov 2015 21:06:28 +0800 Subject: [PATCH 3/3] fix minor error --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index d7ee1f9a632a7..7b6c5f6d64d0f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -258,9 +258,9 @@ private[spark] class ApplicationMaster( } private def sparkContextStopped(sc: SparkContext) = { - sparkContextRef.compareAndSet(sc, null) reporterThread.interrupt() reporterThread = null + sparkContextRef.compareAndSet(sc, null) } private def registerAM(