Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables #30403

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3e532c8
initial commit
imback82 Nov 11, 2020
f4ee301
Uncache
imback82 Nov 12, 2020
a0687b3
ResolveCommandsWithIfExists to support uncache table
imback82 Nov 14, 2020
f36bc59
Fix tests
imback82 Nov 18, 2020
b3fe647
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 20, 2020
4b2fba0
introduce CacheTableAsSelect
imback82 Nov 20, 2020
f232eba
Address PR comments
imback82 Nov 21, 2020
8c0140c
Rename to command name
imback82 Nov 21, 2020
9085189
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 21, 2020
0bdfcee
Address PR comments
imback82 Nov 22, 2020
fc8a913
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 23, 2020
7ee6eb0
Address PR comments
imback82 Nov 23, 2020
a5923ab
Address PR comments
imback82 Nov 24, 2020
f22159c
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 24, 2020
c0e4f3e
Address PR comments
imback82 Nov 25, 2020
47dc974
revert to minimize the churn
imback82 Nov 25, 2020
5e7227b
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 25, 2020
20b2474
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 25, 2020
b33d807
Address comments
imback82 Nov 25, 2020
4c2d5e2
revert
imback82 Nov 25, 2020
3c4a0cf
Fix compilation
imback82 Nov 25, 2020
7f5a0b2
fix tests
imback82 Nov 25, 2020
d0f49ef
Fix tests
imback82 Nov 25, 2020
ed1a6db
fix test
imback82 Nov 26, 2020
4e0e82f
Address comments
imback82 Nov 26, 2020
911927d
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 27, 2020
7e788ce
Address PR comments
imback82 Nov 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3586,37 +3586,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx.SERDE != null)
}

/**
* Create a [[CacheTableStatement]].
*
* For example:
* {{{
* CACHE [LAZY] TABLE multi_part_name
* [OPTIONS tablePropertyList] [[AS] query]
* }}}
*/
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val query = Option(ctx.query).map(plan)
val tableName = visitMultipartIdentifier(ctx.multipartIdentifier)
if (query.isDefined && tableName.length > 1) {
val catalogAndNamespace = tableName.init
throw new ParseException("It is not allowed to add catalog/namespace " +
s"prefix ${catalogAndNamespace.quoted} to " +
"the table name in CACHE TABLE AS SELECT", ctx)
}
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
CacheTableStatement(tableName, query, ctx.LAZY != null, options)
}

/**
* Create an [[UncacheTableStatement]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null)
}

/**
* Create a [[TruncateTable]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,22 +410,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A CACHE TABLE statement, as parsed from SQL
*/
case class CacheTableStatement(
tableName: Seq[String],
plan: Option[LogicalPlan],
isLazy: Boolean,
options: Map[String, String]) extends ParsedStatement

/**
* An UNCACHE TABLE statement, as parsed from SQL
*/
case class UncacheTableStatement(
tableName: Seq[String],
ifExists: Boolean) extends ParsedStatement

