Skip to content

Commit

Permalink
Fail if delete contains subquery
Browse files Browse the repository at this point in the history
  • Loading branch information
xy_xin committed Aug 1, 2019
1 parent c264a79 commit db74032
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.parser

import java.util.Locale

import javax.xml.bind.DatatypeConverter

import scala.collection.JavaConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,10 @@ object OverwritePartitionsDynamic {

case class DeleteFromTable(
child: LogicalPlan,
condition: Expression)
condition: Filter)
extends Command {

override def children: Seq[LogicalPlan] = child :: Nil
override def children: Seq[LogicalPlan] = child :: condition :: Nil
override def output: Seq[Attribute] = Seq.empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.sql.execution.datasources
import java.util.Locale

import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand}
Expand Down Expand Up @@ -315,9 +316,9 @@ case class DataSourceResolution(
delete: DeleteFromStatement): DeleteFromTable = {
val relation = CatalogV2Util.loadTable(catalog, identifier)
.map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(delete.tableName))
DeleteFromTable(
delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation),
delete.condition)
val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation)
val filter = Filter(delete.condition, aliased)
DeleteFromTable(aliased, filter)
}

private def convertTableProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,12 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil

case DeleteFromTable(r: DataSourceV2Relation, deleteExpr) =>
case DeleteFromTable(r: DataSourceV2Relation, filter) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
val filters = splitConjunctivePredicates(filter.condition).map {
f => DataSourceStrategy.translateFilter(f).getOrElse(
throw new AnalysisException(s"Exec delete failed:" +
s" cannot translate expression to source filter: $f"))
}.toArray
DeleteFromTableExec(r.table.asDeletable, r.options, filters) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.{Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.BooleanType
Expand Down Expand Up @@ -51,6 +51,11 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) {
}
}

case DeleteFromTable(_, filter) =>
if (SubqueryExpression.hasSubquery(filter.condition)) {
failAnalysis(s"Delete by condition with subquery is not supported: $filter")
}

case _ => // OK
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1647,14 +1647,39 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
}
}

test("DeleteFromTable: basic") {
test("DeleteFrom: basic") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t WHERE id=2")
sql(s"DELETE FROM $t WHERE id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}

test("DeleteFrom: alias") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t tbl WHERE tbl.id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}

test("DeleteFrom: fail if has subquery") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)")
}

assert(spark.table(t).count === 3)
assert(exc.getMessage.contains("Delete by condition with subquery is not supported"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.{IdentityTransform, Transform}
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Literal, Not}
import org.apache.spark.sql.sources.{And, EqualTo, Filter}
import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.types.{BooleanType, StructType}
Expand Down Expand Up @@ -273,6 +272,8 @@ class InMemoryTable(
case _ =>
throw new IllegalArgumentException(s"Unknown filter attribute: $attr")
}
case IsNotNull(_) =>
true
case f =>
throw new IllegalArgumentException(s"Unsupported filter type: $f")
}
Expand Down

0 comments on commit db74032

Please sign in to comment.