Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ object LogKey extends Enumeration {
val CONFIG = Value
val CONFIG2 = Value
val CONFIG3 = Value
val CONFIG4 = Value
val CONFIG5 = Value
val CONTAINER = Value
val CONTAINER_ID = Value
val COUNT = Value
Expand All @@ -63,7 +65,9 @@ object LogKey extends Enumeration {
val ERROR = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
val EXECUTOR_ENV_REGEX = Value
val EXECUTOR_ID = Value
val EXECUTOR_IDS = Value
val EXECUTOR_STATE = Value
val EXIT_CODE = Value
val EXPRESSION_TERMS = Value
Expand All @@ -81,6 +85,7 @@ object LogKey extends Enumeration {
val HOST = Value
val INDEX = Value
val INFERENCE_MODE = Value
val INTERVAL = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
val JOIN_CONDITION_SUB_EXPRESSION = Value
Expand Down Expand Up @@ -117,6 +122,9 @@ object LogKey extends Enumeration {
val PATH = Value
val PATHS = Value
val POD_ID = Value
val POD_NAME = Value
val POD_NAMESPACE = Value
val POD_PHASE = Value
val POLICY = Value
val PORT = Value
val PRODUCER_ID = Value
Expand All @@ -132,7 +140,7 @@ object LogKey extends Enumeration {
val REDUCE_ID = Value
val RELATION_NAME = Value
val REMAINING_PARTITIONS = Value
val REMOTE_ADDRESS = Value
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that the LogKey REMOTE_ADDRESS and RPC_ADDRESS have the same meaning, so I removed REMOTE_ADDRESS from LogKey and only kept RPC_ADDRESS

val RESOURCE_NAME = Value
val RETRY_COUNT = Value
val RETRY_INTERVAL = Value
val RPC_ADDRESS = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CONFIG, EXECUTOR_ENV_REGEX}
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -214,10 +215,10 @@ private[spark] class KubernetesExecutorConf(
if (executorEnvRegex.pattern.matcher(key).matches()) {
true
} else {
logWarning(s"Invalid key: $key: " +
"a valid environment variable name must consist of alphabetic characters, " +
"digits, '_', '-', or '.', and must not start with a digit." +
s"Regex used for validation is '$executorEnvRegex')")
logWarning(log"Invalid key: ${MDC(CONFIG, key)}, " +
log"a valid environment variable name must consist of alphabetic characters, " +
log"digits, '_', '-', or '.', and must not start with a digit. " +
log"Regex used for validation is '${MDC(EXECUTOR_ENV_REGEX, executorEnvRegex)}'")
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Since, Unstable}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.POD_ID
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down Expand Up @@ -120,8 +121,8 @@ object KubernetesUtils extends Logging {
case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest))
case _ =>
logWarning(
s"specified container ${name} not found on pod template, " +
s"falling back to taking the first container")
log"specified container ${MDC(POD_ID, name)} not found on pod template, " +
log"falling back to taking the first container")
Option.empty
}
val containers = pod.getSpec.getContainers.asScala.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, CONFIG3, CONFIG4, CONFIG5}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this looks a bit ugly, there seems to be no better way.

import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.launcher.SparkLauncher

Expand Down Expand Up @@ -78,10 +79,11 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
private def configureForPython(pod: SparkPod, res: String): SparkPod = {
if (conf.get(PYSPARK_MAJOR_PYTHON_VERSION).isDefined) {
logWarning(
s"${PYSPARK_MAJOR_PYTHON_VERSION.key} was deprecated in Spark 3.1. " +
s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
"variables instead.")
log"${MDC(CONFIG, PYSPARK_MAJOR_PYTHON_VERSION.key)} was deprecated in Spark 3.1. " +
log"Please set '${MDC(CONFIG2, PYSPARK_PYTHON.key)}' and " +
log"'${MDC(CONFIG3, PYSPARK_DRIVER_PYTHON.key)}' " +
log"configurations or ${MDC(CONFIG4, ENV_PYSPARK_PYTHON)} and " +
log"${MDC(CONFIG5, ENV_PYSPARK_DRIVER_PYTHON)} environment variables instead.")
}

val pythonEnvs = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesUtils}
import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH, KUBERNETES_NAMESPACE}
import org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CONFIG, PATH, PATHS}
import org.apache.spark.util.ArrayImplicits._

