[CORE] Add customMetrics extension point to ShuffleWriterMetrics for backend-specific shuffle stats#12114
Draft
luis4a0 wants to merge 2 commits into
Draft
[CORE] Add customMetrics extension point to ShuffleWriterMetrics for backend-specific shuffle stats#12114luis4a0 wants to merge 2 commits into
luis4a0 wants to merge 2 commits into
Conversation
ShuffleWriterMetrics currently has a hand-rolled list of 9 scalar fields, two of which (`avgDictionaryFields`, `dictionarySize`) are Velox-specific. Adding more backend-specific scalars every time someone needs another counter doesn't scale — other backends (ClickHouse, GPU, RSS) have the same need and the cross-backend coordination cost grows linearly per metric. This PR adds a generic `std::unordered_map<std::string, int64_t> customMetrics` to ShuffleWriterMetrics that any shuffle writer can populate with backend-specific stats. It is plumbed through the existing JNI `stop()` serialization as two parallel arrays (keys + values) into `GlutenSplitResult`, where the JVM side reassembles them lazily into an unmodifiable `Map<String, Long>` on first access. Convention for keys: `<Backend>.<Family>.<Stat>` — e.g. `Velox.InputEncoding.Flat` or `Velox.SplitRV.FixedWidthWallNanos`. Spark-side registration as SQLMetrics happens per-key in the backend's MetricsApi (`VeloxMetricsApi` / `CHMetricsApi`); unknown keys are silently dropped on the Scala side so a backend can ship new metrics ahead of the Spark-side registration without breaking older Spark wrappers. This commit only introduces the plumbing — no backend populates the map yet. The follow-up commit on this PR wires up the Velox hash shuffle writer as the first consumer. Includes `GlutenSplitResultSuite` covering the JVM-side reassembly (empty / null / populated arrays, caching, immutability) so the JNI boundary is fenced by a unit test that doesn't need a full Spark / native round-trip. Generated-by: GitHub Copilot CLI (Claude Opus 4.7 1M context) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
First consumer of the generic customMetrics extension point added in the previous commit. The per-batch input-encoding counters introduced by apache#12107 were only observable via the compile-time `VELOX_SHUFFLE_WRITER_LOG_FLAG` log channel (default OFF). With this change they flow through the existing JNI `stop()` path → `GlutenSplitResult.getCustomMetrics()` → SQLMetrics on the Velox columnar shuffle exchange, so anyone running with Gluten can see them on the Spark UI without rebuilding native. Seven new metrics under the `Velox.InputEncoding.*` namespace (`Flat` / `Dictionary` / `Constant` / `Lazy` / `Complex` / `Other` plus `SkippedNonVeloxBatches`). Each is registered as a counting SQLMetric in `VeloxMetricsApi.genColumnarShuffleExchangeMetrics()` for the Hash and GpuHash shuffle writer types; the Sort and RssSort writers don't touch the input-encoding code path so the metrics simply stay at 0. `VeloxHashShuffleWriter::publishInputEncodingCustomMetrics()` writes into `metrics_.customMetrics` during `stop()`, after the partition writer has populated the rest of `metrics_`. The JVM side (`ColumnarShuffleWriter.scala`) then iterates the map and adds each value into the matching pre-registered SQLMetric, dropping unknown keys silently so a future native-side metric can ship before the Spark-side registration. Includes `VeloxHashShuffleWriterCustomMetricsTest` covering post-`stop()` population, the 7-key contract, and parity with the `inputEncodingCounts()` / `inputEncodingSkippedBatches()` accessors. Generated-by: GitHub Copilot CLI (Claude Opus 4.7 1M context) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes are proposed in this pull request?
ShuffleWriterMetrics(incpp/core/shuffle/Options.h) currently has a hand-rolled list of 9 scalar fields, two of which (avgDictionaryFields,dictionarySize) are Velox-specific. Every time someone wants to expose another backend-specific shuffle stat, the pattern is: new scalar field on the struct → new constructor parameter onGlutenSplitResult-> new getter -> new line inColumnarShuffleWriter.scala. That's four files per metric, and it forces cross-backend coordination (the ClickHouse / GPU / RSS backends would each need similar plumbing for their own stats).This PR adds a generic
std::unordered_map<std::string, int64_t> customMetricsfield toShuffleWriterMetricsthat any shuffle writer can populate with backend-specific stats, plumbed through the existing JNIstop()path as two parallel arrays (keys + values) intoGlutenSplitResult. The JVM side reassembles them lazily into an unmodifiableMap<String, Long>on firstgetCustomMetrics()access, andColumnarShuffleWriter.scalaiterates the map and forwards each entry into a pre-registered SQLMetric (unknown keys silently dropped, so the native side can ship new metrics ahead of Spark-side registration without breaking older wrappers).Convention for keys:
<Backend>.<Family>.<Stat>, e.g.Velox.InputEncoding.FlatorVelox.SplitRV.FixedWidthWallNanos. Spark-side registration happens per-key in the backend's MetricsApi (VeloxMetricsApi/CHMetricsApi/GpuMetricsApi).The PR ships in two commits:
c++struct field + JNI marshalling + Java POJO + JVM-side unit test.VELOX_SHUFFLE_WRITER_LOG_FLAG=1compile-time log) become 7 new SQLMetrics on the Velox columnar shuffle exchange:Velox.InputEncoding.{Flat, Dictionary, Constant, Lazy, Complex, Other, SkippedNonVeloxBatches}. Anyone running with Gluten can now see these on the Spark UI without rebuilding native libraries.The two commits are deliberately separable so reviewers can sign off on the infrastructure independently of the Velox-specific consumer; if a reviewer disagrees with the bucketing scheme of
Velox.InputEncoding.*, that's a discussion for the second commit only.Why now
This is the natural completion of the #12083 -> #12107 -> #12108 (in-flight) arc that started with "the shuffle writer's per-stage timers were silently recording near-zero" and ended with "we have rich per-batch + per-stage data but it's invisible unless you enable a compile-flag log". With #12114 (this PR), all of that data becomes JVM-observable through the channel Spark already uses for shuffle metrics, which means it can drive dashboards / regression detection / Spark UI tooltips without anyone having to grep through log files. Once the infrastructure is in place, #12108's 4 SplitRV sub-stage timers and any future per-stage counter become 4-line additions to
VeloxMetricsApi.Files touched
cpp/core/shuffle/Options.h(+18) — newcustomMetricsfield with key-naming convention doccpp/core/shuffle/ShuffleWriter.{h,cc}(+8) —customMetrics()const accessorcpp/core/jni/JniWrapper.cc(+29) — marshal map as two parallel arrays at the existingstop()sitegluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java(+47) — new constructor params, lazygetCustomMetrics()accessor with cachingcpp/velox/shuffle/VeloxHashShuffleWriter.{h,cc}(+22) — populateVelox.InputEncoding.*keys fromstop()backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala(+20) — register the 7 new SQLMetricsbackends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala(+12) — drainsplitResult.getCustomMetricsinto matchingdep.metricsgluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala(+97) — JVM-side unit tests for the lazy reassembly logiccpp/velox/tests/VeloxHashShuffleWriterCustomMetricsTest.cc(+135) — C++ gtests covering Velox-side populationcpp/velox/tests/CMakeLists.txt(+3) — register the new gtestTotal: ~390 lines added across 4 layers.
How was this patch tested?
Local
ninja gluten velox: cleanmvn -Pbackends-velox,spark-3.5 -pl backends-velox -am compile: BUILD SUCCESSmvn scalatest:test -pl gluten-arrow -DwildcardSuites='org.apache.gluten.vectorized.GlutenSplitResultSuite': 6/6 passed (empty map, null arrays, parallel-array reassembly, identity caching, unmodifiability, null-safety)g++ -fsyntax-onlyagainst the in-tree compile flags). Full execution gated byVELOX_BUILD_TEST_UTILS=OFFon the local Velox ep build for the same reason documented in [VL] Fix SCOPED_TIMER macro destroying timer immediately #12083 / [VL] Add per-batch input-encoding counter to VeloxHashShuffleWriter #12107 / [VL] Subdivide SplitRV CPU/wall timer into 4 sub-stages #12108; Gluten CI will exercise it.dev/git-clang-format --binary clang-format-15 --style=file: clean after one round of auto-fixdev/format-scala-code.sh: clean after one round of auto-fixdev/check.py format main: Ok on all 8 C++ files touchedWhat the tests cover
GlutenSplitResultSuite(Scala / ScalaTest, 6 cases): ConstructsGlutenSplitResultdirectly with various combinations of the newcustomMetricsKeys/customMetricsValuesparameters (empty, null, populated) and asserts the lazy map reassembly behaves correctly. This is the JNI boundary — if the C++ side passes garbage, the POJO's behavior is what surfaces to Spark, so it deserves a dedicated suite.VeloxHashShuffleWriterCustomMetricsTest(C++ / gtest, 2 cases): Drives a realVeloxHashShuffleWriterthroughwrite()+stop()and asserts thatcustomMetrics()contains exactly the 7Velox.InputEncoding.*keys with values matching the in-memoryinputEncodingCounts_/inputEncodingSkippedBatches_accessors. Fences the source-of-truth so a future change can't accidentally introduce a unit / rounding mismatch between the log channel and the JVM metric channel.