From ca85f86613378e36e62310b5d1e327fcc3de0aa2 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 13 Oct 2022 22:07:27 +0800 Subject: [PATCH] [SPARK-39876][FOLLOW-UP][SQL] Add parser and Dataset tests for SQL UNPIVOT ### What changes were proposed in this pull request? Adds more tests for the SQL `UNPIVOT` clause. https://github.com/apache/spark/pull/37407#discussion_r988768918 ### Why are the changes needed? Better test coverage. ### Does this PR introduce _any_ user-facing change? No, only more tests and fixing one issue. SQL `UNPIVOT` has not been released yet. ### How was this patch tested? In `UnpivotParserSuite` and `DatasetUnpivotSuite`. Closes #38153 from EnricoMi/branch-sql-unpivot-tests. Authored-by: Enrico Minack Signed-off-by: Wenchen Fan --- .../sql/catalyst/parser/AstBuilder.scala | 6 +- .../plans/logical/basicLogicalOperators.scala | 5 +- .../catalyst/parser/UnpivotParserSuite.scala | 195 ++++++++++++++++++ .../spark/sql/DatasetUnpivotSuite.scala | 124 +++++++++++ 4 files changed, 327 insertions(+), 3 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/UnpivotParserSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d17c839be115a..01ba83d3f8448 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1137,7 +1137,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit Unpivot( None, Some(unpivotColumns.map(Seq(_))), - Some(unpivotAliases), + // None when all elements are None + Some(unpivotAliases).filter(_.exists(_.isDefined)), variableColumnName, valueColumnNames, query @@ -1151,7 +1152,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit Unpivot( None, Some(unpivotColumns), - Some(unpivotAliases), + // None when all elements are None + Some(unpivotAliases).filter(_.exists(_.isDefined)), variableColumnName, valueColumnNames, query diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4ba869b73204e..793fecd5a5b8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1471,7 +1471,10 @@ case class Unpivot( copy(child = newChild) def canBeCoercioned: Boolean = values.exists(_.nonEmpty) && - values.exists(_.forall(_.forall(_.resolved))) + values.exists(_.forall(_.forall(_.resolved))) && + // when no ids are given, values must be Attributes (column names) to allow detecting ids + // coercion will add aliases, would disallow detecting ids, so defer coercion after id detection + ids.exists(_.forall(_.resolved)) def valuesTypeCoercioned: Boolean = canBeCoercioned && // all inner values at position idx must have the same data type diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/UnpivotParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/UnpivotParserSuite.scala new file mode 100644 index 0000000000000..dd7e4ec4916fc --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/UnpivotParserSuite.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser + +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Unpivot} + +class UnpivotParserSuite extends AnalysisTest { + + import CatalystSqlParser._ + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.dsl.plans._ + + private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = { + comparePlans(parsePlan(sqlCommand), plan, checkAnalysis = false) + } + + private def intercept(sqlCommand: String, errorClass: Option[String], messages: String*): Unit = + interceptParseException(parsePlan)(sqlCommand, messages: _*)(errorClass) + + test("unpivot - single value") { + assertEqual( + "SELECT * FROM t UNPIVOT (val FOR col in (a, b))", + Unpivot( + None, + Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))), + None, + "col", + Seq("val"), + table("t")) + .where(coalesce($"val").isNotNull) + .select(star()) + ) + } + + test("unpivot - single value with alias") { + Seq( + "SELECT * FROM t UNPIVOT (val FOR col in (a A, b))", + "SELECT * FROM t UNPIVOT (val FOR col in (a AS A, b))" + ).foreach { sql => + withClue(sql) { + assertEqual( + sql, + Unpivot( + None, + Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))), + Some(Seq(Some("A"), None)), + "col", + Seq("val"), + table("t")) + .where(coalesce($"val").isNotNull) + .select(star()) + ) + } + } + } + + test("unpivot - multiple values") { + assertEqual( + "SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a, b), (c, d)))", + Unpivot( + None, + Some(Seq(Seq($"a", $"b").map(UnresolvedAlias(_)), Seq($"c", $"d").map(UnresolvedAlias(_)))), + None, + "col", + Seq("val1", "val2"), + table("t")) + .where(coalesce($"val1", $"val2").isNotNull) + .select(star()) + ) + } + + test("unpivot - multiple values with alias") { + Seq( + "SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a, b) first, (c, d)))", + "SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a, b) AS first, (c, d)))" + ).foreach { sql => + withClue(sql) { + assertEqual( + sql, + Unpivot( + None, + Some(Seq( + Seq($"a", $"b").map(UnresolvedAlias(_)), + Seq($"c", $"d").map(UnresolvedAlias(_)) + )), + Some(Seq(Some("first"), None)), + "col", + Seq("val1", "val2"), + table("t")) + .where(coalesce($"val1", $"val2").isNotNull) + .select(star()) + ) + } + } + } + + test("unpivot - multiple values with inner alias") { + Seq( + "SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a A, b), (c, d)))", + "SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ((a AS A, b), (c, d)))" + ).foreach { sql => + withClue(sql) { + intercept(sql, Some("PARSE_SYNTAX_ERROR"), "Syntax error at or near ") + } + } + } + + test("unpivot - alias") { + Seq( + "SELECT up.* FROM t UNPIVOT (val FOR col in (a, b)) up", + "SELECT up.* FROM t UNPIVOT (val FOR col in (a, b)) AS up" + ).foreach { sql => + withClue(sql) { + assertEqual( + sql, + Unpivot( + None, + Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))), + None, + "col", + Seq("val"), + table("t")) + .where(coalesce($"val").isNotNull) + .subquery("up") + .select(star("up")) + ) + } + } + } + + test("unpivot - no unpivot value names") { + intercept( + "SELECT * FROM t UNPIVOT (() FOR col in ((a, b), (c, d)))", + Some("PARSE_SYNTAX_ERROR"), "Syntax error at or near " + ) + } + + test("unpivot - no unpivot columns") { + Seq( + "SELECT * FROM t UNPIVOT (val FOR col in ())", + "SELECT * FROM t UNPIVOT ((val1, val2) FOR col in ())", + "SELECT * FROM t UNPIVOT ((val1, val2) FOR col in (()))" + ).foreach { sql => + withClue(sql) { + intercept(sql, Some("PARSE_SYNTAX_ERROR"), "Syntax error at or near ") + } + } + } + + test("unpivot - exclude nulls") { + assertEqual( + "SELECT * FROM t UNPIVOT EXCLUDE NULLS (val FOR col in (a, b))", + Unpivot( + None, + Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))), + None, + "col", + Seq("val"), + table("t")) + .where(coalesce($"val").isNotNull) + .select(star()) + ) + } + + test("unpivot - include nulls") { + assertEqual( + "SELECT * FROM t UNPIVOT INCLUDE NULLS (val FOR col in (a, b))", + Unpivot( + None, + Some(Seq(Seq(UnresolvedAlias($"a")), Seq(UnresolvedAlias($"b")))), + None, + "col", + Seq("val"), + table("t")) + .select(star()) + ) + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala index eac5a20a7e5d0..f2f31851acbac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala @@ -539,6 +539,130 @@ class DatasetUnpivotSuite extends QueryTest "val"), longStructDataRows) } + + test("unpivot sql with struct fields") { + // accessing struct fields in FROM clause works + checkAnswer( + spark.sql("""SELECT * FROM ( + | SELECT course, `the.earnings`.* FROM ( + | SELECT col1 AS course, + | struct(col2 AS `2012`, col3 AS `2013`, col4 AS `2014`) `the.earnings` + | FROM VALUES ("dotNET", 15000, 48000.0, 22500L) + | ) + |) + |UNPIVOT ( + | earningsYear FOR year IN (`2012`, `2013`, `2014`) + |); + |""".stripMargin), + Seq( + ("dotNET", "2012", 15000), + ("dotNET", "2013", 48000), + ("dotNET", "2014", 22500) + ).toDF("course", "year", "earningsYear") + ) + + checkAnswer( + spark.sql("""SELECT * FROM ( + | SELECT course, `the.earnings`.* FROM ( + | SELECT col1 AS course, + | named_struct( + | 'earnings2012', col2, 'earnings2013', col4, 'earnings2014', col6, + | 'sales2012', col3, 'sales2013', col5, 'sales2014', col7 + | ) AS `the.earnings` + | FROM VALUES ("dotNET", 15000, NULL, 48000.0, 1, 22500L, 1) + | ) + |) + |UNPIVOT ( + | (earnings, sales) FOR year IN ( + | (`earnings2012`, `sales2012`) `2012`, + | (`earnings2013`, `sales2013`) `2013`, + | (`earnings2014`, `sales2014`) `2014` + | ) + |); + |""".stripMargin), + Seq( + ("dotNET", "2012", 15000, null), + ("dotNET", "2013", 48000, Some(1)), + ("dotNET", "2014", 22500, Some(1)) + ).toDF("course", "year", "earnings", "sales") + ) + + // accessing struct fields as unpivot columns does not work + val e = intercept[AnalysisException] { + spark.sql("""SELECT * FROM ( + | SELECT col1 AS course, + | struct(col2 AS `2012`, col3 AS `2013`, col4 AS `2014`) AS `the.earnings` + | FROM VALUES ("dotNET", 15000, 48000, 22500) + |) + |UNPIVOT ( + | earningsYear FOR year IN (`the.earnings`.`2012`, + | `the.earnings`.`2013`, + | `the.earnings`.`2014`) + |); + |""".stripMargin) + } + checkError( + exception = e, + errorClass = "UNPIVOT_REQUIRES_ATTRIBUTES", + parameters = Map( + "given" -> "value", + "empty" -> "id", + "expressions" -> ( + "\"the.earnings.2012 AS `2012`\", " + + "\"the.earnings.2013 AS `2013`\", " + + "\"the.earnings.2014 AS `2014`\"") + )) + + val e2 = intercept[AnalysisException] { + spark.sql("""SELECT * FROM ( + | SELECT col1 AS course, + | named_struct('2012', col2, '2013', col4, '2014', col6) `the.earnings`, + | named_struct('2012', col3, '2013', col5, '2014', col7) `the.sales` + | FROM VALUES ("dotNET", 15000, NULL, 48000, 1, 22500, 1) + |) + |UNPIVOT ( + | (earnings, sales) FOR year IN ( + | (`the.earnings`.`2012`, `the.sales`.`2012`) `2012`, + | (`the.earnings`.`2013`, `the.sales`.`2013`) `2013`, + | (`the.earnings`.`2014`, `the.sales`.`2014`) `2014` + | ) + |); + |""".stripMargin) + } + checkError( + exception = e2, + errorClass = "UNPIVOT_REQUIRES_ATTRIBUTES", + parameters = Map( + "given" -> "value", + "empty" -> "id", + "expressions" -> ( + "\"the.earnings.2012 AS `2012`\", " + + "\"the.sales.2012 AS `2012`\", " + + "\"the.earnings.2013 AS `2013`\", " + + "\"the.sales.2013 AS `2013`\", " + + "\"the.earnings.2014 AS `2014`\", " + + "\"the.sales.2014 AS `2014`\"") + )) + } + + test("unpivot sql with unpivot value number mismatch") { + Seq("col1", "col1, col2, col3").foreach { columns => + withClue(columns) { + val e = intercept[AnalysisException] { + spark.sql(s"""SELECT * FROM VALUES (1, 2, 3) + |UNPIVOT ( + | (val1, val2) FOR col IN (($columns)) + |); + |""".stripMargin) + } + checkError( + exception = e, + errorClass = "UNPIVOT_VALUE_SIZE_MISMATCH", + parameters = Map("names" -> "2")) + } + } + } + } case class WideData(id: Int, str1: String, str2: String, int1: Option[Int], long1: Option[Long])