private[spark] object KubernetesClientUtils extends Logging {
Expand Down Expand Up @@ -133,8 +134,8 @@ private[spark] object KubernetesClientUtils extends Logging {
}
} catch {
case e: MalformedInputException =>
logWarning(
s"Unable to read a non UTF-8 encoded file ${file.getAbsolutePath}. Skipping...", e)
logWarning(log"Unable to read a non UTF-8 encoded file " +
log"${MDC(PATH, file.getAbsolutePath)}. Skipping...", e)
} finally {
source.close()
}
Expand All @@ -144,8 +145,9 @@ private[spark] object KubernetesClientUtils extends Logging {
s" ${truncatedMap.keys.mkString(",")}")
}
if (skippedFiles.nonEmpty) {
logWarning(s"Skipped conf file(s) ${skippedFiles.mkString(",")}, due to size constraint." +
s" Please see, config: `${Config.CONFIG_MAP_MAXSIZE.key}` for more details.")
logWarning(log"Skipped conf file(s) ${MDC(PATHS, skippedFiles.mkString(","))}, due to " +
log"size constraint. Please see, config: " +
log"`${MDC(CONFIG, Config.CONFIG_MAP_MAXSIZE.key)}` for more details.")
}
truncatedMap.toMap
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.{Logging, LogKey, MDC}
import org.apache.spark.internal.LogKey.{COUNT, EXECUTOR_IDS, TIMEOUT}
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS
Expand Down Expand Up @@ -210,10 +211,10 @@ class ExecutorPodsAllocator(
}

if (timedOut.nonEmpty) {
logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" +
s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" +
" allocation attempt tried to create them. The executors may have been deleted but the" +
" application missed the deletion event.")
logWarning(log"Executors with ids ${MDC(EXECUTOR_IDS, timedOut.mkString(","))}} were not " +
log"detected in the Kubernetes cluster after ${MDC(TIMEOUT, podCreationTimeout)} ms " +
log"despite the fact that a previous allocation attempt tried to create them. " +
log"The executors may have been deleted but the application missed the deletion event.")

