Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22488] [SQL] Fix the view resolution issue in the SparkSession internal table() API #19713

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ test_that("test cache, uncache and clearCache", {
expect_true(dropTempView("table1"))

expect_error(uncacheTable("foo"),
"Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'")
"Error in uncacheTable : analysis error - Table or view not found: foo")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit:

expect_error(uncacheTable("foo"),
             "Error in uncacheTable : analysis error - Table or view not found: foo")

})

test_that("insertInto() on a registered table", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
Expand Down Expand Up @@ -621,7 +622,7 @@ class SparkSession private(
}

private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
Copy link
Contributor

@cloud-fan cloud-fan Nov 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also called by the public table API, so this bug affects the public interface too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. many public APIs and DDL command processing are based on this internal API.

Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
Dataset.ofRows(self, UnresolvedRelation(tableIdent))
}

/* ----------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ case class UncacheTableCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = tableIdent.quotedString
try {
if (!ifExists || sparkSession.catalog.tableExists(tableId)) {
sparkSession.catalog.uncacheTable(tableId)
} catch {
case _: NoSuchTableException if ifExists => // don't throw
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,36 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
private var globalTempDB: String = _

test("basic semantic") {
val expectedErrorMsg = "not found"
try {
sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")

// If there is no database in table name, we should try local temp view first, if not found,
// try table/view in current database, which is "default" in this case. So we expect
// NoSuchTableException here.
intercept[NoSuchTableException](spark.table("src"))
var e = intercept[AnalysisException](spark.table("src")).getMessage
assert(e.contains(expectedErrorMsg))

// Use qualified name to refer to the global temp view explicitly.
checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))

// Table name without database will never refer to a global temp view.
intercept[NoSuchTableException](sql("DROP VIEW src"))
e = intercept[AnalysisException](sql("DROP VIEW src")).getMessage
assert(e.contains(expectedErrorMsg))

sql(s"DROP VIEW $globalTempDB.src")
// The global temp view should be dropped successfully.
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
assert(e.contains(expectedErrorMsg))

// We can also use Dataset API to create global temp view
Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))

// Use qualified name to rename a global temp view.
sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src")).getMessage
assert(e.contains(expectedErrorMsg))
checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))

// Use qualified name to alter a global temp view.
Expand All @@ -68,7 +73,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext {

// We can also use Catalog API to drop global temp view
spark.catalog.dropGlobalTempView("src2")
intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
e = intercept[AnalysisException](spark.table(s"$globalTempDB.src2")).getMessage
assert(e.contains(expectedErrorMsg))

// We can also use Dataset API to replace global temp view
Seq(2 -> "b").toDF("i", "j").createOrReplaceGlobalTempView("src")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,19 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
assert(spark.table("v").schema.head.name == "cBa")
}
}

test("sparkSession API view resolution with different default database") {
withDatabase("db2") {
withView("v1") {
withTable("t1") {
sql("USE default")
sql("CREATE TABLE t1 USING parquet AS SELECT 1 AS c0")
sql("CREATE VIEW v1 AS SELECT * FROM t1")
sql("CREATE DATABASE IF NOT EXISTS db2")
sql("USE db2")
checkAnswer(spark.table("default.v1"), Row(1))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
spark.range(10).createOrReplaceTempView("tab1")
sql("ALTER TABLE tab1 RENAME TO tab2")
checkAnswer(spark.table("tab2"), spark.range(10).toDF())
intercept[NoSuchTableException] { spark.table("tab1") }
val e = intercept[AnalysisException](spark.table("tab1")).getMessage
assert(e.contains("Table or view not found"))
sql("ALTER VIEW tab2 RENAME TO tab1")
checkAnswer(spark.table("tab1"), spark.range(10).toDF())
intercept[NoSuchTableException] { spark.table("tab2") }
intercept[AnalysisException] { spark.table("tab2") }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
assert(ColumnsRequired.set === requiredColumnNames)

val table = spark.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
val relation = table.queryExecution.analyzed.collectFirst {
case LogicalRelation(r, _, _, _) => r
}.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}

test("uncache of nonexistant tables") {
val expectedErrorMsg = "Table or view not found: nonexistantTable"
// make sure table doesn't exist
intercept[NoSuchTableException](spark.table("nonexistantTable"))
intercept[NoSuchTableException] {
var e = intercept[AnalysisException](spark.table("nonexistantTable")).getMessage
assert(e.contains(expectedErrorMsg))
e = intercept[AnalysisException] {
spark.catalog.uncacheTable("nonexistantTable")
}
intercept[NoSuchTableException] {
}.getMessage
assert(e.contains(expectedErrorMsg))
e = intercept[AnalysisException] {
sql("UNCACHE TABLE nonexistantTable")
}
}.getMessage
assert(e.contains(expectedErrorMsg))
sql("UNCACHE TABLE IF EXISTS nonexistantTable")
}

Expand Down