From c25fdd6d460f40f695b6f0ff5991d1ddbeb42215 Mon Sep 17 00:00:00 2001 From: xingczhang Date: Fri, 18 Nov 2022 16:53:59 +0800 Subject: [PATCH] [CARMEL-6375] Support Potential Skewed Operator Tagging (#1147) --- .../sql/execution/DataSourceScanExec.scala | 17 +++++++++++++--- .../adaptive/CustomShuffleReaderExec.scala | 20 +++++++++++++++++-- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 3d1f6ad4236f0..ca85a44eca19f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -422,8 +422,9 @@ case class FileSourceScanExec( tableParallelInfo.get.partitionNumber.isEmpty)) } + var _isBucketSkew: Option[Boolean] = None lazy val isBucketSkew = { - bucketedScan && { + _isBucketSkew = Some(bucketedScan && { val files = selectedPartitions.flatMap(partition => partition.files) val bucketFilesGroupingSize = files.map(file => (file.getPath.getName, file.getLen)) .groupBy(file => BucketingUtils.getBucketId(file._1)) @@ -432,7 +433,8 @@ case class FileSourceScanExec( bucketFilesGroupingSize.nonEmpty && SkewHandlingUtil.isSkewed(bucketFilesGroupingSize.max, Utils.median(bucketFilesGroupingSize, false), conf) - } + }) + _isBucketSkew } override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { @@ -492,6 +494,11 @@ case class FileSourceScanExec( } else { Nil } + + if (conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) && + !hasPartitionsAvailableAtRunTime) { + logInfo(s"bucketed scan skewed: ${isBucketSkew}") + } (partitioning, sortOrder) } else { (UnknownPartitioning(0), Nil) @@ -536,7 +543,11 @@ case class FileSourceScanExec( val withSelectedBucketsCount = relation.bucketSpec.map { spec => val bucketedKey = "Bucketed" val withBucketedScanStatus = if (bucketedScan) { - metadata + (bucketedKey -> s"true") + if (_isBucketSkew.nonEmpty && _isBucketSkew.get) { + metadata + (bucketedKey -> s"true, potentialBucketSkewed") + } else { + metadata + (bucketedKey -> s"true") + } } else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) { metadata + (bucketedKey -> "false (disabled by configuration)") } else if (disableBucketedScan) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index 5c6c7578d2cbd..7c8134bdbc780 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils /** * A wrapper of shuffle query stage, which follows the given partition arrangement. @@ -84,11 +85,19 @@ case class CustomShuffleReaderExec private( } else if (hasCoalescedPartition && hasSkewedPartition) { "coalesced and skewed" } else if (hasCoalescedPartition) { - "coalesced" + if (isPotentialSkewed) { + "coalesced and potentialSkewed" + } else { + "coalesced" + } } else if (hasSkewedPartition) { "skewed" } else { - "" + if (isPotentialSkewed) { + "potentialSkewed" + } else { + "" + } } Iterator(desc) } @@ -102,6 +111,13 @@ case class CustomShuffleReaderExec private( def isLocalReader: Boolean = partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) + lazy val isPotentialSkewed: Boolean = { + !hasSkewedPartition && + partitionDataSizes.nonEmpty && + SkewHandlingUtil.isSkewed(partitionDataSizes.get.max, + Utils.median(partitionDataSizes.get.toArray, false), conf) + } + private def shuffleStage = child match { case stage: ShuffleQueryStageExec => Some(stage) case _ => None