Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables #30403

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3e532c8
initial commit
imback82 Nov 11, 2020
f4ee301
Uncache
imback82 Nov 12, 2020
a0687b3
ResolveCommandsWithIfExists to support uncache table
imback82 Nov 14, 2020
f36bc59
Fix tests
imback82 Nov 18, 2020
b3fe647
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 20, 2020
4b2fba0
introduce CacheTableAsSelect
imback82 Nov 20, 2020
f232eba
Address PR comments
imback82 Nov 21, 2020
8c0140c
Rename to command name
imback82 Nov 21, 2020
9085189
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 21, 2020
0bdfcee
Address PR comments
imback82 Nov 22, 2020
fc8a913
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 23, 2020
7ee6eb0
Address PR comments
imback82 Nov 23, 2020
a5923ab
Address PR comments
imback82 Nov 24, 2020
f22159c
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 24, 2020
c0e4f3e
Address PR comments
imback82 Nov 25, 2020
47dc974
revert to minimize the churn
imback82 Nov 25, 2020
5e7227b
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 25, 2020
20b2474
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 25, 2020
b33d807
Address comments
imback82 Nov 25, 2020
4c2d5e2
revert
imback82 Nov 25, 2020
3c4a0cf
Fix compilation
imback82 Nov 25, 2020
7f5a0b2
fix tests
imback82 Nov 25, 2020
d0f49ef
Fix tests
imback82 Nov 25, 2020
ed1a6db
fix test
imback82 Nov 26, 2020
4e0e82f
Address comments
imback82 Nov 26, 2020
911927d
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 27, 2020
7e788ce
Address PR comments
imback82 Nov 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class Analyzer(
TypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once,
Seq(ResolveNoopDropTable) ++
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable}
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopCommand, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* A rule for handling [[DropTable]] logical plan when the table or temp view is not resolved.
* If "ifExists" flag is set to true, the plan is resolved to [[NoopDropTable]],
* A rule for handling commands when the table or temp view is not resolved.
* These commands support a flag, "ifExists", so that they do not fail when a relation is not
* resolved. If the "ifExists" flag is set to true. the plan is resolved to [[NoopCommand]],
* which is a no-op command.
*/
object ResolveNoopDropTable extends Rule[LogicalPlan] {
object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists =>
NoopDropTable(u.multipartIdentifier)
NoopCommand(u.multipartIdentifier)
case UncacheTable(u: UnresolvedTableOrView, ifExists) if ifExists =>
NoopCommand(u.multipartIdentifier)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3312,7 +3312,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}

/**
* Create a [[CacheTableStatement]].
* Create a [[CacheTable]].
*
* For example:
* {{{
Expand All @@ -3332,14 +3332,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
"the table name in CACHE TABLE AS SELECT", ctx)
}
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
CacheTableStatement(tableName, query, ctx.LAZY != null, options)
CacheTable(UnresolvedTableOrView(tableName), query, ctx.LAZY != null, options)
}

/**
* Create an [[UncacheTableStatement]] logical plan.
* Create an [[UncacheTable]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null)
UncacheTable(
UnresolvedTableOrView(visitMultipartIdentifier(ctx.multipartIdentifier)),
ctx.EXISTS != null)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,22 +347,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A CACHE TABLE statement, as parsed from SQL
*/
case class CacheTableStatement(
tableName: Seq[String],
plan: Option[LogicalPlan],
isLazy: Boolean,
options: Map[String, String]) extends ParsedStatement

/**
* An UNCACHE TABLE statement, as parsed from SQL
*/
case class UncacheTableStatement(
tableName: Seq[String],
ifExists: Boolean) extends ParsedStatement

/**
* A TRUNCATE TABLE statement, as parsed from SQL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,9 @@ case class DropTable(
}

/**
* The logical plan for handling non-existing table for DROP TABLE command.
* The logical plan for no-op command handling non-existing table.
*/
case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command
case class NoopCommand(multipartIdentifier: Seq[String]) extends Command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a commandName: String property, which can be DROP TABLE, REFRESH TABLE, etc., so that we can see the original commannd name from the EXPLAIN result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added commandName. Now EXPLAIN EXTENDED DROP TABLE looks like the following:

