Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35786][SQL] Add a new operator to rebalance the query output if AQE is enabled #32932

Closed
wants to merge 22 commits into from
Closed
2 changes: 2 additions & 0 deletions docs/sql-performance-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is
SELECT /*+ REPARTITION */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
SELECT /*+ REBALANCE */ * FROM t
SELECT /*+ REBALANCE(c) */ * FROM t

For more details please refer to the documentation of [Partitioning Hints](sql-ref-syntax-qry-select-hints.html#partitioning-hints).
ulysses-you marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
8 changes: 8 additions & 0 deletions docs/sql-ref-syntax-qry-select-hints.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ specified, multiple nodes are inserted into the logical plan, but the leftmost h

The `REPARTITION_BY_RANGE` hint can be used to repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters.

* **REBALANCE**

The `REBALANCE` hint can be used to rebalance the query result output partitions, so that every partition is of a reasonable size (not too small and not too big). It can take column names as parameters, and try its best to partition the query result by these columns. This is a best-effort: if there are skews, Spark will split the skewed partitions, to make these partitions not too big. This hint is useful when you need to write the result of this query to a table, to avoid too small/big files. This hint is ignored if AQE is not enabled.

#### Examples

```sql
Expand All @@ -66,6 +70,10 @@ SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;

SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;

SELECT /*+ REBALANCE */ * FROM t;

SELECT /*+ REBALANCE(c) */ * FROM t;

-- multiple partitioning hints
EXPLAIN EXTENDED SELECT /*+ REPARTITION(100), COALESCE(500), REPARTITION_BY_RANGE(3, c) */ * FROM t;
== Parsed Logical Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ object ResolveHints {
*/
object ResolveCoalesceHints extends Rule[LogicalPlan] {

val COALESCE_HINT_NAMES: Set[String] = Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE")
val COALESCE_HINT_NAMES: Set[String] =
Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE", "REBALANCE")

/**
* This function handles hints for "COALESCE" and "REPARTITION".
Expand Down Expand Up @@ -248,6 +249,18 @@ object ResolveHints {
}
}

private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
hint.parameters match {
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty) {
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
}
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
Expand All @@ -257,6 +270,8 @@ object ResolveHints {
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,31 @@ object RepartitionByExpression {
}
}

/**
* This operator is used to rebalance the output partitions of the given `child`, so that every
* partition is of a reasonable size (not too small and not too big). It also try its best to
* partition the child output by `partitionExpressions`. If there are skews, Spark will split the
* skewed partitions, to make these partitions not too big. This operator is useful when you need
* to write the result of `child` to a table, to avoid too small/big files.
*
* Note that, this operator only makes sense when AQE is enabled.
*/
case class RebalancePartitions(
partitionExpressions: Seq[Expression],
child: LogicalPlan) extends UnaryNode {
Comment on lines +1363 to +1365
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make RebalancePartitions extends RepartitionOperation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a special one. For conservative, this PR does not extend RepartitionOperation, see the comment #32932 (comment).

override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output

def partitioning: Partitioning = if (partitionExpressions.isEmpty) {
RoundRobinPartitioning(conf.numShufflePartitions)
} else {
HashPartitioning(partitionExpressions, conf.numShufflePartitions)
}

override protected def withNewChildInternal(newChild: LogicalPlan): RebalancePartitions =
copy(child = newChild)
}

/**
* A relation with one row. This is used in "SELECT ..." without a from clause.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference,
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

class ResolveHintsSuite extends AnalysisTest {
Expand Down Expand Up @@ -295,4 +296,28 @@ class ResolveHintsSuite extends AnalysisTest {
caseSensitive = true)
}
}

test("SPARK-35786: Support optimize repartition by expression in AQE") {
checkAnalysisWithoutViewWrapper(
UnresolvedHint("REBALANCE", Seq(UnresolvedAttribute("a")), table("TaBlE")),
RebalancePartitions(Seq(AttributeReference("a", IntegerType)()), testRelation))

checkAnalysisWithoutViewWrapper(
UnresolvedHint("REBALANCE", Seq.empty, table("TaBlE")),
RebalancePartitions(Seq.empty, testRelation))

withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
checkAnalysisWithoutViewWrapper(
UnresolvedHint("REBALANCE", Seq(UnresolvedAttribute("a")), table("TaBlE")),
testRelation)

checkAnalysisWithoutViewWrapper(
UnresolvedHint("REBALANCE", Seq.empty, table("TaBlE")),
testRelation)
}

assertAnalysisError(
UnresolvedHint("REBALANCE", Seq(Literal(1)), table("TaBlE")),
Seq("Hint parameter should include columns"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors
import org.apache.spark.sql.execution.aggregate.AggUtils
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_COL, REPARTITION_BY_NONE, REPARTITION_BY_NUM, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleExchangeExec}
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
Expand Down Expand Up @@ -715,13 +715,20 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.RangeExec(r) :: Nil
case r: logical.RepartitionByExpression =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty && r.optNumPartitions.isEmpty) {
REPARTITION_BY_NONE
REBALANCE_PARTITIONS_BY_NONE
} else if (r.optNumPartitions.isEmpty) {
REPARTITION_BY_COL
} else {
REPARTITION_BY_NUM
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REPARTITION_BY_NONE, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -30,7 +30,8 @@ import org.apache.spark.sql.internal.SQLConf
case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffleReaderRule {

override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REPARTITION_BY_NONE)
Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REBALANCE_PARTITIONS_BY_NONE,
REBALANCE_PARTITIONS_BY_COL)

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.coalesceShufflePartitionsEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, EnsureRequirements, REPARTITION_BY_NONE, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, EnsureRequirements, REBALANCE_PARTITIONS_BY_NONE, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {

override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_NONE)
Seq(ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_NONE)

private val ensureRequirements = EnsureRequirements

Expand Down Expand Up @@ -144,7 +144,7 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
s.shuffle.shuffleOrigin match {
case ENSURE_REQUIREMENTS =>
s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle)
case REPARTITION_BY_NONE =>
case REBALANCE_PARTITIONS_BY_NONE =>
// Use LocalShuffleReader only when we can't CoalesceShufflePartitions
s.mapStats.exists(_.bytesByPartitionId.length == partitionSpecs.size) &&
partitionSpecs.nonEmpty && supportLocalReader(s.shuffle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,17 @@ case object REPARTITION_BY_COL extends ShuffleOrigin
// a certain partition number. Spark can't optimize it.
case object REPARTITION_BY_NUM extends ShuffleOrigin

// Indicates that the shuffle operator was added by the user-specified repartition operator. Spark
// firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle
// reader.
case object REPARTITION_BY_NONE extends ShuffleOrigin
// Indicates that the shuffle operator was added by the user-specified rebalance operator.
// Spark will try to rebalance partitions that make per-partition size not too small and not
// too big. Local shuffle reader will be used if possible to reduce network traffic.
case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved

// Indicates that the shuffle operator was added by the user-specified rebalance operator with
// columns. Spark will try to rebalance partitions that make per-partition size not too small and
// not too big.
// Different from `REBALANCE_PARTITIONS_BY_NONE`, local shuffle reader cannot be used for it as
// the output needs to be partitioned by the given columns.
case object REBALANCE_PARTITIONS_BY_COL extends ShuffleOrigin

/**
* Performs a shuffle that will result in the desired partitioning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1788,17 +1788,20 @@ class AdaptiveQueryExecSuite

test("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
val query = "SELECT /*+ REPARTITION */ * FROM testData"
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query)
collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
} match {
case Seq(customShuffleReader) =>
assert(customShuffleReader.partitionSpecs.size === 1)
assert(!customShuffleReader.isLocalReader)
case _ =>
fail("There should be a CustomShuffleReaderExec")
}
Seq("REPARTITION", "REBALANCE(key)")
.foreach {repartition =>
val query = s"SELECT /*+ $repartition */ * FROM testData"
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query)
collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
} match {
case Seq(customShuffleReader) =>
assert(customShuffleReader.partitionSpecs.size === 1)
assert(!customShuffleReader.isLocalReader)
case _ =>
fail("There should be a CustomShuffleReaderExec")
}
}
}
}

Expand Down