Skip to content

[SPARK-56509][SQL] SparkSQL Last Attempt Metrics#55371

Open
juliuszsompolski wants to merge 16 commits intoapache:masterfrom
juliuszsompolski:spark-last-attempt-metrics
Open

[SPARK-56509][SQL] SparkSQL Last Attempt Metrics#55371
juliuszsompolski wants to merge 16 commits intoapache:masterfrom
juliuszsompolski:spark-last-attempt-metrics

Conversation

@juliuszsompolski
Copy link
Copy Markdown
Contributor

@juliuszsompolski juliuszsompolski commented Apr 16, 2026

What changes were proposed in this pull request?

This PR introduces Last Attempt Accumulators — accumulators that track metric values aggregated across the "last execution" that produced the values, discarding values from earlier attempts that have been recomputed due to stage retries or AQE replanning.

The problem

Regular Spark accumulators sum up values from all task attempts, including retried ones. When a stage retry occurs — due to executor loss, fetch failures, or AQE replanning — the accumulator value includes contributions from both the original execution and the retry. This makes it impossible to determine the "true" metric value of the final successful execution. For example, if a stage processes 1000 rows and is retried once, numOutputRows reports 2000 instead of 1000.

This is a fundamental limitation when accumulators are used for post-execution correctness checking (e.g. verifying that the number of rows written matches expectations), or when accurate execution statistics need to be reported.

The solution

LastAttemptAccumulator is a trait that can be mixed into any AccumulatorV2 subclass. It hooks into DAGScheduler.updateAccumulators to receive per-task stage and attempt metadata alongside the regular accumulator merge. It tracks per-RDD, per-partition partial values tagged with (stageId, stageAttemptId, taskAttemptNumber), and when queried, aggregates only the values from the latest attempt of each partition.

API

The core trait LastAttemptAccumulator[IN, OUT, PARTIAL] provides several query methods to retrieve the last attempt value, depending on what scope is of interest:

  • lastAttemptValueForRDDId(rddId) / lastAttemptValueForRDDIds(rddIds) — Value from specific RDD(s). Useful when the caller knows which RDD(s) the metric was used in.
  • lastAttemptValueForAllRDDs() — Value aggregated from all RDDs that contributed to this accumulator. Useful when only one execution used the accumulator.
  • lastAttemptValueForHighestRDDId() — Value from the RDD with the highest ID, which corresponds to the most recent execution. A simple heuristic that works well for single-use metrics in SQL plans.
  • lastAttemptValueForRDDScopes(rddScopeIds) — Value from RDDs matching specific RDDOperationScope IDs, enabling tracking by SparkPlan node.

All query methods return Option[OUT]None means the last attempt value cannot be determined (e.g. the accumulator was updated both from tasks and directly on the driver, or an internal consistency check failed).

The SQL extension SQLLastAttemptAccumulator adds Dataset-aware tracking:

  • lastAttemptValueForDataset(ds) / lastAttemptValueForQueryExecution(qe) — Value from the execution of a specific Dataset or QueryExecution. This works by extracting RDDOperationScope IDs from the physical plan to identify which RDDs belong to which SparkPlan nodes, and is aware of exchanges, subqueries, broadcast joins, and WholeStageCodegen fallbacks.

A ready-to-use concrete implementation SQLLastAttemptMetric (a SQLMetric with SQLLastAttemptAccumulator mixed in) can be created via SQLLastAttemptMetrics.createMetric(sparkContext, name).

Scoping and edge cases handled

  • Stage retries: Tracks (stageId, stageAttemptId, taskAttemptNumber) tuples, keeps only the latest.
  • AQE replanning: New plans create new RDDs with new scopes; old cancelled plan values are naturally excluded when querying by scope.
  • Repeated Dataset execution: Same executedPlan and RDDs are reused; the accumulator detects this and doesn't double-count.
  • Partial execution (take/limit): Only computed partitions are aggregated.
  • Driver-side updates (e.g. ConvertToLocalRelation optimizer folding): Tracked separately, scoped to QueryExecution.id. Bails out if mixed with task-side updates.
  • WholeStageCodegen fallback: Accounts for both the codegen wrapper's and the child's RDD scope, since fallback changes which scope the execution runs in.

