diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 28b06f4487840..2967b7754525c 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -45,6 +45,8 @@ object LogKey extends Enumeration { val CONFIG = Value val CONFIG2 = Value val CONFIG3 = Value + val CONFIG4 = Value + val CONFIG5 = Value val CONTAINER = Value val CONTAINER_ID = Value val COUNT = Value @@ -63,7 +65,9 @@ object LogKey extends Enumeration { val ERROR = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value + val EXECUTOR_ENV_REGEX = Value val EXECUTOR_ID = Value + val EXECUTOR_IDS = Value val EXECUTOR_STATE = Value val EXIT_CODE = Value val EXPRESSION_TERMS = Value @@ -81,6 +85,7 @@ object LogKey extends Enumeration { val HOST = Value val INDEX = Value val INFERENCE_MODE = Value + val INTERVAL = Value val JOB_ID = Value val JOIN_CONDITION = Value val JOIN_CONDITION_SUB_EXPRESSION = Value @@ -117,6 +122,9 @@ object LogKey extends Enumeration { val PATH = Value val PATHS = Value val POD_ID = Value + val POD_NAME = Value + val POD_NAMESPACE = Value + val POD_PHASE = Value val POLICY = Value val PORT = Value val PRODUCER_ID = Value @@ -132,7 +140,7 @@ object LogKey extends Enumeration { val REDUCE_ID = Value val RELATION_NAME = Value val REMAINING_PARTITIONS = Value - val REMOTE_ADDRESS = Value + val RESOURCE_NAME = Value val RETRY_COUNT = Value val RETRY_INTERVAL = Value val RPC_ADDRESS = Value diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 4ebf31ae44eef..9fdd9518d2d81 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -25,7 +25,8 @@ import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, EXECUTOR_ENV_REGEX} import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.util.Utils @@ -214,10 +215,10 @@ private[spark] class KubernetesExecutorConf( if (executorEnvRegex.pattern.matcher(key).matches()) { true } else { - logWarning(s"Invalid key: $key: " + - "a valid environment variable name must consist of alphabetic characters, " + - "digits, '_', '-', or '.', and must not start with a digit." + - s"Regex used for validation is '$executorEnvRegex')") + logWarning(log"Invalid key: ${MDC(CONFIG, key)}, " + + log"a valid environment variable name must consist of alphabetic characters, " + + log"digits, '_', '-', or '.', and must not start with a digit. " + + log"Regex used for validation is '${MDC(EXECUTOR_ENV_REGEX, executorEnvRegex)}'") false } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index cbd12282278fc..50ecefdb6a5dc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -32,7 +32,8 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.POD_ID import org.apache.spark.launcher.SparkLauncher import org.apache.spark.resource.ResourceUtils import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -120,8 +121,8 @@ object KubernetesUtils extends Logging { case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest)) case _ => logWarning( - s"specified container ${name} not found on pod template, " + - s"falling back to taking the first container") + log"specified container ${MDC(POD_ID, name)} not found on pod template, " + + log"falling back to taking the first container") Option.empty } val containers = pod.getSpec.getContainers.asScala.toList diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index c351211dd97d5..de15bf9b24d90 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -24,7 +24,8 @@ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, CONFIG3, CONFIG4, CONFIG5} import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} import org.apache.spark.launcher.SparkLauncher @@ -78,10 +79,11 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) private def configureForPython(pod: SparkPod, res: String): SparkPod = { if (conf.get(PYSPARK_MAJOR_PYTHON_VERSION).isDefined) { logWarning( - s"${PYSPARK_MAJOR_PYTHON_VERSION.key} was deprecated in Spark 3.1. " + - s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " + - s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " + - "variables instead.") + log"${MDC(CONFIG, PYSPARK_MAJOR_PYTHON_VERSION.key)} was deprecated in Spark 3.1. " + + log"Please set '${MDC(CONFIG2, PYSPARK_PYTHON.key)}' and " + + log"'${MDC(CONFIG3, PYSPARK_DRIVER_PYTHON.key)}' " + + log"configurations or ${MDC(CONFIG4, ENV_PYSPARK_PYTHON)} and " + + log"${MDC(CONFIG5, ENV_PYSPARK_DRIVER_PYTHON)} environment variables instead.") } val pythonEnvs = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala index dee7211c386a8..beb7ff6bfe22c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala @@ -31,7 +31,8 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesUtils} import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH, KUBERNETES_NAMESPACE} import org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, PATH, PATHS} import org.apache.spark.util.ArrayImplicits._ private[spark] object KubernetesClientUtils extends Logging { @@ -133,8 +134,8 @@ private[spark] object KubernetesClientUtils extends Logging { } } catch { case e: MalformedInputException => - logWarning( - s"Unable to read a non UTF-8 encoded file ${file.getAbsolutePath}. Skipping...", e) + logWarning(log"Unable to read a non UTF-8 encoded file " + + log"${MDC(PATH, file.getAbsolutePath)}. Skipping...", e) } finally { source.close() } @@ -144,8 +145,9 @@ private[spark] object KubernetesClientUtils extends Logging { s" ${truncatedMap.keys.mkString(",")}") } if (skippedFiles.nonEmpty) { - logWarning(s"Skipped conf file(s) ${skippedFiles.mkString(",")}, due to size constraint." + - s" Please see, config: `${Config.CONFIG_MAP_MAXSIZE.key}` for more details.") + logWarning(log"Skipped conf file(s) ${MDC(PATHS, skippedFiles.mkString(","))}, due to " + + log"size constraint. Please see, config: " + + log"`${MDC(CONFIG, Config.CONFIG_MAP_MAXSIZE.key)}` for more details.") } truncatedMap.toMap } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index f4d80c24d01ff..a48e1fba99546 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -34,6 +34,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.internal.{Logging, LogKey, MDC} +import org.apache.spark.internal.LogKey.{COUNT, EXECUTOR_IDS, TIMEOUT} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS @@ -210,10 +211,10 @@ class ExecutorPodsAllocator( } if (timedOut.nonEmpty) { - logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" + - s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" + - " allocation attempt tried to create them. The executors may have been deleted but the" + - " application missed the deletion event.") + logWarning(log"Executors with ids ${MDC(EXECUTOR_IDS, timedOut.mkString(","))}} were not " + + log"detected in the Kubernetes cluster after ${MDC(TIMEOUT, podCreationTimeout)} ms " + + log"despite the fact that a previous allocation attempt tried to create them. " + + log"The executors may have been deleted but the application missed the deletion event.") newlyCreatedExecutors --= timedOut if (shouldDeleteExecutors) { @@ -282,7 +283,7 @@ class ExecutorPodsAllocator( val newFailedExecutorIds = currentFailedExecutorIds.diff(failedExecutorIds) if (newFailedExecutorIds.nonEmpty) { - logWarning(s"${newFailedExecutorIds.size} new failed executors.") + logWarning(log"${MDC(COUNT, newFailedExecutorIds.size)} new failed executors.") newFailedExecutorIds.foreach { _ => failureTracker.registerExecutorFailure() } } failedExecutorIds = failedExecutorIds ++ currentFailedExecutorIds diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala index 1d987cc7569c1..99cef671d2e41 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala @@ -24,7 +24,8 @@ import io.fabric8.kubernetes.api.model.ContainerStateTerminated import io.fabric8.kubernetes.api.model.Pod import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{POD_NAME, POD_NAMESPACE, POD_PHASE} /** * An immutable view of the current executor pods that are running in the cluster. @@ -114,8 +115,9 @@ object ExecutorPodsSnapshot extends Logging { case "terminating" => PodTerminating(pod) case _ => - logWarning(s"Received unknown phase $phase for executor pod with name" + - s" ${pod.getMetadata.getName} in namespace ${pod.getMetadata.getNamespace}") + logWarning(log"Received unknown phase ${MDC(POD_PHASE, phase)} for executor " + + log"pod with name ${MDC(POD_NAME, pod.getMetadata.getName)} in " + + log"namespace ${MDC(POD_NAMESPACE, pod.getMetadata.getNamespace)}") PodUnknown(pod) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala index a2c4b9be0c56f..1c0de8e2afded 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala @@ -26,9 +26,11 @@ import org.apache.spark.SparkContext import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING} import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG, INTERVAL} import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND import org.apache.spark.status.api.v1 import org.apache.spark.util.ThreadUtils @@ -60,9 +62,10 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { val interval = sc.conf.get(EXECUTOR_ROLL_INTERVAL) if (interval <= 0) { - logWarning(s"Disabled due to invalid interval value, '$interval'") + logWarning(log"Disabled due to invalid interval value, " + + log"'${MDC(INTERVAL, interval * MILLIS_PER_SECOND)}'") } else if (!sc.conf.get(DECOMMISSION_ENABLED)) { - logWarning(s"Disabled because ${DECOMMISSION_ENABLED.key} is false.") + logWarning(log"Disabled because ${MDC(CONFIG, DECOMMISSION_ENABLED.key)} is false.") } else { minTasks = sc.conf.get(MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING) // Scheduler is not created yet @@ -90,7 +93,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { } case _ => logWarning("This plugin expects " + - s"${classOf[KubernetesClusterSchedulerBackend].getSimpleName}.") + log"${MDC(CLASS_NAME, classOf[KubernetesClusterSchedulerBackend].getSimpleName)}.") } } catch { case e: Throwable => logError("Error in rolling thread", e) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 7864760777fcc..bf7f91ea7ce31 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -44,7 +44,7 @@ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{EXIT_CODE, REMOTE_ADDRESS} +import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, RPC_ADDRESS} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} @@ -597,7 +597,8 @@ private[spark] class ApplicationMaster( ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + s"$failureCount time(s) from Reporter thread.") } else { - logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) + logWarning( + log"Reporter thread fails ${MDC(FAILURES, failureCount)} time(s) in a row.", e) } } try { @@ -857,7 +858,7 @@ private[spark] class ApplicationMaster( finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } else { logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, exitCode)}! " + - log"Shutting down. ${MDC(REMOTE_ADDRESS, remoteAddress)}") + log"Shutting down. ${MDC(RPC_ADDRESS, remoteAddress)}") finish(FinalApplicationStatus.FAILED, exitCode) } } else { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index dfd52a0fdb24e..bed7c859003a0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -55,8 +55,8 @@ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.APP_ID +import org.apache.spark.internal.{Logging, LogKey, MDC} +import org.apache.spark.internal.LogKey.{APP_ID, CONFIG, CONFIG2, PATH} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} @@ -258,7 +258,7 @@ private[spark] class Client( } } catch { case ioe: IOException => - logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) + logWarning(log"Failed to cleanup staging dir ${MDC(PATH, stagingDirPath)}", ioe) } } @@ -332,8 +332,8 @@ private[spark] class Client( appContext.setLogAggregationContext(logAggregationContext) } catch { case NonFatal(e) => - logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " + - "does not support it", e) + logWarning(log"Ignoring ${MDC(CONFIG, ROLLED_LOG_INCLUDE_PATTERN.key)}} " + + log"because the version of YARN does not support it", e) } } appContext.setUnmanagedAM(isClientUnmanagedAMEnabled) @@ -558,10 +558,12 @@ private[spark] class Client( val uriStr = uri.toString() val fileName = new File(uri.getPath).getName if (distributedUris.contains(uriStr)) { - logWarning(s"Same path resource $uri added multiple times to distributed cache.") + logWarning(log"Same path resource ${MDC(LogKey.URI, uri)} added multiple times " + + log"to distributed cache.") false } else if (distributedNames.contains(fileName)) { - logWarning(s"Same name resource $uri added multiple times to distributed cache") + logWarning(log"Same name resource ${MDC(LogKey.URI, uri)} added multiple times " + + log"to distributed cache") false } else { distributedUris += uriStr @@ -698,8 +700,9 @@ private[spark] class Client( case None => // No configuration, so fall back to uploading local jar files. - logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " + - "to uploading libraries under SPARK_HOME.") + logWarning( + log"Neither ${MDC(CONFIG, SPARK_JARS.key)} nor ${MDC(CONFIG2, SPARK_ARCHIVE.key)}} " + + log"is set, falling back to uploading libraries under SPARK_HOME.") val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir( sparkConf.getenv("SPARK_HOME"))) val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", @@ -878,7 +881,7 @@ private[spark] class Client( if (dir.isDirectory()) { val files = dir.listFiles() if (files == null) { - logWarning("Failed to list files under directory " + dir) + logWarning(log"Failed to list files under directory ${MDC(PATH, dir)}") } else { files.foreach { file => if (file.isFile && !hadoopConfFiles.contains(file.getName())) { @@ -1067,7 +1070,7 @@ private[spark] class Client( sparkConf)) } if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) { - logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode") + logWarning(log"${MDC(CONFIG, AM_JAVA_OPTIONS.key)} will not take effect in cluster mode") } } else { // Validate and include yarn am specific java options in yarn-client mode. @@ -1344,7 +1347,7 @@ private[spark] class Client( .getOrElse(IMap.empty) } catch { case e: Exception => - logWarning(s"Unable to get driver log links for $appId: $e") + logWarning(log"Unable to get driver log links for ${MDC(APP_ID, appId)}: ", e) // Include the full stack trace only at DEBUG level to reduce verbosity logDebug(s"Unable to get driver log links for $appId", e) IMap.empty diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index f9aa11c4d48d6..755a69520ce41 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, RESOURCE_NAME} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceID import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU} @@ -166,9 +167,9 @@ private object ResourceRequestHelper extends Logging { case e: ResourceNotFoundException => // warn a couple times and then stop so we don't spam the logs if (numResourceErrors < 2) { - logWarning(s"YARN doesn't know about resource $name, your resource discovery " + - s"has to handle properly discovering and isolating the resource! Error: " + - s"${e.getMessage}") + logWarning(log"YARN doesn't know about resource ${MDC(RESOURCE_NAME, name)}, " + + log"your resource discovery has to handle properly discovering and isolating " + + log"the resource! Error: ${MDC(ERROR, e.getMessage)}") numResourceErrors += 1 } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 1660db8903c73..efe766be8356d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -41,7 +41,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{CONTAINER_ID, EXECUTOR_ID} +import org.apache.spark.internal.LogKey.{APP_STATE, CONFIG, CONFIG2, CONFIG3, CONTAINER_ID, ERROR, EXECUTOR_ID, HOST, REASON} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -194,8 +194,8 @@ private[yarn] class YarnAllocator( sparkConf.get(SHUFFLE_SERVICE_ENABLED)) match { case (true, false) => true case (true, true) => - logWarning(s"Yarn Executor Decommissioning is supported only " + - s"when ${SHUFFLE_SERVICE_ENABLED.key} is set to false. See: SPARK-39018.") + logWarning(log"Yarn Executor Decommissioning is supported only " + + log"when ${MDC(CONFIG, SHUFFLE_SERVICE_ENABLED.key)} is set to false. See: SPARK-39018.") false case (false, _) => false } @@ -421,7 +421,7 @@ private[yarn] class YarnAllocator( val (_, rpId) = containerIdToExecutorIdAndResourceProfileId(container.getId) internalReleaseContainer(container) getOrUpdateRunningExecutorForRPId(rpId).remove(executorId) - case _ => logWarning(s"Attempted to kill unknown executor $executorId!") + case _ => logWarning(log"Attempted to kill unknown executor ${MDC(EXECUTOR_ID, executorId)}!") } } @@ -848,7 +848,8 @@ private[yarn] class YarnAllocator( containerIdToExecutorIdAndResourceProfileId.get(containerId) match { case Some((executorId, _)) => getOrUpdateRunningExecutorForRPId(rpId).remove(executorId) - case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}") + case None => logWarning(log"Cannot find executorId for container: " + + log"${MDC(CONTAINER_ID, containerId)}") } logInfo("Completed container %s%s (state: %s, exit status: %s)".format( @@ -859,31 +860,36 @@ private[yarn] class YarnAllocator( val exitStatus = completedContainer.getExitStatus val (exitCausedByApp, containerExitReason) = exitStatus match { case _ if shutdown => - (false, s"Executor for container $containerId exited after Application shutdown.") + (false, log"Executor for container ${MDC(CONTAINER_ID, containerId)} exited after " + + log"Application shutdown.") case ContainerExitStatus.SUCCESS => - (false, s"Executor for container $containerId exited because of a YARN event (e.g., " + - "preemption) and not because of an error in the running job.") + (false, log"Executor for container ${MDC(CONTAINER_ID, containerId)} exited because " + + log"of a YARN event (e.g., preemption) and not because of an error in the running " + + log"job.") case ContainerExitStatus.PREEMPTED => // Preemption is not the fault of the running tasks, since YARN preempts containers // merely to do resource sharing, and tasks that fail due to preempted executors could // just as easily finish on any other executor. See SPARK-8167. - (false, s"Container ${containerId}${onHostStr} was preempted.") + (false, log"Container ${MDC(CONTAINER_ID, containerId)}${MDC(HOST, onHostStr)} " + + log"was preempted.") // Should probably still count memory exceeded exit codes towards task failures case ContainerExitStatus.KILLED_EXCEEDED_VMEM => val vmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX virtual memory used".r val diag = vmemExceededPattern.findFirstIn(completedContainer.getDiagnostics) .map(_.concat(".")).getOrElse("") - val message = "Container killed by YARN for exceeding virtual memory limits. " + - s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key} or boosting " + - s"${YarnConfiguration.NM_VMEM_PMEM_RATIO} or disabling " + - s"${YarnConfiguration.NM_VMEM_CHECK_ENABLED} because of YARN-4714." + val message = log"Container killed by YARN for exceeding virtual memory limits. " + + log"${MDC(ERROR, diag)} Consider boosting " + + log"${MDC(CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)} or boosting " + + log"${MDC(CONFIG2, YarnConfiguration.NM_VMEM_PMEM_RATIO)} or disabling " + + log"${MDC(CONFIG3, YarnConfiguration.NM_VMEM_CHECK_ENABLED)} because of YARN-4714." (true, message) case ContainerExitStatus.KILLED_EXCEEDED_PMEM => val pmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX physical memory used".r val diag = pmemExceededPattern.findFirstIn(completedContainer.getDiagnostics) .map(_.concat(".")).getOrElse("") - val message = "Container killed by YARN for exceeding physical memory limits. " + - s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}." + val message = log"Container killed by YARN for exceeding physical memory limits. " + + log"${MDC(ERROR, diag)} Consider boosting " + + log"${MDC(CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)}." (true, message) case other_exit_status => val exitStatus = completedContainer.getExitStatus @@ -894,17 +900,17 @@ private[yarn] class YarnAllocator( // SPARK-26269: follow YARN's behaviour, see details in // org.apache.hadoop.yarn.util.Apps#shouldCountTowardsNodeBlacklisting if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) { - (false, s"Container marked as failed: $containerId$onHostStr. " + - s"Exit status: $exitStatus. " + - s"Possible causes: $sparkExitCodeReason " + - s"Diagnostics: ${completedContainer.getDiagnostics}.") + (false, log"Container marked as failed: ${MDC(CONTAINER_ID, containerId)}" + + log"${MDC(HOST, onHostStr)}. Exit status: ${MDC(APP_STATE, exitStatus)}. " + + log"Possible causes: ${MDC(REASON, sparkExitCodeReason)} " + + log"Diagnostics: ${MDC(ERROR, completedContainer.getDiagnostics)}.") } else { // completed container from a bad node allocatorNodeHealthTracker.handleResourceAllocationFailure(hostOpt) - (true, s"Container from a bad node: $containerId$onHostStr. " + - s"Exit status: $exitStatus. " + - s"Possible causes: $sparkExitCodeReason " + - s"Diagnostics: ${completedContainer.getDiagnostics}.") + (true, log"Container from a bad node: ${MDC(CONTAINER_ID, containerId)}" + + log"${MDC(HOST, onHostStr)}. Exit status: ${MDC(APP_STATE, exitStatus)}. " + + log"Possible causes: ${MDC(REASON, sparkExitCodeReason)} " + + log"Diagnostics: ${MDC(ERROR, completedContainer.getDiagnostics)}.") } } if (exitCausedByApp) { @@ -912,7 +918,7 @@ private[yarn] class YarnAllocator( } else { logInfo(containerExitReason) } - ExecutorExited(exitStatus, exitCausedByApp, containerExitReason) + ExecutorExited(exitStatus, exitCausedByApp, containerExitReason.message) } else { // If we have already released this container, then it must mean // that the driver has explicitly requested it to be killed @@ -974,7 +980,8 @@ private[yarn] class YarnAllocator( // the pre-stored lost reason context.reply(releasedExecutorLossReasons.remove(eid).get) } else { - logWarning(s"Tried to get the loss reason for non-existent executor $eid") + logWarning(log"Tried to get the loss reason for non-existent executor " + + log"${MDC(EXECUTOR_ID, eid)}") context.sendFailure( new SparkException(s"Fail to find loss reason for non-existent executor $eid")) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index a8e655aed1bf8..ccc0bc9f715e4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -127,8 +127,8 @@ private[spark] class YarnClientSchedulerBackend( state match { case FinalApplicationStatus.FAILED | FinalApplicationStatus.KILLED if conf.get(AM_CLIENT_MODE_EXIT_ON_ERROR) => - logWarning(s"ApplicationMaster finished with status ${state}, " + - s"SparkContext should exit with code 1.") + logWarning(log"ApplicationMaster finished with status ${MDC(APP_STATE, state)}, " + + log"SparkContext should exit with code 1.") System.exit(1) case _ => } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1e3bec628d903..8404785d8e0b0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{EXECUTOR_ID, REASON, RPC_ADDRESS} import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -304,9 +305,10 @@ private[spark] abstract class YarnSchedulerBackend( .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread) .recover { case NonFatal(e) => - logWarning(s"Attempted to get executor loss reason" + - s" for executor id ${executorId} at RPC address ${executorRpcAddress}," + - s" but got no response. Marking as agent lost.", e) + logWarning(log"Attempted to get executor loss reason for executor id " + + log"${MDC(EXECUTOR_ID, executorId)} at RPC address " + + log"${MDC(RPC_ADDRESS, executorRpcAddress)}, but got no response. " + + log"Marking as agent lost.", e) RemoveExecutor(executorId, ExecutorProcessLost()) }(ThreadUtils.sameThread) case None => @@ -343,7 +345,8 @@ private[spark] abstract class YarnSchedulerBackend( case r @ RemoveExecutor(executorId, reason) => if (!stopped.get) { - logWarning(s"Requesting driver to remove executor $executorId for reason $reason") + logWarning(log"Requesting driver to remove executor " + + log"${MDC(EXECUTOR_ID, executorId)} for reason ${MDC(REASON, reason)}") driverEndpoint.send(r) } @@ -392,7 +395,7 @@ private[spark] abstract class YarnSchedulerBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (amEndpoint.exists(_.address == remoteAddress)) { - logWarning(s"ApplicationMaster has disassociated: $remoteAddress") + logWarning(log"ApplicationMaster has disassociated: ${MDC(RPC_ADDRESS, remoteAddress)}") amEndpoint = None } }