Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12719][SQL] SQL generation support for generators, including UDTF #11563

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ case class MultiAlias(child: Expression, names: Seq[String])

override def toString: String = s"$child AS $names"

override def sql: String = {
val aliasNames = names.map(quoteIdentifier(_)).mkString(",")
s"${child.sql} AS ($aliasNames)"
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,10 +999,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}

val attributes = clauses.collect {
case Token(a, Nil) => UnresolvedAttribute(a.toLowerCase)
case Token(a, Nil) => UnresolvedAttribute(cleanIdentifier(a.toLowerCase))
}

Generate(generator, join = true, outer = outer, Some(alias.toLowerCase), attributes, child)
Generate(
generator,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thanks !! In the doc, 4-space indentation is applicable for function declaration. Is it also applicable here ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, you are right. This is part of #11538
@liancheng already reviewed it. Thanks!

join = true,
outer = outer,
Some(cleanIdentifier(alias.toLowerCase)),
attributes,
child)
}

protected def nodeToGenerator(node: ASTNode): Generator = noParseRule("Generator", node)
Expand Down
59 changes: 55 additions & 4 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiAlias
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -42,7 +43,7 @@ case class SubqueryHolder(query: String) extends LeafExpression with Unevaluable
}

/**
* A builder class used to convert a resolved logical plan into a SQL query string. Note that this
* A builder class used to convert a resolved logical plan into a SQL query string. Note that not
* all resolved logical plan are convertible. They either don't have corresponding SQL
* representations (e.g. logical plans that operate on local Scala collections), or are simply not
* supported by this builder (yet).
Expand Down Expand Up @@ -94,6 +95,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case Distinct(p: Project) =>
projectToSQL(p, isDistinct = true)

case g : Generate =>
generateToSQL(g)

case p: Project =>
projectToSQL(p, isDistinct = false)

Expand Down Expand Up @@ -205,15 +209,36 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
segments.map(_.trim).filter(_.nonEmpty).mkString(" ")

private def projectToSQL(plan: Project, isDistinct: Boolean): String = {
val (projectListExprs, planToProcess) = getProjectListExprs(plan)
build(
"SELECT",
if (isDistinct) "DISTINCT" else "",
plan.projectList.map(_.sql).mkString(", "),
if (plan.child == OneRowRelation) "" else "FROM",
toSQL(plan.child)
projectListExprs.map(_.sql).mkString(", "),
if (planToProcess == OneRowRelation) "" else "FROM",
toSQL(planToProcess)
)
}

private def getProjectListExprs(plan: Project): (Seq[NamedExpression], LogicalPlan) = {
plan match {
case p @ Project(_, g: Generate) if g.qualifier.isEmpty =>
// Only keep the first generated column in the list so that we can
// transform it to a Generator expression in the following step.
val projList = p.projectList.filter {
case e: Expression if g.generatorOutput.tail.exists(_.semanticEquals(e)) => false
case _ => true
}
val exprs = projList.map {
case e: Expression if g.generatorOutput.exists(_.semanticEquals(e)) =>
val names = g.generatorOutput.map(_.name)
MultiAlias(g.generator, names)
case other => other
}
(exprs, g.child)
case _ => (plan.projectList, plan.child)
}
}

private def aggregateToSQL(plan: Aggregate): String = {
val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ")
build(
Expand Down Expand Up @@ -297,6 +322,31 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

/* This function handles the SQL generation when generators are specified in the
* LATERAL VIEW clause. SQL generation of generators specified in projection lists
* is handled in projectToSQL.
* sample plan :
* +- Project [mycol2#192]
* +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192]
* +- Generate explode(array(array(1, 2, 3))), true, false, Some(mytable), [mycol#191]
* +- MetastoreRelation default, src, None
*
*/
private def generateToSQL(plan: Generate): String = {
val columnAliases = plan.generatorOutput.map(a => quoteIdentifier(a.name)).mkString(",")
val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get
val outerClause = if (plan.outer) "OUTER" else ""
build(
toSQL(plan.child),
"LATERAL VIEW",
outerClause,
plan.generator.sql,
quoteIdentifier(generatorAlias),
"AS",
columnAliases
)
}

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Canonicalizer", FixedPoint(100),
Expand Down Expand Up @@ -329,6 +379,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi

