From e7223a0d6fd1750706e7ca2829f701b36bf4ff58 Mon Sep 17 00:00:00 2001 From: ashahid Date: Wed, 11 Dec 2024 12:47:16 -0800 Subject: [PATCH 1/2] SPARK-45658.Fix canonicalization of DynamicPruningSubquery to canonicalize build keys relative to build query output --- .../catalyst/expressions/DynamicPruning.scala | 4 +++- .../expressions/CanonicalizeSuite.scala | 19 ++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala index b65576403e9d8..b3b2288d2299a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.trees.UnaryLike @@ -87,10 +88,11 @@ case class DynamicPruningSubquery( override def toString: String = s"dynamicpruning#${exprId.id} $conditionString" override lazy val canonicalized: DynamicPruning = { + val buildOutput = buildQuery.output copy( pruningKey = pruningKey.canonicalized, buildQuery = buildQuery.canonicalized, - buildKeys = buildKeys.map(_.canonicalized), + buildKeys = buildKeys.map(QueryPlan.normalizeExpressions(_, buildOutput)), exprId = ExprId(0)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala index 7e545d3321054..c362dc6bb93a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala @@ -22,7 +22,7 @@ import java.util.TimeZone import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.MULTI_COMMUTATIVE_OP_OPT_THRESHOLD import org.apache.spark.sql.types.{BooleanType, Decimal, DecimalType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampNTZType, TimestampType} @@ -352,6 +352,23 @@ class CanonicalizeSuite extends SparkFunSuite { SQLConf.get.setConfString(MULTI_COMMUTATIVE_OP_OPT_THRESHOLD.key, default.toString) } + test("SPARK-45658: DynamicPruningSubquery canonicalization build keys not canonicalized" + + " relative to build query output") { + val pruneExprId = NamedExpression.newExprId + val pruneKey = AttributeReference("dummy", IntegerType)(pruneExprId) + val testRelation = LocalRelation($"a".int, $"b".int, $"c".int) + + val buildQueryPlan1 = testRelation.where("a".attr > 10).select($"b".attr * Literal(5)).analyze + val buildKeys1 = Seq(buildQueryPlan1.output.head) + val dps1 = DynamicPruningSubquery(pruneKey, buildQueryPlan1, buildKeys1, 0, true) + + val buildQueryPlan2 = testRelation.where("a".attr > 10).select($"b".attr * Literal(5)).analyze + val buildKeys2 = Seq(buildQueryPlan2.output.head) + val dps2 = DynamicPruningSubquery(pruneKey, buildQueryPlan2, buildKeys2, 0, true) + + assert(dps1.canonicalized == dps2.canonicalized) + } + test("canonicalization of With expressions with one common expression") { val expr = Divide(Literal.create(1, IntegerType), AttributeReference("a", IntegerType)()) val common1 = IsNull(With(expr.copy()) { case Seq(expr) => From d6e4f6517fe2c18b004a65a215d3dc1bc3b34b08 Mon Sep 17 00:00:00 2001 From: ashahid Date: Wed, 11 Dec 2024 19:17:19 -0800 Subject: [PATCH 2/2] SPARK-45658.Fix canonicalization of DynamicPruningSubquery to canonicalize build keys relative to build query output. fix compilation error after merge --- .../spark/sql/catalyst/expressions/CanonicalizeSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala index c362dc6bb93a1..6aa5703645076 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala @@ -360,11 +360,11 @@ class CanonicalizeSuite extends SparkFunSuite { val buildQueryPlan1 = testRelation.where("a".attr > 10).select($"b".attr * Literal(5)).analyze val buildKeys1 = Seq(buildQueryPlan1.output.head) - val dps1 = DynamicPruningSubquery(pruneKey, buildQueryPlan1, buildKeys1, 0, true) + val dps1 = DynamicPruningSubquery(pruneKey, buildQueryPlan1, buildKeys1, Seq(0), true) val buildQueryPlan2 = testRelation.where("a".attr > 10).select($"b".attr * Literal(5)).analyze val buildKeys2 = Seq(buildQueryPlan2.output.head) - val dps2 = DynamicPruningSubquery(pruneKey, buildQueryPlan2, buildKeys2, 0, true) + val dps2 = DynamicPruningSubquery(pruneKey, buildQueryPlan2, buildKeys2, Seq(0), true) assert(dps1.canonicalized == dps2.canonicalized) }