Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables #30403

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3e532c8
initial commit
imback82 Nov 11, 2020
f4ee301
Uncache
imback82 Nov 12, 2020
a0687b3
ResolveCommandsWithIfExists to support uncache table
imback82 Nov 14, 2020
f36bc59
Fix tests
imback82 Nov 18, 2020
b3fe647
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 20, 2020
4b2fba0
introduce CacheTableAsSelect
imback82 Nov 20, 2020
f232eba
Address PR comments
imback82 Nov 21, 2020
8c0140c
Rename to command name
imback82 Nov 21, 2020
9085189
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 21, 2020
0bdfcee
Address PR comments
imback82 Nov 22, 2020
fc8a913
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 23, 2020
7ee6eb0
Address PR comments
imback82 Nov 23, 2020
a5923ab
Address PR comments
imback82 Nov 24, 2020
f22159c
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 24, 2020
c0e4f3e
Address PR comments
imback82 Nov 25, 2020
47dc974
revert to minimize the churn
imback82 Nov 25, 2020
5e7227b
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 25, 2020
20b2474
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 25, 2020
b33d807
Address comments
imback82 Nov 25, 2020
4c2d5e2
revert
imback82 Nov 25, 2020
3c4a0cf
Fix compilation
imback82 Nov 25, 2020
7f5a0b2
fix tests
imback82 Nov 25, 2020
d0f49ef
Fix tests
imback82 Nov 25, 2020
ed1a6db
fix test
imback82 Nov 26, 2020
4e0e82f
Address comments
imback82 Nov 26, 2020
911927d
Merge remote-tracking branch 'upstream/master' into cache_table
imback82 Nov 27, 2020
7e788ce
Address PR comments
imback82 Nov 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class Analyzer(override val catalogManager: CatalogManager)
TypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once,
Seq(ResolveNoopDropTable) ++
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable}
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopCommand, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* A rule for handling [[DropTable]] logical plan when the table or temp view is not resolved.
* If "ifExists" flag is set to true, the plan is resolved to [[NoopDropTable]],
* A rule for handling commands when the table or temp view is not resolved.
* These commands support a flag, "ifExists", so that they do not fail when a relation is not
* resolved. If the "ifExists" flag is set to true. the plan is resolved to [[NoopCommand]],
* which is a no-op command.
*/
object ResolveNoopDropTable extends Rule[LogicalPlan] {
object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists =>
NoopDropTable(u.multipartIdentifier)
NoopCommand("DROP TABLE", u.multipartIdentifier)
case UncacheTable(u: UnresolvedTableOrView, ifExists) if ifExists =>
NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3323,7 +3323,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create a [[CacheTableStatement]].
* Create a [[CacheTable]] pr [[CacheTableAsSelect]].
imback82 marked this conversation as resolved.
Show resolved Hide resolved
*
* For example:
* {{{
Expand All @@ -3343,14 +3343,21 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
"the table name in CACHE TABLE AS SELECT", ctx)
}
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
CacheTableStatement(tableName, query, ctx.LAZY != null, options)
val isLazy = ctx.LAZY != null
if (query.isDefined) {
CacheTableAsSelect(tableName.head, query.get, isLazy, options)
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
} else {
CacheTable(UnresolvedTableOrView(tableName), isLazy, options)
}
}

/**
* Create an [[UncacheTableStatement]] logical plan.
* Create an [[UncacheTable]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null)
UncacheTable(
UnresolvedTableOrView(visitMultipartIdentifier(ctx.multipartIdentifier)),
ctx.EXISTS != null)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,22 +329,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A CACHE TABLE statement, as parsed from SQL
*/
case class CacheTableStatement(
tableName: Seq[String],
plan: Option[LogicalPlan],
isLazy: Boolean,
options: Map[String, String]) extends ParsedStatement

/**
* An UNCACHE TABLE statement, as parsed from SQL
*/
case class UncacheTableStatement(
tableName: Seq[String],
ifExists: Boolean) extends ParsedStatement

/**
* A TRUNCATE TABLE statement, as parsed from SQL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ case class DropTable(
}

/**
* The logical plan for handling non-existing table for DROP TABLE command.
* The logical plan for no-op command handling non-existing table.
*/
case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command
case class NoopCommand(commandName: String, multipartIdentifier: Seq[String]) extends Command

/**
* The logical plan of the ALTER TABLE command.
Expand Down Expand Up @@ -670,3 +670,29 @@ case class LoadData(
case class ShowCreateTable(child: LogicalPlan, asSerde: Boolean = false) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the CACHE TABLE command.
*/
case class CacheTable(
child: LogicalPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more thought, I think CACHE TABLE is not a DDL command that needs to interact with catalogs, and it doesn't need a v2 version.

The current problem is that CacheTableCommand only takes v1 table identifier and can't cache v2 tables with n part name. Maybe we can fix CacheTableCommand directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated CacheTableCommand and uncacheTable to support multiparts name (and not resolving the identifier). Please check what you think about the new approach. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since it's not resolving to catalogs, we should move it out of ResolveSessionCatalog?

isLazy: Boolean,
options: Map[String, String]) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the CACHE TABLE ... AS SELECT command.
*/
case class CacheTableAsSelect(
tempViewName: String,
plan: LogicalPlan,
isLazy: Boolean,
options: Map[String, String]) extends Command

/**
* The logical plan of the UNCACHE TABLE command.
*/
case class UncacheTable(child: LogicalPlan, ifExists: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1594,15 +1594,26 @@ class DDLParserSuite extends AnalysisTest {
test("CACHE TABLE") {
comparePlans(
parsePlan("CACHE TABLE a.b.c"),
CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty))
CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), false, Map.empty))

