From e8f6dbb12aa026ffc4c00d2dd9553c3a0c3b10a7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 11 Nov 2017 18:20:11 +0100 Subject: [PATCH] [SPARK-22488][SQL] Fix the view resolution issue in the SparkSession internal table() API The current internal `table()` API of `SparkSession` bypasses the Analyzer and directly calls `sessionState.catalog.lookupRelation` API. This skips the view resolution logics in our Analyzer rule `ResolveRelations`. This internal API is widely used by various DDL commands, public and internal APIs. Users might get the strange error caused by view resolution when the default database is different. ``` Table or view not found: t1; line 1 pos 14 org.apache.spark.sql.AnalysisException: Table or view not found: t1; line 1 pos 14 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) ``` This PR is to fix it by enforcing it to use `ResolveRelations` to resolve the table. Added a test case and modified the existing test cases Author: gatorsmile Closes #19713 from gatorsmile/viewResolution. --- R/pkg/tests/fulltests/test_sparkSQL.R | 2 +- .../org/apache/spark/sql/SparkSession.scala | 3 ++- .../spark/sql/execution/command/cache.scala | 4 +--- .../sql/execution/GlobalTempViewSuite.scala | 16 +++++++++++----- .../spark/sql/execution/SQLViewSuite.scala | 15 +++++++++++++++ .../spark/sql/execution/command/DDLSuite.scala | 5 +++-- .../spark/sql/sources/FilteredScanSuite.scala | 2 +- .../apache/spark/sql/hive/CachedTableSuite.scala | 14 +++++++++----- 8 files changed, 43 insertions(+), 18 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 50c60fe331078..f774554e5b2b1 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -713,7 +713,7 @@ test_that("test cache, uncache and clearCache", { expect_true(dropTempView("table1")) expect_error(uncacheTable("foo"), - "Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'") + "Error in uncacheTable : analysis error - Table or view not found: foo") }) test_that("insertInto() on a registered table", { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index cce8a1ceaf726..96882c62c2d67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog import org.apache.spark.sql.catalyst._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} @@ -614,7 +615,7 @@ class SparkSession private( } private[sql] def table(tableIdent: TableIdentifier): DataFrame = { - Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent)) + Dataset.ofRows(self, UnresolvedRelation(tableIdent)) } /* ----------------- * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 336f14dd97aea..cfcf3ac6f77f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -56,10 +56,8 @@ case class UncacheTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val tableId = tableIdent.quotedString - try { + if (!ifExists || sparkSession.catalog.tableExists(tableId)) { sparkSession.catalog.uncacheTable(tableId) - } catch { - case _: NoSuchTableException if ifExists => // don't throw } Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index a3d75b221ec3e..cc943e0356f2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -35,23 +35,27 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { private var globalTempDB: String = _ test("basic semantic") { + val expectedErrorMsg = "not found" try { sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'") // If there is no database in table name, we should try local temp view first, if not found, // try table/view in current database, which is "default" in this case. So we expect // NoSuchTableException here. - intercept[NoSuchTableException](spark.table("src")) + var e = intercept[AnalysisException](spark.table("src")).getMessage + assert(e.contains(expectedErrorMsg)) // Use qualified name to refer to the global temp view explicitly. checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) // Table name without database will never refer to a global temp view. - intercept[NoSuchTableException](sql("DROP VIEW src")) + e = intercept[AnalysisException](sql("DROP VIEW src")).getMessage + assert(e.contains(expectedErrorMsg)) sql(s"DROP VIEW $globalTempDB.src") // The global temp view should be dropped successfully. - intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) + e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage + assert(e.contains(expectedErrorMsg)) // We can also use Dataset API to create global temp view Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src") @@ -59,7 +63,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { // Use qualified name to rename a global temp view. sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2") - intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) + e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage + assert(e.contains(expectedErrorMsg)) checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a")) // Use qualified name to alter a global temp view. @@ -68,7 +73,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { // We can also use Catalog API to drop global temp view spark.catalog.dropGlobalTempView("src2") - intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2")) + e = intercept[AnalysisException](spark.table(s"$globalTempDB.src2")).getMessage + assert(e.contains(expectedErrorMsg)) // We can also use Dataset API to replace global temp view Seq(2 -> "b").toDF("i", "j").createOrReplaceGlobalTempView("src") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 6761f05bb462a..08a4a21b20f61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -679,4 +679,19 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { assert(spark.table("v").schema.head.name == "cBa") } } + + test("sparkSession API view resolution with different default database") { + withDatabase("db2") { + withView("v1") { + withTable("t1") { + sql("USE default") + sql("CREATE TABLE t1 USING parquet AS SELECT 1 AS c0") + sql("CREATE VIEW v1 AS SELECT * FROM t1") + sql("CREATE DATABASE IF NOT EXISTS db2") + sql("USE db2") + checkAnswer(spark.table("default.v1"), Row(1)) + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5109c649f4318..a3abb7dfa6879 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -800,10 +800,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { spark.range(10).createOrReplaceTempView("tab1") sql("ALTER TABLE tab1 RENAME TO tab2") checkAnswer(spark.table("tab2"), spark.range(10).toDF()) - intercept[NoSuchTableException] { spark.table("tab1") } + val e = intercept[AnalysisException](spark.table("tab1")).getMessage + assert(e.contains("Table or view not found")) sql("ALTER VIEW tab2 RENAME TO tab1") checkAnswer(spark.table("tab1"), spark.range(10).toDF()) - intercept[NoSuchTableException] { spark.table("tab2") } + intercept[AnalysisException] { spark.table("tab2") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 5a0388ec1d1db..c902b0afcce6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -326,7 +326,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic assert(ColumnsRequired.set === requiredColumnNames) val table = spark.table("oneToTenFiltered") - val relation = table.queryExecution.logical.collectFirst { + val relation = table.queryExecution.analyzed.collectFirst { case LogicalRelation(r, _, _) => r }.get diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index d3cbf898e2439..48ab4eb9a6178 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -102,14 +102,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } test("uncache of nonexistant tables") { + val expectedErrorMsg = "Table or view not found: nonexistantTable" // make sure table doesn't exist - intercept[NoSuchTableException](spark.table("nonexistantTable")) - intercept[NoSuchTableException] { + var e = intercept[AnalysisException](spark.table("nonexistantTable")).getMessage + assert(e.contains(expectedErrorMsg)) + e = intercept[AnalysisException] { spark.catalog.uncacheTable("nonexistantTable") - } - intercept[NoSuchTableException] { + }.getMessage + assert(e.contains(expectedErrorMsg)) + e = intercept[AnalysisException] { sql("UNCACHE TABLE nonexistantTable") - } + }.getMessage + assert(e.contains(expectedErrorMsg)) sql("UNCACHE TABLE IF EXISTS nonexistantTable") }