Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33652][SQL] DSv2: DeleteFrom should refresh cache #30597

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,16 @@ case class DataSourceV2Relation(
* plan. This ensures that the stats that are used by the optimizer account for the filters and
* projection that will be pushed down.
*
* @param table a DSv2 [[Table]]
* @param relation a [[DataSourceV2Relation]]
* @param scan a DSv2 [[Scan]]
* @param output the output attributes of this relation
*/
case class DataSourceV2ScanRelation(
table: Table,
relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference]) extends LeafNode with NamedRelation {

override def name: String = table.name()
override def name: String = relation.table.name()

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -3464,7 +3464,7 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2ScanRelation(table: FileTable, _, _) =>
case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _), _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat

case DeleteFromTable(relation, condition) =>
relation match {
case DataSourceV2ScanRelation(table, _, output) =>
case DataSourceV2ScanRelation(r, _, output) =>
val table = r.table
if (condition.exists(SubqueryExpression.hasSubquery)) {
throw new AnalysisException(
s"Delete by condition with subquery is not supported: $condition")
Expand All @@ -227,7 +228,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}")
}

DeleteFromTableExec(table.asDeletable, filters) :: Nil
DeleteFromTableExec(table.asDeletable, filters, refreshCache(r)) :: Nil
case _ =>
throw new AnalysisException("DELETE is only supported with v2 tables.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import org.apache.spark.sql.sources.Filter

case class DeleteFromTableExec(
table: SupportsDelete,
condition: Array[Filter]) extends V2CommandExec {
condition: Array[Filter],
refreshCache: () => Unit) extends V2CommandExec {

override protected def run(): Seq[InternalRow] = {
table.deleteWhere(condition)
refreshCache()
Seq.empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
case _ => scan
}

val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, output)
val scanRelation = DataSourceV2ScanRelation(relation, wrappedScan, output)

val projectionOverSchema = ProjectionOverSchema(output.toStructType)
val projectionFunc = (expr: Expression) => expr transformDown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,22 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-33652: DeleteFrom should refresh caches referencing the table") {
val t = "testcat.ns1.ns2.tbl"
val view = "view"
withTable(t) {
withTempView(view) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"CACHE TABLE view AS SELECT id FROM $t")
assert(spark.table(view).count() == 3)

sql(s"DELETE FROM $t WHERE id = 2")
assert(spark.table(view).count() == 1)
}
}
}

test("UPDATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
Expand Down