comparePlans(
parsePlan("CACHE TABLE t AS SELECT * FROM testData"),
CacheTableAsSelect(
"t",
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData"))),
false,
Map.empty))

comparePlans(
parsePlan("CACHE LAZY TABLE a.b.c"),
CacheTableStatement(Seq("a", "b", "c"), None, true, Map.empty))
CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), true, Map.empty))

comparePlans(
parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"),
CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY")))
CacheTable(
UnresolvedTableOrView(Seq("a", "b", "c")),
true,
Map("storageLevel" -> "DISK_ONLY")))

intercept("CACHE TABLE a.b.c AS SELECT * FROM testData",
"It is not allowed to add catalog/namespace prefix a.b")
Expand All @@ -1611,11 +1622,11 @@ class DDLParserSuite extends AnalysisTest {
test("UNCACHE TABLE") {
comparePlans(
parsePlan("UNCACHE TABLE a.b.c"),
UncacheTableStatement(Seq("a", "b", "c"), ifExists = false))
UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = false))

comparePlans(
parsePlan("UNCACHE TABLE IF EXISTS a.b.c"),
UncacheTableStatement(Seq("a", "b", "c"), ifExists = true))
UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = true))
}

test("TRUNCATE table") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,19 +442,15 @@ class ResolveSessionCatalog(
ShowCreateTableCommand(ident.asTableIdentifier)
}

case CacheTableStatement(tbl, plan, isLazy, options) =>
val name = if (plan.isDefined) {
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
tbl
} else {
parseTempViewOrV1Table(tbl, "CACHE TABLE")
}
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior of it if the temp view already exists? overwrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would fail with:

org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary view 't' already exists;

case CacheTableAsSelect(tempViewName, plan, isLazy, options) =>
CacheTableCommand(TableIdentifier(tempViewName), Some(plan), isLazy, options)

case UncacheTableStatement(tbl, ifExists) =>
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
UncacheTableCommand(name.asTableIdentifier, ifExists)
case CacheTable(ResolvedV1TableOrViewIdentifier(ident), isLazy, options) =>
CacheTableCommand(ident.asTableIdentifier, None, isLazy, options)

case UncacheTable(ResolvedV1TableOrViewIdentifier(ident), ifExists) =>
UncacheTableCommand(ident.asTableIdentifier, ifExists)

case TruncateTableStatement(tbl, partitionSpec) =>
val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE")
Expand Down Expand Up @@ -570,12 +566,9 @@ class ResolveSessionCatalog(
"SHOW VIEWS, only SessionCatalog supports this command.")
}

case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) =>
case ShowTableProperties(ResolvedV1TableOrViewIdentifier(ident), propertyKey) =>
imback82 marked this conversation as resolved.
Show resolved Hide resolved
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey)

case ShowTableProperties(r: ResolvedView, propertyKey) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)

case DescribeFunction(ResolvedFunc(identifier), extended) =>
DescribeFunctionCommand(identifier.asFunctionIdentifier, extended)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.util.Locale

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

object CacheTableUtils extends Logging {
def getStorageLevel(options: Map[String, String]): Option[String] = {
val storageLevelKey = "storagelevel"
val storageLevelValue =
CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT))
val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey)
if (withoutStorageLevel.nonEmpty) {
logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
}
storageLevelValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.sql.execution.command

