Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28351][SQL] Support DELETE in DataSource V2 #25115

Closed
wants to merge 16 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ statement
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
| RESET #resetConfiguration
| DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable
| unsupportedHiveNativeCommands .*? #failNativeCommand
;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.Filter;

/**
* A mix-in interface for {@link Table} delete support. Data sources can implement this
* interface to provide the ability to delete data from tables that matches filter expressions.
*/
public interface SupportsDelete {
xianyinxin marked this conversation as resolved.
Show resolved Hide resolved
/**
* Delete data from a data source table that matches filter expressions.
* <p>
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
* expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Implementations may reject a delete operation if the delete isn't possible without significant
* effort. For example, partitioned data sources may reject deletes that do not filter by
* partition columns because the filter may require rewriting files without deleted records.
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
* error message that identifies which expression was rejected.
*
* @param filters filter expressions, used to select rows to delete when all expressions match
* @throws IllegalArgumentException If the delete is rejected due to required effort
*/
void deleteWhere(Filter[] filters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,8 @@ class Analyzer(
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode if q.childrenResolved =>
resolveSubQueries(q, q.children)
case d: DeleteFromTable if d.childrenResolved =>
xianyinxin marked this conversation as resolved.
Show resolved Hide resolved
resolveSubQueries(d, d.children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ trait CheckAnalysis extends PredicateHelper {
// Only certain operators are allowed to host subquery expression containing
// outer references.
plan match {
case _: Filter | _: Aggregate | _: Project => // Ok
case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok
case other => failAnalysis(
"Correlated scalar sub-queries can only be used in a " +
s"Filter/Aggregate/Project: $plan")
Expand All @@ -594,9 +594,10 @@ trait CheckAnalysis extends PredicateHelper {

case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter => // Ok
case _: Filter | _: DeleteFromTable => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in a Filter: $plan")
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter/DeleteFromTable: $plan")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -338,6 +338,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
}

override def visitDeleteFromTable(
xianyinxin marked this conversation as resolved.
Show resolved Hide resolved
ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {

val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
val tableAlias = if (ctx.tableAlias() != null) {
val ident = ctx.tableAlias().strictIdentifier()
if (ident != null) { Some(ident.getText) } else { None }
} else {
None
}

DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression()))
}

/**
* Create a partition specification map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,13 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm
override val output = DescribeTableSchema.describeTableAttributes()
}

case class DeleteFromTable(
child: LogicalPlan,
condition: Expression) extends Command {

override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* Drop a table.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

case class DeleteFromStatement(
tableName: Seq[String],
tableAlias: Option[String],
condition: Expression)
extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}

object DataSourceV2Implicits {
implicit class TableHelper(table: Table) {
Expand All @@ -40,6 +40,15 @@ object DataSourceV2Implicits {
}
}

def asDeletable: SupportsDelete = {
table match {
case support: SupportsDelete =>
support
case _ =>
throw new AnalysisException(s"Table does not support deletes: ${table.name}")
}
}

def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability)

def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,8 @@ class AnalysisErrorSuite extends AnalysisTest {
val plan = Project(
Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()),
LocalRelation(a))
assertAnalysisError(plan, "Predicate sub-queries can only be used in a Filter" :: Nil)
assertAnalysisError(plan, "Predicate sub-queries can only be used" +
" in Filter/DeleteFromTable" :: Nil)
}

test("PredicateSubQuery is used is a nested condition") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand}
import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation}
Expand Down Expand Up @@ -173,9 +173,18 @@ case class DataSourceResolution(
// only top-level adds are supported using AlterTableAddColumnsCommand
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField))

case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) =>
throw new AnalysisException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this always throws AnalysisException, I think this case should be removed. Instead, the next case should match and the V2SessionCatalog should be used. If the table loaded by the v2 session catalog doesn't support delete, then conversion to physical plan will fail when asDeletable is called.

Then users can still call v2 deletes for formats like parquet that have a v2 implementation that will work.

FYI @brkyvz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @rdblue . Removed this case and fallback to sessionCatalog when resolveTables for DeleteFromTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worse to move this case from here to https://github.com/apache/spark/pull/25115/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R657 .

If we can't merge these 2 cases into one here, let's keep it as it was.

Copy link
Contributor Author

@xianyinxin xianyinxin Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, one purpose of removing the first case is we can execute delete on parquet format via this API (if we implement it later) as @rdblue mentioned. The key point here is we resolve the table use V2SessionCatalog as the fallback catalog. The original resolveTable doesn't give any fallback-to-sessionCatalog mechanism (if no catalog found, it will fallback to resolveRelation). So maybe we can modify resolveTable and let it treat V2SessionCatalog as a try option:

case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) =>
        maybeCatalog.orElse(sessionCatalog) match {
          case Some(catalogPlugin) =>
            loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
          case None =>
            u
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to update ResolveTables, though I do see that it would be nice to use ResolveTables as the only rule that resolves UnresolvedRelation for v2 tables.

There is already another rule that loads tables from a catalog, ResolveInsertInto.

I considered updating that rule and moving the table resolution part into ResolveTables as well, but I think it is a little cleaner to resolve the table when converting the statement (in DataSourceResolution), as @cloud-fan is suggesting.

One of the reasons to do this for the insert plans is that those plans don't include the target relation as a child. Instead, those plans have the data to insert as a child node, which means that the unresolved relation won't be visible to the ResolveTables rule.

Taking the same approach in this PR would also make this a little cleaner. If DeleteFrom didn't expose the relation as a child, it could be a UnaryNode and you wouldn't need to update some of the other rules to explicitly include DeleteFrom.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I rolled back the resolve rules for DeleteFromTable as it was as @cloud-fan suggested. For cases that like deleting from formats or V2SessionCatalog support, let's open another pr. And another pr for resolve rules is also need because I found other issues related with that. Does this sounds reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this case after #25402, which updates ResolveTable to fallback to v2 session catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw the code in #25402 . I think it's the best choice.

s"Delete from tables is not supported using the legacy / v1 Spark external catalog" +
s" API. Identifier: $table.")

case delete: DeleteFromStatement =>
val relation = UnresolvedRelation(delete.tableName)
val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
DeleteFromTable(aliased, delete.condition)

case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) =>
UnresolvedCatalogRelation(catalogTable)

}

object V1WriteProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalog.v2.StagingTableCatalog
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
Expand Down Expand Up @@ -222,6 +222,15 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil

case DeleteFromTable(r: DataSourceV2Relation, condition) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(condition).map {
f => DataSourceStrategy.translateFilter(f).getOrElse(
throw new AnalysisException(s"Exec delete failed:" +
s" cannot translate expression to source filter: $f"))
}.toArray
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil

case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.SupportsDelete
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class DeleteFromTableExec(
table: SupportsDelete,
condition: Array[Filter]) extends LeafExecNode {

override protected def doExecute(): RDD[InternalRow] = {
table.deleteWhere(condition)
sparkContext.emptyRDD
}

override def output: Seq[Attribute] = Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.expressions.{Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteFromTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.BooleanType

Expand Down Expand Up @@ -51,6 +51,11 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) {
}
}

case DeleteFromTable(_, condition) =>
if (SubqueryExpression.hasSubquery(condition)) {
failAnalysis(s"Delete by condition with subquery is not supported: $condition")
}

case _ => // OK
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,42 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
}
}

test("DeleteFrom: basic") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t WHERE id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}

test("DeleteFrom: alias") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t tbl WHERE tbl.id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}

test("DeleteFrom: fail if has subquery") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test correlated subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that necessary to test correlated subquery? Because correlated subquery is a subset of subquery and we forbid subquery here, then correlated subquery is also forbidden.
My thought is later I want to add pre-execution subquery for DELETE, but correlated subquery is still forbidden, so we can modify the test cases at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds reasonable to me.

}

assert(spark.table(t).count === 3)
assert(exc.getMessage.contains("Delete by condition with subquery is not supported"))
}
}

private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
Expand Down
Loading