Skip to content

Commit

Permalink
[SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-cli…
Browse files Browse the repository at this point in the history
…ent mode

## What changes were proposed in this pull request?

Providing a new configuration "spark.yarn.un-managed-am" (defaults to false) to enable the Unmanaged AM Application in Yarn Client mode which launches the Application Master service as part of the Client. It utilizes the existing code for communicating between the Application Master <-> Task Scheduler for the container requests/allocations/launch, and eliminates these,
1. Allocating and launching the Application Master container
2. Remote Node/Process communication between Application Master <-> Task Scheduler

## How was this patch tested?

I verified manually running the applications in yarn-client mode with "spark.yarn.un-managed-am" enabled, and also ensured that there is no impact to the existing execution flows.

I would like to hear others feedback/thoughts on this.

Closes #19616 from devaraj-kavali/SPARK-22404.

Authored-by: Devaraj K <devaraj@apache.org>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
Devaraj K authored and Marcelo Vanzin committed Jan 25, 2019
1 parent 773efed commit f06bc0c
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 78 deletions.
Expand Up @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark._
Expand All @@ -54,36 +55,27 @@ import org.apache.spark.util._
/**
* Common application master functionality for Spark on Yarn.
*/
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private[spark] class ApplicationMaster(
args: ApplicationMasterArguments,
sparkConf: SparkConf,
yarnConf: YarnConfiguration) extends Logging {

// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.

private val appAttemptId = YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
private val isClusterMode = args.userClass != null

private val sparkConf = new SparkConf()
if (args.propertiesFile != null) {
Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
private val appAttemptId =
if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) {
YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
} else {
null
}
}

private val isClusterMode = args.userClass != null

private val securityMgr = new SecurityManager(sparkConf)

private var metricsSystem: Option[MetricsSystem] = None

// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
//
// Both cases create a new SparkConf object which reads these configs from system properties.
sparkConf.getAll.foreach { case (k, v) =>
sys.props(k) = v
}

private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
Expand Down Expand Up @@ -153,24 +145,15 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
// Next wait interval before allocator poll.
private var nextAllocationInterval = initialAllocationInterval

private var rpcEnv: RpcEnv = null

// In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
private val sparkContextPromise = Promise[SparkContext]()

/**
* Load the list of localized files set by the client, used when launching executors. This should
* be called in a context where the needed credentials to access HDFS are available.
*/
private def prepareLocalResources(): Map[String, LocalResource] = {
private def prepareLocalResources(distCacheConf: SparkConf): Map[String, LocalResource] = {
logInfo("Preparing Local resources")
val distCacheConf = new SparkConf(false)
if (args.distCacheConf != null) {
Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) =>
distCacheConf.set(k, v)
}
}

val resources = HashMap[String, LocalResource]()

def setupDistributedCache(
Expand Down Expand Up @@ -267,7 +250,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir()
cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR")))
}
}
}
Expand Down Expand Up @@ -299,6 +282,60 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
exitCode
}

