From f30dcffe6770ae00fdf3f771857a0a052bbb9008 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Fri, 29 Sep 2017 18:21:49 -0700 Subject: [PATCH 1/2] [SPARK-22172][CORE] Worker hangs when the external shuffle service port is already in use --- .../scala/org/apache/spark/deploy/worker/Worker.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ed5fa4b839cd4..c9ad214e5ccf7 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -199,7 +199,7 @@ private[deploy] class Worker( logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome) createWorkDir() - shuffleService.startIfEnabled() + startExternalShuffleService webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() @@ -367,6 +367,14 @@ private[deploy] class Worker( } } + private def startExternalShuffleService() { + try { + shuffleService.startIfEnabled + } catch { + case NonFatal(e) => logError("Failed to start external shuffle service", e) + } + } + private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = { masterEndpoint.send(RegisterWorker( workerId, From 335e676ddb000978389ac0700a25f8e743d437e9 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Tue, 31 Oct 2017 14:23:18 -0700 Subject: [PATCH 2/2] Throwing the exception and exiting the process --- .../scala/org/apache/spark/deploy/worker/Worker.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index c9ad214e5ccf7..3962d422f81d3 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -199,7 +199,7 @@ private[deploy] class Worker( logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome) createWorkDir() - startExternalShuffleService + startExternalShuffleService() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() @@ -369,9 +369,11 @@ private[deploy] class Worker( private def startExternalShuffleService() { try { - shuffleService.startIfEnabled + shuffleService.startIfEnabled() } catch { - case NonFatal(e) => logError("Failed to start external shuffle service", e) + case e: Exception => + logError("Failed to start external shuffle service", e) + System.exit(1) } }