Skip to content

Commit

Permalink
[SPARK-33765][SQL] Migrate UNCACHE TABLE to use UnresolvedRelation to…
Browse files Browse the repository at this point in the history
… resolve identifier

### What changes were proposed in this pull request?

This PR proposes to migrate `UNCACHE TABLE` to use `UnresolvedRelation` to resolve the table/view identifier in Analyzer as discussed https://github.com/apache/spark/pull/30403/files#r532360022.

### Why are the changes needed?

To resolve the table/view in the analyzer.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Updated existing tests

Closes #30743 from imback82/uncache_v2.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
imback82 authored and cloud-fan committed Dec 16, 2020
1 parent ddff94f commit 62be248
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 50 deletions.
Expand Up @@ -875,6 +875,10 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupTempView(ident)
.map(view => c.copy(table = view))
.getOrElse(c)
case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
lookupTempView(ident)
.map(view => c.copy(table = view, isTempView = true))
.getOrElse(c)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1005,6 +1009,11 @@ class Analyzer(override val catalogManager: CatalogManager)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupV2Relation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1098,7 +1107,12 @@ class Analyzer(override val catalogManager: CatalogManager)

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.map(relation => c.copy(table = EliminateSubqueryAliases(relation)))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(relation => c.copy(table = EliminateSubqueryAliases(relation)))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
Expand Down
Expand Up @@ -125,7 +125,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case CacheTable(u: UnresolvedRelation, _, _, _) =>
failAnalysis(s"Table or view not found for `CACHE TABLE`: ${u.multipartIdentifier.quoted}")
failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case UncacheTable(u: UnresolvedRelation, _, _) =>
failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand}
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule

/**
Expand All @@ -31,5 +31,7 @@ object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
NoopCommand("DROP TABLE", u.multipartIdentifier)
case DropView(u: UnresolvedView, ifExists) if ifExists =>
NoopCommand("DROP VIEW", u.multipartIdentifier)
case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists =>
NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
}
}
Expand Up @@ -3632,6 +3632,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
}

/**
* Create an [[UncacheTable]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTable(
UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier)),
ctx.EXISTS != null)
}

/**
* Create a [[TruncateTable]] command.
*
Expand Down
Expand Up @@ -794,3 +794,11 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
isLazy: Boolean,
options: Map[String, String]) extends Command

/**
* The logical plan of the UNCACHE TABLE command.
*/
case class UncacheTable(
table: LogicalPlan,
ifExists: Boolean,
isTempView: Boolean = false) extends Command
Expand Up @@ -2032,6 +2032,16 @@ class DDLParserSuite extends AnalysisTest {
"It is not allowed to add catalog/namespace prefix a.b")
}

test("UNCACHE TABLE") {
comparePlans(
parsePlan("UNCACHE TABLE a.b.c"),
UncacheTable(UnresolvedRelation(Seq("a", "b", "c")), ifExists = false))

comparePlans(
parsePlan("UNCACHE TABLE IF EXISTS a.b.c"),
UncacheTable(UnresolvedRelation(Seq("a", "b", "c")), ifExists = true))
}

test("TRUNCATE table") {
comparePlans(
parsePlan("TRUNCATE TABLE a.b.c"),
Expand Down
Expand Up @@ -192,15 +192,6 @@ class SparkSqlAstBuilder extends AstBuilder {
unquotedPath
}

/**
* Create an [[UncacheTableCommand]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTableCommand(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.EXISTS != null)
}

/**
* Create a [[ClearCacheCommand]] logical plan.
*/
Expand Down
Expand Up @@ -17,32 +17,8 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

case class UncacheTableCommand(
multipartIdentifier: Seq[String],
ifExists: Boolean) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = multipartIdentifier.quoted
table(sparkSession, tableName).foreach { table =>
val cascade = !sparkSession.sessionState.catalog.isTempView(multipartIdentifier)
sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade)
}
Seq.empty[Row]
}

private def table(sparkSession: SparkSession, name: String): Option[DataFrame] = {
try {
Some(sparkSession.table(name))
} catch {
case ex: AnalysisException if ifExists && ex.getMessage.contains("Table or view not found") =>
None
}
}
}

/**
* Clear all cached data from the in-memory cache.
Expand Down
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.SupportsRead
Expand Down Expand Up @@ -283,6 +283,20 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
c.copy(table = readDataSourceTable(tableMeta, options))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _) =>
c.copy(table = DDLUtils.readHiveTable(tableMeta))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
u.copy(table = readDataSourceTable(tableMeta, options))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _) =>
u.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta, options, false)
if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta, options)
Expand Down
Expand Up @@ -87,3 +87,15 @@ case class CacheTableAsSelectExec(
sparkSession.table(tempViewName)
}
}

case class UncacheTableExec(
relation: LogicalPlan,
cascade: Boolean) extends V2CommandExec {
override def run(): Seq[InternalRow] = {
val sparkSession = sqlContext.sparkSession
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, relation, cascade)
Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}
Expand Up @@ -364,6 +364,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case r: CacheTableAsSelect =>
CacheTableAsSelectExec(r.tempViewName, r.plan, r.isLazy, r.options) :: Nil

case r: UncacheTable =>
UncacheTableExec(r.table, cascade = !r.isTempView) :: Nil

case _ => Nil
}
}
Expand Up @@ -339,16 +339,6 @@ class SparkSqlParserSuite extends AnalysisTest {
"LINES TERMINATED BY only supports newline '\\n' right now")
}

test("UNCACHE TABLE") {
assertEqual(
"UNCACHE TABLE a.b.c",
UncacheTableCommand(Seq("a", "b", "c"), ifExists = false))

assertEqual(
"UNCACHE TABLE IF EXISTS a.b.c",
UncacheTableCommand(Seq("a", "b", "c"), ifExists = true))
}

test("CLEAR CACHE") {
assertEqual("CLEAR CACHE", ClearCacheCommand)
}
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -231,6 +231,16 @@ case class RelationConversions(
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)

// Cache table
case c @ CacheTable(relation: HiveTableRelation, _, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
c.copy(table = metastoreCatalog.convert(relation))

// Uncache table
case u @ UncacheTable(relation: HiveTableRelation, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
u.copy(table = metastoreCatalog.convert(relation))
}
}
}
Expand Down
Expand Up @@ -113,7 +113,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
e = intercept[AnalysisException] {
sql("UNCACHE TABLE nonexistentTable")
}.getMessage
assert(e.contains(s"$expectedErrorMsg nonexistentTable"))
assert(e.contains("Table or view not found: nonexistentTable"))
sql("UNCACHE TABLE IF EXISTS nonexistentTable")
}

Expand Down

0 comments on commit 62be248

Please sign in to comment.