Testing infrastructure

This PR also adds two testing-only mechanisms:

  • INJECT_SHUFFLE_FETCH_FAILURES: a config that injects invalid BlockManager locations for the first stage attempt of shuffle map tasks, forcing stage retries.
  • AQETestHelper.withForcedCancellation: triggers forced AQE replanning after the first stage materializes with non-zero metric values, causing the plan to be discarded and re-run.

Together these enable testing Last Attempt Accumulators under both stage retry and AQE replanning conditions.

Why are the changes needed?

Applications and future work:

  • DML statistics: Reporting correct rows written/updated/deleted after INSERT, UPDATE, DELETE, MERGE operations, even when stage retries occurred.
  • Observability: Any metric that should reflect the actual work done by the final successful execution, not cumulative work across all attempts. We could consider reporting metrics like "num output rows" of various operators in Spark UI using SLAM.

Does this PR introduce any user-facing change?

This adds internal infrastructure. LastAttemptAccumulator and SQLLastAttemptMetric are internal APIs. No existing accumulator behavior is changed.

There is a slight change in RDDOperationScope names. SparkPlan used to create scopes with an autogenerated name (RDDOperationScope.nextScopeId. Now they get a scope name assigned as spark_plan_{id} where id is the SparkPlan.id, which is also a globally unique, monotonically incrementing id. This is visible in the SparkUI, and makes it more readable to link the RDD operation with the SparkPlan that triggered it.

How was this patch tested?

  • SQLLastAttemptMetricUnitSuite — Unit tests for serialization, copy, and mergeLastAttempt with various attempt orderings.
  • SQLLastAttemptMetricIntegrationSuite — Integration tests with RDDs and Datasets covering: single/multi stage execution, take/coalesce, driver-side updates, ConvertToLocalRelation optimizer folding, BroadcastNestedLoopJoin double execution, AQE coalesced shuffle partitions, and WholeStageCodegenExec fallback.
  • SQLLastAttemptMetricIntegrationSuiteWithStageRetries — Reruns all integration tests with INJECT_SHUFFLE_FETCH_FAILURES enabled to verify correctness under stage retries.
  • SQLLastAttemptMetricPlanShapesSuite — Tests various physical plan shapes (simple plans, subqueries, shuffle/broadcast/reused exchanges, cached plans, checkpointed plans) across an AQE on/off, stage retry on/off, and AQE replanning on/off matrix.
  • MetricsFailureInjectionSuite — Tests the AQE forced cancellation and shuffle fetch failure injection mechanisms with multi-stage queries, verifying that both regular metrics and SLAM metrics behave correctly under retries and replanning.

Was this patch authored or co-authored using generative AI tooling?

Code artisinally crafted by a human, some refactoring and applying review comments by Claude.
Generated-by: Claude Code (claude-opus-4-6)

Co-authored-by: Isaac
@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

cc @cloud-fan

- Fix afterAll() calling super.beforeAll() instead of super.afterAll()
- Fix copy-paste errors in bailout messages (ShuffleExchangeLike, BaseSubqueryExec)
- Fix typos: lastAttempValue, e.d., PARTIAl, highestid, splits it, decomissioned,
  a subclass an, in in, implementation create
- Fix missing ]] in lastAttemptValueForHighestRDDId scaladoc link
- Simplify partialMerge to avoid unnecessary copy+merge
- Remove unused stageId/stageAttemptId params from setMockAttempt
@juliuszsompolski juliuszsompolski force-pushed the spark-last-attempt-metrics branch from 4515033 to 4dd356a Compare April 16, 2026 17:38
@HyukjinKwon HyukjinKwon changed the title [SPARK-56509] SparkSQL Last Attempt Metrics [SPARK-56509][SQL] SparkSQL Last Attempt Metrics Apr 16, 2026
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well-designed infrastructure for tracking "last attempt" accumulator values, with strong test coverage. The layered architecture (core trait → SQL trait → concrete metric) is sensible, and the test matrix across AQE/retries/replanning is thorough.

