Skip to content

Commit

Permalink
[SPARK-22488][SQL] Fix the view resolution issue in the SparkSession …
Browse files Browse the repository at this point in the history
…internal table() API

The current internal `table()` API of `SparkSession` bypasses the Analyzer and directly calls `sessionState.catalog.lookupRelation` API. This skips the view resolution logics in our Analyzer rule `ResolveRelations`. This internal API is widely used by various DDL commands, public and internal APIs.

Users might get the strange error caused by view resolution when the default database is different.
```
Table or view not found: t1; line 1 pos 14
org.apache.spark.sql.AnalysisException: Table or view not found: t1; line 1 pos 14
	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
```

This PR is to fix it by enforcing it to use `ResolveRelations` to resolve the table.

Added a test case and modified the existing test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19713 from gatorsmile/viewResolution.
  • Loading branch information
gatorsmile committed Nov 11, 2017
1 parent 4ef0bef commit e8f6dbb
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 18 deletions.
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ test_that("test cache, uncache and clearCache", {
expect_true(dropTempView("table1"))

expect_error(uncacheTable("foo"),
"Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'")
"Error in uncacheTable : analysis error - Table or view not found: foo")
})

test_that("insertInto() on a registered table", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
Expand Down Expand Up @@ -614,7 +615,7 @@ class SparkSession private(
}

private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
Dataset.ofRows(self, UnresolvedRelation(tableIdent))
}

/* ----------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ case class UncacheTableCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = tableIdent.quotedString
try {
if (!ifExists || sparkSession.catalog.tableExists(tableId)) {
sparkSession.catalog.uncacheTable(tableId)
} catch {
case _: NoSuchTableException if ifExists => // don't throw
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,36 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
private var globalTempDB: String = _

test("basic semantic") {
val expectedErrorMsg = "not found"
try {
sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")

// If there is no database in table name, we should try local temp view first, if not found,
// try table/view in current database, which is "default" in this case. So we expect
// NoSuchTableException here.
intercept[NoSuchTableException](spark.table("src"))
var e = intercept[AnalysisException](spark.table("src")).getMessage
assert(e.contains(expectedErrorMsg))

// Use qualified name to refer to the global temp view explicitly.
checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))

// Table name without database will never refer to a global temp view.
intercept[NoSuchTableException](sql("DROP VIEW src"))
e = intercept[AnalysisException](sql("DROP VIEW src")).getMessage
assert(e.contains(expectedErrorMsg))

sql(s"DROP VIEW $globalTempDB.src")
// The global temp view should be dropped successfully.
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
assert(e.contains(expectedErrorMsg))

// We can also use Dataset API to create global temp view
Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))

// Use qualified name to rename a global temp view.
sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
assert(e.contains(expectedErrorMsg))
checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))

// Use qualified name to alter a global temp view.
Expand All @@ -68,7 +73,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {

// We can also use Catalog API to drop global temp view
spark.catalog.dropGlobalTempView("src2")
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src2")).getMessage
assert(e.contains(expectedErrorMsg))

// We can also use Dataset API to replace global temp view
Seq(2 -> "b").toDF("i", "j").createOrReplaceGlobalTempView("src")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,19 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
assert(spark.table("v").schema.head.name == "cBa")
}
}

test("sparkSession API view resolution with different default database") {
withDatabase("db2") {
withView("v1") {
withTable("t1") {
sql("USE default")
sql("CREATE TABLE t1 USING parquet AS SELECT 1 AS c0")
sql("CREATE VIEW v1 AS SELECT * FROM t1")
sql("CREATE DATABASE IF NOT EXISTS db2")
sql("USE db2")
checkAnswer(spark.table("default.v1"), Row(1))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
spark.range(10).createOrReplaceTempView("tab1")
sql("ALTER TABLE tab1 RENAME TO tab2")
checkAnswer(spark.table("tab2"), spark.range(10).toDF())
intercept[NoSuchTableException] { spark.table("tab1") }
val e = intercept[AnalysisException](spark.table("tab1")).getMessage
assert(e.contains("Table or view not found"))
sql("ALTER VIEW tab2 RENAME TO tab1")
checkAnswer(spark.table("tab1"), spark.range(10).toDF())
intercept[NoSuchTableException] { spark.table("tab2") }
intercept[AnalysisException] { spark.table("tab2") }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
assert(ColumnsRequired.set === requiredColumnNames)

val table = spark.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
val relation = table.queryExecution.analyzed.collectFirst {
case LogicalRelation(r, _, _) => r
}.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}

test("uncache of nonexistant tables") {
val expectedErrorMsg = "Table or view not found: nonexistantTable"
// make sure table doesn't exist
intercept[NoSuchTableException](spark.table("nonexistantTable"))
intercept[NoSuchTableException] {
var e = intercept[AnalysisException](spark.table("nonexistantTable")).getMessage
assert(e.contains(expectedErrorMsg))
e = intercept[AnalysisException] {
spark.catalog.uncacheTable("nonexistantTable")
}
intercept[NoSuchTableException] {
}.getMessage
assert(e.contains(expectedErrorMsg))
e = intercept[AnalysisException] {
sql("UNCACHE TABLE nonexistantTable")
}
}.getMessage
assert(e.contains(expectedErrorMsg))
sql("UNCACHE TABLE IF EXISTS nonexistantTable")
}

Expand Down

0 comments on commit e8f6dbb

Please sign in to comment.