case plan @ Project(_,
_: SubqueryAlias
| _: Generate
| _: Filter
| _: Join
| _: MetastoreRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,13 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
|USING 'cat' AS (`thing1` int, `thing2` string) FROM `default`.`parquet_t1`) AS t
""".stripMargin)
}

test("use backticks in output of Generator") {
val plan = parser.parsePlan(
"""SELECT `gentab2`.`gencol2`
|FROM `default`.`src`
|LATERAL VIEW explode(array(array(1, 2, 3))) `gentab1` AS `gencol1`
|LATERAL VIEW explode(`gentab1`.`gencol1`) `gentab2` AS `gencol2`
""".stripMargin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS parquet_t3")
sql("DROP TABLE IF EXISTS t0")
sql("DROP TABLE IF EXISTS t1")

val tuples: Seq[(String, String)] =
("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
("3", """{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") ::
("4", null) ::
("5", """{"f1": "", "f5": null}""") ::
("6", "[invalid JSON string]") ::
Nil

sqlContext.range(10).write.saveAsTable("parquet_t0")
sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0")
Expand All @@ -45,13 +56,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
.select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write
.saveAsTable("parquet_t2")

tuples.toDF("key", "jstring").write.saveAsTable("parquet_t3")
sql("CREATE TABLE t1 as select key, array(value) as value from parquet_t1 limit 20")
}

override protected def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS parquet_t3")
sql("DROP TABLE IF EXISTS t0")
sql("DROP TABLE IF EXISTS t1")
}

private def checkHiveQl(hiveQl: String): Unit = {
Expand Down Expand Up @@ -445,4 +461,91 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
"f1", "b[0].f1", "f1", "c[foo]", "d[0]"
)
}

test("SQL generator for explode in projection list") {
// Basic Explode
checkHiveQl("SELECT explode(array(1,2,3)) FROM src")

// Explode with Alias
checkHiveQl("SELECT explode(array(1,2,3)) as value FROM src")

// Explode without FROM
checkHiveQl("select explode(array(1,2,3)) AS gencol")

// non-generated columns in projection list
checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t1")
}

test("SQL generation for json_tuple as generator") {
checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM parquet_t3")
}

test("SQL generation for lateral views") {
// Filter and OUTER clause
checkHiveQl(
"""SELECT key, value
|FROM t1 LATERAL VIEW OUTER explode(value) gentab as gencol
|WHERE key = 1
""".stripMargin
)

// single lateral view
checkHiveQl(
"""SELECT *
|FROM t1 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol
|SORT BY key ASC, gencol ASC LIMIT 1
""".stripMargin
)

// multiple lateral views
checkHiveQl(
"""SELECT gentab2.*
|FROM t1
|LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1
|LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3
""".stripMargin
)

// No generated column aliases
checkHiveQl(
"""SELECT gentab.*
|FROM
|t1 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2
""".stripMargin
)
}

test("SQL generation for lateral views in subquery") {
// Subquries in FROM clause using Generate
checkHiveQl(
"""SELECT subq.gencol
|FROM
|(SELECT * from t1 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin)

checkHiveQl(
"""SELECT subq.key
|FROM
|(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin
)
}

test("SQL generation for UDTF") {
sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")

// The function source code can be found at:
// https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
sql(
"""
|CREATE TEMPORARY FUNCTION udtf_count2
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
""".stripMargin)

checkHiveQl("SELECT key, gencol FROM t1 LATERAL VIEW udtf_count2(value) gentab AS gencol")

checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t1 LIMIT 3) g1")

sql("DROP TEMPORARY FUNCTION udtf_count2")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkAnswer(
sql("SELECT ints FROM nestedArray LATERAL VIEW explode(a.b) a AS ints"),
Row(1) :: Row(2) :: Row(3) :: Nil)

checkAnswer(
sql("SELECT `ints` FROM nestedArray LATERAL VIEW explode(a.b) `a` AS `ints`"),
Row(1) :: Row(2) :: Row(3) :: Nil)

checkAnswer(
sql("SELECT `a`.`ints` FROM nestedArray LATERAL VIEW explode(a.b) `a` AS `ints`"),
Row(1) :: Row(2) :: Row(3) :: Nil)
}

test("SPARK-4512 Fix attribute reference resolution error when using SORT BY") {
Expand Down