Main concerns: test infrastructure embedded in production code paths, and a few API widening choices. See inline comments.

Comment thread common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java Outdated
throw SparkCoreErrors.accessNonExistentAccumulatorError(id)
}
acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
if (acc.isInstanceOf[LastAttemptAccumulator[_, _, _]]) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces coupling between DAGScheduler and the SLAM concept. An alternative: add an overridable method to AccumulatorV2 like def mergeWithTaskMetadata(other, rdd, taskInfo, stageId, stageAttemptId, props): Unit = {} that SLAM overrides. Then DAGScheduler calls it unconditionally (no-op for regular accumulators) without the instanceof check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LastAttemptAccumulator follows a mixin pattern, where the LastAttemptAccumulator mixin adds the extra functionality to the accumulator. The code doesn't change AccumulatorV2 at all, and only adds extra visibility to two fields in SQLMetrics, and the plugin into existing production code is limited to these few lines in DAGScheduler, plus some testing utils, plus tiny fixes to RDD scoping in collector and shuffle.
If we added this method to the base class, it would be a noop for any metric other than a LastAttemptAccumulator.
This kind of mixin follows the same pattern as e.g. most of DSv2 interfaces, where also instead of adding empty methods to many interfaces, we have mixins like SupportsDelta, RequiresDistributionAndOrdering etc. etc., and various places that interact with it are plugging in the awareness based on the type check.
I think it's a good pattern.

