diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala index 0b76d9cf139a..b15d32a6e12a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala @@ -60,7 +60,7 @@ object CollapseProjectExecTransformer extends Rule[SparkPlan] { */ private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = { projectList.exists { - case _ @Alias(_: CreateNamedStruct, _) => true + case a: Alias => a.child.exists(_.isInstanceOf[CreateNamedStruct]) case _ => false } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala index 87a4ce14d4a2..755cc26cb72a 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -18,9 +18,13 @@ package org.apache.spark.sql.extension import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.ProjectExecTransformer +import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer import org.apache.spark.sql.GlutenSQLTestsTrait import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.LocalTableScanExec +import org.apache.spark.sql.types._ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { @@ -116,4 +120,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { } } } + + testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") { + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") { + val nameAttr = AttributeReference("name", StringType, nullable = true)() + val valueAttr = AttributeReference("value", IntegerType, nullable = false)() + val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty) + + // Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info") + val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr)) + val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns) + val innerAlias = Alias(wrappedCns, "info")() + val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf) + + // Outer project: GetStructField(info, 0) AS n + val infoAttr = innerProject.output.find(_.name == "info").get + val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")() + val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject) + + // Apply collapse rule - guard should block collapse + val result = CollapseProjectExecTransformer.apply(outerProject) + assert( + result match { + case ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + }, + "Expected stacked projects to remain uncollapsed when CreateNamedStruct " + + "is nested inside wrapper expression" + ) + } + } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala index 87a4ce14d4a2..755cc26cb72a 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -18,9 +18,13 @@ package org.apache.spark.sql.extension import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.ProjectExecTransformer +import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer import org.apache.spark.sql.GlutenSQLTestsTrait import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.LocalTableScanExec +import org.apache.spark.sql.types._ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { @@ -116,4 +120,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { } } } + + testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") { + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") { + val nameAttr = AttributeReference("name", StringType, nullable = true)() + val valueAttr = AttributeReference("value", IntegerType, nullable = false)() + val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty) + + // Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info") + val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr)) + val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns) + val innerAlias = Alias(wrappedCns, "info")() + val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf) + + // Outer project: GetStructField(info, 0) AS n + val infoAttr = innerProject.output.find(_.name == "info").get + val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")() + val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject) + + // Apply collapse rule - guard should block collapse + val result = CollapseProjectExecTransformer.apply(outerProject) + assert( + result match { + case ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + }, + "Expected stacked projects to remain uncollapsed when CreateNamedStruct " + + "is nested inside wrapper expression" + ) + } + } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala index 49e4187babf4..cafef174a65b 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -18,10 +18,13 @@ package org.apache.spark.sql.extension import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.ProjectExecTransformer +import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer import org.apache.spark.sql.GlutenSQLTestsTrait import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{LocalTableScanExec, SparkPlan} +import org.apache.spark.sql.types._ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { @@ -123,4 +126,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { } } } + + testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") { + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") { + val nameAttr = AttributeReference("name", StringType, nullable = true)() + val valueAttr = AttributeReference("value", IntegerType, nullable = false)() + val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty) + + // Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info") + val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr)) + val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns) + val innerAlias = Alias(wrappedCns, "info")() + val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf) + + // Outer project: GetStructField(info, 0) AS n + val infoAttr = innerProject.output.find(_.name == "info").get + val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")() + val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject) + + // Apply collapse rule - guard should block collapse + val result = CollapseProjectExecTransformer.apply(outerProject) + assert( + result match { + case ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + }, + "Expected stacked projects to remain uncollapsed when CreateNamedStruct " + + "is nested inside wrapper expression" + ) + } + } } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala index 49e4187babf4..cf04f0e7b545 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -18,10 +18,13 @@ package org.apache.spark.sql.extension import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.ProjectExecTransformer +import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer import org.apache.spark.sql.GlutenSQLTestsTrait import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{LocalTableScanExec, SparkPlan} +import org.apache.spark.sql.types._ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { @@ -123,4 +126,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { } } } + + testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") { + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") { + val nameAttr = AttributeReference("name", StringType, nullable = true)() + val valueAttr = AttributeReference("value", IntegerType, nullable = false)() + val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty, None) + + // Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info") + val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr)) + val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns) + val innerAlias = Alias(wrappedCns, "info")() + val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf) + + // Outer project: GetStructField(info, 0) AS n + val infoAttr = innerProject.output.find(_.name == "info").get + val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")() + val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject) + + // Apply collapse rule - guard should block collapse + val result = CollapseProjectExecTransformer.apply(outerProject) + assert( + result match { + case ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + }, + "Expected stacked projects to remain uncollapsed when CreateNamedStruct " + + "is nested inside wrapper expression" + ) + } + } } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala index 49e4187babf4..cf04f0e7b545 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -18,10 +18,13 @@ package org.apache.spark.sql.extension import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.ProjectExecTransformer +import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer import org.apache.spark.sql.GlutenSQLTestsTrait import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{LocalTableScanExec, SparkPlan} +import org.apache.spark.sql.types._ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { @@ -123,4 +126,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { } } } + + testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") { + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") { + val nameAttr = AttributeReference("name", StringType, nullable = true)() + val valueAttr = AttributeReference("value", IntegerType, nullable = false)() + val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty, None) + + // Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info") + val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr)) + val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns) + val innerAlias = Alias(wrappedCns, "info")() + val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf) + + // Outer project: GetStructField(info, 0) AS n + val infoAttr = innerProject.output.find(_.name == "info").get + val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")() + val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject) + + // Apply collapse rule - guard should block collapse + val result = CollapseProjectExecTransformer.apply(outerProject) + assert( + result match { + case ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + }, + "Expected stacked projects to remain uncollapsed when CreateNamedStruct " + + "is nested inside wrapper expression" + ) + } + } }