From 4eaee2c8b99c73bd1891b57c46b2e1f204b8b5f3 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 21 Dec 2015 14:52:05 -0800 Subject: [PATCH 1/3] [SPARK-12471] [Core] Spark daemons will log their pid on start up. --- .../spark/deploy/ExternalShuffleService.scala | 5 +++-- .../apache/spark/deploy/master/Master.scala | 1 + .../apache/spark/deploy/worker/Worker.scala | 18 ++++++++---------- .../scala/org/apache/spark/util/Utils.scala | 8 ++++++++ .../hive/thriftserver/HiveThriftServer2.scala | 1 + 5 files changed, 21 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 7fc96e4f764b..825679d940da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -21,14 +21,14 @@ import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ -import org.apache.spark.{Logging, SparkConf, SecurityManager} import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.sasl.SaslServerBootstrap -import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer} +import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.{Logging, SecurityManager, SparkConf} /** * Provides a server from which Executors can read shuffle files (rather than reading directly from @@ -108,6 +108,7 @@ object ExternalShuffleService extends Logging { private[spark] def main( args: Array[String], newShuffleService: (SparkConf, SecurityManager) => ExternalShuffleService): Unit = { + log.info(s"Started shuffle service with process name: ${Utils.getProcessName()}") val sparkConf = new SparkConf Utils.loadDefaultSparkProperties(sparkConf) val securityManager = new SecurityManager(sparkConf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fc42bf06e40a..8fe94f3a3977 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -1083,6 +1083,7 @@ private[deploy] object Master extends Logging { val ENDPOINT_NAME = "Master" def main(argStrings: Array[String]) { + log.info(s"Started master with process name: ${Utils.getProcessName()}") SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f41efb097b4b..f17b16723e27 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -17,27 +17,24 @@ package org.apache.spark.deploy.worker -import java.io.File -import java.io.IOException +import java.io.{File, IOException} import java.text.SimpleDateFormat -import java.util.{UUID, Date} -import java.util.concurrent._ -import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} +import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture, _} +import java.util.{Date, UUID} import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} import scala.concurrent.ExecutionContext -import scala.util.{Failure, Random, Success} import scala.util.control.NonFatal +import scala.util.{Failure, Random, Success} -import org.apache.spark.{Logging, SecurityManager, SparkConf} -import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI +import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState, ExternalShuffleService} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{SignalLogger, ThreadUtils, Utils} +import org.apache.spark.{Logging, SecurityManager, SparkConf} private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -686,6 +683,7 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { + log.info(s"Started worker with process name: ${Utils.getProcessName()}") SignalLogger.register(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fce89dfccfe2..d8a44f5ff022 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2221,6 +2221,14 @@ private[spark] object Utils extends Logging { def tempFileWith(path: File): File = { new File(path.getAbsolutePath + "." + UUID.randomUUID()) } + + /** + * Returns the name of this JVM process. This is OS dependent but typically (OSX, Linux, Windows), + * this is formatted as PID@hostname. + */ + def getProcessName(): String = { + ManagementFactory.getRuntimeMXBean().getName() + } } /** diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index a4fd0c3ce970..33bc8a1a6e06 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -67,6 +67,7 @@ object HiveThriftServer2 extends Logging { } def main(args: Array[String]) { + log.info(s"Started HiveThriftServer with process name: ${Utils.getProcessName()}") val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2") if (!optionsProcessor.process(args)) { System.exit(-1) From a850ce5bdaba37be7bed27a6709ee91f853a947f Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 21 Dec 2015 15:39:43 -0800 Subject: [PATCH 2/3] Add executors as well. --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 10 +++++----- .../apache/spark/executor/MesosExecutorBackend.scala | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c2ebf3059621..ec3b95a6a1a0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -20,20 +20,18 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer -import org.apache.hadoop.conf.Configuration - import scala.collection.mutable import scala.util.{Failure, Success} -import org.apache.spark.rpc._ -import org.apache.spark._ import org.apache.spark.TaskState.TaskState +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher +import org.apache.spark.rpc._ import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{SignalLogger, ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -203,6 +201,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } def main(args: Array[String]) { + log.info(s"Started executor with process name: ${Utils.getProcessName()}") + var driverUrl: String = null var executorId: String = null var hostname: String = null diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index c9f18ebc7f0e..1334fe962c3f 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -121,6 +121,7 @@ private[spark] class MesosExecutorBackend */ private[spark] object MesosExecutorBackend extends Logging { def main(args: Array[String]) { + log.info(s"Started executor with process name: ${Utils.getProcessName()}") SignalLogger.register(log) // Create a new Executor and start it running val runner = new MesosExecutorBackend() From a74ac7ee3fb62758f4a56daecde56d5c1c073061 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Tue, 22 Dec 2015 11:12:52 -0800 Subject: [PATCH 3/3] Combine this logging with registering signal handler into common util. --- .../spark/deploy/ExternalShuffleService.scala | 4 ++-- .../spark/deploy/history/HistoryServer.scala | 8 +++----- .../apache/spark/deploy/master/Master.scala | 5 ++--- .../deploy/mesos/MesosClusterDispatcher.scala | 4 ++-- .../apache/spark/deploy/worker/Worker.scala | 20 ++++++++++--------- .../CoarseGrainedExecutorBackend.scala | 11 ++++------ .../spark/executor/MesosExecutorBackend.scala | 7 +++---- .../scala/org/apache/spark/util/Utils.scala | 10 ++++++++++ .../hive/thriftserver/HiveThriftServer2.scala | 2 +- 9 files changed, 38 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 825679d940da..c514a1a86bab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.sasl.SaslServerBootstrap @@ -28,7 +29,6 @@ import org.apache.spark.network.server.{TransportServer, TransportServerBootstra import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.util.{ShutdownHookManager, Utils} -import org.apache.spark.{Logging, SecurityManager, SparkConf} /** * Provides a server from which Executors can read shuffle files (rather than reading directly from @@ -108,7 +108,7 @@ object ExternalShuffleService extends Logging { private[spark] def main( args: Array[String], newShuffleService: (SparkConf, SecurityManager) => ExternalShuffleService): Unit = { - log.info(s"Started shuffle service with process name: ${Utils.getProcessName()}") + Utils.initDaemon(log) val sparkConf = new SparkConf Utils.loadDefaultSparkProperties(sparkConf) val securityManager = new SecurityManager(sparkConf) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index f31fef0eccc3..0bc0cb1c15eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -23,14 +23,12 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import com.google.common.cache._ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} - import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, - UIRoot} +import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils} +import org.apache.spark.util.{ShutdownHookManager, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -223,7 +221,7 @@ object HistoryServer extends Logging { val UI_PATH_PREFIX = "/history" def main(argStrings: Array[String]) { - SignalLogger.register(log) + Utils.initDaemon(log) new HistoryServerArguments(conf, argStrings) initSecurity() val securityManager = new SecurityManager(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8fe94f3a3977..93738851805d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -45,7 +45,7 @@ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{ThreadUtils, Utils} private[deploy] class Master( override val rpcEnv: RpcEnv, @@ -1083,8 +1083,7 @@ private[deploy] object Master extends Logging { val ENDPOINT_NAME = "Master" def main(argStrings: Array[String]) { - log.info(s"Started master with process name: ${Utils.getProcessName()}") - SignalLogger.register(log) + Utils.initDaemon(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 389eff5e0645..89f1a8671fdb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch import org.apache.spark.deploy.mesos.ui.MesosClusterUI import org.apache.spark.deploy.rest.mesos.MesosRestServer import org.apache.spark.scheduler.cluster.mesos._ -import org.apache.spark.util.{ShutdownHookManager, SignalLogger} +import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.{Logging, SecurityManager, SparkConf} /* @@ -92,7 +92,7 @@ private[mesos] class MesosClusterDispatcher( private[mesos] object MesosClusterDispatcher extends Logging { def main(args: Array[String]) { - SignalLogger.register(log) + Utils.initDaemon(log) val conf = new SparkConf val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) conf.setMaster(dispatcherArgs.masterUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f17b16723e27..84e7b366bc96 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -17,24 +17,27 @@ package org.apache.spark.deploy.worker -import java.io.{File, IOException} +import java.io.File +import java.io.IOException import java.text.SimpleDateFormat -import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture, _} -import java.util.{Date, UUID} +import java.util.{UUID, Date} +import java.util.concurrent._ +import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} import scala.concurrent.ExecutionContext -import scala.util.control.NonFatal import scala.util.{Failure, Random, Success} +import scala.util.control.NonFatal +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ +import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI -import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState, ExternalShuffleService} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.util.{SignalLogger, ThreadUtils, Utils} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -683,8 +686,7 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { - log.info(s"Started worker with process name: ${Utils.getProcessName()}") - SignalLogger.register(log) + Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ec3b95a6a1a0..9d7f86348253 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,16 +22,15 @@ import java.nio.ByteBuffer import scala.collection.mutable import scala.util.{Failure, Success} - -import org.apache.spark.TaskState.TaskState +import org.apache.spark.rpc._ import org.apache.spark._ +import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher -import org.apache.spark.rpc._ import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{SignalLogger, ThreadUtils, Utils} +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -144,7 +143,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { workerUrl: Option[String], userClassPath: Seq[URL]) { - SignalLogger.register(log) + Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code @@ -201,8 +200,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } def main(args: Array[String]) { - log.info(s"Started executor with process name: ${Utils.getProcessName()}") - var driverUrl: String = null var executorId: String = null var hostname: String = null diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 1334fe962c3f..d85465eb2568 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -25,11 +25,11 @@ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.spark.{Logging, TaskState, SparkConf, SparkEnv} +import org.apache.spark.{Logging, SparkConf, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData -import org.apache.spark.util.{SignalLogger, Utils} +import org.apache.spark.util.Utils private[spark] class MesosExecutorBackend extends MesosExecutor @@ -121,8 +121,7 @@ private[spark] class MesosExecutorBackend */ private[spark] object MesosExecutorBackend extends Logging { def main(args: Array[String]) { - log.info(s"Started executor with process name: ${Utils.getProcessName()}") - SignalLogger.register(log) + Utils.initDaemon(log) // Create a new Executor and start it running val runner = new MesosExecutorBackend() new MesosExecutorDriver(runner).run() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d8a44f5ff022..1a07f7ca7eaf 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -43,6 +43,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ +import org.slf4j.Logger import tachyon.TachyonURI import tachyon.client.{TachyonFS, TachyonFile} @@ -2229,6 +2230,15 @@ private[spark] object Utils extends Logging { def getProcessName(): String = { ManagementFactory.getRuntimeMXBean().getName() } + + /** + * Utility function that should be called early in `main()` for daemons to set up some common + * diagnostic state. + */ + def initDaemon(log: Logger): Unit = { + log.info(s"Started daemon with process name: ${Utils.getProcessName()}") + SignalLogger.register(log) + } } /** diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 33bc8a1a6e06..3e3f0382f6a3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -67,7 +67,7 @@ object HiveThriftServer2 extends Logging { } def main(args: Array[String]) { - log.info(s"Started HiveThriftServer with process name: ${Utils.getProcessName()}") + Utils.initDaemon(log) val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2") if (!optionsProcessor.process(args)) { System.exit(-1)