Skip to content

Commit

Permalink
[CARMEL-6375] Support Potential Skewed Operator Tagging (#1147)
Browse files Browse the repository at this point in the history
  • Loading branch information
xingchaozh authored and GitHub Enterprise committed Nov 18, 2022
1 parent 3f2a8a9 commit c25fdd6
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,9 @@ case class FileSourceScanExec(
tableParallelInfo.get.partitionNumber.isEmpty))
}

var _isBucketSkew: Option[Boolean] = None
lazy val isBucketSkew = {
bucketedScan && {
_isBucketSkew = Some(bucketedScan && {
val files = selectedPartitions.flatMap(partition => partition.files)
val bucketFilesGroupingSize = files.map(file => (file.getPath.getName, file.getLen))
.groupBy(file => BucketingUtils.getBucketId(file._1))
Expand All @@ -432,7 +433,8 @@ case class FileSourceScanExec(
bucketFilesGroupingSize.nonEmpty &&
SkewHandlingUtil.isSkewed(bucketFilesGroupingSize.max,
Utils.median(bucketFilesGroupingSize, false), conf)
}
})
_isBucketSkew
}

override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
Expand Down Expand Up @@ -492,6 +494,11 @@ case class FileSourceScanExec(
} else {
Nil
}

if (conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
!hasPartitionsAvailableAtRunTime) {
logInfo(s"bucketed scan skewed: ${isBucketSkew}")
}
(partitioning, sortOrder)
} else {
(UnknownPartitioning(0), Nil)
Expand Down Expand Up @@ -536,7 +543,11 @@ case class FileSourceScanExec(
val withSelectedBucketsCount = relation.bucketSpec.map { spec =>
val bucketedKey = "Bucketed"
val withBucketedScanStatus = if (bucketedScan) {
metadata + (bucketedKey -> s"true")
if (_isBucketSkew.nonEmpty && _isBucketSkew.get) {
metadata + (bucketedKey -> s"true, potentialBucketSkewed")
} else {
metadata + (bucketedKey -> s"true")
}
} else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) {
metadata + (bucketedKey -> "false (disabled by configuration)")
} else if (disableBucketedScan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils

/**
* A wrapper of shuffle query stage, which follows the given partition arrangement.
Expand Down Expand Up @@ -84,11 +85,19 @@ case class CustomShuffleReaderExec private(
} else if (hasCoalescedPartition && hasSkewedPartition) {
"coalesced and skewed"
} else if (hasCoalescedPartition) {
"coalesced"
if (isPotentialSkewed) {
"coalesced and potentialSkewed"
} else {
"coalesced"
}
} else if (hasSkewedPartition) {
"skewed"
} else {
""
if (isPotentialSkewed) {
"potentialSkewed"
} else {
""
}
}
Iterator(desc)
}
Expand All @@ -102,6 +111,13 @@ case class CustomShuffleReaderExec private(
def isLocalReader: Boolean =
partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])

lazy val isPotentialSkewed: Boolean = {
!hasSkewedPartition &&
partitionDataSizes.nonEmpty &&
SkewHandlingUtil.isSkewed(partitionDataSizes.get.max,
Utils.median(partitionDataSizes.get.toArray, false), conf)
}

private def shuffleStage = child match {
case stage: ShuffleQueryStageExec => Some(stage)
case _ => None
Expand Down

0 comments on commit c25fdd6

Please sign in to comment.