Skip to content

Commit

Permalink
[SPARK-14865][SQL] Better error handling for view creation.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This patch improves error handling in view creation. CreateViewCommand itself will analyze the view SQL query first, and if it cannot successfully analyze it, throw an AnalysisException.

In addition, I also added the following two conservative guards for easier identification of Spark bugs:

1. If there is a bug and the generated view SQL cannot be analyzed, throw an exception at runtime. Note that this is not an AnalysisException because it is not caused by the user and more likely indicate a bug in Spark.
2. SQLBuilder when it gets an unresolved plan, it will also show the plan in the error message.

I also took the chance to simplify the internal implementation of CreateViewCommand, and *removed* a fallback path that would've masked an exception from before.

## How was this patch tested?
1. Added a unit test for the user facing error handling.
2. Manually introduced some bugs in Spark to test the internal defensive error handling.
3. Also added a test case to test nested views (not super relevant).

Author: Reynold Xin <rxin@databricks.com>

Closes #12633 from rxin/SPARK-14865.
  • Loading branch information
rxin authored and yhuai committed Apr 23, 2016
1 parent 890abd1 commit e3c1366
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
* supported by this builder (yet).
*/
class SQLBuilder(logicalPlan: LogicalPlan) extends Logging {
require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans")
require(logicalPlan.resolved,
"SQLBuilder only supports resolved logical query plans. Current plan:\n" + logicalPlan)

def this(df: Dataset[_]) = this(df.queryExecution.analyzed)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ case class CreateViewCommand(
}

override def run(sqlContext: SQLContext): Seq[Row] = {
val analzyedPlan = sqlContext.executePlan(child).analyzed
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sqlContext.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
val sessionState = sqlContext.sessionState

if (sessionState.catalog.tableExists(tableIdentifier)) {
Expand All @@ -74,7 +77,7 @@ case class CreateViewCommand(
// already exists.
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
Expand All @@ -85,68 +88,74 @@ case class CreateViewCommand(
} else {
// Create the view if it doesn't exist.
sessionState.catalog.createTable(
prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false)
}

Seq.empty[Row]
}

private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
val expandedText = if (sqlContext.conf.canonicalView) {
try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String =
if (sqlContext.conf.canonicalView) {
val logicalPlan =
if (tableDesc.schema.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed
}
new SQLBuilder(logicalPlan).toSQL
} else {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
val viewOutput = {
val columnNames = analyzedPlan.output.map(f => quote(f.name))
if (tableDesc.schema.isEmpty) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
}
}

val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.identifier.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}
} else {
wrapViewTextWithSelect(analzyedPlan)

// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
sqlContext.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(
"Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e)
}

val viewSchema = {
val viewSchema: Seq[CatalogColumn] = {
if (tableDesc.schema.isEmpty) {
analzyedPlan.output.map { a =>
analyzedPlan.output.map { a =>
CatalogColumn(a.name, a.dataType.simpleString)
}
} else {
analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
}
}
}

tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
}

private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
val viewOutput = {
val columnNames = analzyedPlan.output.map(f => quote(f.name))
if (tableDesc.schema.isEmpty) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
}
}

val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.identifier.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}

private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
val logicalPlan = if (tableDesc.schema.isEmpty) {
analzyedPlan
} else {
val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
}
new SQLBuilder(logicalPlan).toSQL
tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL))
}

// escape backtick with double-backtick in column name and wrap it with backtick.
/** Escape backtick with double-backtick in column name and wrap it with backtick. */
private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,50 @@ import org.apache.spark.sql.test.SQLTestUtils
class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext.implicits._

override def beforeAll(): Unit = {
// Create a simple table with two columns: id and id1
sqlContext.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt")
}

override def afterAll(): Unit = {
sqlContext.sql(s"DROP TABLE IF EXISTS jt")
}

test("nested views") {
withView("jtv1", "jtv2") {
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
}
}

test("error handling: fail if the view sql itself is invalid") {
// A table that does not exist
intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist1345").collect()
}

// A column that does not exist
intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect()
}
}

test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt") {
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt")
sql(
"""CREATE VIEW IF NOT EXISTS
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
|TBLPROPERTIES ('a' = 'b')
|AS SELECT * FROM jt""".stripMargin)
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
sql("DROP VIEW testView")
}
sql(
"""CREATE VIEW IF NOT EXISTS
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
|TBLPROPERTIES ('a' = 'b')
|AS SELECT * FROM jt""".stripMargin)
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
sql("DROP VIEW testView")
}
}

test("correctly handle CREATE VIEW IF NOT EXISTS") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt", "jt2") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
withTable("jt2") {
sql("CREATE VIEW testView AS SELECT id FROM jt")

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
Expand All @@ -66,8 +90,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt", "jt2") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
withTable("jt2") {
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))

Expand All @@ -90,9 +113,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test(s"$prefix correctly handle ALTER VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt", "jt2") {
withTable("jt2") {
withView("testView") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
Expand All @@ -109,12 +131,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// json table is not hive-compatible, make sure the new flag fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt") {
withView("testView") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
withView("testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
}
}
Expand Down

0 comments on commit e3c1366

Please sign in to comment.