From d2555c245d17c61e725ff8c6915d2153a6697622 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Sun, 7 Jun 2020 15:20:34 +0800 Subject: [PATCH 1/8] fix --- .../scala/org/apache/spark/deploy/LocalSparkCluster.scala | 4 ++++ .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index fc849d7f4372f..2cc850f83bb23 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -74,6 +74,10 @@ class LocalSparkCluster( def stop(): Unit = { logInfo("Shutting down local Spark cluster.") + // SPARK-31922: wait one more second before shutting down rpcEnvs of master and worker, + // in order to let the cluster have time to handle the `UnregisterApplication` message. + // Otherwise, we could hit "RpcEnv already stopped" error. + Thread.sleep(1000) // Stop the workers before the master so they don't get upset that it disconnected workerRpcEnvs.foreach(_.shutdown()) masterRpcEnvs.foreach(_.shutdown()) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d2e65db970380..c19f00228db97 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -337,7 +337,9 @@ private[deploy] class Master( } schedule() case None => - logWarning(s"Got status update for unknown executor $appId/$execId") + if (completedApps.find(_.id == appId).map(_.executors.contains(execId)).isEmpty) { + logWarning(s"Got status update for unknown executor $appId/$execId") + } } case DriverStateChanged(driverId, state, exception) => From db435d2ebb10e2be820e60f91e043cfad76106fa Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 8 Jun 2020 11:51:57 +0800 Subject: [PATCH 2/8] fix --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e4f4000d3574d..c5f1aed2a97b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -560,7 +560,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - override def reviveOffers(): Unit = { + override def reviveOffers(): Unit = Utils.tryLogNonFatalError { driverEndpoint.send(ReviveOffers) } From 7a713ac994c95649950510783c155e904a160207 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 9 Jul 2020 22:28:02 +0800 Subject: [PATCH 3/8] swith the line --- .../main/scala/org/apache/spark/deploy/LocalSparkCluster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 2cc850f83bb23..a130a8fdbda1e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -80,8 +80,8 @@ class LocalSparkCluster( Thread.sleep(1000) // Stop the workers before the master so they don't get upset that it disconnected workerRpcEnvs.foreach(_.shutdown()) - masterRpcEnvs.foreach(_.shutdown()) workerRpcEnvs.foreach(_.awaitTermination()) + masterRpcEnvs.foreach(_.shutdown()) masterRpcEnvs.foreach(_.awaitTermination()) masterRpcEnvs.clear() workerRpcEnvs.clear() From 09bbb03f53716d42432e7f9bf8db502db6301ced Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 22:14:21 +0800 Subject: [PATCH 4/8] revert the change --- .../scala/org/apache/spark/deploy/LocalSparkCluster.scala | 6 +----- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 +--- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index a130a8fdbda1e..fc849d7f4372f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -74,14 +74,10 @@ class LocalSparkCluster( def stop(): Unit = { logInfo("Shutting down local Spark cluster.") - // SPARK-31922: wait one more second before shutting down rpcEnvs of master and worker, - // in order to let the cluster have time to handle the `UnregisterApplication` message. - // Otherwise, we could hit "RpcEnv already stopped" error. - Thread.sleep(1000) // Stop the workers before the master so they don't get upset that it disconnected workerRpcEnvs.foreach(_.shutdown()) - workerRpcEnvs.foreach(_.awaitTermination()) masterRpcEnvs.foreach(_.shutdown()) + workerRpcEnvs.foreach(_.awaitTermination()) masterRpcEnvs.foreach(_.awaitTermination()) masterRpcEnvs.clear() workerRpcEnvs.clear() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c19f00228db97..d2e65db970380 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -337,9 +337,7 @@ private[deploy] class Master( } schedule() case None => - if (completedApps.find(_.id == appId).map(_.executors.contains(execId)).isEmpty) { - logWarning(s"Got status update for unknown executor $appId/$execId") - } + logWarning(s"Got status update for unknown executor $appId/$execId") } case DriverStateChanged(driverId, state, exception) => From 092c442b040321a8212dd20b7150976188490d16 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 22:29:25 +0800 Subject: [PATCH 5/8] use debug --- .../main/scala/org/apache/spark/rpc/netty/Dispatcher.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 41d6d146a86d7..dbf4dcd70ab34 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -147,7 +147,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte /** Posts a one-way message. */ def postOneWayMessage(message: RequestMessage): Unit = { postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content), - (e) => throw e) + (e) => e match { + case re: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${re.getMessage}") + case _ => throw e + }) } /** From ef576c06eeaf584763e3c729bf058465f7631e69 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 22:35:14 +0800 Subject: [PATCH 6/8] exchange --- .../main/scala/org/apache/spark/deploy/LocalSparkCluster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index fc849d7f4372f..33851d9145d0a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -76,8 +76,8 @@ class LocalSparkCluster( logInfo("Shutting down local Spark cluster.") // Stop the workers before the master so they don't get upset that it disconnected workerRpcEnvs.foreach(_.shutdown()) - masterRpcEnvs.foreach(_.shutdown()) workerRpcEnvs.foreach(_.awaitTermination()) + masterRpcEnvs.foreach(_.shutdown()) masterRpcEnvs.foreach(_.awaitTermination()) masterRpcEnvs.clear() workerRpcEnvs.clear() From 4c8ceec09911ea3f62efc3d2add222636bffc2fd Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 10 Jul 2020 23:19:00 +0800 Subject: [PATCH 7/8] add comment --- .../main/scala/org/apache/spark/rpc/netty/Dispatcher.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index dbf4dcd70ab34..5f6cfc7e21210 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -148,6 +148,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte def postOneWayMessage(message: RequestMessage): Unit = { postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content), (e) => e match { + // SPARK-31922: in local cluster mode, there's always a RpcEnvStoppedException when + // stop is called due to some asynchronous message handling. We catch the exception + // and log it at debug level to avoid verbose error message when user stop a local + // cluster in spark shell. case re: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${re.getMessage}") case _ => throw e }) From 59a0595c8177ff801d0f7cbcc1fe6c458bcb844c Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 13 Jul 2020 10:05:36 +0800 Subject: [PATCH 8/8] no spacrs --- .../main/scala/org/apache/spark/rpc/netty/Dispatcher.scala | 4 ++-- .../main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 5f6cfc7e21210..4a9f551646fc7 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -122,7 +122,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte while (iter.hasNext) { val name = iter.next postMessage(name, message, (e) => { e match { - case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}") + case e: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${e.getMessage}") case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}") }} )} @@ -152,7 +152,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte // stop is called due to some asynchronous message handling. We catch the exception // and log it at debug level to avoid verbose error message when user stop a local // cluster in spark shell. - case re: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${re.getMessage}") + case re: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${re.getMessage}") case _ => throw e }) } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 3886cc5baa48e..fcb9fe422c0d4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -213,7 +213,7 @@ private[netty] class NettyRpcEnv( def onFailure(e: Throwable): Unit = { if (!promise.tryFailure(e)) { e match { - case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") + case e : RpcEnvStoppedException => logDebug(s"Ignored failure: $e") case _ => logWarning(s"Ignored failure: $e") } }