From 5bee290e357d8f855bcf22393fd076a8301f1001 Mon Sep 17 00:00:00 2001 From: echo2mei <534384876@qq.com> Date: Thu, 17 Dec 2015 03:28:31 -0500 Subject: [PATCH 1/4] Once driver register successfully, stop it to connect master again. --- .../main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 25ea6925434ab..f42147050acab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -125,6 +125,7 @@ private[spark] class AppClient( if (registered) { registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() + registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { From 7959c1f75cd34e46ceda011ec11ce56e8e166fd1 Mon Sep 17 00:00:00 2001 From: echo2mei <534384876@qq.com> Date: Sun, 20 Dec 2015 20:57:25 -0500 Subject: [PATCH 2/4] [SPARK-12396][Core] Cancel the driver retry thread once it register successfull. --- .../main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index f42147050acab..992b3da03db80 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -125,7 +125,7 @@ private[spark] class AppClient( if (registered) { registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() - registrationRetryTimer.cancel(true) + registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { From 3a38c14c6f87e730c66701c7e7d58c05ea107a89 Mon Sep 17 00:00:00 2001 From: echo2mei <534384876@qq.com> Date: Mon, 21 Dec 2015 20:34:35 -0500 Subject: [PATCH 3/4] [SPARK-12396][Core] Cancel the driver retry thread once it register successfull. Delete the white space at the end of the line. --- .../main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 992b3da03db80..f42147050acab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -125,7 +125,7 @@ private[spark] class AppClient( if (registered) { registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() - registrationRetryTimer.cancel(true) + registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { From 93c005de65a0d889681883794ff7ed8dc280e8ca Mon Sep 17 00:00:00 2001 From: echo2mei <534384876@qq.com> Date: Wed, 23 Dec 2015 02:04:39 -0500 Subject: [PATCH 4/4] [SPARK-12396][Core] Modify the function scheduleAtFixedRate to schedule. It is no need to register to master iteratively. --- .../scala/org/apache/spark/deploy/client/AppClient.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index f42147050acab..6945d3f728875 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -119,13 +119,12 @@ private[spark] class AppClient( */ private def registerWithMaster(nthRetry: Int) { registerMasterFutures = tryRegisterAllMasters() - registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable { + registrationRetryTimer = registrationRetryThread.schedule(new Runnable { override def run(): Unit = { Utils.tryOrExit { if (registered) { registerMasterFutures.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() - registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { @@ -134,7 +133,7 @@ private[spark] class AppClient( } } } - }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS) + }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS) } /**