From 66c968c8e88e5695e63bb3b878c7a3b5139cddee Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Thu, 3 Dec 2020 17:11:08 -0800 Subject: [PATCH 1/2] initial commit --- .../datasources/v2/DataSourceV2Relation.scala | 6 +++--- .../scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../datasources/v2/DataSourceV2Strategy.scala | 5 +++-- .../datasources/v2/DeleteFromTableExec.scala | 4 +++- .../datasources/v2/V2ScanRelationPushDown.scala | 2 +- .../sql/connector/DataSourceV2SQLSuite.scala | 16 ++++++++++++++++ 6 files changed, 28 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 4debdd380e6b4..2e198ef0b282e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -111,16 +111,16 @@ case class DataSourceV2Relation( * plan. This ensures that the stats that are used by the optimizer account for the filters and * projection that will be pushed down. * - * @param table a DSv2 [[Table]] + * @param v2Relation a [[DataSourceV2Relation]] * @param scan a DSv2 [[Scan]] * @param output the output attributes of this relation */ case class DataSourceV2ScanRelation( - table: Table, + v2Relation: DataSourceV2Relation, scan: Scan, output: Seq[AttributeReference]) extends LeafNode with NamedRelation { - override def name: String = table.name() + override def name: String = v2Relation.table.name() override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0716043bcf660..05d6647afd958 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, FileTable} import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.internal.SQLConf @@ -3464,7 +3464,7 @@ class Dataset[T] private[sql]( fr.inputFiles case r: HiveTableRelation => r.tableMeta.storage.locationUri.map(_.toString).toArray - case DataSourceV2ScanRelation(table: FileTable, _, _) => + case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _), _, _) => table.fileIndex.inputFiles }.flatten files.toSet.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 938ba77fede47..5289d359f7809 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -208,7 +208,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DeleteFromTable(relation, condition) => relation match { - case DataSourceV2ScanRelation(table, _, output) => + case DataSourceV2ScanRelation(r, _, output) => + val table = r.table if (condition.exists(SubqueryExpression.hasSubquery)) { throw new AnalysisException( s"Delete by condition with subquery is not supported: $condition") @@ -227,7 +228,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}") } - DeleteFromTableExec(table.asDeletable, filters) :: Nil + DeleteFromTableExec(table.asDeletable, filters, refreshCache(r)) :: Nil case _ => throw new AnalysisException("DELETE is only supported with v2 tables.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala index afebbfd01db22..f0a45c249dc10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -24,10 +24,12 @@ import org.apache.spark.sql.sources.Filter case class DeleteFromTableExec( table: SupportsDelete, - condition: Array[Filter]) extends V2CommandExec { + condition: Array[Filter], + refreshCache: () => Unit) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { table.deleteWhere(condition) + refreshCache() Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index b168e848f0b6f..d2180566790ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -64,7 +64,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { case _ => scan } - val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, output) + val scanRelation = DataSourceV2ScanRelation(relation, wrappedScan, output) val projectionOverSchema = ProjectionOverSchema(output.toStructType) val projectionFunc = (expr: Expression) => expr transformDown { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 6ef4fd1372a78..55b6f9b2b18dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1841,6 +1841,22 @@ class DataSourceV2SQLSuite } } + test("DeleteFrom: should refresh caches referencing the table") { + val t = "testcat.ns1.ns2.tbl" + val view = "view" + withTable(t) { + withTempView(view) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"CACHE TABLE view AS SELECT id FROM $t") + assert(spark.table(view).count() == 3) + + sql(s"DELETE FROM $t WHERE id = 2") + assert(spark.table(view).count() == 1) + } + } + } + test("UPDATE TABLE") { val t = "testcat.ns1.ns2.tbl" withTable(t) { From eb5646fd21aeb4e1e65aa8307eb398a2f5fba0e7 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 4 Dec 2020 16:58:43 -0800 Subject: [PATCH 2/2] address comments --- .../sql/execution/datasources/v2/DataSourceV2Relation.scala | 6 +++--- .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 2e198ef0b282e..513fce0aba10c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -111,16 +111,16 @@ case class DataSourceV2Relation( * plan. This ensures that the stats that are used by the optimizer account for the filters and * projection that will be pushed down. * - * @param v2Relation a [[DataSourceV2Relation]] + * @param relation a [[DataSourceV2Relation]] * @param scan a DSv2 [[Scan]] * @param output the output attributes of this relation */ case class DataSourceV2ScanRelation( - v2Relation: DataSourceV2Relation, + relation: DataSourceV2Relation, scan: Scan, output: Seq[AttributeReference]) extends LeafNode with NamedRelation { - override def name: String = v2Relation.table.name() + override def name: String = relation.table.name() override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 55b6f9b2b18dc..6838a7644a29f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1841,7 +1841,7 @@ class DataSourceV2SQLSuite } } - test("DeleteFrom: should refresh caches referencing the table") { + test("SPARK-33652: DeleteFrom should refresh caches referencing the table") { val t = "testcat.ns1.ns2.tbl" val view = "view" withTable(t) {