Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31253][SQL][FOLLOW-UP] Improve the partition data size metrics in CustomShuffleReaderExec #28175

Closed
wants to merge 11 commits into from
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand Down Expand Up @@ -95,30 +97,6 @@ case class CustomShuffleReaderExec private(
case _ => None
}

private def partitionDataSizeMetrics = {
val maxSize = SQLMetrics.createSizeMetric(sparkContext, "maximum partition data size")
val minSize = SQLMetrics.createSizeMetric(sparkContext, "minimum partition data size")
val avgSize = SQLMetrics.createSizeMetric(sparkContext, "average partition data size")
val mapStatsOpt = shuffleStage.get.mapStats
val sizes = mapStatsOpt.map { mapStats =>
val mapSizes = mapStats.bytesByPartitionId
partitionSpecs.map {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
startReducerIndex.until(endReducerIndex).map(mapSizes).sum
case p: PartialReducerPartitionSpec => p.dataSize
case p => throw new IllegalStateException("unexpected " + p)
}
}.getOrElse(Seq(0L))

maxSize.set(sizes.max)
minSize.set(sizes.min)
avgSize.set(sizes.sum / sizes.length)
Map(
"maxPartitionDataSize" -> maxSize,
"minPartitionDataSize" -> minSize,
"avgPartitionDataSize" -> avgSize)
}

private def skewedPartitionMetrics = {
val metrics = SQLMetrics.createMetric(sparkContext, "number of skewed partitions")
val numSkewedPartitions = partitionSpecs.collect {
Expand All @@ -128,6 +106,33 @@ case class CustomShuffleReaderExec private(
Map("numSkewedPartitions" -> metrics)
}

private def sendPartitionDataSizeMetrics(executionId: String): Unit = {
val mapStats = shuffleStage.get.mapStats
val partitionMetrics = metrics("partitionDataSize")

if (mapStats.isEmpty) {
partitionMetrics.set(0)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, Seq(partitionMetrics))
} else {
val sizes = ArrayBuffer[Long]()
partitionSpecs.foreach {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
sizes += startReducerIndex.until(endReducerIndex).map(
mapStats.get.bytesByPartitionId(_)).sum
case p: PartialReducerPartitionSpec =>
sizes += p.dataSize
case p => throw new IllegalStateException("unexpected " + p)
}

val id = partitionMetrics.id
val accumUpdates = sizes.map(value => (id, value))
SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, accumUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we send all metrics together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. And already updated.


// Set sum value to "partitionDataSize" metric.
partitionMetrics.set(sizes.sum)
}
}

@transient override lazy val metrics: Map[String, SQLMetric] = {
if (shuffleStage.isDefined) {
val numPartitions = SQLMetrics.createMetric(sparkContext, "number of partitions")
Expand All @@ -138,7 +143,8 @@ case class CustomShuffleReaderExec private(
// data size info is available.
Map.empty
} else {
partitionDataSizeMetrics
Map("partitionDataSize" ->
SQLMetrics.createSizeMetric(sparkContext, "partition data size"))
}
} ++ {
if (hasSkewedPartition) {
Expand All @@ -155,7 +161,12 @@ case class CustomShuffleReaderExec private(

private lazy val cachedShuffleRDD: RDD[InternalRow] = {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics.filter(_._1 != "partitionDataSize").values.toSeq)

if(!isLocalReader && shuffleStage.get.mapStats.isDefined) {
sendPartitionDataSizeMetrics(executionId)
}
shuffleStage.map { stage =>
new ShuffledRowRDD(
stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
Expand Down
Expand Up @@ -222,6 +222,16 @@ object SQLMetrics {
}
}

def postDriverMetricsUpdatedByValue(
sc: SparkContext,
executionId: String,
accumUpdates: Seq[(Long, Long)]): Unit = {
if (executionId != null) {
sc.listenerBus.post(
SparkListenerDriverAccumUpdates(executionId.toLong, accumUpdates))
}
}

/**
* Updates metrics based on the driver side value. This is useful for certain metrics that
* are only updated on the driver, e.g. subquery execution time, or number of files.
Expand Down
Expand Up @@ -450,7 +450,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {

var jobs = Map[Int, JobExecutionStatus]()
var stages = Set[Int]()
var driverAccumUpdates = Map[Long, Long]()
var driverAccumUpdates = Seq[(Long, Long)]()

@volatile var metricsValues: Map[Long, String] = null

Expand Down
Expand Up @@ -808,11 +808,9 @@ class AdaptiveQueryExecSuite
assert(!reader.hasSkewedPartition)
assert(reader.hasCoalescedPartition)
assert(reader.metrics.keys.toSeq.sorted == Seq(
"avgPartitionDataSize", "maxPartitionDataSize", "minPartitionDataSize", "numPartitions"))
"numPartitions", "partitionDataSize"))
assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length)
assert(reader.metrics("avgPartitionDataSize").value > 0)
assert(reader.metrics("maxPartitionDataSize").value > 0)
assert(reader.metrics("minPartitionDataSize").value > 0)
assert(reader.metrics("partitionDataSize").value > 0)

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
Expand Down