Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object CollapseProjectExecTransformer extends Rule[SparkPlan] {
*/
private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = {
projectList.exists {
case _ @Alias(_: CreateNamedStruct, _) => true
case a: Alias => a.child.exists(_.isInstanceOf[CreateNamedStruct])
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package org.apache.spark.sql.extension

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.types._

class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {

Expand Down Expand Up @@ -116,4 +120,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {
}
}
}

testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") {
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") {
val nameAttr = AttributeReference("name", StringType, nullable = true)()
val valueAttr = AttributeReference("value", IntegerType, nullable = false)()
val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty)

// Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info")
val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr))
val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns)
val innerAlias = Alias(wrappedCns, "info")()
val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf)

// Outer project: GetStructField(info, 0) AS n
val infoAttr = innerProject.output.find(_.name == "info").get
val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")()
val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject)

// Apply collapse rule - guard should block collapse
val result = CollapseProjectExecTransformer.apply(outerProject)
assert(
result match {
case ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
},
"Expected stacked projects to remain uncollapsed when CreateNamedStruct " +
"is nested inside wrapper expression"
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package org.apache.spark.sql.extension

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.types._

class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {

Expand Down Expand Up @@ -116,4 +120,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {
}
}
}

testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") {
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") {
val nameAttr = AttributeReference("name", StringType, nullable = true)()
val valueAttr = AttributeReference("value", IntegerType, nullable = false)()
val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty)

// Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info")
val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr))
val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns)
val innerAlias = Alias(wrappedCns, "info")()
val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf)

// Outer project: GetStructField(info, 0) AS n
val infoAttr = innerProject.output.find(_.name == "info").get
val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")()
val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject)

// Apply collapse rule - guard should block collapse
val result = CollapseProjectExecTransformer.apply(outerProject)
assert(
result match {
case ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
},
"Expected stacked projects to remain uncollapsed when CreateNamedStruct " +
"is nested inside wrapper expression"
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package org.apache.spark.sql.extension

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{LocalTableScanExec, SparkPlan}
import org.apache.spark.sql.types._

class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {

Expand Down Expand Up @@ -123,4 +126,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {
}
}
}

testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") {
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") {
val nameAttr = AttributeReference("name", StringType, nullable = true)()
val valueAttr = AttributeReference("value", IntegerType, nullable = false)()
val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty)

// Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info")
val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr))
val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns)
val innerAlias = Alias(wrappedCns, "info")()
val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf)

// Outer project: GetStructField(info, 0) AS n
val infoAttr = innerProject.output.find(_.name == "info").get
val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")()
val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject)

// Apply collapse rule - guard should block collapse
val result = CollapseProjectExecTransformer.apply(outerProject)
assert(
result match {
case ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
},
"Expected stacked projects to remain uncollapsed when CreateNamedStruct " +
"is nested inside wrapper expression"
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package org.apache.spark.sql.extension

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{LocalTableScanExec, SparkPlan}
import org.apache.spark.sql.types._

class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {

Expand Down Expand Up @@ -123,4 +126,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {
}
}
}

testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") {
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") {
val nameAttr = AttributeReference("name", StringType, nullable = true)()
val valueAttr = AttributeReference("value", IntegerType, nullable = false)()
val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty, None)

// Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info")
val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr))
val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns)
val innerAlias = Alias(wrappedCns, "info")()
val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf)

// Outer project: GetStructField(info, 0) AS n
val infoAttr = innerProject.output.find(_.name == "info").get
val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")()
val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject)

// Apply collapse rule - guard should block collapse
val result = CollapseProjectExecTransformer.apply(outerProject)
assert(
result match {
case ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
},
"Expected stacked projects to remain uncollapsed when CreateNamedStruct " +
"is nested inside wrapper expression"
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package org.apache.spark.sql.extension

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.extension.columnar.CollapseProjectExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{LocalTableScanExec, SparkPlan}
import org.apache.spark.sql.types._

class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {

Expand Down Expand Up @@ -123,4 +126,34 @@ class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {
}
}
}

testGluten("Collapse is blocked when CreateNamedStruct is nested inside wrapper expression") {
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") {
val nameAttr = AttributeReference("name", StringType, nullable = true)()
val valueAttr = AttributeReference("value", IntegerType, nullable = false)()
val leaf = LocalTableScanExec(Seq(nameAttr, valueAttr), Seq.empty, None)

// Inner project: Alias(If(IsNull(name), null, CreateNamedStruct(...)), "info")
val cns = CreateNamedStruct(Seq(Literal("n"), nameAttr, Literal("v"), valueAttr))
val wrappedCns = If(IsNull(nameAttr), Literal.create(null, cns.dataType), cns)
val innerAlias = Alias(wrappedCns, "info")()
val innerProject = ProjectExecTransformer.createUnsafe(Seq(innerAlias), leaf)

// Outer project: GetStructField(info, 0) AS n
val infoAttr = innerProject.output.find(_.name == "info").get
val outerExpr = Alias(GetStructField(infoAttr, 0, Some("n")), "n")()
val outerProject = ProjectExecTransformer.createUnsafe(Seq(outerExpr), innerProject)

// Apply collapse rule - guard should block collapse
val result = CollapseProjectExecTransformer.apply(outerProject)
assert(
result match {
case ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
},
"Expected stacked projects to remain uncollapsed when CreateNamedStruct " +
"is nested inside wrapper expression"
)
}
}
}
Loading