Skip to content

Commit

Permalink
[SPARK-45592][SPARK-45282][SQL][3.4] Correctness issue in AQE with In…
Browse files Browse the repository at this point in the history
…MemoryTableScanExec

### What changes were proposed in this pull request?

This backports #43435 SPARK-45592  to the 3.4 branch. This is because it was already reported there as SPARK-45282 but it required enabling some extra configuration to hit the bug.

### Why are the changes needed?

Fix correctness issue.

### Does this PR introduce _any_ user-facing change?

Yes, fixing correctness issue.

### How was this patch tested?

New tests based on the reproduction example in SPARK-45282

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43729 from eejbyfeldt/SPARK-45282.

Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
eejbyfeldt authored and dongjoon-hyun committed Nov 12, 2023
1 parent 92bea64 commit eace7d3
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,35 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)

}

case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)

/**
* Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
* fewer number of partitions.
*/
case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])
extends Expression with Partitioning with Unevaluable {

override def children: Seq[Expression] = from.expressions
override def nullable: Boolean = from.nullable
override def dataType: DataType = from.dataType

override def satisfies0(required: Distribution): Boolean = from.satisfies0(required)

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
copy(from = from.copy(expressions = newChildren))

override val numPartitions: Int = partitions.length

override def toString: String = from.toString
override def sql: String = from.sql
}

/**
Expand Down Expand Up @@ -661,6 +690,26 @@ case class HashShuffleSpec(
override def numPartitions: Int = partitioning.numPartitions
}

case class CoalescedHashShuffleSpec(
from: ShuffleSpec,
partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
case SinglePartitionShuffleSpec =>
numPartitions == 1
case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
partitions == otherPartitions && from.isCompatibleWith(otherParent)
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ =>
false
}

override def canCreatePartitioning: Boolean = false

override def numPartitions: Int = partitions.length
}

case class KeyGroupedShuffleSpec(
partitioning: KeyGroupedPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.SparkFunSuite
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal, Murmur3Hash, Pmod}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, Murmur3Hash, Pmod}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.IntegerType

Expand Down Expand Up @@ -146,63 +146,75 @@ class DistributionSuite extends SparkFunSuite {
false)
}

test("HashPartitioning is the output partitioning") {
// HashPartitioning can satisfy ClusteredDistribution iff its hash expressions are a subset of
// the required clustering expressions.
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
HashPartitioning(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"b", $"c")),
false)

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"d", $"e")),
false)

// When ClusteredDistribution.requireAllClusterKeys is set to true,
// HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are
// exactly same as the required clustering expressions.
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
true)

checkSatisfied(
HashPartitioning(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

checkSatisfied(
HashPartitioning(Seq($"b", $"a", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

// HashPartitioning cannot satisfy OrderedDistribution
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)
private def testHashPartitioningLike(
partitioningName: String,
create: (Seq[Expression], Int) => Partitioning): Unit = {

test(s"$partitioningName is the output partitioning") {
// HashPartitioning can satisfy ClusteredDistribution iff its hash expressions are a subset of
// the required clustering expressions.
checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
create(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"b", $"c")),
false)

checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"d", $"e")),
false)

// When ClusteredDistribution.requireAllClusterKeys is set to true,
// HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are
// exactly same as the required clustering expressions.
checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
true)

checkSatisfied(
create(Seq($"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

checkSatisfied(
create(Seq($"b", $"a", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
false)

// HashPartitioning cannot satisfy OrderedDistribution
checkSatisfied(
create(Seq($"a", $"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)

checkSatisfied(
create(Seq($"a", $"b", $"c"), 1),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false) // TODO: this can be relaxed.

checkSatisfied(
create(Seq($"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)
}
}

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 1),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false) // TODO: this can be relaxed.
testHashPartitioningLike("HashPartitioning",
(expressions, numPartitions) => HashPartitioning(expressions, numPartitions))

checkSatisfied(
HashPartitioning(Seq($"b", $"c"), 10),
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
false)
}
testHashPartitioningLike("CoalescedHashPartitioning", (expressions, numPartitions) =>
CoalescedHashPartitioning(
HashPartitioning(expressions, numPartitions), Seq(CoalescedBoundary(0, numPartitions))))

test("RangePartitioning is the output partitioning") {
// RangePartitioning can satisfy OrderedDistribution iff its ordering is a prefix
Expand Down

0 comments on commit eace7d3

Please sign in to comment.