From db740325d9e5b16150767eae3b52622adcaa8331 Mon Sep 17 00:00:00 2001 From: xy_xin Date: Thu, 1 Aug 2019 16:13:57 +0800 Subject: [PATCH] Fail if delete contains subquery --- .../sql/catalyst/parser/AstBuilder.scala | 1 - .../plans/logical/basicLogicalOperators.scala | 4 +-- .../datasources/DataSourceResolution.scala | 9 +++--- .../datasources/v2/DataSourceV2Strategy.scala | 9 +++--- .../datasources/v2/V2WriteSupportCheck.scala | 7 ++++- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 29 +++++++++++++++++-- .../sources/v2/TestInMemoryTableCatalog.scala | 5 ++-- 7 files changed, 48 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 193b86f4196a0..be6f7e1500082 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale - import javax.xml.bind.DatatypeConverter import scala.collection.JavaConverters._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 05491ddf94647..54c30c6c924a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -543,10 +543,10 @@ object OverwritePartitionsDynamic { case class DeleteFromTable( child: LogicalPlan, - condition: Expression) + condition: Filter) extends Command { - override def children: Seq[LogicalPlan] = child :: Nil + override def children: Seq[LogicalPlan] = child :: condition :: Nil override def output: Seq[Attribute] = Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 712947da5fc82..2a61a29a0557c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale import scala.collection.mutable + import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform @@ -27,7 +28,7 @@ import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand} @@ -315,9 +316,9 @@ case class DataSourceResolution( delete: DeleteFromStatement): DeleteFromTable = { val relation = CatalogV2Util.loadTable(catalog, identifier) .map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(delete.tableName)) - DeleteFromTable( - delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation), - delete.condition) + val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation) + val filter = Filter(delete.condition, aliased) + DeleteFromTable(aliased, filter) } private def convertTableProperties( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 40f3d5909f48f..ba118de0c23d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -222,11 +222,12 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) => OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil - case DeleteFromTable(r: DataSourceV2Relation, deleteExpr) => + case DeleteFromTable(r: DataSourceV2Relation, filter) => // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = splitConjunctivePredicates(deleteExpr).map { - filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( - throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) + val filters = splitConjunctivePredicates(filter.condition).map { + f => DataSourceStrategy.translateFilter(f).getOrElse( + throw new AnalysisException(s"Exec delete failed:" + + s" cannot translate expression to source filter: $f")) }.toArray DeleteFromTableExec(r.table.asDeletable, r.options, filters) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala index 388f2f3a2c9fd..3405db3f2b707 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{Literal, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.BooleanType @@ -51,6 +51,11 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) { } } + case DeleteFromTable(_, filter) => + if (SubqueryExpression.hasSubquery(filter.condition)) { + failAnalysis(s"Delete by condition with subquery is not supported: $filter") + } + case _ => // OK } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 5ab5e3fde6a23..2aa7c34bf57b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -1647,14 +1647,39 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } } - test("DeleteFromTable: basic") { + test("DeleteFrom: basic") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - sql(s"DELETE FROM $t WHERE id=2") + sql(s"DELETE FROM $t WHERE id = 2") checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) } } + + test("DeleteFrom: alias") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"DELETE FROM $t tbl WHERE tbl.id = 2") + checkAnswer(spark.table(t), Seq( + Row(3, "c", 3))) + } + } + + test("DeleteFrom: fail if has subquery") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + val exc = intercept[AnalysisException] { + sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)") + } + + assert(spark.table(t).count === 3) + assert(exc.getMessage.contains("Delete by condition with subquery is not supported")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 16d79658936f9..6f631796cf001 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable - import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.{IdentityTransform, Transform} @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not} -import org.apache.spark.sql.sources.{And, EqualTo, Filter} +import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.types.{BooleanType, StructType} @@ -273,6 +272,8 @@ class InMemoryTable( case _ => throw new IllegalArgumentException(s"Unknown filter attribute: $attr") } + case IsNotNull(_) => + true case f => throw new IllegalArgumentException(s"Unsupported filter type: $f") }