From 5b336bbfe92eabca7f4c20e5d49e51bb3721da4d Mon Sep 17 00:00:00 2001 From: chesterxgchen Date: Sun, 24 May 2015 20:23:29 -0700 Subject: [PATCH] Spark Yarn Client API Improvement/Requirements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The problem description The following description of the problem and requests are mainly focused on Spark Yarn Application, in particular, spark on Yarn-Cluster Mode. Use Case: Our application doesn't use spark-submit command line to run spark. We submit both hadoop and spark job directly from our servlet application (jetty). We are deploying in Yarn Cluster Mode. We invoke the Spark Client ( in yarn module) directly. Client can't call System.exists, which will shut down the jetty JVM. Our application will submit and stop spark job, monitoring the spark job progress, get the states from the spark jobs ( for example, bad data counters ), logging and exceptions. So far the communication is one way (direction) after the job is submitted; we will move to two-ways communication soon. ( for example, a long running spark context, with different short spark job or actions for interactive analysis and visualization) In our use cases, we have the following requirements Requirements: 1) Get Yarn Container Capacities before submit Yarn Applications. Before spark 1.3.1, the spark requires user to specify the memory and number of executors before start the spark job. In the case Yarn application, the memory is set beyond the yarn memory capacity, the spark job is simply get killed. So we try to get the yarn container capacity first before start the spark job. We use the information in two ways: A) we cap the request memory usage if the request is too large. For example, if the spark.executor.memory supplied by client is larger than the Yarn Container max memory, we reset the spark.executor.memory to yarn max container max memory minus over head and send a message to the Application ( UI message) tell the user that we reset the memory. Or we could simply throw exception without submit the job. users might be use the information about virtual cores to do other validation. B) We can dynamically estimate the executor memory based on data size ( if you have the information from prev processing steps) and max memory available; rather than directly use the fix memory size and potentially get kill if they are too large. This requirement could be eliminated by Spark 1.3.1 Dynamic Resource Allocation Feature. But we haven’t tried this out yet. 2) Add some callback via listener to monitoring Yarn application progress So far, the Spark Job in Yarn Cluster mode is essentially a batch job. No progress is reported back to the user. We like to have some callback listener to provide feedbacks during yarn application lifecycle (initialization, in progress, complete, killed etc.) , even though the information is very limited. A) We can get tracking URL from the yarn application listener call back. The URL allows client to go to the Hadoop Cluster Application management page directly if they need to check the job status As soon as the Yarn container is created and job is submitted, we have tracking URL from Yarn ( we need to watch out for invalid URL), at this point you can put the URL in the UI, even though the Spark job is not started yet. B) We display the progress bar on the UI with the callback For example, in CDH5, we only got 0%, 10% and 100% from Yarn, not very useful, but still some earlier feedback to customer. C) We get the Yarn Application ID when the spark job is submitted, which can be used for tracking progress or kill the app. 3) expose Yarn Kill Application API The Spark Yarn Client has a stop method. But doesn’t seem to work. We need a killApplication() method, which simply kill the Yarn Application with application Id. Yes, you can directly invoke from command line with yarn kill -applicationId appId. But since we need to call from our application, we need a API to do this. In our application, if client start the job and then decided to stop it ( running too long, change parameters etc.), we have to use kill API to kill it, as stop API doesn't stop it. 4) be able to show user spark job progress When spark job starts to run, the application like to show progress bar or stage status (similar to Spark UI) in user’s application. This means we need to re-direct the spark job status (reported by Spark Job Listener) to the end application. 5) be able to log spark job print/error/exception back to clients for easy debugging. When spark job runs, the print statements and logs are captured in Yarn Container log. But they are not communicated back to the client. Users usually read the log on the application cluster management page before the job stops. But once the job stops, the container is destroyed and logs are not longer available for end-user. Although the log history is still in the hadoop cluster, but it is hard for end-user to get it out. This requires the all the print or error logs are re-directed to the client’s application log that can be easily read by the end user. 6) Support long input arguments. The current spark job inputs are in the form of command line arguments. In our application, we run into the limitation of the command line argument length limit. When we run our application with large number of column names, we serialize the input into string and pass it as argument. But the length is too long for Command line argument. One solution is simply use HDFS, i.e. pass the argument as HDFS file, and pass the path in spark conf and load the file into memory as input before spark starts. But this doesn’t seem to be an best option, as now every spark job needs to write to HDFS first and then read back in in the cluster, then has to delete it when job is finished or killed. We need alternative channel to pass the argument. 7) create a communication channel that allows to run interactive command Pull Request Design Part 1 -- it was originally SPARK-3913 The part 1 design is address the Resource Capacity, and Yarn Application LIstener as well as kill application API. Since the Spark 1.3.1 Dynamic Resource Allocation address the resource allocation issue, so we don’t need to address #1) Resource Capacity requirement. The main changes are the followings: case class YarnAppInfo(appId: ApplicationId, user: String, queue: String, name: String, masterHost: String, masterRpcPort: Int, state: String, diagnostics: String, trackingUrl: String, startTime: Long) sealed trait YarnApplicationEvent case class YarnApplicationStart(time: Long) extends YarnApplicationEvent case class YarnApplicationProgress(time: Long, progress: YarnAppProgress) extends YarnApplicationEvent case class YarnApplicationEnd(time: Long) extends YarnApplicationEvent trait YarnApplicationListener { def onApplicationInit(time:Long, appId: ApplicationId) def onApplicationStart(time:Long, info: YarnAppInfo) def onApplicationProgress(time:Long, progress: YarnAppProgress) def onApplicationEnd(time:Long, progress: YarnAppProgress) def onApplicationFailed(time:Long, progress: YarnAppProgress) def onApplicationKilled(time:Long, progress: YarnAppProgress) } case class YarnAppResource(memory: Int, virtualCores: Int) case class YarnResourceUsage(numUsedContainers : Int, numReservedContainers : Int, usedResource : YarnAppResource, reservedResource : YarnAppResource, neededResource : YarnAppResource) { } case class YarnAppProgress(appId: ApplicationId, trackingUrl: String, usage: YarnResourceUsage, progress: Float = 0) In Client.scala private val listeners = ListBuffer[YarnApplicationListener]() def killApplication(appId: ApplicationId ) = { yarnClient.killApplication(appId) } def monitorApplication( appId: ApplicationId, returnOnRunning: Boolean = false, logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) val initialReport = getApplicationReport(appId) var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) val report: ApplicationReport = …. //code skipped val state = report.getYarnApplicationState state match { case YarnApplicationState.RUNNING => notifyAppProgress(report) case YarnApplicationState.FINISHED => notifyAppFinished(report) case YarnApplicationState.FAILED => notifyAppFailed(report) case YarnApplicationState.KILLED => notifyAppKilled(report) case _ => notifyAppProgress(report) } //code skipped ... } Part 2 -- Create A communication channel to address requirements #4-7 The main idea of the design is composed of followings: create a communication channel before the spark job started. The client can pass the connection URL (with host and port). The Spark Job will try to establish the connection (Ping-Pong) before launch the spark job. All logging (print, error) statements will be also send back and log on the application log (outside the cluster) Application can send different type of messages to the client application LogMessage -- Info, Error, Warn, Debug messages are redirected to corresponding application log. AppMessage -- message will redirect to the application. For example, UpdateMessage will indicate to the application program to update the corresponding job state. VisualMessage -- message will indicate the client application that the message will used for visual display. 4) All spark job will need to the implements a trait that allow the spark job to compose pre- and post- spark job actions. Here is the main code trait YarnSparkApp { // all spark jobs should implements SparkMain method def sparkMain(appCtx: ApplicationContext) //this is called by Spark Yarn Client.run(conf) def run(conf: SparkConf) { var logger = new ChannelMessageLogger("spark app", None) logTime { //capture the runtime var failed = false var appContext: Option[ApplicationContext] = None try { // initialize the application context. appContext = Some(new ApplicationContext(conf)) //update logger update logger = appContext.get.logger logger.logInfo(s"starting ${appContext.get.appName}") sparkMain(appContext.get) } catch { case e: Throwable => failed = true val t = wrapThrowable(appContext, e) printStackTrace(t) throw t } finally { if (!failed) { logger.sendUpdateMessage(SPARK_APP_COMPLETED, true) logger.logInfo(s"spark app finished.") } waitToFinish(failed) appContext.foreach(_.stop()) appContext.foreach(_.restoreConsoleOut()) } …. } The run() method first initialize the ApplicationContext which will established the communication channel, prepare the logger as well as get the extra arguments ( such as wide column arguments if any) and then run sparkMain(appCtx: ApplicationContext) once the job failed or completed, the finally clause will perform the cleanup. If the job is completed, the UpdateMessage is sent to the application side to indicate the spark app is completed. otherwise, the application is consider the job terminated abnormally. If the job failed or for the purpose of debugging (via configuration), the finally clause calls waitToFinish(failed) so user can check the log before the container is destroyed. Here is the ApplicationContext class ApplicationContext(val conf: SparkConf) { import ApplicationContext._ val appName = conf.get(SPARK_APP_NAME, "Spark Application") val deployMode = conf.get(SPARK_DEPLOY_MODE) val sparkCtx: SparkContext = new SparkContext(deployMode, appName, conf) val messenger = createAppChannelMessenger() val logger = new ChannelMessageLogger(appName, Some(this)) val stdOut = Console.out val stdErr = Console.err //add spark listener sparkCtx.addSparkListener(new JobProgressRelayListener(this)) Console.setOut(new RedirectPrintStream(logger, stdOut)) Console.setErr(new RedirectPrintStream(logger, stdErr)) //todo: get extra arguments from client, not implemented. // todo : call method here //show the spark conf specified showConf() def showConf(): Unit = { if (conf.get(SPARK_SHOW_CONF, "true").toBoolean) { println(conf.toDebugString) } } def restoreConsoleOut() = { Console.setOut(stdOut) Console.setOut(stdErr) } def addSparkListener(listener: SparkListener) { sparkCtx.addSparkListener(listener) } def stop() { println("[ApplicationContext] stopping ") CommunicationHelper.stopRelayMessenger(Some(sparkCtx), messenger) sparkCtx.stop() } private def createAppChannelMessenger(): ChannelMessenger = CommunicationHelper.createRelayMessenger(this) } The communication channel In our design, we leverage the Akka for the messaging channel. private[server] def createRelayMessenger(appCtx: ApplicationContext): ChannelMessenger = { val protocol = appCtx.conf.get(SPARK_APP_CHANNEL_PROTOCOL, AKKA) protocol match { case AKKA => AkkaChannelUtils.createRelayMessenger(appCtx) case NETTY => NettyMessenger(None) case _ => ActorMessenger(None) } } case class ChannelMessageLogger(appName: String = "", appCtx: Option[ApplicationContext]) { def logInfo(message: String) { sendMessage(InfoMessage(message, appName)) } def logError(message: String, e: Throwable) { sendMessage(ErrorMessage(message, appName, e)) } def logWarn(message: String) { sendMessage(WarnMessage(message, appName)) } def logDebug(message: String) { sendMessage(DebugMessage(message, appName)) } def sendVisualMessage(message: VisualMessage) { sendMessage(message) } def sendUpdateMessage(key: String, value: Any) { sendMessage(UpdateMessage(appName, key, value)) } private def sendMessage(message: Any) { if (appCtx.isDefined) { val sc = appCtx.map(_.sparkCtx) val messenger = appCtx.map(_.messenger) if (sc.isDefined) { messenger.map(m =>m.sendMessage(sc.get, message)) } else printlnMessage(message) } else printlnMessage(message) } def printlnMessage(message: Any): Unit = { message match { case err @ ErrorMessage(msg, name, cause, time) => Console.err.println(err.toString) cause.printStackTrace(Console.err) case _ => println(message.toString) } } } --- .../org/apache/spark/deploy/yarn/Client.scala | 144 ++++++++++++++++++ .../spark/deploy/yarn/ResourceUtil.scala | 32 ++++ .../spark/deploy/yarn/SparkJobCommand.scala | 21 +++ .../spark/deploy/yarn/YarnAppInfo.scala | 14 ++ .../spark/deploy/yarn/YarnAppProgress.scala | 15 ++ .../spark/deploy/yarn/YarnAppResource.scala | 3 + .../deploy/yarn/YarnApplicationListener.scala | 20 +++ .../spark/deploy/yarn/YarnResourceUsage.scala | 12 ++ .../yarn/server/ApplicationContext.scala | 89 +++++++++++ .../yarn/server/ChannelMessageLogger.scala | 114 ++++++++++++++ .../yarn/server/CommunicationHelper.scala | 67 ++++++++ .../server/JobProgressRelayListener.scala | 86 +++++++++++ .../yarn/server/MessageRelayActor.scala | 134 ++++++++++++++++ .../yarn/server/RedirectPrintStream.scala | 17 +++ .../deploy/yarn/server/YarnAppState.scala | 24 +++ .../deploy/yarn/server/YarnSparkApp.scala | 90 +++++++++++ 16 files changed, 882 insertions(+) create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceUtil.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkJobCommand.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppInfo.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppProgress.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppResource.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnApplicationListener.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnResourceUsage.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ApplicationContext.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ChannelMessageLogger.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/CommunicationHelper.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/JobProgressRelayListener.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/MessageRelayActor.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/RedirectPrintStream.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnAppState.scala create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnSparkApp.scala diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7e023f2d92578..0afebc0b933f1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer +import java.util.Date import java.security.PrivilegedExceptionAction import java.util.UUID import java.util.zip.{ZipEntry, ZipOutputStream} @@ -80,9 +81,135 @@ private[spark] class Client( private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) + private val listeners = ListBuffer[YarnApplicationListener]() def stop(): Unit = yarnClient.stop() + def killApplication(appId: ApplicationId ) = { + yarnClient.killApplication(appId) + } + + + /* ------------------------------------------------------------------------------------- * + | The following methods have much in common in the stable and alpha versions of Client, | + | but cannot be implemented in the parent trait due to subtle API differences across | + | hadoop versions. | + * ------------------------------------------------------------------------------------- */ + + def addApplicationListener(listener: YarnApplicationListener) { + listeners += listener + } + + private def notifyAppInit(appId: ApplicationId) { + for (l <- listeners) { + //async { + l.onApplicationInit(new java.util.Date().getTime, appId) + //} + } + } + + + def getApplicationInfo(report: ApplicationReport): YarnAppInfo = { + import scala.collection.JavaConverters._ + YarnAppInfo(report.getApplicationId, + report.getUser, + report.getQueue, + report.getName, + report.getHost, + report.getRpcPort, + report.getYarnApplicationState.name(), + report.getDiagnostics, + report.getTrackingUrl, + report.getStartTime) + + } + + + private def notifyAppStart(report: ApplicationReport) { + val appInfo: YarnAppInfo = getApplicationInfo(report) + for (l <- listeners) { + //async { + l.onApplicationStart(appInfo.startTime, appInfo) + //} + } + } + + + private def notifyAppFailed(report: ApplicationReport) { + val appProgress: YarnAppProgress = getAppProgress(report) + for (l <- listeners) { + //async { + l.onApplicationFailed(new Date().getTime, appProgress) + //} + } + } + + private def notifyAppKilled(report: ApplicationReport) { + val appProgress: YarnAppProgress = getAppProgress(report) + for (l <- listeners) { + //async { + l.onApplicationKilled(new Date().getTime, appProgress) + //} + } + } + + + + + private def getResourceUsage(report: ApplicationResourceUsageReport): YarnResourceUsage = { + + def getYarnAppResource(res: Resource) = YarnAppResource(res.getMemory, res.getVirtualCores) + + YarnResourceUsage(report.getNumUsedContainers, + report.getNumReservedContainers, + getYarnAppResource(report.getUsedResources), + getYarnAppResource(report.getReservedResources), + getYarnAppResource(report.getNeededResources)) + } + + + private def getAppProgress(report: ApplicationReport): YarnAppProgress = { + + val appUsageReport = report.getApplicationResourceUsageReport + YarnAppProgress(report.getApplicationId, + report.getTrackingUrl, + getResourceUsage(appUsageReport), + report.getProgress) + } + + + private def notifyAppProgress(report: ApplicationReport) { + val appProgress: YarnAppProgress = getAppProgress(report) + for (l <- listeners) { + //async { + l.onApplicationProgress(new Date().getTime, appProgress) + //} + } + } + + private def notifyAppFinished(report: ApplicationReport) { + val appProgress: YarnAppProgress = getAppProgress(report) + for (l <- listeners) { + //async { + l.onApplicationEnd(new Date().getTime, appProgress) + //} + } + } + + + + + private def createYarnApplication() : YarnClientApplication = { + yarnClient.init(yarnConf) + yarnClient.start() + + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) + + // Get a new application from our RM + yarnClient.createApplication() + } + /** * Submit an application running our ApplicationMaster to the ResourceManager. * @@ -107,6 +234,8 @@ private[spark] class Client( val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() + notifyAppInit(appId) + // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) @@ -754,6 +883,7 @@ private[spark] class Client( returnOnRunning: Boolean = false, logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) + val initialReport = getApplicationReport(appId) var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) @@ -767,6 +897,20 @@ private[spark] class Client( } val state = report.getYarnApplicationState + + state match { + case YarnApplicationState.RUNNING => + notifyAppProgress(report) + case YarnApplicationState.FINISHED => + notifyAppFinished(report) + case YarnApplicationState.FAILED => + notifyAppFailed(report) + case YarnApplicationState.KILLED => + notifyAppKilled(report) + case _ => + notifyAppProgress(report) + } + if (logApplicationReport) { logInfo(s"Application report for $appId (state: $state)") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceUtil.scala new file mode 100644 index 0000000000000..e42d054fdce35 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceUtil.scala @@ -0,0 +1,32 @@ +package org.apache.spark.deploy.yarn + + +object ResourceUtil { + + def withResource[A: ClosableResource, B](resource: => A)(f: A => B) = { + val r = resource + try { + f(r) + } finally { + implicitly[ClosableResource[A]].close(r) + } + } +} + + +trait ClosableResource[R] { + def close(r: R): Unit +} + +object ClosableResource { + + implicit def genericResourceTrait[A <: { def close(): Unit }] = new ClosableResource[A] { + override def close(r: A) = r.close() + override def toString = "ClosableResource[{def close() : Unit }]" + } + + implicit def jioResourceTrait[A <: java.io.Closeable] = new ClosableResource[A] { + override def close(r: A) = r.close() + override def toString = "ClosableResource[java.io.Closeable]" + } +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkJobCommand.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkJobCommand.scala new file mode 100644 index 0000000000000..daa01d18e0671 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkJobCommand.scala @@ -0,0 +1,21 @@ +package org.apache.spark.deploy.yarn + + +case class JobArg(name: String, value: Any) + +sealed trait SparkJobCommand + +case class StartSparkJob(name: String, args: JobArg*) extends SparkJobCommand +case class StopSparkJob(name: String, args: JobArg*) extends SparkJobCommand + +sealed trait SparkJobStatus { + val name: String + val message: String +} +case class SubmittingSparkJob(name: String, message: String) extends SparkJobStatus +case class SparkJobStarted(name: String, message: String) extends SparkJobStatus +case class SparkJobFinished(name: String, message: String) extends SparkJobStatus +case class SparkJobFailed(name: String, message: String, cause: Throwable) extends SparkJobStatus +case class SparkJobProgress(name: String, message: String) extends SparkJobStatus + + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppInfo.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppInfo.scala new file mode 100644 index 0000000000000..578437915ae27 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppInfo.scala @@ -0,0 +1,14 @@ +package org.apache.spark.deploy.yarn + +import org.apache.hadoop.yarn.api.records.ApplicationId + +case class YarnAppInfo(appId: ApplicationId, + user: String, + queue: String, + name: String, + masterHost: String, + masterRpcPort: Int, + state: String, + diagnostics: String, + trackingUrl: String, + startTime: Long) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppProgress.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppProgress.scala new file mode 100644 index 0000000000000..4d50e7c8b80ac --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppProgress.scala @@ -0,0 +1,15 @@ +package org.apache.spark.deploy.yarn + +import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.spark.deploy.yarn.YarnResourceUsage + +/** + * + * @param appId -- application Id + * @param usage -- Yarn Resource Usage + * @param progress -- + * */ +case class YarnAppProgress(appId: ApplicationId, + trackingUrl: String, + usage: YarnResourceUsage, + progress: Float = 0) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppResource.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppResource.scala new file mode 100644 index 0000000000000..66c026615d2e4 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAppResource.scala @@ -0,0 +1,3 @@ +package org.apache.spark.deploy.yarn + +case class YarnAppResource(memory: Int, virtualCores: Int) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnApplicationListener.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnApplicationListener.scala new file mode 100644 index 0000000000000..bae95b9334325 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnApplicationListener.scala @@ -0,0 +1,20 @@ +package org.apache.spark.deploy.yarn + + +import org.apache.hadoop.yarn.api.records.ApplicationId + +sealed trait YarnApplicationEvent + +case class YarnApplicationStart(time: Long) extends YarnApplicationEvent +case class YarnApplicationProgress(time: Long, progress: YarnAppProgress) extends YarnApplicationEvent +case class YarnApplicationEnd(time: Long) extends YarnApplicationEvent + +trait YarnApplicationListener { + def onApplicationInit(time:Long, appId: ApplicationId) + def onApplicationStart(time:Long, info: YarnAppInfo) + def onApplicationProgress(time:Long, progress: YarnAppProgress) + def onApplicationEnd(time:Long, progress: YarnAppProgress) + def onApplicationFailed(time:Long, progress: YarnAppProgress) + def onApplicationKilled(time:Long, progress: YarnAppProgress) + +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnResourceUsage.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnResourceUsage.scala new file mode 100644 index 0000000000000..166584c3a1ca4 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnResourceUsage.scala @@ -0,0 +1,12 @@ +package org.apache.spark.deploy.yarn + +import org.apache.spark.deploy.yarn.YarnAppResource + + +case class YarnResourceUsage(numUsedContainers : Int, + numReservedContainers : Int, + usedResource : YarnAppResource, + reservedResource : YarnAppResource, + neededResource : YarnAppResource) { + +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ApplicationContext.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ApplicationContext.scala new file mode 100644 index 0000000000000..77a110eb71efe --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ApplicationContext.scala @@ -0,0 +1,89 @@ +package org.apache.spark.deploy.yarn.server + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.scheduler.SparkListener + +object ApplicationContext { + + val SPARK_APP_NAME = "spark.app.name" + val SPARK_NUMBER_EXECUTORS = "spark.executor.instances" + val SPARK_DRIVER_MEMORY = "spark.driver.memory" + val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" + val SPARK_EXECUTOR_CORES = "spark.executor.cores" + + val SPARK_EXECUTOR_EXTRA_JAVA_OPTS = "spark.executor.extraJavaOpts" + + val YARN_REPORT_INTERVAL = "yarn.report.interval" + val SPARK_YARN_REPORT_INTERVAL = s"spark.$YARN_REPORT_INTERVAL" + + val YARN_APP_HANDSHAKE_TIMEOUT = "app.yarn.handshake.timeout" + val SPARK_YARN_APP_HANDSHAKE_TIMEOUT = s"spark.$YARN_APP_HANDSHAKE_TIMEOUT" + val SPARK_APP_CHANNEL_PROTOCOL = "spark.yarn.app.channel.protocol" + + val SPARK_EVENT_LOG_ENABLED = "spark.eventLog.enabled" + val SPARK_DEPLOY_MODE = "spark.deployMode" + + val APP_JAR = "app.jar" + val SPARK_ADD_JARS = "spark.addJars" + val SPARK_MAIN_CLASS = "spark.mainClass" + val CONF_SPARK_JAR = "spark.yarn.jar" + + val SPARK_JAR = "SPARK_JAR" + val SPARK_YARN_MODE = "SPARK_YARN_MODE" + val SPARK_YARN_APP_ID = "spark.yarn.app.id" + val SPARK_APP_INFO = "spark.app.info" + val SPARK_APP_COMPLETED = "spark.app.completed" + val SPARK_SHOW_CONF = "spark.show.conf" + val SPARK_YARN_APP_DEBUG_ENABLED = "spark.app.yarn.debug.enabled" + val SPARK_YARN_APP_SLEEP_TIME_BEFORE_COMPLETE= "spark.app.yarn.sleep.time.before.job.completed" + +} + +case object Ping +case object Pong + + + +class ApplicationContext(val conf: SparkConf) { + import ApplicationContext._ + + val appName = conf.get(SPARK_APP_NAME, "Spark Application") + val deployMode = conf.get(SPARK_DEPLOY_MODE) + val sparkCtx: SparkContext = new SparkContext(deployMode, appName, conf) + val messenger = createAppChannelMessenger() + val logger = new ChannelMessageLogger(appName, Some(this)) + val stdOut = Console.out + val stdErr = Console.err + + //add spark listener + sparkCtx.addSparkListener(new JobProgressRelayListener(this)) + Console.setOut(new RedirectPrintStream(logger, stdOut)) + Console.setErr(new RedirectPrintStream(logger, stdErr)) + + showConf() + + def showConf(): Unit = { + if (conf.get(SPARK_SHOW_CONF, "true").toBoolean) { + println(conf.toDebugString) + } + } + + def restoreConsoleOut() = { + Console.setOut(stdOut) + Console.setOut(stdErr) + } + + def addSparkListener(listener: SparkListener) { + sparkCtx.addSparkListener(listener) + } + + def stop() { + println("[ApplicationContext] stopping ") + CommunicationHelper.stopRelayMessenger(Some(sparkCtx), messenger) + sparkCtx.stop() + } + + private def createAppChannelMessenger(): ChannelMessenger = + CommunicationHelper.createRelayMessenger(this) + +} \ No newline at end of file diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ChannelMessageLogger.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ChannelMessageLogger.scala new file mode 100644 index 0000000000000..c6dcce0fe8ece --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/ChannelMessageLogger.scala @@ -0,0 +1,114 @@ +package org.apache.spark.deploy.yarn.server + +import java.util.Date + +/** + * Messages are categorized as + * LogMessage -- write to the log on the application side + * UIMessage -- display to the UI + * AppMessage -- update app on the application side + * + * User: chester + * Date: 7/9/14 + * Time: 7:49 AM + */ + +//message are sent over the wire and logged on the agent side. +trait LogMessage { + val time: Long + val name: String + val message: String + override def toString: String = s"[$time-$name]:$message" +} + +case class InfoMessage(message: String, + name: String, + time: Long = new Date().getTime) extends LogMessage +case class WarnMessage(message: String, + name: String, + time: Long = new Date().getTime) extends LogMessage +case class DebugMessage(message: String, + name: String, + time: Long = new Date().getTime) extends LogMessage +case class ErrorMessage(message: String, + name: String, cause: Throwable, + time: Long = new Date().getTime) extends LogMessage + +trait VisualMessage { + val time: Long + val task: String + val message: String +} + +case class ProgressMessage(task: String, + message: String, + progress: Float, + time: Long = new Date().getTime) extends VisualMessage +case class StartTaskMessage(task: String, + message: String, + time: Long = new Date().getTime) extends VisualMessage +case class EndTaskMessage(task: String, + message: String, + time: Long = new Date().getTime) extends VisualMessage +case class DisplayMessage(task: String, + message: String, + time: Long = new Date().getTime) extends VisualMessage + +trait AppMessage { + val name: String + val key: String + val value: Any //message needs to be serializable +} + +case class UpdateMessage(name: String, key: String, value: Any) extends AppMessage + +case class ChannelMessageLogger(appName: String = "", appCtx: Option[ApplicationContext]) { + + def logInfo(message: String) { + sendMessage(InfoMessage(message, appName)) + } + + def logError(message: String, e: Throwable) { + sendMessage(ErrorMessage(message, appName, e)) + } + + def logWarn(message: String) { + sendMessage(WarnMessage(message, appName)) + } + + def logDebug(message: String) { + sendMessage(DebugMessage(message, appName)) + } + + def sendVisualMessage(message: VisualMessage) { + sendMessage(message) + } + + def sendUpdateMessage(key: String, value: Any) { + sendMessage(UpdateMessage(appName, key, value)) + } + + private def sendMessage(message: Any) { + + if (appCtx.isDefined) { + val sc = appCtx.map(_.sparkCtx) + val messenger = appCtx.map(_.messenger) + if (sc.isDefined) { + messenger.map(m =>m.sendMessage(sc.get, message)) + } else + printlnMessage(message) + } else + printlnMessage(message) + } + + def printlnMessage(message: Any): Unit = { + message match { + case err @ ErrorMessage(msg, name, cause, time) => + Console.err.println(err.toString) + cause.printStackTrace(Console.err) + case _ => + println(message.toString) + } + } + +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/CommunicationHelper.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/CommunicationHelper.scala new file mode 100644 index 0000000000000..68e6803478afc --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/CommunicationHelper.scala @@ -0,0 +1,67 @@ +package org.apache.spark.deploy.yarn.server + +import akka.actor.ActorRef +import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn.server.ApplicationContext._ +import org.apache.spark.deploy.yarn.server.ChannelProtocols._ + +import scala.language.postfixOps + +/** + * This class will be run inside the Yarn Cluster + */ + + +object ChannelProtocols { + val AKKA = "akka" + val NETTY = "netty" +} + + +sealed trait ChannelMessenger { + val protocol: String + val messenger: Option[Any] + + def sendMessage(sc: SparkContext, message: Any) +} + +case class ActorMessenger(messenger:Option[ActorRef]) extends ChannelMessenger { + val protocol = ChannelProtocols.AKKA + + def sendMessage(sc: SparkContext, message: Any) = { + implicit val actorSystem = sc.env.actorSystem + messenger.foreach(_ ! message) + } + +} +case class NettyMessenger(messenger:Option[Any]) extends ChannelMessenger { + val protocol = ChannelProtocols.NETTY + def sendMessage(sc: SparkContext, message: Any) = ??? + +} + +object CommunicationHelper { + + private[server] def stopRelayMessenger(sparkCtx: Option[SparkContext], + channelMessenger: ChannelMessenger) { + sparkCtx.map { sc => + channelMessenger.protocol match { + case AKKA => + channelMessenger.messenger.asInstanceOf[Option[ActorRef]].map(sc.env.actorSystem.stop) + case NETTY => + case _ => + } + } + } + + private[server] def createRelayMessenger(appCtx: ApplicationContext): ChannelMessenger = { + val protocol = appCtx.conf.get(SPARK_APP_CHANNEL_PROTOCOL, AKKA) + protocol match { + case AKKA => AkkaChannelUtils.createRelayMessenger(appCtx) + case NETTY => NettyMessenger(None) + case _ => ActorMessenger(None) + } + + } + +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/JobProgressRelayListener.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/JobProgressRelayListener.scala new file mode 100644 index 0000000000000..92a9c9a8e2ed9 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/JobProgressRelayListener.scala @@ -0,0 +1,86 @@ +package org.apache.spark.deploy.yarn.server + +import java.util.Date + +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerUnpersistRDD, _} +import org.apache.spark.ui.jobs.JobProgressListener + +/** + * + * User: chester + * Date: 7/9/14 + * Time: 11:40 AM + */ +class JobProgressRelayListener(appCtx: ApplicationContext) + extends JobProgressListener(appCtx.sparkCtx.getConf) with SparkListener { + + private val logger = new ChannelMessageLogger(appCtx.appName, Some(appCtx)) + private val sc = appCtx.sparkCtx + private implicit val sparkCtx = Some(sc) + private implicit val messenger = appCtx.messenger + + private def makeProgress(started: Int, + completed: Int, + failed: Int, + total: Int): (Double, String) = { + val progress = (completed.toDouble / total) * 100 + val failedText = if (failed > 0) s"$failed failed" else "" + val progressMessage = s"$completed/$total $failedText" + + (progress, progressMessage) + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { + + super.onStageCompleted(stageCompleted) + + val info = stageCompleted.stageInfo + val stageDataOption = stageIdToData.get((info.stageId, info.attemptId)) + + val progressMessage: Option[ProgressMessage] = stageDataOption.map { stageData => + val (progress, message) = makeProgress(stageData.numActiveTasks, stageData.completedIndices.size, + stageData.numFailedTasks, info.numTasks) + val task = s"${info.name} - ${stageData.description.getOrElse(info.stageId.toString)}" + val time = info.completionTime.getOrElse(new Date().getTime) + ProgressMessage(task, message, progress.toFloat, time) + } + + Console.err.println("onStageCompleted task message " + progressMessage) + progressMessage.foreach(logger.sendVisualMessage) + + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { + super.onStageSubmitted(stageSubmitted) + + val info = stageSubmitted.stageInfo + val stageDataOption = stageIdToData.get((info.stageId, info.attemptId)) + val taskMessage: Option[StartTaskMessage] = stageDataOption.map { stageData => + val task = s"${info.name} - ${stageData.description.getOrElse(info.stageId.toString)}" + val time = info.submissionTime.getOrElse(new Date().getTime) + StartTaskMessage(task, "submitted", time) + } + + Console.err.println("onStageSubmitted task message " + taskMessage) + + taskMessage.foreach(logger.sendVisualMessage) + + } + + override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { + super.onUnpersistRDD(unpersistRDD) + Console.err.println(s"RDD [${unpersistRDD.rddId}] is un-persisted") + } + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { + super.onApplicationStart(applicationStart) + val taskMessage = StartTaskMessage(applicationStart.appName, "started", applicationStart.time) + logger.sendVisualMessage(taskMessage) + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + super.onApplicationEnd(applicationEnd) + val taskMessage = EndTaskMessage(appCtx.appName, "end", applicationEnd.time) + logger.sendVisualMessage(taskMessage) + } +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/MessageRelayActor.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/MessageRelayActor.scala new file mode 100644 index 0000000000000..8616ef17fdd81 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/MessageRelayActor.scala @@ -0,0 +1,134 @@ +package org.apache.spark.deploy.yarn.server + +import akka.actor.{Actor, ActorSelection, Props, ActorRef} +import akka.pattern.AskSupport +import akka.util.Timeout +import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn._ +import org.apache.spark.deploy.yarn.server.ApplicationContext._ +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Try + + +/** + * + * MessageRelayActor is a Server side Actor that created by Spark Job + * that relay the message send from spark job to spark Application. + * + * In particular, this server side actor to send message to clientListener. + * + * + * @param clientListener -- Yarn Client Side Actor to receive messages + * @param appName -- spark app name + * @param sc -- sparkContext + */ +class MessageRelayActor(val clientListener: ActorRef, + appName: String, + sc: SparkContext) extends Actor with AskSupport{ + + + //relay the message to clientListener + def stopSparkJob(name: String, args: JobArg*) = { + if (name == appName) { + sc.stop() + } else { + val ignoreMsg: String = s"stopJob command for application $name is ignored, " + + s"as the current application is $appName" + println(ignoreMsg) + + clientListener ! ignoreMsg + } + } + + def receive: Actor.Receive = { + case Pong => + println("clientListener = " + sender) + println("clientListener response Pong") + + case x @ AppInit => clientListener ! x + case x @ AppStart => clientListener ! x + case x @ AppEnd => clientListener ! x + case x @ AppKilled => clientListener ! x + case x @ AppProgress => clientListener ! x + case x @ AppFailed => clientListener ! x + + + //receive message from clientListener + case StartSparkJob(name, args) => + println(" get job start job message") + case StopSparkJob(name) => + stopSparkJob(name) + + case StopSparkJob(name, args) => + stopSparkJob(name, args) + + //catch all messages + case x: Any => + Console.err.println(s"get message: $x") + clientListener ! x + case x @ _ => clientListener ! x + + } + +} + + +object AkkaChannelUtils { + + def resolveClientActor(clientListener: ActorSelection, handshakeTimeout: Timeout): Try[ActorRef] = { + println("perform handshake between client application and spark job ") + val t = Try { + implicit val timeout = handshakeTimeout + val f = clientListener.resolveOne() + Await.result(f, handshakeTimeout.duration) + } + + import scala.util.{ Failure, Success } + t match { + case Success(a) => Console.err.println("actor resolved") + case Failure(ex) => ex.printStackTrace(Console.err) + } + + t + } + + def createRelayMessenger(appCtx: ApplicationContext): ChannelMessenger = { + val timeout = appCtx.conf.get(SPARK_YARN_APP_HANDSHAKE_TIMEOUT, "5").toInt + val handShakeTimeout = new Timeout(timeout seconds) + + def resolved: Option[ActorRef] = { + findClientSideCommunicator(appCtx.sparkCtx).map { clientListener => + resolveClientActor(clientListener, handShakeTimeout).toOption + }.flatten + } + + val messenger :Option[ActorRef] = resolved.map { actor => + appCtx.sparkCtx.env.actorSystem.actorOf(Props(new MessageRelayActor(actor, + appCtx.appName, appCtx.sparkCtx)), "akka-replay-messenger") + } + + if (!messenger.isDefined) { + Console.err.println("===============================================") + Console.err.println(" Unable to setup Akka Relay Messenger ") + Console.err.println("===============================================") + } + + //hand-shake + messenger.foreach(_ ! Ping) + + ActorMessenger(messenger) + } + + def findClientSideCommunicator(sc: SparkContext): Option[ActorSelection] = { + + sc.getConf.getOption("app.spark.yarn.client.listener.uri").map { uri => + Console.err.println(s"**************************************************") + Console.err.println(s" client listener uri = $uri") + Console.err.println(s"**************************************************") + val actorSystem = sc.env.actorSystem + actorSystem.actorSelection(uri) + + } + } +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/RedirectPrintStream.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/RedirectPrintStream.scala new file mode 100644 index 0000000000000..b6dd28667e680 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/RedirectPrintStream.scala @@ -0,0 +1,17 @@ +package org.apache.spark.deploy.yarn.server + +import java.io.{OutputStream, PrintStream} + +class RedirectPrintStream(logger: ChannelMessageLogger, out: OutputStream) extends PrintStream(out) { + + override def println(x: String): Unit = { + super.println(x) + logger.logInfo(x) + } + + override def print(x: String): Unit = { + super.print(x) + logger.logInfo(x) + } + +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnAppState.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnAppState.scala new file mode 100644 index 0000000000000..9d919ded19647 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnAppState.scala @@ -0,0 +1,24 @@ +package org.apache.spark.deploy.yarn.server + +import org.apache.hadoop.yarn.api.records.ApplicationId +// this requires SPARK-3913, comment this for now +//import org.apache.spark.deploy.yarn.{YarnAppProgress, YarnAppInfo} + + +sealed trait YarnAppState +case class AppInit(time: Long, appId: ApplicationId) extends YarnAppState +/* + this requires SPARK-3913, comment this for now + +case class AppStart(time: Long, appInfo: YarnAppInfo) extends YarnAppState +case class AppProgress(time: Long, progress: YarnAppProgress) extends YarnAppState +case class AppKilled(time: Long, progress: YarnAppProgress) extends YarnAppState +case class AppEnd(time: Long, progress: YarnAppProgress) extends YarnAppState +case class AppFailed(time: Long, progress: YarnAppProgress) extends YarnAppState +*/ + +case class AppStart(time: Long) extends YarnAppState +case class AppProgress(time: Long) extends YarnAppState +case class AppKilled(time: Long) extends YarnAppState +case class AppEnd(time: Long) extends YarnAppState +case class AppFailed(time: Long) extends YarnAppState diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnSparkApp.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnSparkApp.scala new file mode 100644 index 0000000000000..31ac4dbf6612d --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/server/YarnSparkApp.scala @@ -0,0 +1,90 @@ +package org.apache.spark.deploy.yarn.server + +import java.io.{PrintWriter, StringWriter} +import java.util.concurrent.TimeUnit + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.yarn._ +import org.apache.spark.deploy.yarn.server.ApplicationContext._ + + + +trait YarnSparkApp { + + + def sparkMain(appCtx: ApplicationContext) + + def run(conf: SparkConf) { + + var logger = new ChannelMessageLogger("spark app", None) + + logTime { // log time + var failed = false + var appContext: Option[ApplicationContext] = None + + try { + appContext = Some(new ApplicationContext(conf)) + //update logger update + logger = appContext.get.logger + logger.logInfo(s"starting ${appContext.get.appName}") + sparkMain(appContext.get) + + } catch { + case e: Throwable => + failed = true + val t = wrapThrowable(appContext, e) + printStackTrace(t) + throw t + } finally { + if (!failed) { + logger.sendUpdateMessage(SPARK_APP_COMPLETED, true) + logger.logInfo(s"spark app finished.") + } + waitToFinish(failed) + appContext.foreach(_.stop()) + appContext.foreach(_.restoreConsoleOut()) + } + + def waitToFinish(failed: Boolean) { + val debugEnabled = conf.get(SPARK_YARN_APP_DEBUG_ENABLED, "false").toBoolean + if (debugEnabled || failed) { + val sleepTime = conf.get(SPARK_YARN_APP_SLEEP_TIME_BEFORE_COMPLETE, "0").toInt + val message: String = s" sleeping for $sleepTime sec before stop" + logger.logInfo(message) + appContext.foreach { aCtx => + logger.sendVisualMessage(DisplayMessage(s" Wait -- ", message)) + } + TimeUnit.SECONDS.sleep(sleepTime) + } + } + } + + + def logTime[T] ( f : => T) : T = { + val start = System.currentTimeMillis() + val t = f + val end = System.currentTimeMillis() + val msg = "total running time: " + (end - start) / 1000 + " sec" + logger.logInfo(msg) + t + } + } + + + def wrapThrowable(appContext: Option[ApplicationContext], e: Throwable): RuntimeException = { + val appName = appContext.map(ac => ac.appName).getOrElse("") + val failedMsg = s"Application $appName failed due to " + + s"${if (e.getMessage == null && e.getCause != null) e.getCause.getMessage else e.getMessage}" + new RuntimeException(failedMsg, e) + } + + def printStackTrace(t: Throwable) { + ResourceUtil.withResource(new StringWriter()) { s => + ResourceUtil.withResource(new PrintWriter(s)) { p => + t.printStackTrace(p) + Console.err.println(s.getBuffer.toString) + } + } + } + +}