Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9ed6cd6
Add streaming query Id and batch Id to scheduling logs
BrooksWalls Apr 1, 2026
4a53e4b
Add unit tests covering streaming query and batch Id logs
BrooksWalls Apr 2, 2026
92d2f95
add jira ID to test name in PoolSuite
BrooksWalls Apr 2, 2026
4845215
switch from SchedulableBuilder companion object to IdAware logger
BrooksWalls Apr 3, 2026
53099c9
Move mixin to just streaming tasksets' TaskSetManagers
BrooksWalls Apr 8, 2026
280d11b
clean up white space diff
BrooksWalls Apr 8, 2026
f1497f6
Override all Throwable-accepting log methods in streaming trait
BrooksWalls Apr 8, 2026
f5a1915
Enrich FairSchedulableBuilder logWarning with streaming IDs
BrooksWalls Apr 8, 2026
413c21c
Restore 'tasks' in 'Added task set ... tasks to pool' log message
BrooksWalls Apr 8, 2026
fedfcdc
Add explicit return type to streamingTaskSetManager
BrooksWalls Apr 9, 2026
dc6bd59
wrap streaming message in logEntry to avoid eager execution on disabl…
BrooksWalls Apr 9, 2026
0b0f5f1
remove query Id truncation
BrooksWalls Apr 9, 2026
3951124
address nits in StructuredStreamingIdAwareSchedulerLogging
BrooksWalls Apr 9, 2026
8272bc4
override properties in TaskSchedulerImpl, update test to use TaskSche…
BrooksWalls Apr 9, 2026
6f40341
handle empty strings in streaming properties
BrooksWalls Apr 10, 2026
c54701f
introduce conf for disabling structured streaming logging on the sche…
BrooksWalls Apr 15, 2026
5607bab
add SS logging conf to configs-without-binding-policy-exceptions
BrooksWalls Apr 15, 2026
47368d4
introduce config for controlling truncation of queryId in structured …
BrooksWalls Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2373,6 +2373,22 @@ package object config {
.enumConf(SchedulingMode)
.createWithDefault(SchedulingMode.FIFO)

private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED =
ConfigBuilder("spark.scheduler.streaming.idAwareLogging.enabled")
.doc("When true, scheduler log messages for streaming tasks include " +
"the structured streaming query ID and batch ID.")
.version("4.2.0")
.booleanConf
.createWithDefault(true)

private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH =
ConfigBuilder("spark.scheduler.streaming.idAwareLogging.queryIdLength")
.doc("Maximum number of characters of the streaming query ID to include " +
"in scheduler log messages. Set to -1 to include the full query ID.")
.version("4.2.0")
.intConf
.createWithDefault(5)

private[spark] val SCHEDULER_REVIVE_INTERVAL =
ConfigBuilder("spark.scheduler.revive.interval")
.version("0.8.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ import scala.xml.{Node, XML}
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE}
import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE, STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED, STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -61,6 +60,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
extends SchedulableBuilder with Logging {

val streamingIdAwareLoggingEnabled: Boolean =
sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED)
val streamingQueryIdLength: Int =
sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH)
val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
Expand Down Expand Up @@ -216,18 +219,33 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logWarning(log"A job was submitted with scheduler pool " +
log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " +
log"configured. This can happen when the file that pools are read from isn't set, or " +
log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " +
log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " +
log"configuration (schedulingMode: " +
log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " +
log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " +
log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}")
logWarning(
StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(
properties,
log"A job was submitted with scheduler pool " +
log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " +
log"configured. This can happen when the file that pools are read from isn't set, or " +
log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " +
log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " +
log"configuration (schedulingMode: " +
log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " +
log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " +
log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}",
streamingIdAwareLoggingEnabled,
streamingQueryIdLength
)
)
}
parentPool.addSchedulable(manager)
logInfo(log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " +
log"${MDC(LogKeys.POOL_NAME, poolName)}")

logInfo(
StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I said above, since this method is called for many different TaskSets we have to use the companion object's method

properties,
log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " +
log"${MDC(LogKeys.POOL_NAME, poolName)}",
streamingIdAwareLoggingEnabled,
streamingQueryIdLength
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import java.util.{HashMap, Locale, Properties}

import org.apache.spark.internal.{LogEntry, Logging, LogKeys, MessageWithContext}

/**
* A logging trait for scheduler components where log messages should include
* structured streaming identifiers (query ID and batch ID).
*
* Streaming execution sets these identifiers via
* [[org.apache.spark.SparkContext#setLocalProperty]], which is thread-local.
* Scheduler code typically runs on a different thread (e.g. the
* task-scheduler-event-loop-worker), so `getLocalProperty` would not have
* the streaming context. This trait instead reads the identifiers from the
* task's [[java.util.Properties]], which are propagated with the
* [[org.apache.spark.scheduler.TaskSet]] across thread boundaries.
*
* Mix this trait into any scheduler component that has access to task
* properties and needs streaming-aware log output.
*/
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a trait here so all logs published from TaskSetManager will include the query and batch Id when present

// we gather the query and batch Id from the properties of a given TaskSet
protected def properties: Properties
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can't rely on thread local properties, we need to gather the query and batch Id from the taskSet's properties, this must be set by class which mixes in the trait

protected def streamingIdAwareLoggingEnabled: Boolean
protected def streamingQueryIdLength: Int

override protected def logInfo(msg: => String): Unit =
super.logInfo(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))

