From 003dd5a70a31d9579dce483814edf46c399e7c49 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 20 Sep 2017 14:55:32 -0700 Subject: [PATCH] [SPARK-18838][hotfix][yarn] Check internal context state before stopping it. The live listener bus now cleans up after itself and releases listeners after stopping, so code cannot get references to listeners after the Spark context is stopped. --- .../spark/deploy/yarn/YarnClusterSuite.scala | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index fc78bc488b116..d5de19072ce29 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -390,47 +390,47 @@ private object YarnClusterDriver extends Logging with Matchers { .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } .collect() assert(configFromExecutors.find(_ == null) === None) - } finally { - Files.write(result, status, StandardCharsets.UTF_8) - sc.stop() - } - // verify log urls are present - val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] - assert(listeners.size === 1) - val listener = listeners(0) - val executorInfos = listener.addedExecutorInfos.values - assert(executorInfos.nonEmpty) - executorInfos.foreach { info => - assert(info.logUrlMap.nonEmpty) - info.logUrlMap.values.foreach { url => - val log = Source.fromURL(url).mkString - assert( - !log.contains(SECRET_PASSWORD), - s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " - ) + // verify log urls are present + val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] + assert(listeners.size === 1) + val listener = listeners(0) + val executorInfos = listener.addedExecutorInfos.values + assert(executorInfos.nonEmpty) + executorInfos.foreach { info => + assert(info.logUrlMap.nonEmpty) + info.logUrlMap.values.foreach { url => + val log = Source.fromURL(url).mkString + assert( + !log.contains(SECRET_PASSWORD), + s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " + ) + } } - } - // If we are running in yarn-cluster mode, verify that driver logs links and present and are - // in the expected format. - if (conf.get("spark.submit.deployMode") == "cluster") { - assert(listener.driverLogs.nonEmpty) - val driverLogs = listener.driverLogs.get - assert(driverLogs.size === 2) - assert(driverLogs.contains("stderr")) - assert(driverLogs.contains("stdout")) - val urlStr = driverLogs("stderr") - driverLogs.foreach { kv => - val log = Source.fromURL(kv._2).mkString - assert( - !log.contains(SECRET_PASSWORD), - s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " - ) + // If we are running in yarn-cluster mode, verify that driver logs links and present and are + // in the expected format. + if (conf.get("spark.submit.deployMode") == "cluster") { + assert(listener.driverLogs.nonEmpty) + val driverLogs = listener.driverLogs.get + assert(driverLogs.size === 2) + assert(driverLogs.contains("stderr")) + assert(driverLogs.contains("stdout")) + val urlStr = driverLogs("stderr") + driverLogs.foreach { kv => + val log = Source.fromURL(kv._2).mkString + assert( + !log.contains(SECRET_PASSWORD), + s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " + ) + } + val containerId = YarnSparkHadoopUtil.get.getContainerId + val user = Utils.getCurrentUserName() + assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096")) } - val containerId = YarnSparkHadoopUtil.get.getContainerId - val user = Utils.getCurrentUserName() - assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096")) + } finally { + Files.write(result, status, StandardCharsets.UTF_8) + sc.stop() } }