/**
* A TRUNCATE TABLE statement, as parsed from SQL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1918,33 +1918,6 @@ class DDLParserSuite extends AnalysisTest {
asSerde = true))
}

test("CACHE TABLE") {
comparePlans(
parsePlan("CACHE TABLE a.b.c"),
CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty))

comparePlans(
parsePlan("CACHE LAZY TABLE a.b.c"),
CacheTableStatement(Seq("a", "b", "c"), None, true, Map.empty))

comparePlans(
parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"),
CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY")))

intercept("CACHE TABLE a.b.c AS SELECT * FROM testData",
"It is not allowed to add catalog/namespace prefix a.b")
}

test("UNCACHE TABLE") {
comparePlans(
parsePlan("UNCACHE TABLE a.b.c"),
UncacheTableStatement(Seq("a", "b", "c"), ifExists = false))

comparePlans(
parsePlan("UNCACHE TABLE IF EXISTS a.b.c"),
UncacheTableStatement(Seq("a", "b", "c"), ifExists = true))
}

test("TRUNCATE table") {
comparePlans(
parsePlan("TRUNCATE TABLE a.b.c"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,20 +446,6 @@ class ResolveSessionCatalog(
ShowCreateTableCommand(ident.asTableIdentifier)
}

case CacheTableStatement(tbl, plan, isLazy, options) =>
val name = if (plan.isDefined) {
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
tbl
} else {
parseTempViewOrV1Table(tbl, "CACHE TABLE")
}
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)

case UncacheTableStatement(tbl, ifExists) =>
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
UncacheTableCommand(name.asTableIdentifier, ifExists)

case TruncateTable(ResolvedV1TableIdentifier(ident), partitionSpec) =>
TruncateTableCommand(
ident.asTableIdentifier,
Expand Down Expand Up @@ -561,12 +547,9 @@ class ResolveSessionCatalog(
"SHOW VIEWS, only SessionCatalog supports this command.")
}

case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) =>
case ShowTableProperties(ResolvedV1TableOrViewIdentifier(ident), propertyKey) =>
imback82 marked this conversation as resolved.
Show resolved Hide resolved
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey)

case ShowTableProperties(r: ResolvedView, propertyKey) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)

case DescribeFunction(ResolvedFunc(identifier), extended) =>
DescribeFunctionCommand(identifier.asFunctionIdentifier, extended)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,40 @@ class SparkSqlAstBuilder extends AstBuilder {
unquotedPath
}

/**
* Create a [[CacheTableCommand]].
*
* For example:
* {{{
* CACHE [LAZY] TABLE multi_part_name
* [OPTIONS tablePropertyList] [[AS] query]
* }}}
*/
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val query = Option(ctx.query).map(plan)
val tableName = visitMultipartIdentifier(ctx.multipartIdentifier)
if (query.isDefined && tableName.length > 1) {
val catalogAndNamespace = tableName.init
throw new ParseException("It is not allowed to add catalog/namespace " +
s"prefix ${catalogAndNamespace.quoted} to " +
"the table name in CACHE TABLE AS SELECT", ctx)
}
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
CacheTableCommand(tableName, query, ctx.LAZY != null, options)
}


