From 91c5bfd3b91eec99ab39910de2a1e2fc49050d71 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Apr 2016 14:04:21 -0700 Subject: [PATCH 1/3] Stop endpoints before closing the connections --- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 6 +-- .../org/apache/spark/rpc/RpcEnvSuite.scala | 45 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 7f2192e1f5a70..7d7b4c82fa392 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -287,15 +287,15 @@ private[netty] class NettyRpcEnv( if (timeoutScheduler != null) { timeoutScheduler.shutdownNow() } + if (dispatcher != null) { + dispatcher.stop() + } if (server != null) { server.close() } if (clientFactory != null) { clientFactory.close() } - if (dispatcher != null) { - dispatcher.stop() - } if (clientConnectionExecutor != null) { clientConnectionExecutor.shutdownNow() } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index cebac2097f380..6cdc8785ced6c 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -841,6 +841,51 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events") { + env.setupEndpoint("SPARK-14699", new RpcEndpoint { + override val rpcEnv: RpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case m => context.reply(m) + } + }) + + @volatile var onStopCalled = false + @volatile var onDisconnectedCalled = false + @volatile var onNetworkErrorCalled = false + val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0) + anotherEnv.setupEndpoint("SPARK-14699", new RpcEndpoint { + override val rpcEnv = anotherEnv + + override def receive: PartialFunction[Any, Unit] = { + case m => + } + + override def onDisconnected(remoteAddress: RpcAddress): Unit = { + onDisconnectedCalled = true + } + + override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { + onNetworkErrorCalled = true + } + + override def onStop(): Unit = { + onStopCalled = true + } + }) + + val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699") + // Make sure the connect is set up + assert(ref.askWithRetry[String]("hello") === "hello") + anotherEnv.shutdown() + anotherEnv.awaitTermination() + + env.stop(ref) + + assert(onStopCalled === true) + assert(onDisconnectedCalled === false) + assert(onNetworkErrorCalled === false) + } } class UnserializableClass From 7c78927f8da7064c4f16819473d6d02acbd9c320 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 20 Apr 2016 12:05:47 -0700 Subject: [PATCH 2/3] Don't close the client in Outbox --- core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 56499c639f292..6c090ada5ae9d 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -241,10 +241,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { } private def closeClient(): Unit = synchronized { - // Not sure if `client.close` is idempotent. Just for safety. - if (client != null) { - client.close() - } + // Just set client to null. Don't close it in order to reuse the connection. client = null } From cee0e427623abaeada5f7fb09a9896535b5525bf Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 20 Apr 2016 12:54:34 -0700 Subject: [PATCH 3/3] Use Mockito to simplify the codes --- .../org/apache/spark/rpc/RpcEnvSuite.scala | 33 ++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 6cdc8785ced6c..16f089dcf5bfc 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -29,7 +29,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.Files -import org.mockito.Mockito.{mock, when} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ @@ -850,29 +851,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } }) - @volatile var onStopCalled = false - @volatile var onDisconnectedCalled = false - @volatile var onNetworkErrorCalled = false val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0) - anotherEnv.setupEndpoint("SPARK-14699", new RpcEndpoint { - override val rpcEnv = anotherEnv - - override def receive: PartialFunction[Any, Unit] = { - case m => - } - - override def onDisconnected(remoteAddress: RpcAddress): Unit = { - onDisconnectedCalled = true - } - - override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { - onNetworkErrorCalled = true - } - - override def onStop(): Unit = { - onStopCalled = true - } - }) + val endpoint = mock(classOf[RpcEndpoint]) + anotherEnv.setupEndpoint("SPARK-14699", endpoint) val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699") // Make sure the connect is set up @@ -882,9 +863,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { env.stop(ref) - assert(onStopCalled === true) - assert(onDisconnectedCalled === false) - assert(onNetworkErrorCalled === false) + verify(endpoint).onStop() + verify(endpoint, never()).onDisconnected(any()) + verify(endpoint, never()).onNetworkError(any(), any()) } }