New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31253][SQL][FOLLOW-UP] Improve the partition data size metrics in CustomShuffleReaderExec #28175
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
Outdated
Show resolved
Hide resolved
metrics("partitionDataSize").set(dataSize) | ||
SQLMetrics.postDriverMetricUpdates( | ||
sparkContext, executionId, | ||
metrics.filter(_._1 == "partitionDataSize").values.toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we look up the partitionDataSize
SQLMetric
at the beginning of this method? then here we can simply write Seq(metric)
.
can you put the before/after screenshots? |
Test build #121054 has finished for PR 28175 at commit
|
bbd6324
to
4aabfaa
Compare
private def sendPartitionDataSizeMetrics( | ||
executionId: String, | ||
partitionMetrics: SQLMetric): Unit = { | ||
val mapStats = shuffleStage.get.mapStats.get.bytesByPartitionId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's follow the previous code: https://github.com/apache/spark/pull/28175/files#diff-a42cafdbb5870e28c4e03df50ffc44f6L111
If shuffleStage.get.mapStats.isEmpty
, we send the metric value as 0 only once.
Test build #121076 has finished for PR 28175 at commit
|
Test build #121071 has finished for PR 28175 at commit
|
val mapStats = shuffleStage.get.mapStats | ||
if (mapStats.isEmpty) { | ||
metrics("partitionDataSize").set(0) | ||
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, Seq{partitionMetrics}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seq{partitionMetrics} ?
should be Seq(partitionMetrics)
val dataSize = startReducerIndex.until(endReducerIndex).map( | ||
mapStats.get.bytesByPartitionId(_)).sum | ||
metrics("partitionDataSize").set(dataSize) | ||
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, Seq{partitionMetrics}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
partitionMetrics: SQLMetric): Unit = { | ||
val mapStats = shuffleStage.get.mapStats | ||
if (mapStats.isEmpty) { | ||
metrics("partitionDataSize").set(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionMetrics.set(0)
sum += dataSize | ||
case p: PartialReducerPartitionSpec => | ||
metrics("partitionDataSize").set(p.dataSize) | ||
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, Seq{partitionMetrics}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => | ||
val dataSize = startReducerIndex.until(endReducerIndex).map( | ||
mapStats.get.bytesByPartitionId(_)).sum | ||
metrics("partitionDataSize").set(dataSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
case p => throw new IllegalStateException("unexpected " + p) | ||
} | ||
// Set sum value to "partitionDataSize" metric. | ||
metrics("partitionDataSize").set(sum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
metrics.filter(_._1 != "partitionDataSize").values.toSeq) | ||
|
||
if(!isLocalReader && shuffleStage.get.mapStats.isDefined) { | ||
sendPartitionDataSizeMetrics(executionId, metrics.get("partitionDataSize").get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not do val partitionMetrics = metrics("partitionDataSize")
inside the method instead of passing as a parameter?
Test build #121168 has finished for PR 28175 at commit
|
Test build #121173 has finished for PR 28175 at commit
|
@@ -128,6 +104,34 @@ case class CustomShuffleReaderExec private( | |||
Map("numSkewedPartitions" -> metrics) | |||
} | |||
|
|||
private def sendPartitionDataSizeMetrics( | |||
executionId: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can merge this to the previous line now.
Test build #121184 has finished for PR 28175 at commit
|
retest this please |
Test build #121196 has finished for PR 28175 at commit
|
retest this please |
Test build #121214 has finished for PR 28175 at commit
|
After offline discussions with @cloud-fan, we agree that it'd be more efficient to add a new metric type other than post metrics for all the partitions. With the new type, we can have max, min, avg, median (or anything you want). |
@maryannxue Instead of creating new metric type, we can add new method |
@@ -222,6 +222,15 @@ object SQLMetrics { | |||
} | |||
} | |||
|
|||
def postDriverMetricsUpdatedByValue( | |||
sc: SparkContext, executionId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one parameter per line
Test build #121264 has finished for PR 28175 at commit
|
Test build #121270 has finished for PR 28175 at commit
|
|
||
val id = partitionMetrics.id | ||
val accumUpdates = sizes.map(value => (id, value)) | ||
SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, accumUpdates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we send all metrics together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense. And already updated.
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
Show resolved
Hide resolved
Test build #121347 has finished for PR 28175 at commit
|
Test build #121345 has finished for PR 28175 at commit
|
Test build #121351 has finished for PR 28175 at commit
|
retest this please |
Test build #121362 has finished for PR 28175 at commit
|
retest this please |
Test build #121385 has finished for PR 28175 at commit
|
thanks, merging to master! |
…etrics of AQE shuffle ### What changes were proposed in this pull request? A followup of #28175: 1. use mutable collection to store the driver metrics 2. don't send size metrics if there is no map stats, as UI will display size as 0 if there is no data 3. calculate partition data size separately, to make the code easier to read. ### Why are the changes needed? code simplification ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests Closes #28240 from cloud-fan/refactor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
What changes were proposed in this pull request?
Currently the partition data size metrics contain three entries (min/max/avg) in Spark UI, which is not user friendly. This PR lets the metrics with min/max/avg in one entry by calling SQLMetrics.postDriverMetricUpdates multiple times.
Before this PR, the spark UI is shown in the following:
After this PR. the spark UI is shown in the following:
Why are the changes needed?
Improving UI
Does this PR introduce any user-facing change?
No
How was this patch tested?
existing ut