From 2c07186a723385b5711a4392d572a5071e4ab584 Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Fri, 22 Jan 2016 14:22:03 -0800 Subject: [PATCH 1/4] Avoid Netty RPC error messages when sparkContext stops --- .../main/scala/org/apache/spark/SparkContext.scala | 7 +++++++ .../org/apache/spark/rpc/netty/Dispatcher.scala | 13 ++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d7c605a583c4f..3c514d16789bf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2182,6 +2182,13 @@ object SparkContext extends Logging { getOrCreate(new SparkConf()) } + /** + * Return SparkContext + */ + def get(): SparkContext = { + activeContext.get() + } + /** * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is * running. Throws an exception if a running context is detected and logs a warning if another diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 19259e0e800c3..b9f9fa5f09725 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -28,6 +28,7 @@ import org.apache.spark.{Logging, SparkException} import org.apache.spark.network.client.RpcResponseCallback import org.apache.spark.rpc._ import org.apache.spark.util.ThreadUtils +import org.apache.spark.SparkContext /** * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s). @@ -145,12 +146,18 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { callbackIfStopped: (Exception) => Unit): Unit = { val shouldCallOnStop = synchronized { val data = endpoints.get(endpointName) - if (stopped || data == null) { - true - } else { + val sc = SparkContext.get() + if (!stopped && data != null) { data.inbox.post(message) receivers.offer(data) false + } else if (sc != null && sc.isStopped) { + // in the middle of executing SparkContext.stop() + logWarning("Unable to post RPC message, SparkContext is shutting down") + false + } else { + // callback with error + true } } if (shouldCallOnStop) { From b61ac280e8ef3b52c2e26313e9f3abf549a54c49 Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Fri, 22 Jan 2016 23:51:09 -0800 Subject: [PATCH 2/4] Update SparkContext.scala --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3c514d16789bf..edefb4f4b5d2d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2185,7 +2185,7 @@ object SparkContext extends Logging { /** * Return SparkContext */ - def get(): SparkContext = { + private[spark] def get(): SparkContext = { activeContext.get() } From 3376bfbb508599c9f1489fa9a39266596e07f084 Mon Sep 17 00:00:00 2001 From: Nishkam Ravi Date: Sun, 24 Jan 2016 21:45:55 -0800 Subject: [PATCH 3/4] Update (remove stack trace from postToAll callback, add RpcEnvStoppedException) --- .../scala/org/apache/spark/SparkContext.scala | 7 ------- .../spark/rpc/RpcEnvStoppedException.scala | 20 +++++++++++++++++++ .../apache/spark/rpc/netty/Dispatcher.scala | 17 +++++----------- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 6 +++++- .../org/apache/spark/rpc/netty/Outbox.scala | 7 +++++-- 5 files changed, 35 insertions(+), 22 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index edefb4f4b5d2d..d7c605a583c4f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2182,13 +2182,6 @@ object SparkContext extends Logging { getOrCreate(new SparkConf()) } - /** - * Return SparkContext - */ - private[spark] def get(): SparkContext = { - activeContext.get() - } - /** * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is * running. Throws an exception if a running context is detected and logs a warning if another diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala new file mode 100644 index 0000000000000..c296cc23f12b7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.rpc + +private[rpc] class RpcEnvStoppedException() + extends IllegalStateException("RpcEnv already stopped.") diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index b9f9fa5f09725..1f8990d666bd7 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -28,7 +28,6 @@ import org.apache.spark.{Logging, SparkException} import org.apache.spark.network.client.RpcResponseCallback import org.apache.spark.rpc._ import org.apache.spark.util.ThreadUtils -import org.apache.spark.SparkContext /** * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s). @@ -107,7 +106,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val iter = endpoints.keySet().iterator() while (iter.hasNext) { val name = iter.next - postMessage(name, message, (e) => logWarning(s"Message $message dropped.", e)) + postMessage(name, message, (e) => logWarning(s"Message $message dropped.")) } } @@ -146,24 +145,18 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { callbackIfStopped: (Exception) => Unit): Unit = { val shouldCallOnStop = synchronized { val data = endpoints.get(endpointName) - val sc = SparkContext.get() - if (!stopped && data != null) { + if (stopped || data == null) { + true + } else { data.inbox.post(message) receivers.offer(data) false - } else if (sc != null && sc.isStopped) { - // in the middle of executing SparkContext.stop() - logWarning("Unable to post RPC message, SparkContext is shutting down") - false - } else { - // callback with error - true } } if (shouldCallOnStop) { // We don't need to call `onStop` in the `synchronized` block val error = if (stopped) { - new IllegalStateException("RpcEnv already stopped.") + new RpcEnvStoppedException() } else { new SparkException(s"Could not find $endpointName or it has been stopped.") } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index ef876b1d8c15a..d29387ba1869c 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -182,7 +182,11 @@ private[netty] class NettyRpcEnv( val remoteAddr = message.receiver.address if (remoteAddr == address) { // Message to a local RPC endpoint. - dispatcher.postOneWayMessage(message) + try { + dispatcher.postOneWayMessage(message) + } catch { + case e: RpcEnvStoppedException => logWarning(e.getMessage) + } } else { // Message to a remote RPC endpoint. postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message))) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 2316ebe347bb7..9fd64e8535752 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.spark.{Logging, SparkException} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} -import org.apache.spark.rpc.RpcAddress +import org.apache.spark.rpc.{RpcAddress, RpcEnvStoppedException} private[netty] sealed trait OutboxMessage { @@ -43,7 +43,10 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo } override def onFailure(e: Throwable): Unit = { - logWarning(s"Failed to send one-way RPC.", e) + e match { + case e1: RpcEnvStoppedException => logWarning(e1.getMessage) + case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1) + } } } From f91afed2063db789a73449e1aa94ea55e8d951f3 Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Tue, 26 Jan 2016 19:12:01 -0800 Subject: [PATCH 4/4] Update Dispatcher.scala --- core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 1f8990d666bd7..6ceff2c073998 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -106,7 +106,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val iter = endpoints.keySet().iterator() while (iter.hasNext) { val name = iter.next - postMessage(name, message, (e) => logWarning(s"Message $message dropped.")) + postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}")) } }