|== Parsed Logical Plan ==
'DropTable true, false
+- 'UnresolvedTableOrView [testcat, ns1, ns2, tbl], true

== Analyzed Logical Plan ==
NoopCommand DROP TABLE, [testcat, ns1, ns2, tbl]

== Optimized Logical Plan ==
NoopCommand DROP TABLE, [testcat, ns1, ns2, tbl]

== Physical Plan ==
LocalTableScan <empty>

Btw, do we want to introduce NoopCommandExec for physical plan as well?


/**
* The logical plan of the ALTER TABLE command.
Expand Down Expand Up @@ -629,3 +629,21 @@ case class LoadData(
case class ShowCreateTable(child: LogicalPlan, asSerde: Boolean = false) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the CACHE TABLE command.
*/
case class CacheTable(
child: LogicalPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more thought, I think CACHE TABLE is not a DDL command that needs to interact with catalogs, and it doesn't need a v2 version.

The current problem is that CacheTableCommand only takes v1 table identifier and can't cache v2 tables with n part name. Maybe we can fix CacheTableCommand directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated CacheTableCommand and uncacheTable to support multiparts name (and not resolving the identifier). Please check what you think about the new approach. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since it's not resolving to catalogs, we should move it out of ResolveSessionCatalog?

plan: Option[LogicalPlan],
isLazy: Boolean,
options: Map[String, String]) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the UNCACHE TABLE command.
*/
case class UncacheTable(child: LogicalPlan, ifExists: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1595,15 +1595,19 @@ class DDLParserSuite extends AnalysisTest {
test("CACHE TABLE") {
comparePlans(
parsePlan("CACHE TABLE a.b.c"),
CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty))
CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), None, false, Map.empty))

comparePlans(
parsePlan("CACHE LAZY TABLE a.b.c"),
CacheTableStatement(Seq("a", "b", "c"), None, true, Map.empty))
CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), None, true, Map.empty))

comparePlans(
parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"),
CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY")))
CacheTable(
UnresolvedTableOrView(Seq("a", "b", "c")),
None,
true,
Map("storageLevel" -> "DISK_ONLY")))

intercept("CACHE TABLE a.b.c AS SELECT * FROM testData",
"It is not allowed to add catalog/namespace prefix a.b")
Expand All @@ -1612,11 +1616,11 @@ class DDLParserSuite extends AnalysisTest {
test("UNCACHE TABLE") {
comparePlans(
parsePlan("UNCACHE TABLE a.b.c"),
UncacheTableStatement(Seq("a", "b", "c"), ifExists = false))
UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = false))

comparePlans(
parsePlan("UNCACHE TABLE IF EXISTS a.b.c"),
UncacheTableStatement(Seq("a", "b", "c"), ifExists = true))
UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = true))
}

test("TRUNCATE table") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,19 +441,16 @@ class ResolveSessionCatalog(
ShowCreateTableCommand(ident.asTableIdentifier)
}

case CacheTableStatement(tbl, plan, isLazy, options) =>
val name = if (plan.isDefined) {
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
tbl
} else {
parseTempViewOrV1Table(tbl, "CACHE TABLE")
}
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior of it if the temp view already exists? overwrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would fail with:

org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary view 't' already exists;

// Thus, use the identifier in UnresolvedTableOrView directly,
case CacheTable(u: UnresolvedTableOrView, plan, isLazy, options) if plan.isDefined =>
CacheTableCommand(u.multipartIdentifier.asTableIdentifier, plan, isLazy, options)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Please let me know what you think about having UnresolvedTableOrView here to eagerly use the identifier if plan is defined. Another approach is to have a separate rule to handle CacheTable(u: UnresolvedTableOrView, ...).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like CTAS and the table should be just Seq[String] not LogicalPlan.

