Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ public enum LogKeys implements LogKey {
LABEL_COLUMN,
LARGEST_CLUSTER_INDEX,
LAST_ACCESS_TIME,
LAST_ATTEMPT_ACC_INVALIDATE,
LAST_ATTEMPT_ACC_SYSTEM_METRIC,
LAST_ATTEMPT_ACC_UNEXPECTED_REASON,
LAST_ATTEMPT_ACC_USER_METRIC,
LAST_COMMITTED_CHECKPOINT_ID,
LAST_COMMIT_BASED_CHECKPOINT_ID,
LAST_SCAN_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class LogEntry(messageWithContext: => MessageWithContext) {
def message: String = cachedMessageWithContext.message

def context: java.util.Map[String, String] = cachedMessageWithContext.context

def +(other: LogEntry): LogEntry = {
val combined = cachedMessageWithContext + other.cachedMessageWithContext
new LogEntry(combined)
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3152,6 +3152,8 @@ object SparkContext extends Logging {
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"
private[spark] val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
private[spark] val DATASET_QUERY_EXECUTION_ID_KEY =
"spark.sql.dataset.queryExecution.id"

/**
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ private[spark] object Tests {
.booleanConf
.createOptional

val INJECT_SHUFFLE_FETCH_FAILURES =
ConfigBuilder("spark.testing.injectShuffleFetchFailures")
.doc("Injecting fetch failures for shuffle stages by providing an invalid BlockManager " +
"location for the first stage attempt. Testing only flag!")
.booleanConf
.createWithDefault(false)

val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry")
.version("1.2.0")
.booleanConf
Expand Down
22 changes: 20 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ private[spark] object RDDOperationScope extends Logging {
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(body: => T): T = {
withScope(sc, name, allowNesting, ignoreParent,
nextScopeId().toString)(body)
}

/**
* Execute the given body such that all RDDs created in this body
* will have the same scope, with an explicit scope ID.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean,
rddScopeId: String)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
Expand All @@ -139,10 +155,12 @@ private[spark] object RDDOperationScope extends Logging {
try {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
sc.setLocalProperty(scopeKey,
new RDDOperationScope(name, None, rddScopeId).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
sc.setLocalProperty(scopeKey,
new RDDOperationScope(name, oldScope, rddScopeId).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,11 @@ private[spark] class DAGScheduler(
throw SparkCoreErrors.accessNonExistentAccumulatorError(id)
}
acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
if (acc.isInstanceOf[LastAttemptAccumulator[_, _, _]]) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces coupling between DAGScheduler and the SLAM concept. An alternative: add an overridable method to AccumulatorV2 like def mergeWithTaskMetadata(other, rdd, taskInfo, stageId, stageAttemptId, props): Unit = {} that SLAM overrides. Then DAGScheduler calls it unconditionally (no-op for regular accumulators) without the instanceof check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LastAttemptAccumulator follows a mixin pattern, where the LastAttemptAccumulator mixin adds the extra functionality to the accumulator. The code doesn't change AccumulatorV2 at all, and only adds extra visibility to two fields in SQLMetrics, and the plugin into existing production code is limited to these few lines in DAGScheduler, plus some testing utils, plus tiny fixes to RDD scoping in collector and shuffle.
If we added this method to the base class, it would be a noop for any metric other than a LastAttemptAccumulator.
This kind of mixin follows the same pattern as e.g. most of DSv2 interfaces, where also instead of adding empty methods to many interfaces, we have mixins like SupportsDelta, RequiresDistributionAndOrdering etc. etc., and various places that interact with it are plugging in the awareness based on the type check.
I think it's a good pattern.

acc.asInstanceOf[LastAttemptAccumulator[_, _, _]].mergeLastAttempt(
updates, stage.rdd, event.taskInfo,
task.stageId, task.stageAttemptId, task.localProperties)
}
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && !updates.isZero) {
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
Expand Down Expand Up @@ -2333,6 +2338,19 @@ private[spark] class DAGScheduler(
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
// For testing purposes, inject fetch failures controlled from the driver-side by
// supplying an invalid location.
if (Utils.isTesting &&
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Injecting invalid BlockManagerId locations directly in the DAGScheduler production path (even guarded by Utils.isTesting) is a pattern that could accumulate tech debt. Similarly, AQETestHelper.shouldForceCancellation() is checked in AdaptiveSparkPlanExec's main loop. Could these be implemented via a test-only pluggable hook (e.g., a MapOutputTrackerMaster wrapper or a test-only DAGSchedulerEventProcessLoop subclass) rather than inline conditionals in production code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are about 100 code paths in Spark that invoke extra checks or trigger extra failures when Utils.isTesting; here it's protected both by isTesting and an internal config. I think introducing extra pluggable hooks and extra infrastructure to plug them in would introduce extra tech debt and regression change by requiring more reshaping of the existing production code paths.

sc.conf.get(config.Tests.INJECT_SHUFFLE_FETCH_FAILURES) &&
task.stageAttemptId == 0) {
val currentLocation = status.location
val invalidLocation = BlockManagerId(
execId = BlockManagerId.INVALID_EXECUTOR_ID,
host = currentLocation.host,
port = currentLocation.port,
topologyInfo = currentLocation.topologyInfo)
status.updateLocation(invalidLocation)
}
val isChecksumMismatched = mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
if (isChecksumMismatched) {
Expand Down
Loading