override protected def logInfo(entry: LogEntry): Unit = {
super.logInfo(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
}

override protected def logInfo(msg: => String, t: Throwable): Unit =
super.logInfo(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)

override protected def logInfo(entry: LogEntry, t: Throwable): Unit = {
super.logInfo(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
}

override protected def logWarning(msg: => String): Unit =
super.logWarning(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))

override protected def logWarning(entry: LogEntry): Unit = {
super.logWarning(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
}

override protected def logWarning(msg: => String, t: Throwable): Unit =
super.logWarning(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)

override protected def logWarning(entry: LogEntry, t: Throwable): Unit = {
super.logWarning(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
}

override protected def logDebug(msg: => String): Unit =
super.logDebug(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))

override protected def logDebug(entry: LogEntry): Unit = {
super.logDebug(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
}

override protected def logDebug(msg: => String, t: Throwable): Unit =
super.logDebug(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)

override protected def logDebug(entry: LogEntry, t: Throwable): Unit = {
super.logDebug(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
}

override protected def logError(msg: => String): Unit =
super.logError(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))

override protected def logError(entry: LogEntry): Unit = {
super.logError(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
}

override protected def logError(msg: => String, t: Throwable): Unit =
super.logError(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)

override protected def logError(entry: LogEntry, t: Throwable): Unit = {
super.logError(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
}

override protected def logTrace(msg: => String): Unit =
super.logTrace(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))

override protected def logTrace(entry: LogEntry): Unit = {
super.logTrace(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
}

override protected def logTrace(msg: => String, t: Throwable): Unit =
super.logTrace(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)

override protected def logTrace(entry: LogEntry, t: Throwable): Unit = {
super.logTrace(
StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
}
}

/**
* Helpers for constructing log entries enriched with structured streaming
* identifiers extracted from task properties.
*/
private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses a companion object here so that we can call the methods from SchedulableBuilder which can not set one Properties object at construction as it's shared across tasks

val QUERY_ID_KEY = "sql.streaming.queryId"
val BATCH_ID_KEY = "streaming.sql.batchId"

private[scheduler] def constructStreamingLogEntry(
properties: Properties,
entry: LogEntry,
enabled: Boolean,
queryIdLength: Int): LogEntry = {
if (!enabled || properties == null) {
return entry
}
// wrap in log entry to defer until log is evaluated
new LogEntry({
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wrap this in a LogEntry since Claude pointed out that we were forcing eager evaluation of the provided logEntry even if the logging level was disabled in the environment. By wrapping everything in a LogEntry, now the logic is only called when the logging level is enabled for the environment. This is important for things like debugging and trace logs

val (queryId: Option[String], batchId: Option[String]) =
getStreamingProperties(properties, queryIdLength)

formatMessage(
queryId,
batchId,
entry
)
})
}

private[scheduler] def constructStreamingLogEntry(
properties: Properties,
msg: => String,
enabled: Boolean,
queryIdLength: Int): LogEntry = {
if (!enabled || properties == null) {
return new LogEntry(
MessageWithContext(msg, java.util.Collections.emptyMap())
)
}

new LogEntry({
val (queryId: Option[String], batchId: Option[String]) =
getStreamingProperties(properties, queryIdLength)

MessageWithContext(
formatMessage(
queryId,
batchId,
msg
),
constructStreamingContext(queryId, batchId)
)
})
}

private def constructStreamingContext(
queryId: Option[String],
batchId: Option[String]): HashMap[String, String] = {
val streamingContext = new HashMap[String, String]()
// MDC places the log key in the context as all lowercase, so we do the same here
queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _))
batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _))
Comment on lines +235 to +237
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the lowercase is necessary here or not, but wanted to match the behavior of the log interpolator

streamingContext
}

private def formatMessage(
queryId: Option[String],
batchId: Option[String],
msg: => String): String = {
val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg)
queryId.map(qId => s"[queryId = $qId] $msgWithBatchId").getOrElse(msgWithBatchId)
}

private def formatMessage(
queryId: Option[String],
batchId: Option[String],
msg: => LogEntry): MessageWithContext = {
val msgWithBatchId: MessageWithContext = batchId.map(
bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg)
).getOrElse(toMessageWithContext(msg))
queryId.map(
qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId)}] " + msgWithBatchId
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing the truncation of the query Id here. The reason is that, before, when we added the hashmap to the context containing the full query Id, any log renderer which used the log context would place the full query Id into the log message anyways. Ultimately I think it's probably better to have the full query Id in the log context and subsequently the log, then to have the truncated version. Open to reverting this change back if others feel differently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the code is simpler without the truncation

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this impact the log message? I'm not very sure we really want to make log message to contain almost 40 chars of query ID every time. This would be more painful when we have to look at mixed log (we can't ensure every single log message to onboard this). So it's OK if this isn't reflected to log message (especially prefix) but if this will go to the prefix of log message, I'm not sure I prefer it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed update where we default to truncating the query Id to 5 characters and a new config can be used to control how much we truncate, or no truncating at all

).getOrElse(msgWithBatchId)
}

private def toMessageWithContext(entry: LogEntry): MessageWithContext = {
MessageWithContext(entry.message, entry.context)
}

private def getStreamingProperties(
properties: Properties,
queryIdLength: Int): (Option[String], Option[String]) = {
val queryId = Option(properties.getProperty(QUERY_ID_KEY)).filter(_.nonEmpty).map { id =>
if (queryIdLength == -1) {
id
} else {
id.take(queryIdLength)
}
}
val batchId = Option(properties.getProperty(BATCH_ID_KEY)).filter(_.nonEmpty)
(queryId, batchId)
}
}
Loading