From e88d2db952f30f2a1ed62a7c435e5dfecfe77ebb Mon Sep 17 00:00:00 2001 From: yhuang-db Date: Mon, 3 Nov 2025 15:01:01 -0800 Subject: [PATCH 1/4] normalize keyGroupedPartitioning and ordering, and move equal function --- .../datasources/v2/DataSourceV2Relation.scala | 8 ++++++- .../sql/connector/DataSourceV2Suite.scala | 24 +++++++++---------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 9b8d48c3f3a8..789f1b0ca000 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -182,7 +182,13 @@ case class DataSourceV2ScanRelation( relation = this.relation.copy( output = this.relation.output.map(QueryPlan.normalizeExpressions(_, this.relation.output)) ), - output = this.output.map(QueryPlan.normalizeExpressions(_, this.output)) + output = this.output.map(QueryPlan.normalizeExpressions(_, this.output)), + keyGroupedPartitioning = keyGroupedPartitioning.map( + _.map(QueryPlan.normalizeExpressions(_, output)) + ), + ordering = ordering.map( + _.map(o => o.copy(child = QueryPlan.normalizeExpressions(o.child, output))) + ) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 01fa2b13b86f..8715cfa18bdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -1093,6 +1093,18 @@ abstract class SimpleScanBuilder extends ScanBuilder override def readSchema(): StructType = TestingV2Source.schema override def createReaderFactory(): PartitionReaderFactory = SimpleReaderFactory + + override def equals(obj: Any): Boolean = { + obj match { + case s: Scan => + this.readSchema() == s.readSchema() + case _ => false + } + } + + override def hashCode(): Int = { + this.readSchema().hashCode() + } } trait TestingV2Source extends TableProvider { @@ -1157,18 +1169,6 @@ class SimpleDataSourceV2 extends TestingV2Source { override def planInputPartitions(): Array[InputPartition] = { Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) } - - override def equals(obj: Any): Boolean = { - obj match { - case s: Scan => - this.readSchema() == s.readSchema() - case _ => false - } - } - - override def hashCode(): Int = { - this.readSchema().hashCode() - } } override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { From d8fcaa06a91f15bfc18f32c6160e32ba9038fb0f Mon Sep 17 00:00:00 2001 From: yhuang-db Date: Tue, 4 Nov 2025 12:06:04 -0800 Subject: [PATCH 2/4] same tests for OrderAndPartitionAwareDataSource --- .../sql/connector/DataSourceV2Suite.scala | 100 +++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 8715cfa18bdf..1dc234a8c785 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -21,6 +21,8 @@ import java.io.File import java.util import java.util.OptionalLong +import scala.jdk.CollectionConverters._ + import test.org.apache.spark.sql.connector._ import org.apache.spark.SparkUnsupportedOperationException @@ -37,7 +39,7 @@ import org.apache.spark.sql.connector.read.Scan.ColumnarSupportMode import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, DataSourceV2ScanRelation, V2ScanPartitioningAndOrdering} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector @@ -1008,6 +1010,46 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS "Canonicalized DataSourceV2ScanRelation instances should be equal") } + test("SPARK-54163: scan canonicalization with partitioning and ordering aware data source") { + val options = new CaseInsensitiveStringMap(Map( + "partitionKeys" -> "i", + "orderKeys" -> "i,j" + ).asJava) + val table = new OrderAndPartitionAwareDataSource().getTable(options) + + def createDsv2ScanRelation(): DataSourceV2ScanRelation = { + val relation = DataSourceV2Relation.create(table, None, None, options) + val scan = relation.table.asReadable.newScanBuilder(relation.options).build() + val scanRelation = DataSourceV2ScanRelation(relation, scan, relation.output) + V2ScanPartitioningAndOrdering.apply(scanRelation).asInstanceOf[DataSourceV2ScanRelation] + } + + // Create two DataSourceV2ScanRelation instances, representing the scan of the same table + val scanRelation1 = createDsv2ScanRelation() + val scanRelation2 = createDsv2ScanRelation() + + // assert scanRelations have partitioning and ordering + assert(scanRelation1.keyGroupedPartitioning.isDefined && + scanRelation1.keyGroupedPartitioning.get.nonEmpty, + "DataSourceV2ScanRelation should have key grouped partitioning") + assert(scanRelation1.ordering.isDefined && + scanRelation1.ordering.get.nonEmpty, + "DataSourceV2ScanRelation should have ordering") + + // the two instances should not be the same, as they should have different attribute IDs + assert(scanRelation1 != scanRelation2, + "Two created DataSourceV2ScanRelation instances should not be the same") + assert(scanRelation1.output.map(_.exprId).toSet != scanRelation2.output.map(_.exprId).toSet, + "Output attributes should have different expression IDs before canonicalization") + assert(scanRelation1.relation.output.map(_.exprId).toSet != + scanRelation2.relation.output.map(_.exprId).toSet, + "Relation output attributes should have different expression IDs before canonicalization") + + // After canonicalization, the two instances should be equal + assert(scanRelation1.canonicalized == scanRelation2.canonicalized, + "Canonicalized DataSourceV2ScanRelation instances should be equal") + } + test("SPARK-53809: check mergeScalarSubqueries is effective for DataSourceV2ScanRelation") { val df = spark.read.format(classOf[SimpleDataSourceV2].getName).load() df.createOrReplaceTempView("df") @@ -1052,6 +1094,62 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS // Verify the query produces correct results checkAnswer(query, Row(9, 0)) } + + test("SPARK-54163: check mergeScalarSubqueries is effective") { + withSQLConf(SQLConf.V2_BUCKETING_ENABLED.key -> "true") { + val options = Map( + "partitionKeys" -> "i", + "orderKeys" -> "i,j" + ) + + // Create the OrderAndPartitionAwareDataSource DataFrame + val df = spark.read + .format(classOf[OrderAndPartitionAwareDataSource].getName) + .options(options) + .load() + df.createOrReplaceTempView("df") + + val query = sql("select (select max(i) from df) as max_i, (select min(i) from df) as min_i") + val optimizedPlan = query.queryExecution.optimizedPlan + + // check optimizedPlan merged scalar subqueries `select max(i), min(i) from df` + val sub1 = optimizedPlan.asInstanceOf[Project].projectList.head.collect { + case s: ScalarSubquery => s + } + val sub2 = optimizedPlan.asInstanceOf[Project].projectList(1).collect { + case s: ScalarSubquery => s + } + + // Both subqueries should reference the same merged plan `select max(i), min(i) from df` + assert(sub1.nonEmpty && sub2.nonEmpty, "Both scalar subqueries should exist") + assert(sub1.head.plan == sub2.head.plan, + "Both subqueries should reference the same merged plan") + + // Extract the aggregate from the merged plan sub1 + val agg = sub1.head.plan.collect { + case a: Aggregate => a + }.head + + // Check that the aggregate contains both max(i) and min(i) + val aggFunctionSet = agg.aggregateExpressions.flatMap { expr => + expr.collect { + case ae: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression => + ae.aggregateFunction + } + }.toSet + + assert(aggFunctionSet.size == 2, "Aggregate should contain exactly two aggregate functions") + assert(aggFunctionSet + .exists(_.isInstanceOf[org.apache.spark.sql.catalyst.expressions.aggregate.Max]), + "Aggregate should contain max(i)") + assert(aggFunctionSet + .exists(_.isInstanceOf[org.apache.spark.sql.catalyst.expressions.aggregate.Min]), + "Aggregate should contain min(i)") + + // Verify the query produces correct results + checkAnswer(query, Row(4, 1)) + } + } } case class RangeInputPartition(start: Int, end: Int) extends InputPartition From e9511ec4dc4ca75973a4cf7108127e0e6be60194 Mon Sep 17 00:00:00 2001 From: yhuang-db Date: Mon, 17 Nov 2025 16:00:31 -0800 Subject: [PATCH 3/4] same tests for OrderAndPartitionAwareDataSource --- .../spark/sql/connector/DataSourceV2Suite.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 1dc234a8c785..025625b3646d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -1010,7 +1010,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS "Canonicalized DataSourceV2ScanRelation instances should be equal") } - test("SPARK-54163: scan canonicalization with partitioning and ordering aware data source") { + test("SPARK-54163: scan canonicalization for partitioning and ordering aware data source") { val options = new CaseInsensitiveStringMap(Map( "partitionKeys" -> "i", "orderKeys" -> "i,j" @@ -1044,6 +1044,12 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS assert(scanRelation1.relation.output.map(_.exprId).toSet != scanRelation2.relation.output.map(_.exprId).toSet, "Relation output attributes should have different expression IDs before canonicalization") + assert(scanRelation1.keyGroupedPartitioning.get.flatMap(_.references.map(_.exprId)).toSet != + scanRelation2.keyGroupedPartitioning.get.flatMap(_.references.map(_.exprId)).toSet, + "Partitioning columns should have different expression IDs before canonicalization") + assert(scanRelation1.ordering.get.flatMap(_.references.map(_.exprId)).toSet != + scanRelation2.ordering.get.flatMap(_.references.map(_.exprId)).toSet, + "Ordering columns should have different expression IDs before canonicalization") // After canonicalization, the two instances should be equal assert(scanRelation1.canonicalized == scanRelation2.canonicalized, @@ -1095,7 +1101,9 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS checkAnswer(query, Row(9, 0)) } - test("SPARK-54163: check mergeScalarSubqueries is effective") { + test( + "SPARK-54163: check mergeScalarSubqueries is effective for OrderAndPartitionAwareDataSource" + ) { withSQLConf(SQLConf.V2_BUCKETING_ENABLED.key -> "true") { val options = Map( "partitionKeys" -> "i", From 6ae9cd0307f0831f863e5dc9984c3fe31a33bf89 Mon Sep 17 00:00:00 2001 From: yhuang-db Date: Mon, 17 Nov 2025 17:22:25 -0800 Subject: [PATCH 4/4] comment --- .../org/apache/spark/sql/connector/DataSourceV2Suite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 025625b3646d..a09b7e0827c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -1021,6 +1021,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS val relation = DataSourceV2Relation.create(table, None, None, options) val scan = relation.table.asReadable.newScanBuilder(relation.options).build() val scanRelation = DataSourceV2ScanRelation(relation, scan, relation.output) + // Attach partitioning and ordering information to DataSourceV2ScanRelation V2ScanPartitioningAndOrdering.apply(scanRelation).asInstanceOf[DataSourceV2ScanRelation] } @@ -1032,8 +1033,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS assert(scanRelation1.keyGroupedPartitioning.isDefined && scanRelation1.keyGroupedPartitioning.get.nonEmpty, "DataSourceV2ScanRelation should have key grouped partitioning") - assert(scanRelation1.ordering.isDefined && - scanRelation1.ordering.get.nonEmpty, + assert(scanRelation1.ordering.isDefined && scanRelation1.ordering.get.nonEmpty, "DataSourceV2ScanRelation should have ordering") // the two instances should not be the same, as they should have different attribute IDs