Skip to content

Commit

Permalink
[SPARK-39876][FOLLOW-UP][SQL] Add parser and Dataset tests for SQL UN…
Browse files Browse the repository at this point in the history
…PIVOT

### What changes were proposed in this pull request?
Adds more tests for the SQL `UNPIVOT` clause. apache#37407 (comment)

### Why are the changes needed?
Better test coverage.

### Does this PR introduce _any_ user-facing change?
No, only more tests and fixing one issue. SQL `UNPIVOT` has not been released yet.

### How was this patch tested?
In `UnpivotParserSuite` and `DatasetUnpivotSuite`.

Closes apache#38153 from EnricoMi/branch-sql-unpivot-tests.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
EnricoMi authored and SandishKumarHN committed Dec 12, 2022
1 parent 6c6dd01 commit ca85f86
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
Unpivot(
None,
Some(unpivotColumns.map(Seq(_))),
Some(unpivotAliases),
// None when all elements are None
Some(unpivotAliases).filter(_.exists(_.isDefined)),
variableColumnName,
valueColumnNames,
query
Expand All @@ -1151,7 +1152,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
Unpivot(
None,
Some(unpivotColumns),
Some(unpivotAliases),
// None when all elements are None
Some(unpivotAliases).filter(_.exists(_.isDefined)),
variableColumnName,
valueColumnNames,
query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,10 @@ case class Unpivot(
copy(child = newChild)

def canBeCoercioned: Boolean = values.exists(_.nonEmpty) &&
values.exists(_.forall(_.forall(_.resolved)))
values.exists(_.forall(_.forall(_.resolved))) &&
// when no ids are given, values must be Attributes (column names) to allow detecting ids
// coercion will add aliases, would disallow detecting ids, so defer coercion after id detection
ids.exists(_.forall(_.resolved))

def valuesTypeCoercioned: Boolean = canBeCoercioned &&
// all inner values at position idx must have the same data type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.parser

import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Unpivot}

class UnpivotParserSuite extends AnalysisTest {

import CatalystSqlParser._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = {
comparePlans(parsePlan(sqlCommand), plan, checkAnalysis = false)
}

private def intercept(sqlCommand: String, errorClass: Option[String], messages: String*): Unit =
interceptParseException(parsePlan)(sqlCommand, messages: _*)(errorClass)

test("unpivot - single value") {
assertEqual(
"SELECT * FROM t UNPIVOT (val FOR col in (a, b))",
Unpivot(
None,
Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))),
None,
"col",
Seq("val"),
table("t"))
.where(coalesce($"val").isNotNull)
.select(star())
)
}

test("unpivot - single value with alias") {
Seq(
"SELECT * FROM t UNPIVOT (val FOR col in (a A, b))",
"SELECT * FROM t UNPIVOT (val FOR col in (a AS A, b))"
).foreach { sql =>
withClue(sql) {
assertEqual(
sql,
Unpivot(
None,
Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))),
Some(Seq(Some("A"), None)),
"col",
Seq("val"),
table("t"))
.where(coalesce($"val").isNotNull)
.select(star())
)
}
}
}

test("unpivot - multiple values") {
assertEqual(
"SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a, b), (c, d)))",
Unpivot(
None,
Some(Seq(Seq($"a", $"b").map(UnresolvedAlias(_)), Seq($"c", $"d").map(UnresolvedAlias(_)))),
None,
"col",
Seq("val1", "val2"),
table("t"))
.where(coalesce($"val1", $"val2").isNotNull)
.select(star())
)
}

test("unpivot - multiple values with alias") {
Seq(
"SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a, b) first, (c, d)))",
"SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a, b) AS first, (c, d)))"
).foreach { sql =>
withClue(sql) {
assertEqual(
sql,
Unpivot(
None,
Some(Seq(
Seq($"a", $"b").map(UnresolvedAlias(_)),
Seq($"c", $"d").map(UnresolvedAlias(_))
)),
Some(Seq(Some("first"), None)),
"col",
Seq("val1", "val2"),
table("t"))
.where(coalesce($"val1", $"val2").isNotNull)
.select(star())
)
}
}
}

test("unpivot - multiple values with inner alias") {
Seq(
"SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a A, b), (c, d)))",
"SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a AS A, b), (c, d)))"
).foreach { sql =>
withClue(sql) {
intercept(sql, Some("PARSE_SYNTAX_ERROR"), "Syntax error at or near ")
}
}
}

test("unpivot - alias") {
Seq(
"SELECT up.* FROM t UNPIVOT (val FOR col in (a, b)) up",
"SELECT up.* FROM t UNPIVOT (val FOR col in (a, b)) AS up"
).foreach { sql =>
withClue(sql) {
assertEqual(
sql,
Unpivot(
None,
Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))),
None,
"col",
Seq("val"),
table("t"))
.where(coalesce($"val").isNotNull)
.subquery("up")
.select(star("up"))
)
}
}
}

test("unpivot - no unpivot value names") {
intercept(
"SELECT * FROM t UNPIVOT (() FOR col in ((a, b), (c, d)))",
Some("PARSE_SYNTAX_ERROR"), "Syntax error at or near "
)
}

test("unpivot - no unpivot columns") {
Seq(
"SELECT * FROM t UNPIVOT (val FOR col in ())",
"SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ())",
"SELECT * FROM t UNPIVOT ((val1, val2) FOR col in (()))"
).foreach { sql =>
withClue(sql) {
intercept(sql, Some("PARSE_SYNTAX_ERROR"), "Syntax error at or near ")
}
}
}