How about we have both CacheTable(table: LogicalPlan, ...) and CacheTabeAsSelect(tempViewName: String, ...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea


case UncacheTableStatement(tbl, ifExists) =>
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
UncacheTableCommand(name.asTableIdentifier, ifExists)
case CacheTable(ResolvedV1TableOrViewIdentifier(ident), plan, isLazy, options) =>
CacheTableCommand(ident.asTableIdentifier, plan, isLazy, options)

case UncacheTable(ResolvedV1TableOrViewIdentifier(ident), ifExists) =>
UncacheTableCommand(ident.asTableIdentifier, ifExists)

case TruncateTableStatement(tbl, partitionSpec) =>
val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE")
Expand Down Expand Up @@ -570,12 +567,9 @@ class ResolveSessionCatalog(
"SHOW VIEWS, only SessionCatalog supports this command.")
}

case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) =>
case ShowTableProperties(ResolvedV1TableOrViewIdentifier(ident), propertyKey) =>
imback82 marked this conversation as resolved.
Show resolved Hide resolved
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey)

case ShowTableProperties(r: ResolvedView, propertyKey) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)

case DescribeFunction(ResolvedFunc(identifier), extended) =>
DescribeFunctionCommand(identifier.asFunctionIdentifier, extended)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DropTable(r: ResolvedTable, ifExists, purge) =>
DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil

case _: NoopDropTable =>
case _: NoopCommand =>
LocalTableScanExec(Nil, Nil) :: Nil

case AlterTable(catalog, ident, _, changes) =>
Expand Down Expand Up @@ -289,6 +289,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case ShowCreateTable(_: ResolvedTable, _) =>
throw new AnalysisException("SHOW CREATE TABLE is not supported for v2 tables.")

case CacheTable(_: ResolvedTable, _, _, _) =>
throw new AnalysisException("CACHE TABLE is not supported for v2 tables.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need new v2 APIs to support it. This command touches CacheManager which is Spark internal.

Copy link
Contributor Author

@imback82 imback82 Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. An existing bug, I guess? (it only supported temp view / v1 tables).

Does it make sense to match case CacheTable(_: ResolvedTable, _, _, _) in ResolveSessionCatalog (seems weird) or should we match it in DataSourceV2Strategy with a new CacheTableExec similar to CacheTableCommand?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a CacheTableExec as a v2 version of CacheTableCommand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added v2 version.


case UncacheTable(_: ResolvedTable, _) =>
throw new AnalysisException("UNCACHE TABLE is not supported for v2 tables.")

case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1963,13 +1963,8 @@ class DataSourceV2SQLSuite
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")

testV1CommandSupportingTempView("CACHE TABLE", t)

val e = intercept[AnalysisException] {
sql(s"CACHE LAZY TABLE $t")
}
assert(e.message.contains("CACHE TABLE is only supported with temp views or v1 tables"))
testNotSupportedV2Command("CACHE TABLE", t)
testNotSupportedV2Command("CACHE LAZY TABLE", t, sqlCommandInMessage = Some("CACHE TABLE"))
}
}

Expand All @@ -1978,8 +1973,8 @@ class DataSourceV2SQLSuite
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")

testV1CommandSupportingTempView("UNCACHE TABLE", t)
testV1CommandSupportingTempView("UNCACHE TABLE", s"IF EXISTS $t")
testNotSupportedV2Command("UNCACHE TABLE", t)
testNotSupportedV2Command("UNCACHE TABLE", s"IF EXISTS $t")
}
}

Expand Down Expand Up @@ -2486,11 +2481,15 @@ class DataSourceV2SQLSuite
}
}

private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
private def testNotSupportedV2Command(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change. This is minor and let's fix it in your next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will fix.

sqlCommand: String,
sqlParams: String,
sqlCommandInMessage: Option[String] = None): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
}
assert(e.message.contains(s"$sqlCommand is not supported for v2 tables"))
val cmdStr = sqlCommandInMessage.getOrElse(sqlCommand)
assert(e.message.contains(s"$cmdStr is not supported for v2 tables"))
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
e = intercept[AnalysisException] {
sql("UNCACHE TABLE nonexistentTable")
}.getMessage
assert(e.contains(s"$expectedErrorMsg default.nonexistentTable"))
assert(e.contains(s"$expectedErrorMsg nonexistentTable"))
imback82 marked this conversation as resolved.
Show resolved Hide resolved
sql("UNCACHE TABLE IF EXISTS nonexistentTable")
}

Expand Down