/**
* Create an [[UncacheTableCommand]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTableCommand(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.EXISTS != null)
}

/**
* Create a [[ClearCacheCommand]] logical plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@ package org.apache.spark.sql.execution.command

import java.util.Locale

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.storage.StorageLevel

case class CacheTableCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the next thing we can do is to refactor it using the v2 framework (not adding a v2 version). The benefits are: 1. moving the logical plan to catalyst. 2. resolve the table in the analyzer. e.g.

CacheTable(UnresolvedRelation(...), ...)
...
case class CacheTableExec(relation: LogicalPlan) {
  def run() {
     val df  = Dataset.ofRows(spark, relation)
     ....
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue I am encountering by moving to the v2 framework (for v2 tables) is the following.

When CACHE TABLE testcat.tbl is run, tbl is changed from DataSourceV2Relation to DataSourceV2ScanRelation in V2ScanRelationPushDown rule, now that the plan goes thru analyzer, optimizer, etc. But, if I run spark.table("testcat.tbl"), the query execution has tbl as DataSourceV2Relation, thus cache is not applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, one solution is to follow InsertIntoStatement and do not make the table as a child. Then we resolve UnresolvedRelation inside CacheTable manually in ResolveTempViews and other resolution rules.

tableIdent: TableIdentifier,
multipartIdentifier: Seq[String],
plan: Option[LogicalPlan],
isLazy: Boolean,
options: Map[String, String]) extends RunnableCommand {
require(plan.isEmpty || tableIdent.database.isEmpty,
"Database name is not allowed in CACHE TABLE AS SELECT")
require(plan.isEmpty || multipartIdentifier.length == 1,
"Namespace name is not allowed in CACHE TABLE AS SELECT")

override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = multipartIdentifier.quoted
plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableName)
}

val storageLevelKey = "storagelevel"
Expand All @@ -49,34 +50,46 @@ case class CacheTableCommand(
logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
}

val table = sparkSession.table(tableName)
if (storageLevelValue.nonEmpty) {
sparkSession.catalog.cacheTable(
tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get))
sparkSession.sharedState.cacheManager.cacheQuery(
table,
Some(tableName),
StorageLevel.fromString(storageLevelValue.get))
} else {
sparkSession.catalog.cacheTable(tableIdent.quotedString)
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName))
}

if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
table.count()
}

Seq.empty[Row]
}
}


case class UncacheTableCommand(
tableIdent: TableIdentifier,
multipartIdentifier: Seq[String],
ifExists: Boolean) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = tableIdent.quotedString
if (!ifExists || sparkSession.catalog.tableExists(tableId)) {
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
sparkSession.catalog.uncacheTable(tableId)
val tableName = multipartIdentifier.quoted
table(sparkSession, tableName).foreach { table =>
val cascade = !sparkSession.sessionState.catalog.isTempView(multipartIdentifier)
sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade)
}
Seq.empty[Row]
}

private def table(sparkSession: SparkSession, name: String): Option[DataFrame] = {
try {
Some(sparkSession.table(name))
} catch {
case ex: AnalysisException if ifExists && ex.getMessage.contains("Table or view not found") =>
None
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.executor.DataReadMethod._
import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
Expand Down Expand Up @@ -140,6 +141,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}

test("cache table as select - existing temp view") {
withTempView("tempView") {
sql("CREATE TEMPORARY VIEW tempView as SELECT 1")
val e = intercept[TempTableAlreadyExistsException] {
sql("CACHE TABLE tempView AS SELECT 1")
}
assert(e.getMessage.contains("Temporary view 'tempView' already exists"))
}
}

test("uncaching temp table") {
withTempView("tempTable1", "tempTable2") {
testData.select("key").createOrReplaceTempView("tempTable1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
import org.apache.spark.sql.internal.connector.SimpleTableProvider
Expand Down Expand Up @@ -2018,28 +2019,29 @@ class DataSourceV2SQLSuite
}
}

test("CACHE TABLE") {
test("CACHE/UNCACHE TABLE") {
imback82 marked this conversation as resolved.
Show resolved Hide resolved
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
def isCached(table: String): Boolean = {
spark.table(table).queryExecution.withCachedData.isInstanceOf[InMemoryRelation]
}

testV1CommandSupportingTempView("CACHE TABLE", t)
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
sql(s"CACHE TABLE $t")
assert(isCached(t))

val e = intercept[AnalysisException] {
sql(s"CACHE LAZY TABLE $t")
}
assert(e.message.contains("CACHE TABLE is only supported with temp views or v1 tables"))
sql(s"UNCACHE TABLE $t")
assert(!isCached(t))
}
}

test("UNCACHE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")

testV1CommandSupportingTempView("UNCACHE TABLE", t)
testV1CommandSupportingTempView("UNCACHE TABLE", s"IF EXISTS $t")
// Test a scenario where a table does not exist.
val e = intercept[AnalysisException] {
sql(s"UNCACHE TABLE $t")
}
assert(e.message.contains("Table or view not found: testcat.ns1.ns2.tbl"))

// If "IF EXISTS" is set, UNCACHE TABLE will not throw an exception.
sql(s"UNCACHE TABLE IF EXISTS $t")
}

test("SHOW COLUMNS") {
Expand Down Expand Up @@ -2555,11 +2557,15 @@ class DataSourceV2SQLSuite
}
}

private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
private def testNotSupportedV2Command(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change. This is minor and let's fix it in your next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will fix.

sqlCommand: String,
sqlParams: String,
sqlCommandInMessage: Option[String] = None): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
}
assert(e.message.contains(s"$sqlCommand is not supported for v2 tables"))
val cmdStr = sqlCommandInMessage.getOrElse(sqlCommand)
assert(e.message.contains(s"$cmdStr is not supported for v2 tables"))
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
Expand Down
Loading