Skip to content

Commit

Permalink
[SPARK-37855][SQL] IllegalStateException when transforming an array i…
Browse files Browse the repository at this point in the history
…nside a nested struct

### What changes were proposed in this pull request?

Skip alias the `ExtractValue` whose children contains `NamedLambdaVariable`.

### Why are the changes needed?

Since #32773, the `NamedLambdaVariable` can produce the references, however it cause the rule `NestedColumnAliasing` alias the `ExtractValue` which contains `NamedLambdaVariable`. It fails since we can not match a `NamedLambdaVariable` to an actual attribute.

Talk more:
During `NamedLambdaVariable#replaceWithAliases`, it uses the references of nestedField to match the output attributes of grandchildren. However `NamedLambdaVariable` is created at analyzer as a virtual attribute, and it is not resolved from the output of children. So we can not get any attribute when use the references of `NamedLambdaVariable` to match the grandchildren's output.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

Add new test

Closes #35170 from ulysses-you/SPARK-37855.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
ulysses-you authored and viirya committed Jan 12, 2022
1 parent 9404954 commit 189b205
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,13 @@ object NestedColumnAliasing {
val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
exprList.foreach { e =>
collectRootReferenceAndExtractValue(e).foreach {
case ev: ExtractValue =>
// we can not alias the attr from lambda variable whose expr id is not available
case ev: ExtractValue if ev.find(_.isInstanceOf[NamedLambdaVariable]).isEmpty =>
if (ev.references.size == 1) {
nestedFieldReferences.append(ev)
}
case ar: AttributeReference => otherRootReferences.append(ar)
case _ => // ignore
}
}
val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)
Expand Down
54 changes: 54 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3033,6 +3033,60 @@ class DataFrameSuite extends QueryTest
}
}
}

test("SPARK-37855: IllegalStateException when transforming an array inside a nested struct") {
def makeInput(): DataFrame = {
val innerElement1 = Row(3, 3.12)
val innerElement2 = Row(4, 2.1)
val innerElement3 = Row(1, 985.2)
val innerElement4 = Row(10, 757548.0)
val innerElement5 = Row(1223, 0.665)

val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
val outerElement2 = Row(2, Row(List(innerElement3)))
val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))

val data = Seq(
Row("row1", List(outerElement1)),
Row("row2", List(outerElement2, outerElement3))
)

val schema = new StructType()
.add("name", StringType)
.add("outer_array", ArrayType(new StructType()
.add("id", IntegerType)
.add("inner_array_struct", new StructType()
.add("inner_array", ArrayType(new StructType()
.add("id", IntegerType)
.add("value", DoubleType)
))
)
))

spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
}

val df = makeInput().limit(2)

val res = df.withColumn("extracted", transform(
col("outer_array"),
c1 => {
struct(
c1.getField("id").alias("outer_id"),
transform(
c1.getField("inner_array_struct").getField("inner_array"),
c2 => {
struct(
c2.getField("value").alias("inner_value")
)
}
)
)
}
))

assert(res.collect.length == 2)
}
}

case class GroupByKey(a: Int, b: Int)
Expand Down

0 comments on commit 189b205

Please sign in to comment.