From 200a5df5369a4f9e37d97466f057f25f3f155495 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 28 Jan 2016 16:59:14 -0800 Subject: [PATCH] TTT --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 1 + .../scala/org/apache/spark/rpc/netty/Dispatcher.scala | 9 +++++++++ python/pyspark/streaming/tests.py | 4 ++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 12c7b2048a8c8..9461afdc54124 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -91,6 +91,7 @@ class SparkEnv ( metricsSystem.stop() outputCommitCoordinator.stop() rpcEnv.shutdown() + rpcEnv.awaitTermination() // Note that blockTransferService is stopped by BlockManager since it is started by it. diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 6ceff2c073998..7efa8a1501b78 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -179,6 +179,15 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { } def awaitTermination(): Unit = { + if (Thread.currentThread().getName.startsWith("dispatcher-event-loop")) { + logError("Wrong stop thread:!!!!!") + } + val id = Thread.currentThread().getId + for (t <- Thread.getAllStackTraces.keySet.asScala) { + if (!t.isDaemon && t.getId != id) { + logError("Non daemon thread!!!!: " + t.getName) + } + } threadpool.awaitTermination(Long.MaxValue, TimeUnit.MILLISECONDS) } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 24b812615cbb4..b33e8252a7d32 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1013,12 +1013,12 @@ def setUp(self): self._kafkaTestUtils.setup() def tearDown(self): + super(KafkaStreamTests, self).tearDown() + if self._kafkaTestUtils is not None: self._kafkaTestUtils.teardown() self._kafkaTestUtils = None - super(KafkaStreamTests, self).tearDown() - def _randomTopic(self): return "topic-%d" % random.randint(0, 10000)