def runUnmanaged(
clientRpcEnv: RpcEnv,
appAttemptId: ApplicationAttemptId,
stagingDir: Path,
cachedResourcesConf: SparkConf): Unit = {
try {
new CallerContext(
"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), None).setCurrentContext()

val driverRef = clientRpcEnv.setupEndpointRef(
RpcAddress(sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port").toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(Utils.localHostName, -1, sparkConf,
sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId)
addAmIpFilter(Some(driverRef), ProxyUriUtils.getPath(appAttemptId.getApplicationId))
createAllocator(driverRef, sparkConf, clientRpcEnv, appAttemptId, cachedResourcesConf)
reporterThread.join()
} catch {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
if (!unregistered) {
unregister(finalStatus, finalMsg)
cleanupStagingDir(stagingDir)
}
} finally {
try {
metricsSystem.foreach { ms =>
ms.report()
ms.stop()
}
} catch {
case e: Exception =>
logWarning("Exception during stopping of the metric system: ", e)
}
}
}

def stopUnmanaged(stagingDir: Path): Unit = {
if (!finished) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
if (!unregistered) {
unregister(finalStatus, finalMsg)
cleanupStagingDir(stagingDir)
}
}

/**
* Set the default final application status for client mode to UNDEFINED to handle
* if YARN HA restarts the application so that it properly retries. Set the final
Expand Down Expand Up @@ -376,17 +413,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
host: String,
port: Int,
_sparkConf: SparkConf,
uiAddress: Option[String]): Unit = {
val appId = appAttemptId.getApplicationId().toString()
val attemptId = appAttemptId.getAttemptId().toString()
uiAddress: Option[String],
appAttempt: ApplicationAttemptId): Unit = {
val appId = appAttempt.getApplicationId().toString()
val attemptId = appAttempt.getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
registered = true
}

private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
private def createAllocator(
driverRef: RpcEndpointRef,
_sparkConf: SparkConf,
rpcEnv: RpcEnv,
appAttemptId: ApplicationAttemptId,
distCacheConf: SparkConf): Unit = {
// In client mode, the AM may be restarting after delegation tokens have reached their TTL. So
// always contact the driver to get the current set of valid tokens, so that local resources can
// be initialized below.
Expand All @@ -400,7 +443,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
val appId = appAttemptId.getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val localResources = prepareLocalResources()
val localResources = prepareLocalResources(distCacheConf)

// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
Expand Down Expand Up @@ -438,7 +481,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}

private def runDriver(): Unit = {
addAmIpFilter(None)
addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
userClassThread = startUserApplication()

// This a bit hacky, but we need to wait until the spark.driver.port property has
Expand All @@ -449,17 +492,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
val rpcEnv = sc.env.rpcEnv

val userConf = sc.getConf
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
registerAM(host, port, userConf, sc.ui.map(_.webUrl))
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
createAllocator(driverRef, userConf)
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
Expand All @@ -483,20 +526,21 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
private def runExecutorLauncher(): Unit = {
val hostname = Utils.localHostName
val amCores = sparkConf.get(AM_CORES)
rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)

// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS))
registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS), appAttemptId)

// The driver should be up and listening, so unlike cluster mode, just try to connect to it
// with no waiting or retrying.
val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(driverHost, driverPort),
YarnSchedulerBackend.ENDPOINT_NAME)
addAmIpFilter(Some(driverRef))
createAllocator(driverRef, sparkConf)
addAmIpFilter(Some(driverRef),
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
createAllocator(driverRef, sparkConf, rpcEnv, appAttemptId, distCacheConf)

// In client mode the actor will stop the reporter thread.
reporterThread.join()
Expand Down Expand Up @@ -592,15 +636,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
t
}

private def distCacheConf(): SparkConf = {
val distCacheConf = new SparkConf(false)
if (args.distCacheConf != null) {
Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) =>
distCacheConf.set(k, v)
}
}
distCacheConf
}

/**
* Clean up the staging directory.
*/
private def cleanupStagingDir(): Unit = {
var stagingDirPath: Path = null
private def cleanupStagingDir(stagingDirPath: Path): Unit = {
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
logInfo("Deleting staging directory " + stagingDirPath)
val fs = stagingDirPath.getFileSystem(yarnConf)
fs.delete(stagingDirPath, true)
Expand All @@ -612,8 +664,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}

/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
private def addAmIpFilter(driver: Option[RpcEndpointRef], proxyBase: String) = {
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
driver match {
Expand Down Expand Up @@ -743,9 +794,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
// In cluster mode, do not rely on the disassociated event to exit
// In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isClusterMode) {
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
Expand All @@ -771,12 +822,28 @@ object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
val sparkConf = new SparkConf()
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
}
}
// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
//
// Both cases create a new SparkConf object which reads these configs from system properties.
sparkConf.getAll.foreach { case (k, v) =>
sys.props(k) = v
}

val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

val ugi = master.sparkConf.get(PRINCIPAL) match {
val ugi = sparkConf.get(PRINCIPAL) match {
case Some(principal) =>
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
SparkHadoopUtil.get.loginUserFromKeytab(principal, master.sparkConf.get(KEYTAB).orNull)
SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull)
val newUGI = UserGroupInformation.getCurrentUser()
// Transfer the original user's tokens to the new user, since it may contain needed tokens
// (such as those user to connect to YARN).
Expand Down

0 comments on commit f06bc0c

Please sign in to comment.