// available.
// For testing purposes, inject fetch failures controlled from the driver-side by
// supplying an invalid location.
if (Utils.isTesting &&
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Injecting invalid BlockManagerId locations directly in the DAGScheduler production path (even guarded by Utils.isTesting) is a pattern that could accumulate tech debt. Similarly, AQETestHelper.shouldForceCancellation() is checked in AdaptiveSparkPlanExec's main loop. Could these be implemented via a test-only pluggable hook (e.g., a MapOutputTrackerMaster wrapper or a test-only DAGSchedulerEventProcessLoop subclass) rather than inline conditionals in production code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are about 100 code paths in Spark that invoke extra checks or trigger extra failures when Utils.isTesting; here it's protected both by isTesting and an internal config. I think introducing extra pluggable hooks and extra infrastructure to plug them in would introduce extra tech debt and regression change by requiring more reshaping of the existing production code paths.

Comment thread core/src/main/scala/org/apache/spark/util/LastAttemptAccumulator.scala Outdated
@juliuszsompolski juliuszsompolski force-pushed the spark-last-attempt-metrics branch from a0294a7 to 38437af Compare April 20, 2026 13:43
@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

https://github.com/juliuszsompolski/apache-spark/actions/runs/24668193960/job/72131723131
I get the same errors in build/sbt -Pkinesis-asl unidoc out of clean master trying to build locally, so it's not an issue in this PR.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state and problem
Regular Spark accumulators sum values across all task attempts, including those discarded by stage retries or AQE replanning. For metrics meant to reflect the work of the final successful execution (e.g., rows written by a DML), this double-counts — e.g., after one retry of a 1000-row stage, numOutputRows reports 2000. The Task.collectAccumulatorUpdates-level filtering only drops failed attempts; values from successfully computed attempts that are later recomputed (executor loss, fetch failure, AQE replanning) still accumulate.

Design approach
A mixin trait LastAttemptAccumulator[IN, OUT, PARTIAL] that implementations combine with AccumulatorV2. Implementations expose a lightweight PARTIAL intermediate value. DAGScheduler.updateAccumulators is extended to forward a task completion to mergeLastAttempt when the accumulator implements the trait, carrying (rdd, taskInfo, stageId, stageAttemptId, localProperties). The trait stores partial values keyed by rddId and tagged with (stageId, stageAttemptId, taskAttemptNumber) per partition, keeping only the latest attempt per partition.

A SQL extension SQLLastAttemptAccumulator narrows to RDD scopes produced by stage-submitting plan nodes in executedPlan, reachable via a new deterministic SparkPlan.rddScopeId = "spark_plan_<planId>" that is threaded through executeQuery, getByteArrayRdd, and ShuffleExchangeExec.shuffleDependency. Driver-side updates are tracked separately per QueryExecution.id, and the accumulator bails out if both driver-side and task-side updates are seen.

Key design decisions

  • Mixin + instanceof dispatch rather than a no-op base-class method on AccumulatorV2. The thread with @szehon-ho already argued this matches Spark's SupportsX mixin style; reasonable, though it puts a runtime type-check on every task completion.
  • Deterministic rddScopeId on every SparkPlan. RDD scope IDs were previously anonymous nextScopeId() strings; now they are "spark_plan_<planId>". This is a cross-cutting implicit change to the spark.rdd.scope JSON visible via local properties, listeners, and UI tooling, and is not mentioned in the PR description.
  • Semantic split for driver value: lastAttemptValueForAllRDDs / lastAttemptValueForHighestRDDId surface the driver value, but lastAttemptValueForRDDIds / lastAttemptValueForRDDScopes return zero. Intentional (driver updates aren't attributable to a scope) but currently only documented via tests.
  • Driver-side updates scoped by QueryExecution.id via a new spark.sql.dataset.queryExecution.id local property set by QueryExecution.withQueryExecutionId.
  • Test machinery embedded in production code pathsINJECT_SHUFFLE_FETCH_FAILURES in DAGScheduler, AQETestHelper.shouldForceCancellation in AdaptiveSparkPlanExec, _lastByteArrayRddId field on every SparkPlan. Already debated with @szehon-ho for the first two. The _lastByteArrayRddId on SparkPlan is a separate addition not covered in that thread and is flagged inline.

Implementation sketch

  • core/.../LastAttemptAccumulator.scala — base trait, list-backed per-RDD map, per-partition attempt tuples.
  • sql/.../metric/SQLLastAttemptAccumulator.scala — Dataset/QueryExecution-aware lookups; walks executedPlan collecting scope IDs of stage-submitting nodes (broadcast/shuffle/subquery/reused); hard-bails on unknown BroadcastExchangeLike / ShuffleExchangeLike subclasses.
  • sql/.../metric/SQLLastAttemptMetric.scala — concrete SLAM using SQLMetric in SUM mode.
  • Test infrastructure: AQETestHelper, INJECT_SHUFFLE_FETCH_FAILURES config, SQLTestUtilsBase.withSparkContextConf, helpers in SQLMetricsTestUtils.

General notes

Silent scope-ID contract change: Every SparkPlan's RDDs now carry a deterministic scope ID "spark_plan_<id>" instead of an anonymous per-call nextScopeId() string. This is surfaced through spark.rdd.scope local properties, SparkListener events, and the SQL UI. It's a low-risk change in practice, but since it changes what external consumers see, please call it out in the PR description (under "Does this PR introduce any user-facing change?", or at least in the design section) so anyone consuming scope IDs isn't surprised.


/** The RDD id of the last wrapper RDD created by [[getByteArrayRdd]]. Visible for testing. */
@transient @volatile private var _lastByteArrayRddId: Option[Int] = None
private[spark] def lastByteArrayRddId: Option[Int] = _lastByteArrayRddId
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding mutable test-visible state (_lastByteArrayRddId + the accessor at 436) to the production SparkPlan base class is different from the other Utils.isTesting-guarded hooks in this PR — this field is always present on every plan instance regardless of whether tests are running. Can the test that needs this id fetch the RDD via a SparkListener or wrap the execution path in a helper instead of teaching the base class to remember its last wrapper RDD?

* used by [[LastAttemptAccumulator]] to track which RDD belongs
* to which SparkPlan node.
*/
private[spark] def rddScopeId: String =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The switch from anonymous nextScopeId() to "spark_plan_<id>" changes what external consumers see in spark.rdd.scope, SparkListener events, and the SQL UI. Consider mentioning this in the PR description as a (minor) behavior change, and noting the invariant that SparkPlan.id is globally monotonic so these IDs remain globally unique.

@transient
private var lastAttemptDirectDriverQueryExecutionValues: mutable.Map[String, DRIVER_ACC] = _

override def initializeLastAttemptAccumulator()(implicit ct: ClassTag[PARTIAL]): Unit = try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This override assigns lastAttemptDirectDriverQueryExecutionValues before calling super.initializeLastAttemptAccumulator. If super's internal asserts fail (e.g., accidental double init), the subclass state has been replaced by a fresh empty map while the base state is unchanged — subsequent reads see an inconsistent snapshot. Call super first, then initialize subclass state. Same pattern at line 145 in resetLastAttemptAccumulator.

* discarding any values coming from earlier attempts that have been recomputed.
* If the accumulator is used by multiple RDDs, the last attempt value is tracked separately for
* each, and can be retrieved for each or all of them separately, see lastAttemptValueForX methods.
* If the accumulator is used directly on the Spark Driver using the [[AccumulatorV2.set]] or
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[[AccumulatorV2.set]] does not exist — AccumulatorV2 exposes only add publicly; concrete subclasses have setValue (private) or a set(Long) overload (on SQLMetric). The Scaladoc link is broken.

Suggested change
* If the accumulator is used directly on the Spark Driver using the [[AccumulatorV2.set]] or
* If the accumulator is used directly on the Spark Driver using [[AccumulatorV2.add]]

/**
* Returns the last attempt value of this accumulator, aggregated from a set of RDDs.
*
* If the metric was used directly on the driver, and wa not used in any RDD execution,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo "wa not" → "was not" (same scaladoc paragraph is repeated at lines 735, 758, 784, 815 — please fix all five).

Suggested change
* If the metric was used directly on the driver, and wa not used in any RDD execution,
* If the metric was used directly on the driver, and was not used in any RDD execution,

* values of every partition of every RDD that used the accumulator, and holding a full
* [[AccumulatorV2]] object for each would have a high overhead. Therefore, an implementation should
* be able to return [[PARTIAL]] value from [[partialMergeVal]] that represents an intermediate
* mergeable value. and a [[partialMerge]] method that can merge that value into the accumulator.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* mergeable value. and a [[partialMerge]] method that can merge that value into the accumulator.
* mergeable value, and a [[partialMerge]] method that can merge that value into the accumulator.

*
* Implementations must implement [[partialMergeVal]] and [[partialMerge]] methods operating on
* [[PARTIAL]] type. In regular [[AccumulatorV2]] implementations, the [[AccumulatorV2]] object
* itself hold the intermediate value of the accumulator, and [[AccumulatorV2.merge]] method is used
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* itself hold the intermediate value of the accumulator, and [[AccumulatorV2.merge]] method is used
* itself holds the intermediate value of the accumulator, and [[AccumulatorV2.merge]] method is used

*
* Cached / Checkpointed plans
* ---------------------------
* If the metric was used inside a cached (df.cache, df.persist) or checkpointed (df.checkpoint,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "Cached / Checkpointed plans" paragraph has an unmatched parenthesis and a sentence that never resolves ("should return the value from when the execution in which the plan was cached/checkpointed and should be used instead"). Suggest rewriting this paragraph — it's one of the more important pieces of user-facing Scaladoc in the SQL trait. Other grammar issues in this file worth fixing at the same time: "allows to track" (lines 32, 114), "it's" as possessive (lines 38, 57), "in that cases" (75), "in a new submitted Stages" (76), "by record in" (104), truncated fragment "it w" inside the kept block comment (412).

def lastAttemptValueForRDDIds(rddIds: Seq[Int]): Option[OUT] = try {
if (lastAttemptAccumulatorInvalid) return None
assertValid()
if (lastAttemptDirectDriverValue.isDefined) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up to the thread on driver-value handling: in addition to Scaladoc on lastAttemptValueForRDDIds, could the trait-level doc (around line 309) also spell out the global rule? The contract as written in code is "driver value is returned when you query without narrowing scope; narrowing to a specific RDD or scope returns 0 because driver updates aren't attributable". Documenting that once at the trait level makes the per-method behavior predictable without the reader inferring it from four separate method docs.

}
}

object AdaptiveSparkPlanHelper extends AdaptiveSparkPlanHelper
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing a singleton object AdaptiveSparkPlanHelper extends AdaptiveSparkPlanHelper widens the trait's call surface to anyone holding the import. The only in-PR callers are the new SQLLastAttemptAccumulator and the new tests. If the goal is just to avoid with AdaptiveSparkPlanHelper in the SQL trait's companion object, consider scoping the object to private[sql] / private[spark] so this isn't implicitly a new public helper API.

Juliusz Sompolski added 2 commits April 21, 2026 08:37
- Fix Scaladoc typos and grammar in LastAttemptAccumulator and
  SQLLastAttemptAccumulator.
- Rewrite the "Cached / Checkpointed plans" paragraph and complete the
  truncated fragment in SQLLastAttemptAccumulator.
- Add trait-level contract for driver-value handling; remove incorrect
  per-method mentions on scope-narrowed methods (they return zero when
  a driver value is present).
- Fix QueryExecution.withQueryExecutionId Scaladoc to reference
  SQLLastAttemptAccumulator, the actual consumer.
- Fix missing "if" in AQETestHelper.shouldForceCancellation comment.
- Call super first in SQLLastAttemptAccumulator.initializeLastAttemptAccumulator
  and resetLastAttemptAccumulator.
- Scope AdaptiveSparkPlanHelper singleton object to private[sql].
- Remove _lastByteArrayRddId field/accessor from SparkPlan and the
  override in AdaptiveSparkPlanExec; the redundant test assertions in
  SQLLastAttemptMetricIntegrationSuite are also removed, since per-scope
  maxBy(rddId) already implies the wrapper RDD is the one being tracked.
Resolve conflict with SPARK-55910 (4a471e0) by porting the new
withSparkContextConf helper from SQLTestUtilsBase into QueryTestBase,
which is where the SQLTestUtilsBase helpers were moved.
@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

Thank you for your reviews @szehon-ho and @cloud-fan . I addressed or answered all the comments.

@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

CI passes except for Run / Documentation generation, which fail the same way for me on vanilla master.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-review at c0cb791. LGTM, approving — the four items below are all late catches (present at the prior review at 38437af and missed there), so they shouldn't block merge.

Addressed since 38437af: _lastByteArrayRddId removed; PR description now notes the RDD-scope-id change; SQLLastAttemptAccumulator.initializeLastAttemptAccumulator calls super first; @link AccumulatorV2.set Scaladoc fixed; five "wa not" typos fixed; QueryExecution.withQueryExecutionId Scaladoc retargeted to SQLLastAttemptAccumulator; AQETestHelper missing "if"; "itself hold" / "mergeable value. and" Scaladoc; "Cached / Checkpointed plans" file-header rewrite; object AdaptiveSparkPlanHelper scoped private[sql]; trait-level driver-value contract added.

Remaining from prior AI review: none.

New (all late catches): four small text / follow-through items below.

One general note on the PR description: the "Does this PR introduce any user-facing change?" paragraph describes this as a change in RDDOperationScope names / "scope name assigned as spark_plan_{id}", but the display name (RDDOperationScope.name, e.g. HashAggregate) is unchanged — what changed is RDDOperationScope.id. Since the paragraph's purpose is to warn consumers of the spark.rdd.scope JSON, s/name/ID/ would be clearer.

class SQLMetric(
val metricType: String,
initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
val initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up on #55371 (comment): you mentioned the val initValue widening here "could actually just be reverted, I must have needed it earlier", but the revert is not in the current tree. Reverting val initValue back to initValue (plain constructor param) should still let SQLLastAttemptMetric.copy / newDriverQueryExecutionAcc compile, because Scala auto-synthesizes a private field for primary-constructor params when they're referenced outside the constructor body. Could you either apply the revert, or update the thread explaining that it's needed after all? Non-blocking — late catch on my side from the prior round.

* which was either checkpointed (e.g. df.localCheckpoint(), df.checkpoint()) or cached
* (e.g. df.cache(), df.persist()).
* [[lastAttemptValueForHighestRDDId()]] should return the value from when the execution in
* which the plan was cached/checkpointed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentence fragment: "should return the value from when the execution in which the plan was cached/checkpointed." The file-header version at lines 94–97 was rewritten in this round to "should be used instead, which returns the value from the execution in which the plan was cached/checkpointed", but this method @note (and the identical one on lastAttemptValueForDataset at :276–277) wasn't updated in parallel. Late catch from prior round.

Suggested change
* which the plan was cached/checkpointed.
* [[lastAttemptValueForHighestRDDId()]] should be used instead, which returns the
* value from the execution in which the plan was cached/checkpointed.

case Some(acc) => return Some(driverAccValue(acc))
case None => // pass
}
// Otherwise, gather the RDD scoped from the plan and find metric updates from these scopes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: scopedscopes (the object being gathered is a Seq[String] of scope IDs).

Suggested change
// Otherwise, gather the RDD scoped from the plan and find metric updates from these scopes.
// Otherwise, gather the RDD scopes from the plan and find metric updates from these scopes.

* fully re-executed.
* - Due to the async nature of cancellation, there can be tasks from previous attempts that
* arrive later than the last attempt. Therefore, we need to track and compare stageId and
* stageAttemptId of every computed RDD partition, in order to discard late comers.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: "late comers" is one word — "latecomers".

Suggested change
* stageAttemptId of every computed RDD partition, in order to discard late comers.
* stageAttemptId of every computed RDD partition, in order to discard latecomers.

@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

@cloud-fan all applied, thanks for the review.

Juliusz Sompolski added 8 commits April 27, 2026 07:14
…adoc HTML crash

Background
----------
The scaladoc on object InMemoryTableWithV2Filter.evalPredicate listed the
supported V2 predicates as plain text:

    Supports =, <=>, IS_NULL, IS_NOT_NULL, and ALWAYS_TRUE.

genjavadoc emits this verbatim into the corresponding Java stub doc
comment, which becomes:

    Supports =, <=&gt;, IS_NULL, IS_NOT_NULL, and ALWAYS_TRUE.

(genjavadoc HTML-escapes the trailing `>` but not the leading `<=`).
javadoc's HTML parser interprets the unescaped `<=` as the start of an HTML
tag, fails to find a closing `>`, and aborts during HTML generation of this
class -- the mode the diagnostic added in f2fe445 (SPARK-56630) is
designed to surface.

Why this stayed hidden
----------------------
javadoc's default `-Xmaxerrs` is 100. Every Spark unidoc run accumulates
a few dozen "benign noise" errors during source loading -- genjavadoc
emits stubs like

    static public abstract R apply(T1 v1, T2 v2, T3 v3, T4 v4);

which trip both `cannot find symbol` (the type variables `R`, `T1`, ... are
not in scope at the static-context emission site) and `illegal combination
of modifiers: abstract and static`. The commit message for f2fe445
documents that these are inert and that javadoc normally finishes anyway.

In practice, however, whenever the per-PR error count crosses the
`-Xmaxerrs` ceiling, javadoc bails during source loading and never reaches
HTML generation. From the operator's side that looks identical to a
genuine crash: a few hundred `[error]` lines on `target/java/...` Java
stubs followed by `JavadocGenerationFailed`. The diagnostic in f2fe445
can only identify a real culprit by reading the last
`Generating .../<Class>.html...` line before `javadoc exited`, which only
exists once HTML generation has begun -- so this `<=>` crash was masked by
the early bailout for as long as the noise budget happened to fit under
100. A PR that adds a handful of new public Scala classes (each one adds
a few more genjavadoc stubs and bumps the error count) flips the build
from "succeeds with 100 warnings" to "fails with 100 errors", and the new
crash class is suddenly the one being rendered when the limit is hit -- in
this case `InMemoryTableWithV2Filter`.

Effect of this change
---------------------
Wrapping each operator name in backticks and escaping `<=>` as `&lt;=&gt;`
produces a clean inline-code rendering and stops javadoc from treating the
punctuation as markup. After this fix, javadoc proceeds past
InMemoryTableWithV2Filter during HTML generation.
javadoc's default `-Xmaxerrs` and `-Xmaxwarns` are both 100. Spark's
unidoc invocation is driven by genjavadoc-emitted Java stubs in
`<module>/target/java/`, and every run accumulates a few dozen benign
"errors" against those stubs -- e.g.

    error: illegal combination of modifiers: abstract and static
      static public abstract R apply(T1 v1, T2 v2, T3 v3, T4 v4);
    error: cannot find symbol             ^  symbol: class T1
      static public abstract R apply(T1 v1, T2 v2, T3 v3, T4 v4);

These come from the way genjavadoc lifts type-parameterised methods into
static Java emission and were documented in f2fe445 (SPARK-56630) as
"every PR produces this, javadoc always complains, normally still
finishes". The number of such errors scales with the public API surface
that the PR touches.

The problem with leaving `-Xmaxerrs 100` in place is that any PR whose
total noise count happens to exceed 100 causes javadoc to bail during
source loading, before HTML generation begins. The build operator sees
the same end-state as a genuine crash: hundreds of `[error]` lines on
`target/java/...` followed by `JavadocGenerationFailed`. The diagnostic
banner added in f2fe445 cannot help here either -- it identifies the
crash class via the last `Generating .../<Class>.html...` line before the
exit, which only exists once javadoc has reached HTML generation. When
the bailout is at the source-loading phase, the banner correctly reports
"the crash predates HTML output" but cannot point at any specific source.

Concrete example. The unidoc step on PR apache#55371 was failing
deterministically on every CI run with exactly 100 errors. Same noise
categories as a recent passing run on a different fork (LuciferYang's PR
on the same master), but apache#55371 added a few public Scala classes and
that pushed the cumulative count just over the cliff. Symptoms:

    [error] /.../target/java/.../ErrorInfo.java:12:29:
        error: illegal combination of modifiers: abstract and static
    [error] /.../target/java/.../ErrorInfo.java:12:36:
        error: cannot find symbol
    ...
    [warn] javadoc exited with exit code 1
    [error] sbt.inc.Doc$JavadocGenerationFailed
    Unidoc failed -- diagnostic summary
      Javadoc exited but no class HTML generation was in progress;
      the crash predates HTML output -- likely a CLI / classpath /
      setup issue. See the full sbt output above.

In reality there was nothing wrong with the CLI or classpath: the limit
was simply being hit before javadoc could begin emitting HTML. Raising
both limits to 9999 keeps javadoc producing output past the noise wall,
which is what the f2fe445 diagnostic was designed to operate on. With
the higher limits, the same run on apache#55371 produced a useful banner naming
the class javadoc was rendering at the moment of the crash, which led
directly to the `<=>` scaladoc fix in InMemoryTableWithV2Filter.

This change does not silence any real failure: javadoc still exits with a
non-zero code when it encounters a genuine HTML-generation crash. It only
prevents a noise-driven early bailout from masking the real culprit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants