Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28351][SQL] Support DELETE in DataSource V2 #25115

Closed
wants to merge 16 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ statement
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
| RESET #resetConfiguration
| DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable
| unsupportedHiveNativeCommands .*? #failNativeCommand
;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.Filter;

/**
* A mix-in interface for {@link Table} delete support. Data sources can implement this
* interface to provide the ability to delete data from tables that matches filter expressions.
*/
public interface SupportsDelete {
xianyinxin marked this conversation as resolved.
Show resolved Hide resolved
/**
* Delete data from a data source table that matches filter expressions.
* <p>
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
* expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Implementations may reject a delete operation if the delete isn't possible without significant
* effort. For example, partitioned data sources may reject deletes that do not filter by
* partition columns because the filter may require rewriting files without deleted records.
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
* error message that identifies which expression was rejected.
*
* @param filters filter expressions, used to select rows to delete when all expressions match
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
void deleteWhere(Filter[] filters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ class Analyzer(
*/
object ResolveTables extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
Expand All @@ -652,6 +653,17 @@ class Analyzer(

case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)

case d @ DeleteFromTable(u @ UnresolvedRelation(
CatalogObjectIdentifier(None, ident)), condition) =>
// fallback to session catalog for DeleteFromTable if no catalog specified and no default
// catalog set.
val catalog = sessionCatalog
.getOrElse(throw new AnalysisException(
s"Cannot delete from ${ident.quoted} because no catalog specified" +
s" and no session catalog provided."))
.asTableCatalog
d.copy(child = loadTable(catalog, ident).map(DataSourceV2Relation.create).getOrElse(u))
}
}

Expand Down Expand Up @@ -1761,6 +1773,8 @@ class Analyzer(
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children)
case d: DeleteFromTable if d.childrenResolved =>
xianyinxin marked this conversation as resolved.
Show resolved Hide resolved
resolveSubQueries(d, d.children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ trait CheckAnalysis extends PredicateHelper {
// Only certain operators are allowed to host subquery expression containing
// outer references.
plan match {
case _: Filter | _: Aggregate | _: Project => // Ok
case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok
case other => failAnalysis(
"Correlated scalar sub-queries can only be used in a " +
s"Filter/Aggregate/Project: $plan")
Expand All @@ -594,9 +594,10 @@ trait CheckAnalysis extends PredicateHelper {

case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter => // Ok
case _: Filter | _: DeleteFromTable => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in a Filter: $plan")
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter/DeleteFromTable: $plan")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -338,6 +338,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
}

override def visitDeleteFromTable(
xianyinxin marked this conversation as resolved.
Show resolved Hide resolved
ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {

val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
val tableAlias = if (ctx.tableAlias() != null) {
val ident = ctx.tableAlias().strictIdentifier()
if (ident != null) { Some(ident.getText) } else { None }
} else {
None
}

DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression()))
}

/**
* Create a partition specification map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,13 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm
override val output = DescribeTableSchema.describeTableAttributes()
}

case class DeleteFromTable(
child: LogicalPlan,
condition: Expression) extends Command {

override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* Drop a table.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

case class DeleteFromStatement(
tableName: Seq[String],
tableAlias: Option[String],
condition: Expression)
extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}

object DataSourceV2Implicits {
implicit class TableHelper(table: Table) {
Expand All @@ -40,6 +40,15 @@ object DataSourceV2Implicits {
}
}

def asDeletable: SupportsDelete = {
table match {
case support: SupportsDelete =>
support
case _ =>
throw new AnalysisException(s"Table does not support deletes: ${table.name}")
}
}

def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability)

def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,8 @@ class AnalysisErrorSuite extends AnalysisTest {
val plan = Project(
Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()),
LocalRelation(a))
assertAnalysisError(plan, "Predicate sub-queries can only be used in a Filter" :: Nil)
assertAnalysisError(plan, "Predicate sub-queries can only be used" +
" in Filter/DeleteFromTable" :: Nil)
}

test("PredicateSubQuery is used is a nested condition") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand}
import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation}
Expand Down Expand Up @@ -173,9 +173,13 @@ case class DataSourceResolution(
// only top-level adds are supported using AlterTableAddColumnsCommand
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField))

case delete: DeleteFromStatement =>
val relation = UnresolvedRelation(delete.tableName)
val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
DeleteFromTable(aliased, delete.condition)

case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) =>
UnresolvedCatalogRelation(catalogTable)

}

object V1WriteProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalog.v2.StagingTableCatalog
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
Expand Down Expand Up @@ -222,6 +222,15 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil

case DeleteFromTable(r: DataSourceV2Relation, condition) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(condition).map {
f => DataSourceStrategy.translateFilter(f).getOrElse(
throw new AnalysisException(s"Exec delete failed:" +
s" cannot translate expression to source filter: $f"))
}.toArray
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil

case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.SupportsDelete
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class DeleteFromTableExec(
table: SupportsDelete,
condition: Array[Filter]) extends LeafExecNode {

override protected def doExecute(): RDD[InternalRow] = {
table.deleteWhere(condition)
sparkContext.emptyRDD
}

override def output: Seq[Attribute] = Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.expressions.{Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteFromTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.BooleanType

Expand Down Expand Up @@ -51,6 +51,11 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) {
}
}

case DeleteFromTable(_, condition) =>
if (SubqueryExpression.hasSubquery(condition)) {
failAnalysis(s"Delete by condition with subquery is not supported: $condition")
}

case _ => // OK
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,42 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
}
}

test("DeleteFrom: basic") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t WHERE id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}

test("DeleteFrom: alias") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t tbl WHERE tbl.id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}

test("DeleteFrom: fail if has subquery") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test correlated subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that necessary to test correlated subquery? Because correlated subquery is a subset of subquery and we forbid subquery here, then correlated subquery is also forbidden.
My thought is later I want to add pre-execution subquery for DELETE, but correlated subquery is still forbidden, so we can modify the test cases at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds reasonable to me.

}

assert(spark.table(t).count === 3)
assert(exc.getMessage.contains("Delete by condition with subquery is not supported"))
}
}

private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
Expand Down
Loading