test("unpivot - exclude nulls") {
assertEqual(
"SELECT * FROM t UNPIVOT EXCLUDE NULLS (val FOR col in (a, b))",
Unpivot(
None,
Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))),
None,
"col",
Seq("val"),
table("t"))
.where(coalesce($"val").isNotNull)
.select(star())
)
}

test("unpivot - include nulls") {
assertEqual(
"SELECT * FROM t UNPIVOT INCLUDE NULLS (val FOR col in (a, b))",
Unpivot(
None,
Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))),
None,
"col",
Seq("val"),
table("t"))
.select(star())
)
}

}
124 changes: 124 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,130 @@ class DatasetUnpivotSuite extends QueryTest
"val"),
longStructDataRows)
}

test("unpivot sql with struct fields") {
// accessing struct fields in FROM clause works
checkAnswer(
spark.sql("""SELECT * FROM (
| SELECT course, `the.earnings`.* FROM (
| SELECT col1 AS course,
| struct(col2 AS `2012`, col3 AS `2013`, col4 AS `2014`) `the.earnings`
| FROM VALUES ("dotNET", 15000, 48000.0, 22500L)
| )
|)
|UNPIVOT (
| earningsYear FOR year IN (`2012`, `2013`, `2014`)
|);
|""".stripMargin),
Seq(
("dotNET", "2012", 15000),
("dotNET", "2013", 48000),
("dotNET", "2014", 22500)
).toDF("course", "year", "earningsYear")
)

checkAnswer(
spark.sql("""SELECT * FROM (
| SELECT course, `the.earnings`.* FROM (
| SELECT col1 AS course,
| named_struct(
| 'earnings2012', col2, 'earnings2013', col4, 'earnings2014', col6,
| 'sales2012', col3, 'sales2013', col5, 'sales2014', col7
| ) AS `the.earnings`
| FROM VALUES ("dotNET", 15000, NULL, 48000.0, 1, 22500L, 1)
| )
|)
|UNPIVOT (
| (earnings, sales) FOR year IN (
| (`earnings2012`, `sales2012`) `2012`,
| (`earnings2013`, `sales2013`) `2013`,
| (`earnings2014`, `sales2014`) `2014`
| )
|);
|""".stripMargin),
Seq(
("dotNET", "2012", 15000, null),
("dotNET", "2013", 48000, Some(1)),
("dotNET", "2014", 22500, Some(1))
).toDF("course", "year", "earnings", "sales")
)

// accessing struct fields as unpivot columns does not work
val e = intercept[AnalysisException] {
spark.sql("""SELECT * FROM (
| SELECT col1 AS course,
| struct(col2 AS `2012`, col3 AS `2013`, col4 AS `2014`) AS `the.earnings`
| FROM VALUES ("dotNET", 15000, 48000, 22500)
|)
|UNPIVOT (
| earningsYear FOR year IN (`the.earnings`.`2012`,
| `the.earnings`.`2013`,
| `the.earnings`.`2014`)
|);
|""".stripMargin)
}
checkError(
exception = e,
errorClass = "UNPIVOT_REQUIRES_ATTRIBUTES",
parameters = Map(
"given" -> "value",
"empty" -> "id",
"expressions" -> (
"\"the.earnings.2012 AS `2012`\", " +
"\"the.earnings.2013 AS `2013`\", " +
"\"the.earnings.2014 AS `2014`\"")
))

val e2 = intercept[AnalysisException] {
spark.sql("""SELECT * FROM (
| SELECT col1 AS course,
| named_struct('2012', col2, '2013', col4, '2014', col6) `the.earnings`,
| named_struct('2012', col3, '2013', col5, '2014', col7) `the.sales`
| FROM VALUES ("dotNET", 15000, NULL, 48000, 1, 22500, 1)
|)
|UNPIVOT (
| (earnings, sales) FOR year IN (
| (`the.earnings`.`2012`, `the.sales`.`2012`) `2012`,
| (`the.earnings`.`2013`, `the.sales`.`2013`) `2013`,
| (`the.earnings`.`2014`, `the.sales`.`2014`) `2014`
| )
|);
|""".stripMargin)
}
checkError(
exception = e2,
errorClass = "UNPIVOT_REQUIRES_ATTRIBUTES",
parameters = Map(
"given" -> "value",
"empty" -> "id",
"expressions" -> (
"\"the.earnings.2012 AS `2012`\", " +
"\"the.sales.2012 AS `2012`\", " +
"\"the.earnings.2013 AS `2013`\", " +
"\"the.sales.2013 AS `2013`\", " +
"\"the.earnings.2014 AS `2014`\", " +
"\"the.sales.2014 AS `2014`\"")
))
}

test("unpivot sql with unpivot value number mismatch") {
Seq("col1", "col1, col2, col3").foreach { columns =>
withClue(columns) {
val e = intercept[AnalysisException] {
spark.sql(s"""SELECT * FROM VALUES (1, 2, 3)
|UNPIVOT (
| (val1, val2) FOR col IN (($columns))
|);
|""".stripMargin)
}
checkError(
exception = e,
errorClass = "UNPIVOT_VALUE_SIZE_MISMATCH",
parameters = Map("names" -> "2"))
}
}
}

}

case class WideData(id: Int, str1: String, str2: String, int1: Option[Int], long1: Option[Long])

0 comments on commit ca85f86

Please sign in to comment.