Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.plans.physical

import java.util.Objects

import scala.annotation.tailrec
import scala.collection.mutable

Expand Down Expand Up @@ -360,12 +358,11 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
* preserved through `GroupPartitionsExec`. The sorted order is critical for storage-partitioned
* join compatibility.
*
* 2. '''In KeyGroupedShuffleSpec''': When used within `KeyGroupedShuffleSpec`, the `partitionKeys`
* may not be in sorted order. This occurs because `KeyGroupedShuffleSpec` can project the
* partition keys by join key positions. The `EnsureRequirements` rule ensures that either the
* unordered keys from both sides of a join match exactly, or it builds a common ordered set of
* keys and pushes them down to `GroupPartitionsExec` on both sides to establish a compatible
* ordering.
* 2. '''In KeyedShuffleSpec''': When used within `KeyedShuffleSpec`, the `partitionKeys` may not be
* in sorted order. This occurs because `KeyedShuffleSpec` can project the partition keys by join
* key positions. The `EnsureRequirements` rule ensures that either the unordered keys from both
* sides of a join match exactly, or it builds a common ordered set of keys and pushes them down
* to `GroupPartitionsExec` on both sides to establish a compatible ordering.
*
* == Partition Keys ==
* - `partitionKeys`: The partition keys, one per partition. May contain duplicates initially
Expand Down Expand Up @@ -427,7 +424,7 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
* @param expressions Partition transform expressions (e.g., `years(col)`, `bucket(10, col)`).
* @param partitionKeys Partition keys wrapped in InternalRowComparableWrapper for efficient
* comparison and grouping. One per partition. When used as outputPartitioning,
* always in sorted order. When used in KeyGroupedShuffleSpec, may be unsorted
* always in sorted order. When used in `KeyedShuffleSpec`, may be unsorted
* after projection. May contain duplicates when ungrouped.
* @param isGrouped Whether partition keys are unique (no duplicates). Computed on first
* creation, then preserved through copy operations to avoid recomputation.
Expand Down Expand Up @@ -509,7 +506,7 @@ case class KeyedPartitioning(
}

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = {
val result = KeyGroupedShuffleSpec(this, distribution)
val result = KeyedShuffleSpec(this, distribution)
if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {
// If allowing join keys to be subset of clustering keys, we should create a new
// `KeyedPartitioning` here that is grouped on the join keys instead, and use that as
Expand All @@ -525,16 +522,6 @@ case class KeyedPartitioning(
result
}
}

override def equals(that: Any): Boolean = that match {
case k: KeyedPartitioning if this.expressions == k.expressions =>
this.partitionKeys == k.partitionKeys

case _ => false
}

override def hashCode(): Int =
Objects.hash(expressions, partitionKeys)
}