newlyCreatedExecutors --= timedOut
if (shouldDeleteExecutors) {
Expand Down Expand Up @@ -282,7 +283,7 @@ class ExecutorPodsAllocator(

val newFailedExecutorIds = currentFailedExecutorIds.diff(failedExecutorIds)
if (newFailedExecutorIds.nonEmpty) {
logWarning(s"${newFailedExecutorIds.size} new failed executors.")
logWarning(log"${MDC(COUNT, newFailedExecutorIds.size)} new failed executors.")
newFailedExecutorIds.foreach { _ => failureTracker.registerExecutorFailure() }
}
failedExecutorIds = failedExecutorIds ++ currentFailedExecutorIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import io.fabric8.kubernetes.api.model.ContainerStateTerminated
import io.fabric8.kubernetes.api.model.Pod

import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{POD_NAME, POD_NAMESPACE, POD_PHASE}

/**
* An immutable view of the current executor pods that are running in the cluster.
Expand Down Expand Up @@ -114,8 +115,9 @@ object ExecutorPodsSnapshot extends Logging {
case "terminating" =>
PodTerminating(pod)
case _ =>
logWarning(s"Received unknown phase $phase for executor pod with name" +
s" ${pod.getMetadata.getName} in namespace ${pod.getMetadata.getNamespace}")
logWarning(log"Received unknown phase ${MDC(POD_PHASE, phase)} for executor " +
log"pod with name ${MDC(POD_NAME, pod.getMetadata.getName)} in " +
log"namespace ${MDC(POD_NAMESPACE, pod.getMetadata.getNamespace)}")
PodUnknown(pod)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG, INTERVAL}
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
import org.apache.spark.status.api.v1
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -60,9 +62,10 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
val interval = sc.conf.get(EXECUTOR_ROLL_INTERVAL)
if (interval <= 0) {
logWarning(s"Disabled due to invalid interval value, '$interval'")
logWarning(log"Disabled due to invalid interval value, " +
log"'${MDC(INTERVAL, interval * MILLIS_PER_SECOND)}'")
} else if (!sc.conf.get(DECOMMISSION_ENABLED)) {
logWarning(s"Disabled because ${DECOMMISSION_ENABLED.key} is false.")
logWarning(log"Disabled because ${MDC(CONFIG, DECOMMISSION_ENABLED.key)} is false.")
} else {
minTasks = sc.conf.get(MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING)
// Scheduler is not created yet
Expand Down Expand Up @@ -90,7 +93,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
}
case _ =>
logWarning("This plugin expects " +
s"${classOf[KubernetesClusterSchedulerBackend].getSimpleName}.")
log"${MDC(CLASS_NAME, classOf[KubernetesClusterSchedulerBackend].getSimpleName)}.")
}
} catch {
case e: Throwable => logError("Error in rolling thread", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{EXIT_CODE, REMOTE_ADDRESS}
import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, RPC_ADDRESS}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
Expand Down Expand Up @@ -597,7 +597,8 @@ private[spark] class ApplicationMaster(
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
logWarning(
log"Reporter thread fails ${MDC(FAILURES, failureCount)} time(s) in a row.", e)
}
}
try {
Expand Down Expand Up @@ -857,7 +858,7 @@ private[spark] class ApplicationMaster(
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
} else {
logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, exitCode)}! " +
log"Shutting down. ${MDC(REMOTE_ADDRESS, remoteAddress)}")
log"Shutting down. ${MDC(RPC_ADDRESS, remoteAddress)}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated to this PR, but I found that the LogKey REMOTE_ADDRESS and RPC_ADDRESS have the same meaning, so I removed REMOTE_ADDRESS from LogKey and only kept RPC_ADDRESS

finish(FinalApplicationStatus.FAILED, exitCode)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.APP_ID
import org.apache.spark.internal.{Logging, LogKey, MDC}
import org.apache.spark.internal.LogKey.{APP_ID, CONFIG, CONFIG2, PATH}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
Expand Down Expand Up @@ -258,7 +258,7 @@ private[spark] class Client(
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
logWarning(log"Failed to cleanup staging dir ${MDC(PATH, stagingDirPath)}", ioe)
}
}

Expand Down Expand Up @@ -332,8 +332,8 @@ private[spark] class Client(
appContext.setLogAggregationContext(logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
"does not support it", e)
logWarning(log"Ignoring ${MDC(CONFIG, ROLLED_LOG_INCLUDE_PATTERN.key)}} " +
log"because the version of YARN does not support it", e)
}
}
appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)
Expand Down Expand Up @@ -558,10 +558,12 @@ private[spark] class Client(
val uriStr = uri.toString()
val fileName = new File(uri.getPath).getName
if (distributedUris.contains(uriStr)) {
logWarning(s"Same path resource $uri added multiple times to distributed cache.")
logWarning(log"Same path resource ${MDC(LogKey.URI, uri)} added multiple times " +
log"to distributed cache.")
false
} else if (distributedNames.contains(fileName)) {
logWarning(s"Same name resource $uri added multiple times to distributed cache")
logWarning(log"Same name resource ${MDC(LogKey.URI, uri)} added multiple times " +
log"to distributed cache")
false
} else {
distributedUris += uriStr
Expand Down Expand Up @@ -698,8 +700,9 @@ private[spark] class Client(

case None =>
// No configuration, so fall back to uploading local jar files.
logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
"to uploading libraries under SPARK_HOME.")
logWarning(
log"Neither ${MDC(CONFIG, SPARK_JARS.key)} nor ${MDC(CONFIG2, SPARK_ARCHIVE.key)}} " +
log"is set, falling back to uploading libraries under SPARK_HOME.")
val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
sparkConf.getenv("SPARK_HOME")))
val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
Expand Down Expand Up @@ -878,7 +881,7 @@ private[spark] class Client(
if (dir.isDirectory()) {
val files = dir.listFiles()
if (files == null) {
logWarning("Failed to list files under directory " + dir)
logWarning(log"Failed to list files under directory ${MDC(PATH, dir)}")
} else {
files.foreach { file =>
if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
Expand Down Expand Up @@ -1067,7 +1070,7 @@ private[spark] class Client(
sparkConf))
}
if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
logWarning(log"${MDC(CONFIG, AM_JAVA_OPTIONS.key)} will not take effect in cluster mode")
}
} else {
// Validate and include yarn am specific java options in yarn-client mode.
Expand Down Expand Up @@ -1344,7 +1347,7 @@ private[spark] class Client(
.getOrElse(IMap.empty)
} catch {
case e: Exception =>
logWarning(s"Unable to get driver log links for $appId: $e")
logWarning(log"Unable to get driver log links for ${MDC(APP_ID, appId)}: ", e)
// Include the full stack trace only at DEBUG level to reduce verbosity
logDebug(s"Unable to get driver log links for $appId", e)
IMap.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{ERROR, RESOURCE_NAME}
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU}
Expand Down Expand Up @@ -166,9 +167,9 @@ private object ResourceRequestHelper extends Logging {
case e: ResourceNotFoundException =>
// warn a couple times and then stop so we don't spam the logs
if (numResourceErrors < 2) {
logWarning(s"YARN doesn't know about resource $name, your resource discovery " +
s"has to handle properly discovering and isolating the resource! Error: " +
s"${e.getMessage}")
logWarning(log"YARN doesn't know about resource ${MDC(RESOURCE_NAME, name)}, " +
log"your resource discovery has to handle properly discovering and isolating " +
log"the resource! Error: ${MDC(ERROR, e.getMessage)}")
numResourceErrors += 1
}
}
Expand Down
Loading