import java.util.Locale

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.CacheTableUtils
import org.apache.spark.storage.StorageLevel

case class CacheTableCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the next thing we can do is to refactor it using the v2 framework (not adding a v2 version). The benefits are: 1. moving the logical plan to catalyst. 2. resolve the table in the analyzer. e.g.

CacheTable(UnresolvedRelation(...), ...)
...
case class CacheTableExec(relation: LogicalPlan) {
  def run() {
     val df  = Dataset.ofRows(spark, relation)
     ....
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue I am encountering by moving to the v2 framework (for v2 tables) is the following.

When CACHE TABLE testcat.tbl is run, tbl is changed from DataSourceV2Relation to DataSourceV2ScanRelation in V2ScanRelationPushDown rule, now that the plan goes thru analyzer, optimizer, etc. But, if I run spark.table("testcat.tbl"), the query execution has tbl as DataSourceV2Relation, thus cache is not applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, one solution is to follow InsertIntoStatement and do not make the table as a child. Then we resolve UnresolvedRelation inside CacheTable manually in ResolveTempViews and other resolution rules.

Expand All @@ -41,17 +39,10 @@ case class CacheTableCommand(
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
}

val storageLevelKey = "storagelevel"
val storageLevelValue =
CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT))
val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey)
if (withoutStorageLevel.nonEmpty) {
logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
}

if (storageLevelValue.nonEmpty) {
val optStorageLevel = CacheTableUtils.getStorageLevel(options)
if (optStorageLevel.nonEmpty) {
sparkSession.catalog.cacheTable(
tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get))
tableIdent.quotedString, StorageLevel.fromString(optStorageLevel.get))
} else {
sparkSession.catalog.cacheTable(tableIdent.quotedString)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.CacheTableUtils
import org.apache.spark.storage.StorageLevel

/**
* Physical plan node for caching a table.
*/
case class CacheTableExec(
session: SparkSession,
imback82 marked this conversation as resolved.
Show resolved Hide resolved
catalog: TableCatalog,
table: Table,
ident: Identifier,
isLazy: Boolean,
options: Map[String, String]) extends V2CommandExec {
override def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper

val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
val df = Dataset.ofRows(session, v2Relation)
val tableName = Some(ident.quoted)
val optStorageLevel = CacheTableUtils.getStorageLevel(options)
if (optStorageLevel.nonEmpty) {
session.sharedState.cacheManager.cacheQuery(
df, tableName, StorageLevel.fromString(optStorageLevel.get))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we include the catalog name in tableName? otherwise different catalogs may have tables with the same name?

Copy link
Contributor Author

@imback82 imback82 Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableName is used only for display purpose (e.g., InMemoryTableScanExec). The cachedData is matched by the logical plan, so I think the current approach is OK.

} else {
session.sharedState.cacheManager.cacheQuery(df, tableName)
}

if (!isLazy) {
// Performs eager caching.
df.count()
}

Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}

/**
* Physical plan node for uncaching a table.
*/
case class UncacheTableExec(
session: SparkSession,
catalog: TableCatalog,
table: Table,
ident: Identifier) extends V2CommandExec {
override def run(): Seq[InternalRow] = {
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
val df = Dataset.ofRows(session, v2Relation)
// Cascade should be true unless a temporary view is uncached.
imback82 marked this conversation as resolved.
Show resolved Hide resolved
session.sharedState.cacheManager.uncacheQuery(df, cascade = true)
imback82 marked this conversation as resolved.
Show resolved Hide resolved
imback82 marked this conversation as resolved.
Show resolved Hide resolved
Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DropTable(r: ResolvedTable, ifExists, purge) =>
DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil

case _: NoopDropTable =>
case _: NoopCommand =>
LocalTableScanExec(Nil, Nil) :: Nil

case AlterTable(catalog, ident, _, changes) =>
Expand Down Expand Up @@ -302,6 +302,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case ShowCreateTable(_: ResolvedTable, _) =>
throw new AnalysisException("SHOW CREATE TABLE is not supported for v2 tables.")

case CacheTable(r: ResolvedTable, isLazy, options) =>
CacheTableExec(session, r.catalog, r.table, r.identifier, isLazy, options) :: Nil

case UncacheTable(r: ResolvedTable, _) =>
UncacheTableExec(session, r.catalog, r.table, r.identifier) :: Nil

case _ => Nil
}
}
Loading