[SPARK-56647][SQL] Optimize storage of SparkSQL Last Attempt Metrics#55585
[SPARK-56647][SQL] Optimize storage of SparkSQL Last Attempt Metrics#55585juliuszsompolski wants to merge 8 commits intoapache:masterfrom
Conversation
|
cc @cloud-fan - this implements the optimization we discussed offline. |
…pt Metrics Replace the three N-sized int arrays in LastAttemptRDDVals (stageIds, stageAttemptIds, taskAttemptNumbers) with a compact representation that exploits the fact that, in the typical no-retry case, every computed partition shares the same (stageId, stageAttemptId, taskAttemptNumber). The new layout per RDD: - commonStageId / commonStageAttemptId / commonTaskAttemptNumber: the attempt of the first update; partitions matching it carry no extra per-partition state. - computedBitmap: one bit per partition, replacing the EMPTY_ID sentinel in the per-partition int arrays. - overridePartIds / overrideStageIds / overrideStageAttemptIds / overrideTaskAttemptNumbers + overrideBitmap: parallel int arrays for the partitions whose attempt differs from the common, plus a per partition bitmap that lets partialValueAt/findOverrideIdx skip the linear scan in O(1) for partitions without an override. All five are null until the first override is recorded, so RDDs with no retries pay zero allocations beyond the bitmap and three ints of common state. partialValueAt and isEmptyAt preserve their existing semantics, so the DAGScheduler/SLAM merge path is unchanged. toString reconstructs the full per-partition stageIds/stageAttemptIds/taskAttemptNumbers view for debug logs, and additionally prints the common attempt, computed bitmap, and override entries so the internal storage is observable. The single-writer/multi-reader concurrency contract is preserved: writes to the override arrays and bitmap happen before the volatile write to overrideSize, so a reader observing a non-zero size sees consistent contents up to that index; a reader racing the very first allocation sees overrideBitmap == null or has no bit set for the in-flight partition, both of which isOverridden reports correctly as not overridden. Adds a unit test that asserts the override count stays at 0 in the no-retry case, grows to 1 on a partition retry, stays at 1 when the same partition is re-retried in place, and grows again only when a different partition is retried (including a per-task retry with a different taskAttemptNumber). Co-authored-by: Isaac
…override lookups Add an optional HashMap[Int, Int] (partitionId -> override-array index) that accelerates findOverrideIdx for RDDs whose override count grows beyond a small threshold. The map is null while the override count stays at or below LastAttemptRDDVals.OverrideIdxMapThreshold (32); linear scans over a handful of cache-friendly ints are faster than a hashmap probe with Integer boxing, and avoiding the allocation keeps small-retry RDDs cheap. Once the override count crosses the threshold the map is built once over the existing entries and then kept in sync as new overrides are appended. In-place updates of an existing override don't move the entry's index, so they don't need a map update. findOverrideIdx still short-circuits via the override bitmap for partitions without an override, then probes the map when present, and otherwise falls back to a linear scan. To tolerate a transient race against an in-flight setOverride (the HashMap itself is not thread-safe), the map result is validated against the override arrays and any exception from a partially-updated map is caught, in which case findOverrideIdx falls through to the linear scan. Adds a unit test "compact storage: override index map built lazily once threshold is crossed" that reads OverrideIdxMapThreshold via reflection and asserts the map stays null at the threshold, is built on the next override (containing all entries), extends with subsequent new overrides, and stays unchanged on in-place updates. Co-authored-by: Isaac
…o use a HashMap from the first override Drop the per-partition override bitmap and the threshold-based map build. Instead, allocate a HashMap[Int, Int] (partitionId -> override-array index) the first time an override is recorded, so it exists from the get-go for any RDD that has any retries. The HashMap answers both "is this partition overridden?" and "what is its index?" in O(1), without the previous bitmap optimization. This trades a slightly slower "is overridden" check (boxed-Int HashMap probe vs. one-cycle bit lookup) for a smaller, simpler implementation: no threshold tuning, no two-mode lookup path, no separate override bitmap. RDDs with no retries are unchanged - they still pay zero per-partition allocations. findOverrideIdx tolerates a transient race against an in-flight setOverride by validating the lookup result against the override arrays and catching exceptions from a partially-updated HashMap, returning -1 in that case (the caller treats it as "no override" and uses the common attempt - eventually consistent). The unit test is rewritten to assert the map is null while there are no overrides, is allocated and populated on the first override, extends with each new partition retry, and leaves the index unchanged for in-place updates of an existing override. Co-authored-by: Isaac
4dde384 to
d6620d0
Compare
…th three lazy-allocated per-component arrays Drop the parallel int arrays + HashMap[Int, Int] override storage in favor of three independently lazy-allocated Array[Int] of size numPartitions: overrideStageIds, overrideStageAttemptIds, overrideTaskAttemptNumbers. Each is null until some partition's value for that component diverges from the common attempt; once allocated, EMPTY_ID at index i means "use the common value" and any other value is the per-partition override. This shape mirrors the original three int arrays, but allocated only for the components that actually changed: - No retries: all three fields stay null (zero per-partition allocation). - Pure stage-attempt retry (new stageAttemptId, stageId same, taskAttemptNumber resets to 0): only overrideStageAttemptIds is allocated. - Mid-stage executor-loss retry (some tasks restart with a higher taskAttemptNumber): only overrideTaskAttemptNumbers is allocated. - Whole-stage cross-Stage retry: overrideStageIds is allocated too. - Worst case (all three differ): same total footprint as before. The fields are @volatile and the array is fully populated (Array.fill of EMPTY_ID, then the diverging entry) before the field is assigned, so a reader either sees null (use common) or a fully initialized array. In-place element writes for subsequent overrides are plain ints, eventually consistent, matching the loose semantics of the original per-partition int arrays. When a later update brings a previously- overridden component back to the common value, the entry is cleared back to EMPTY_ID so reads correctly fall through. Drops the override HashMap, findOverrideIdx and its race-handling/validation/linear-scan fallback path; partialValueAt and update use a small lookupComponent / setOverrideComponent helper that operates on a single component at a time. The unit test is rewritten to assert each override array stays null until its component actually diverges, gets allocated only on divergence, and clears its entry back to EMPTY_ID when a later update brings that component back to the common value. Co-authored-by: Isaac
d6620d0 to
9e8ef60
Compare
…ear back to EMPTY_ID
Once a per-component override array is allocated, writing the new value
into the slot is correct regardless of whether it matches the common
attempt - lookupComponent treats the array entry as authoritative and
only falls back to common when the slot still holds EMPTY_ID (i.e. the
partition was computed before this component's override array was
allocated, so its slot was never written by setOverrideComponent).
Drop the special case that cleared an existing slot back to EMPTY_ID
when the new value matched the common; just write the value through.
The entry will hold the common value, which lookupComponent returns
correctly. This removes a branch from setOverrideComponent and makes
the array semantics simpler ("the slot's value, or fall back to common
when never written").
Update the unit test that asserted partition 0's stageAttemptId entry
got cleared to EMPTY_ID after a re-update with the common value; now
it asserts the entry holds the new value (which happens to equal the
common).
Co-authored-by: Isaac
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
This is an internal storage optimization for LastAttemptRDDVals: replaces three per-partition Array[Int] (stageId / stageAttemptId / taskAttemptNumber) with one per-RDD common attempt + three lazily-allocated override arrays + a computed-bitmap. The previous design allocated 12·N bytes per RDD even with no retries; the new design pays near-constant overhead in the common case and grows independently per-component only on actual divergence. Public surface and observable behavior are unchanged.
The design holds up well: no layer concerns (private internal class), no API surface change, the merge logic in mergeLastAttempt reads through partialValueAt reconstruction identically to before, and the single-writer/multi-reader concurrency contract is preserved via @volatile overrides and the pre-fill-then-publish allocation pattern in setOverrideComponent. The new reflective test covers lazy allocation across all retry shapes and the resulting aggregated values check out.
Two PR-description / code mismatches worth fixing before merge:
-
"How was this patch tested?" says: "When a later update brings a previously-overridden component back to the common value, that array's entry is cleared back to EMPTY_ID." But the latest commit (
Simplify setOverrideComponent: don't clear back to EMPTY_ID) removed that behavior, and the corresponding test asserts the opposite (saIds4(0) === 10, not -1). Update the description to match the simplified semantics. -
toStringdescription says it "additionally prints the common attempt and the computed bitmap." The actualtoStringonly prints reconstructed per-partition arrays — no common, no bitmap. Either drop the claim or extendtoStringto actually print them (which would genuinely help debugging since the new internal layout is no longer visible from the existing fields).
Minor: toString now allocates three fresh Array[Int](numPartitions) per call where the prior version called mkString directly on existing arrays. Trivial in absolute terms but it's a regression in the debug path and could be done with a single pass into a StringBuilder. Not blocking.
A couple of small Scaladoc polish items inline.
| * after an in-place update. Once the array exists, the value is always written, even when it | ||
| * matches the common - lookupComponent returns it correctly either way. |
There was a problem hiding this comment.
Small wording: "the common" reads as an incomplete noun phrase. Suggest "the common value" — same issue at line 194 in the override-arrays comment block ("diverges from the common"); line 196 already uses the full phrase, so this also makes the doc internally consistent.
| * after an in-place update. Once the array exists, the value is always written, even when it | |
| * matches the common - lookupComponent returns it correctly either way. | |
| * after an in-place update. Once the array exists, the value is always written, even when it | |
| * matches the common value - lookupComponent returns it correctly either way. |
| /** Reads one component's value at `partitionId`, falling back to `common` when the override | ||
| * array is null or the entry is still EMPTY_ID (slot never written - typically a partition | ||
| * that was computed before this component's override array was allocated). */ |
There was a problem hiding this comment.
The parenthetical mixes subjects ("slot never written … typically a partition…") and only covers one of the two cases that leave a slot at EMPTY_ID — the other being a partition that was not yet computed when this override array was first allocated for a different partition. Suggest:
| /** Reads one component's value at `partitionId`, falling back to `common` when the override | |
| * array is null or the entry is still EMPTY_ID (slot never written - typically a partition | |
| * that was computed before this component's override array was allocated). */ | |
| /** Reads one component's value at `partitionId`, falling back to `common` when the override | |
| * array is null or the entry is still EMPTY_ID (the slot was either not yet written, or was | |
| * initialized to EMPTY_ID and never overwritten because the partition's value matched the | |
| * common when the array was first allocated for a different partition). */ |
Per review comments: - "diverges from the common" -> "diverges from the common value" in the override-arrays comment block, matching the noun phrase used a few lines below. - "matches the common" -> "matches the common value" in setOverrideComponent's docstring, same reason. - lookupComponent's docstring now spells out both ways a slot can be EMPTY_ID after the array is allocated: not-yet-written, or initialized-and-never-overwritten because the partition's value matched the common when the array was first allocated for a different partition. Co-authored-by: Isaac
… StringBuilder in a single pass Per review nit: the previous toString allocated three fresh Array[Int](numPartitions) and then called mkString on each, which allocated an extra string per array. Replace with four StringBuilders populated in a single pass over the partitions: each iteration calls partialValueAt once, appends the partial value to one builder and the three attempt components to the others. Avoids the intermediate int-array allocations and the per-array mkString string allocations in the debug path. Co-authored-by: Isaac
|
@cloud-fan done. |
What changes were proposed in this pull request?
This PR optimizes the storage of per-partition tracking state in
LastAttemptRDDVals, which backsSQLLastAttemptMetric(SLAM) introduced in SPARK-56509.In the original design, every
LastAttemptRDDValsallocated threeArray[Int]of sizenumPartitionsto track(stageId, stageAttemptId, taskAttemptNumber)for each partition. In a typical execution without retries every entry of those arrays held the same value, and the three arrays together took ~12·N bytes per RDD that uses a SLAM accumulator.The three int arrays are replaced by a common attempt plus three lazily-allocated per-component override arrays:
commonStageId,commonStageAttemptId,commonTaskAttemptNumber: 3 ints set on the firstupdate()from the first task that completes. Partitions whose attempt matches the common values carry no per-partition stage/attempt state at all.computedBitmap(Array[Long]): one bit per partition, packed into longs. Tracks which partitions have been computed; replaces the previousEMPTY_IDsentinel that was stored in all three int arrays.overrideStageIds/overrideStageAttemptIds/overrideTaskAttemptNumbers(Array[Int], sizednumPartitions): each is null until some partition's value for that component diverges from the common. Once allocated, an entry equal toEMPTY_ID(-1) means "fall back to the common value", any other value is the per-partition override. The three arrays are independent — a retry that only changestaskAttemptNumberallocates onlyoverrideTaskAttemptNumbers, leaving the other two null.Once an override array exists, every subsequent update for that partition writes its value into the slot — even when the value happens to equal the common;
lookupComponentreturns it correctly either way. So a slot that still holdsEMPTY_IDafter the array is allocated only happens when the slot was never written (the partition's value matched the common when the array was first allocated for a different partition's divergence).partialValueAt,isEmptyAt, and the merge logic inmergeLastAttemptpreserve their existing semantics, soDAGScheduler.updateAccumulatorsand the SLAM merge path are unchanged.toStringreconstructs the original per-partitionstageIds/stageAttemptIds/taskAttemptNumbersview for debug logs by walkingpartialValueAtfor every partition; the compact internal representation isn't separately printed because the reconstructed view shows the same effective values.The single-writer/multi-reader concurrency contract is preserved. Each override array reference is
@volatile. On the first divergence for a component the writer allocates the array, fills it withEMPTY_IDviaArray.fill, writes the override entry, and only then assigns the field — so a concurrent reader either sees null (and falls back to the common value) or sees a fully initialized array. Subsequent in-place element writes are plain ints, eventually consistent, matching the loose semantics of the original per-partition int arrays.Memory comparison for a 200-partition RDD. The
partitionPartialValsarray of size N (e.g.Array[Long](200)≈ 1.6 KB forSQLLastAttemptMetric) is required to hold the per-partition partial values and is unchanged; the numbers below are the additional attempt-tracking overhead per RDD on top of that array:Array[Int](200)+ headers)null)Array[Int](200)foroverrideStageAttemptIds)Array[Int](200)foroverrideTaskAttemptNumbers)Array[Int](200))The worst case matches the original design's footprint; the typical no-retry case improves by ~50x and partial retries by ~3x, with the size paid only when the corresponding component actually changes.
Why are the changes needed?
Follow-up to the SLAM PR (SPARK-56509 / #55371), addressing a memory-efficiency concern raised in review: per-partition storage of
(stageId, stageAttemptId, taskAttemptNumber)is redundant in the common case, and even under retries the components diverge independently — usually only one of the three actually changes.If SLAM accumulators see broader use — for example as the basis for accurate Spark UI metrics under retries — many
SQLMetricinstances throughout a query plan would each carry aLastAttemptRDDValsper RDD that updated them. Reducing the per-RDD overhead from 3·N ints to a handful of ints + a small bitmap (with each retry component paid for only on actual divergence) is a meaningful win at scale.Does this PR introduce any user-facing change?
No. This is an internal storage optimization for
LastAttemptRDDVals. The publicLastAttemptAccumulator/SQLLastAttemptMetricAPIs, the values they return, and their behavior under stage retries are unchanged.How was this patch tested?
All existing SLAM test suites pass unchanged, exercising the same code paths as before:
SQLLastAttemptMetricUnitSuiteSQLLastAttemptMetricIntegrationSuiteSQLLastAttemptMetricIntegrationSuiteWithStageRetriesSQLLastAttemptMetricPlanShapesSuiteMetricsFailureInjectionSuite175 tests, all green.
New unit test
SQLLastAttemptMetricUnitSuite."compact storage: per-component override arrays allocated only when component diverges"reflects on the internal fields to verify:overrideStageAttemptIdsis allocated; the other two stay null.taskAttemptNumber, same stage): onlyoverrideTaskAttemptNumbersis allocated.overrideStageIdsis allocated alongside the others.lookupComponentreturns it correctly either way.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)