object KeyedPartitioning {
Expand Down Expand Up @@ -954,7 +941,7 @@ case class CoalescedHashShuffleSpec(
* @param joinKeyPositions position of join keys among cluster keys.
* This is set if joining on a subset of cluster keys is allowed.
*/
case class KeyGroupedShuffleSpec(
case class KeyedShuffleSpec(
partitioning: KeyedPartitioning,
distribution: ClusteredDistribution,
joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec {
Expand Down Expand Up @@ -992,7 +979,7 @@ case class KeyGroupedShuffleSpec(
// 3.3 each pair of partition expressions at the same index must share compatible
// transform functions.
// 4. the partition values from both sides are following the same order.
case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution, _) =>
case otherSpec @ KeyedShuffleSpec(otherPartitioning, otherDistribution, _) =>
distribution.clustering.length == otherDistribution.clustering.length &&
numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
partitioning.partitionKeys == otherPartitioning.partitionKeys
Expand All @@ -1003,7 +990,7 @@ case class KeyGroupedShuffleSpec(

// Whether the partition keys (i.e., partition expressions) are compatible between this and the
// `other` spec.
def areKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
def areKeysCompatible(other: KeyedShuffleSpec): Boolean = {
val expressions = partitioning.expressions
val otherExpressions = other.partitioning.expressions

Expand Down Expand Up @@ -1047,7 +1034,7 @@ case class KeyGroupedShuffleSpec(
*
* @param other other key-grouped shuffle spec
*/
def reducers(other: KeyGroupedShuffleSpec): Option[Seq[Option[Reducer[_, _]]]] = {
def reducers(other: KeyedShuffleSpec): Option[Seq[Option[Reducer[_, _]]]] = {
val results = partitioning.expressions.zip(other.partitioning.expressions).map {
case (e1: TransformExpression, e2: TransformExpression) => e1.reducers(e2)
case (_, _) => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2075,19 +2075,19 @@ object SQLConf {
val V2_BUCKETING_PUSH_PART_VALUES_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.pushPartValues.enabled")
.doc(s"Whether to pushdown common partition values when ${V2_BUCKETING_ENABLED.key} is " +
"enabled. When turned on, if both sides of a join are of KeyGroupedPartitioning and if " +
"enabled. When turned on, if both sides of a join are of KeyedPartitioning and if " +
"they share compatible partition keys, even if they don't have the exact same partition " +
"values, Spark will calculate a superset of partition values and pushdown that info to " +
"scan nodes, which will use empty partitions for the missing partition values on either " +
"side. This could help to eliminate unnecessary shuffles")
"group partition nodes, which will use empty partitions for the missing partition values " +
"on either side. This could help to eliminate unnecessary shuffles")
.version("3.4.0")
.booleanConf
.createWithDefault(true)

val V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled")
.doc("During a storage-partitioned join, whether to allow input partitions to be " +
"partially clustered, when both sides of the join are of KeyGroupedPartitioning. At " +
"partially clustered, when both sides of the join are of KeyedPartitioning. At " +
"planning time, Spark will pick the side with less data size based on table " +
"statistics, group and replicate them to match the other side. This is an optimization " +
"on skew join and can help to reduce data skewness when certain partitions are assigned " +
Expand All @@ -2100,7 +2100,7 @@ object SQLConf {
val V2_BUCKETING_SHUFFLE_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.shuffle.enabled")
.doc("During a storage-partitioned join, whether to allow to shuffle only one side. " +
"When only one side is KeyGroupedPartitioning, if the conditions are met, spark will " +
"When only one side is KeyedPartitioning, if the conditions are met, spark will " +
"only shuffle the other side. This optimization will reduce the amount of data that " +
s"needs to be shuffle. This config requires ${V2_BUCKETING_ENABLED.key} to be enabled")
.version("4.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* @param expectedPartitionKeys Optional sequence of expected partition key values and their
* split counts
* @param reducers Optional reducers to apply to partition keys for grouping compatibility
* @param applyPartialClustering Whether to apply partial clustering for skewed data
* @param replicatePartitions Whether to replicate partitions across multiple keys
* @param distributePartitions When true, splits for a key are distributed across the expected
* partitions (padding with empty partitions). When false, all splits
* are replicated to every expected partition for that key.
*/
case class GroupPartitionsExec(
child: SparkPlan,
@transient joinKeyPositions: Option[Seq[Int]] = None,
@transient expectedPartitionKeys: Option[Seq[(InternalRowComparableWrapper, Int)]] = None,
@transient reducers: Option[Seq[Option[Reducer[_, _]]]] = None,
@transient applyPartialClustering: Boolean = false,
@transient replicatePartitions: Boolean = false
@transient distributePartitions: Boolean = false
) extends UnaryExecNode {

override def outputPartitioning: Partitioning = {
Expand Down Expand Up @@ -91,7 +91,7 @@ case class GroupPartitionsExec(
val alignedPartitions = expectedPartitionKeys.get.flatMap { case (key, numSplits) =>
if (numSplits > 1) isGrouped = false
val splits = keyMap.getOrElse(key, Seq.empty)
if (applyPartialClustering && !replicatePartitions) {
if (distributePartitions) {
// Distribute splits across expected partitions, padding with empty sequences
val paddedSplits = splits.map(Seq(_)).padTo(numSplits, Seq.empty)
paddedSplits.map((key, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,40 @@ case class EnsureRequirements(
child
} else {
// Check KeyedPartitioning satisfaction conditions
val groupedSatisfies = grouped.exists(_.satisfies(distribution))
val groupedSatisfies = grouped.find(_.satisfies(distribution))
val nonGroupedSatisfiesAsIs = nonGrouped.exists(_.nonGroupedSatisfies(distribution))
val nonGroupedSatisfiesWhenGrouped = nonGrouped.exists(_.groupedSatisfies(distribution))
val nonGroupedSatisfiesWhenGrouped = nonGrouped.find(_.groupedSatisfies(distribution))

// Check if any KeyedPartitioning satisfies the distribution
if (groupedSatisfies || nonGroupedSatisfiesAsIs || nonGroupedSatisfiesWhenGrouped) {
if (groupedSatisfies.isDefined || nonGroupedSatisfiesAsIs
|| nonGroupedSatisfiesWhenGrouped.isDefined) {
distribution match {
case o: OrderedDistribution =>
// OrderedDistribution requires grouped KeyedPartitioning with sorted keys.
// OrderedDistribution requires grouped KeyedPartitioning with sorted keys
// according to the distribution's ordering.
// Find any KeyedPartitioning that satisfies via groupedSatisfies.
val satisfyingKeyedPartitioning =
(grouped ++ nonGrouped).find(_.groupedSatisfies(distribution)).get
groupedSatisfies.orElse(nonGroupedSatisfiesWhenGrouped).get
val attrs = satisfyingKeyedPartitioning.expressions.flatMap(_.collectLeaves())
.map(_.asInstanceOf[Attribute])
val keyRowOrdering = RowOrdering.create(o.ordering, attrs)
val keyOrdering = keyRowOrdering.on((t: InternalRowComparableWrapper) => t.row)
val sorted = satisfyingKeyedPartitioning.partitionKeys.sorted(keyOrdering)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug is that sorted should be distict as well (as it was before the refactor), but after the refactor we can do better:

  • We can avoid adding a grouping operator entirelly when the non-grouped satisfyingKeyedPartitioning.partitionKeys satisfies the required sort order.
  • Or if it doesn't, then we need to add a GroupPartitionsExec operator, but we can avoid coalescing partitions in the operator with setting applyPartialClustering.

GroupPartitionsExec(child, expectedPartitionKeys = Some(sorted.map((_, 1))))

case _ if groupedSatisfies =>
if (satisfyingKeyedPartitioning.partitionKeys.sliding(2).forall {
case Seq(k1, k2) => keyOrdering.lteq(k1, k2)
}) {
child
} else {
// Use distributePartitions to spread splits across expected partitions
val sortedGroupedKeys = satisfyingKeyedPartitioning.partitionKeys
.groupBy(identity).view.mapValues(_.size)
.toSeq.sortBy(_._1)(keyOrdering)
GroupPartitionsExec(child,
expectedPartitionKeys = Some(sortedGroupedKeys),
distributePartitions = true
)
}

case _ if groupedSatisfies.isDefined =>
// Grouped KeyedPartitioning already satisfies
child

Expand Down Expand Up @@ -238,7 +252,7 @@ case class EnsureRequirements(
// Hence we need to ensure that after this call, the outputPartitioning of the
// partitioned side's BatchScanExec is grouped by join keys to match,
// and we do that by pushing down the join keys
case Some(KeyGroupedShuffleSpec(_, _, Some(joinKeyPositions))) =>
case Some(KeyedShuffleSpec(_, _, Some(joinKeyPositions))) =>
withJoinKeyPositions(child, joinKeyPositions)
case _ => child
}
Expand All @@ -258,7 +272,7 @@ case class EnsureRequirements(
child match {
case ShuffleExchangeExec(_, c, so, ps) =>
ShuffleExchangeExec(newPartitioning, c, so, ps)
case GroupPartitionsExec(c, _, _, _, _, _) => ShuffleExchangeExec(newPartitioning, c)
case GroupPartitionsExec(c, _, _, _, _) => ShuffleExchangeExec(newPartitioning, c)
case _ => ShuffleExchangeExec(newPartitioning, child)
}
}
Expand Down Expand Up @@ -440,7 +454,7 @@ case class EnsureRequirements(
val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p, d) =>
if (!d.isInstanceOf[ClusteredDistribution]) return None
val cd = d.asInstanceOf[ClusteredDistribution]
val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd)
val specOpt = createKeyedShuffleSpec(p.outputPartitioning, cd)
if (specOpt.isEmpty) return None
specOpt.get
}
Expand All @@ -454,7 +468,7 @@ case class EnsureRequirements(
// partitionings are not modified (projected) in specs and left and right side partitionings are
// compatible with each other.
// Left and right `outputPartitioning` is a `PartitioningCollection` or a `KeyedPartitioning`
// otherwise `createKeyGroupedShuffleSpec()` would have returned `None`.
// otherwise `createKeyedShuffleSpec()` would have returned `None`.
var isCompatible =
left.outputPartitioning.asInstanceOf[Expression].exists(_ == leftPartitioning) &&
right.outputPartitioning.asInstanceOf[Expression].exists(_ == rightPartitioning) &&
Expand Down Expand Up @@ -593,7 +607,7 @@ case class EnsureRequirements(
val originalPartitioning =
partiallyClusteredChild.outputPartitioning.asInstanceOf[Expression]
// `outputPartitioning` is either a `PartitioningCollection` or a `KeyedPartitioning`
// otherwise `createKeyGroupedShuffleSpec()` would have returned `None`.
// otherwise `createKeyedShuffleSpec()` would have returned `None`.
val originalKeyedPartitioning =
originalPartitioning.collectFirst { case k: KeyedPartitioning => k }.get
val projectedOriginalPartitionKeys = partiallyClusteredSpec.joinKeyPositions
Expand All @@ -616,9 +630,9 @@ case class EnsureRequirements(

// Now we need to push-down the common partition information to the `GroupPartitionsExec`s.
newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions, mergedPartitionKeys,
leftReducers, applyPartialClustering, replicateLeftSide)
leftReducers, distributePartitions = applyPartialClustering && !replicateLeftSide)
newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions, mergedPartitionKeys,
rightReducers, applyPartialClustering, replicateRightSide)
rightReducers, distributePartitions = applyPartialClustering && !replicateRightSide)
}
}

Expand Down Expand Up @@ -673,21 +687,19 @@ case class EnsureRequirements(
joinKeyPositions: Option[Seq[Int]],
mergedPartitionKeys: Seq[(InternalRowComparableWrapper, Int)],
reducers: Option[Seq[Option[Reducer[_, _]]]],
applyPartialClustering: Boolean,
replicatePartitions: Boolean): SparkPlan = {
distributePartitions: Boolean): SparkPlan = {
plan match {
case g: GroupPartitionsExec =>
val newGroupPartitions = g.copy(
joinKeyPositions = joinKeyPositions,
expectedPartitionKeys = Some(mergedPartitionKeys),
reducers = reducers,
applyPartialClustering = applyPartialClustering,
replicatePartitions = replicatePartitions)
distributePartitions = distributePartitions)
newGroupPartitions.copyTagsFrom(g)
newGroupPartitions
case _ =>
GroupPartitionsExec(plan, joinKeyPositions, Some(mergedPartitionKeys), reducers,
applyPartialClustering, replicatePartitions)
distributePartitions)
}
}

Expand All @@ -705,14 +717,14 @@ case class EnsureRequirements(
}

/**
* Tries to create a [[KeyGroupedShuffleSpec]] from the input partitioning and distribution, if
* the partitioning is a [[KeyedPartitioning]] (either directly or indirectly), and
* satisfies the given distribution.
* Tries to create a [[KeyedShuffleSpec]] from the input partitioning and distribution, if the
* partitioning is a [[KeyedPartitioning]] (either directly or indirectly), and satisfies the
* given distribution.
*/
private def createKeyGroupedShuffleSpec(
private def createKeyedShuffleSpec(
partitioning: Partitioning,
distribution: ClusteredDistribution): Option[KeyGroupedShuffleSpec] = {
def tryCreate(partitioning: KeyedPartitioning): Option[KeyGroupedShuffleSpec] = {
distribution: ClusteredDistribution): Option[KeyedShuffleSpec] = {
def tryCreate(partitioning: KeyedPartitioning): Option[KeyedShuffleSpec] = {
val attributes = partitioning.expressions.flatMap(_.collectLeaves())
val clustering = distribution.clustering

Expand All @@ -725,7 +737,7 @@ case class EnsureRequirements(
}

if (satisfies) {
Some(partitioning.createShuffleSpec(distribution).asInstanceOf[KeyGroupedShuffleSpec])
Some(partitioning.createShuffleSpec(distribution).asInstanceOf[KeyedShuffleSpec])
} else {
None
}
Expand All @@ -734,7 +746,7 @@ case class EnsureRequirements(
partitioning match {
case p: KeyedPartitioning => tryCreate(p)
case PartitioningCollection(partitionings) =>
partitionings.collectFirst(Function.unlift(createKeyGroupedShuffleSpec(_, distribution)))
partitionings.collectFirst(Function.unlift(createKeyedShuffleSpec(_, distribution)))
case _ => None
}
}
Expand Down
Loading