Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28351][SQL] Support DELETE in DataSource V2 #25115

Closed
wants to merge 16 commits into from

Conversation

xianyinxin
Copy link
Contributor

What changes were proposed in this pull request?

This pr adds DELETE support for V2 datasources. As a first step, this pr only support delete by source filters:

void delete(Filter[] filters);

which could not deal with complicated cases like subqueries.

Since it's uncomfortable to embed the implementation of DELETE in the current V2 APIs, a new mix-in of datasource is added, which is called SupportsMaintenance, similar to SupportsRead and SupportsWrite. A datasource which can be maintained means we can perform DELETE/UPDATE/MERGE/OPTIMIZE on the datasource, as long as the datasource implements the necessary mix-ins.

How was this patch tested?

new test case.

Please review https://spark.apache.org/contributing.html before opening a pull request.

@xianyinxin
Copy link
Contributor Author

cc @cloud-fan @rdblue

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

@xianyinxin thanks for working on it.

A few general questions:

  1. do we need individual interfaces for UPDATE/DELETE/... or a single interface?
  2. Is the builder pattern applicable here? To me it's an overkill to simple stuff like DELETE. We may need it for MERGE in the future.

also cc @brkyvz @jose-torres

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jul 11, 2019

I have no idea what is the meaning of "maintenance" here. Could you elaborate a bit? UPDATE and DELETE are just DMLs.
(UPSERT would be needed for streaming query to restore UPDATE mode in Structured Streaming, so we may add it eventually, then for me it's unclear where we can add SupportUpsert, directly, or under maintenance.)

Sorry for the dumb question if it's just obvious one for others as well.

@jose-torres
Copy link
Contributor

I have to agree with the maintenance thing. I get that it's de-acronymizing DML (although I think technically the M is supposed to be "manipulation"), but it's really confusing to draw a distinction between writes and other types of DML. If DELETE can't be one of the string-based capabilities, I'm not sure SupportsWrite makes sense as an interface.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107538 has finished for PR 25115 at commit 2d60f57.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Jul 11, 2019

@xianyinxin, thanks for working on this. Is there a design doc to go with the interfaces you're proposing?

@xianyinxin
Copy link
Contributor Author

@xianyinxin thanks for working on it.

A few general questions:

  1. do we need individual interfaces for UPDATE/DELETE/... or a single interface?
  2. Is the builder pattern applicable here? To me it's an overkill to simple stuff like DELETE. We may need it for MERGE in the future.

also cc @brkyvz @jose-torres

Thanks @cloud-fan .

  1. UPDATE and DELETE is similar, to me make the two in a single interface seems OK.
  2. Yes, the builder pattern is considered for complicated case like MERGE. Shall we just simplify the builder for UPDATE/DELETE now or keep it thus we can avoid change the interface structure if we want support MERGE in the future? As SupportsMaintenance#newMaintainerBuilder would return a MaintenanceBuilder, remove the builder means we let it return Maintainer directly. If some days later we need a builder, we then need to modify the interface. My prefer is keep it, what's your opinion?

@xianyinxin
Copy link
Contributor Author

I have no idea what is the meaning of "maintenance" here. Could you elaborate a bit? UPDATE and DELETE are just DMLs.
(UPSERT would be needed for streaming query to restore UPDATE mode in Structured Streaming, so we may add it eventually, then for me it's unclear where we can add SupportUpsert, directly, or under maintenance.)

Sorry for the dumb question if it's just obvious one for others as well.

Thank you for the comments @HeartSaVioR . Maybe maintenance is not a good word here. Why I propose to introduce a maintenance interface is that it's hard to embed the UPDATE/DELETE, or UPSERTS or MERGE to the current SupportsWrite framework, because SupportsWrite considered insert/overwrite/append data which backed up by the spark RDD distributed execution framework, i.e., by submitting a spark job. The pattern is fix, explicit, and suitable for insert/overwrite/append data. However, UPDATE/DELETE or UPSERTS/MERGE are different:

  1. for simple case like DELETE by filters in this pr, just pass the filter to datasource is more suitable, a 'spark job' is not needed.
  2. for complicated case like UPSERTS or MERGE, one 'spark job' is not enough. This kind of work need to be splited to multi steps, and ensure the atomic of the whole logic goes out of the ability of current commit protocol for insert/overwrite/append data.
    As for why implement DELETE/UPDATE, not just UPSERTS, the reason is we want to introduce kudu as a datasource. Thus spark SQL can deal with the business which may update/delete data with high frequency. Some other datasource like delta also supports DELETE/UPDATE, we need a SQL entrance for that.
    I'm not sure if i answered your question @HeartSaVioR

@xianyinxin
Copy link
Contributor Author

xianyinxin commented Jul 12, 2019

I have to agree with the maintenance thing. I get that it's de-acronymizing DML (although I think technically the M is supposed to be "manipulation"), but it's really confusing to draw a distinction between writes and other types of DML. If DELETE can't be one of the string-based capabilities, I'm not sure SupportsWrite makes sense as an interface.

Thank you for the comments @jose-torres . I'm not sure if i get you, pls correct me if I'm wrong. "maintenance" is not the M in DML, even though the maintenance thing and write are all DMLs. Why I separate "maintenance" from SupportsWrite, pls see my above comments.
Maybe we can merge SupportsWrite and SupportsMaintenance, and add a new MaintenanceBuilder(or maybe a better word) in SupportsWrite? cc @cloud-fan

@xianyinxin
Copy link
Contributor Author

@xianyinxin, thanks for working on this. Is there a design doc to go with the interfaces you're proposing?

Thank you for the comments @rdblue . Sorry I don't have a design doc, as for the complicated case like MERGE we didn't make the work flow clear. I can prepare one but it must be with much uncertainty. BTW, do you have some idea or suggestion on this?

@SparkQA
Copy link

SparkQA commented Jul 15, 2019

Test build #107680 has finished for PR 25115 at commit bc9daf9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Jul 17, 2019

@xianyinxin, I think we should consider what kind of delete support you're proposing to add, and whether we need to add a new builder pattern.

I don't think that we need one for DELETE FROM. Above, you commented:

for simple case like DELETE by filters in this pr, just pass the filter to datasource is more suitable, a 'spark job' is not needed.

I think we may need a builder for more complex row-level deletes, but if the intent here is to pass filters to a data source and delete if those filters are supported, then we can add a more direct trait to the table, SupportsDelete. I have an open PR that takes this approach: #21308.

Alternatively, we could support deletes using SupportsOverwrite, which allows passing delete filters. An overwrite with no appended data is the same as a delete. The drawback to this is that the source would use SupportsOverwrite but may only support delete. We could handle this by using separate table capabilities.

If either of those approaches would work, then we don't need to add a new builder or make decisions that would affect the future design of MERGE INTO or UPSERT. For row-level operations like those, we need to have a clear design doc. But if the need here is to be able to pass a set of delete filters, then that is a much smaller change and we can move forward with a simple trait.

What do you think? Would you like to discuss this in the next DSv2 sync in a week? I can add this to the topics.

@xianyinxin
Copy link
Contributor Author

Thank you @rdblue , pls see the inline comments.

I think we may need a builder for more complex row-level deletes, but if the intent here is to pass filters to a data source and delete if those filters are supported, then we can add a more direct trait to the table, SupportsDelete. I have an open PR that takes this approach: #21308.

My thoughts is to provide a DELETE support in DSV2, but a general solution maybe a little complicated. We considered delete_by_filter and also delete_by_row, both have pros and cons. Delete_by_filter is simple, and more effcient, while delete_by_row is more powerful but needs careful design at V2 API spark side. So I think we
may provide a hybrid solution which contains both deleteByFilter and deleteByRow. This PR is a init consideration of this plan.

Alternatively, we could support deletes using SupportsOverwrite, which allows passing delete filters. An overwrite with no appended data is the same as a delete. The drawback to this is that the source would use SupportsOverwrite but may only support delete. We could handle this by using separate table capabilities.

As you pointed, and metioned above, if we want to provide a general DELETE support, or a future consideration of MERGE INTO or UPSERTS, delete via SupportOverwrite is not feasible, so we can rule out this option.

We could handle this by using separate table capabilities.

Aggree. And I had a off-line discussion with @cloud-fan. We discussed the SupportMaintenance, which makes people feel uncomfirtable. The table capabilities maybe a solution.

If either of those approaches would work, then we don't need to add a new builder or make decisions that would affect the future design of MERGE INTO or UPSERT. For row-level operations like those, we need to have a clear design doc. But if the need here is to be able to pass a set of delete filters, then that is a much smaller change and we can move forward with a simple trait.

What do you think about the hybrid solution? Is that reasonable?

What do you think? Would you like to discuss this in the next DSv2 sync in a week? I can add this to the topics.

Thank you very much, Ryan. I'd like to attend the sync next week, pls add me in the mail thread and add this topic.

@rdblue
Copy link
Contributor

rdblue commented Jul 24, 2019

We considered delete_by_filter and also delete_by_row, both have pros and cons.
if we want to provide a general DELETE support, or a future consideration of MERGE INTO or UPSERTS, delete via SupportOverwrite is not feasible

Delete by expression is a much simpler case than row-level deletes, upserts, and merge into. Since the goal of this PR is to implement delete by expression, I suggest focusing on that so we can get it in.

If you want to built the general solution for merge into, upsert, and row-level delete, that's a much longer design process. Since this doesn't require that process, let's separate the two. To do that, I think we should add SupportsDelete for filter-based deletes, or re-use SupportsOverwrite.

The table capabilities maybe a solution.

My proposal was to use SupportsOverwrite to pass the filter and capabilities to prevent using that interface for overwrite if it isn't supported. I don't think that is the same thing as what you're talking about.

What do you think about the hybrid solution? Is that reasonable?

I see no reason for a hybrid solution. Filter deletes are a simpler case and can be supported separately. When filters match expectations (e.g., partition filters for Hive, any filter for JDBC) then the source can use them. Otherwise filters can be rejected and Spark can fall back to row-level deletes, if those are supported. I don't see a reason to block filter-based deletes because those are not going to be the same thing as row-level deletes.

@cloud-fan
Copy link
Contributor

I vote for SupportsDelete with a simple method deleteWhere. We can have the builder API later when we support the row-level delete and MERGE.

There is a similar PR opened a long time ago: #21308 . Maybe we can borrow the doc/comments from it? cc @xianyinxin

@xianyinxin
Copy link
Contributor Author

xianyinxin commented Jul 29, 2019

Hi @cloud-fan @rdblue , I refactored the code according to your suggestions. Now SupportsDelete is a simple and straightforward interface of DSV2, which can also be extended in future for builder mode.

@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108322 has finished for PR 25115 at commit 620e6f5.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

protected def findReferences(value: Any): Array[String] = value match {
case f: Filter => f.references
case _ => Array.empty
}

protected def quoteIdentifier(name: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use CatalogV2Implicits to get the quoted method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is borrowed from org.apache.spark.sql.catalyst.util.quoteIdentifier which is a package util, while CatalogV2Implicits.quoted is not a public util function. We'd better unify the two, I think.
However, this code is introduced by the needs in the delete test case. Now the test code is updated according to your suggestion below, which left this function (sources.filter.sql) unused. I have removed this function in the latest code. If we need this function in future (like translating filters to sql string in jdbc), we then submit a new pr.

}
val filterStr =
filters.map {
filter => filter.sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is over-complicated to add a conversion from Filter to a SQL string just so this can parse that filter back into an Expression. I'd prefer a conversion back from Filter to Expression, but I don't think either one is needed.

The overwrite support can run equality filters, which is enough for matching partition keys. I recommend using that and supporting only partition-level deletes in test tables. That way, the table also rejects some delete expressions that are not on partition columns and we can add tests that validate Spark's behavior for those cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Ideally the real implementation should build its own filter evaluator, instead of using Spark Expression. See ParquetFilters as an example.

We don't need a complete implementation in the test. The idea of only supporting equality filters and partition keys sounds pretty good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rdblue @cloud-fan . I've updated the code according to your suggestions.

@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108329 has finished for PR 25115 at commit b9d8bb7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2019

Test build #108512 has finished for PR 25115 at commit db74032.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

condition: Expression) extends Command {

override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a noop override?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the noop override.


override def output: Seq[Attribute] = Seq.empty

override def children: Seq[LogicalPlan] = Seq.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, noop override

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

condition: Expression)
extends ParsedStatement {

override def output: Seq[Attribute] = Seq.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, noop override

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -309,6 +322,15 @@ case class DataSourceResolution(
orCreate = replace.orCreate)
}

private def convertDeleteFrom(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can inline it. It's short and used only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test correlated subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that necessary to test correlated subquery? Because correlated subquery is a subset of subquery and we forbid subquery here, then correlated subquery is also forbidden.
My thought is later I want to add pre-execution subquery for DELETE, but correlated subquery is still forbidden, so we can modify the test cases at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds reasonable to me.

@SparkQA
Copy link

SparkQA commented Aug 9, 2019

Test build #108872 has finished for PR 25115 at commit e68fba2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -173,6 +173,19 @@ case class DataSourceResolution(
// only top-level adds are supported using AlterTableAddColumnsCommand
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField))

case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) =>
throw new AnalysisException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this always throws AnalysisException, I think this case should be removed. Instead, the next case should match and the V2SessionCatalog should be used. If the table loaded by the v2 session catalog doesn't support delete, then conversion to physical plan will fail when asDeletable is called.

Then users can still call v2 deletes for formats like parquet that have a v2 implementation that will work.

FYI @brkyvz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @rdblue . Removed this case and fallback to sessionCatalog when resolveTables for DeleteFromTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worse to move this case from here to https://github.com/apache/spark/pull/25115/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R657 .

If we can't merge these 2 cases into one here, let's keep it as it was.

Copy link
Contributor Author

@xianyinxin xianyinxin Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, one purpose of removing the first case is we can execute delete on parquet format via this API (if we implement it later) as @rdblue mentioned. The key point here is we resolve the table use V2SessionCatalog as the fallback catalog. The original resolveTable doesn't give any fallback-to-sessionCatalog mechanism (if no catalog found, it will fallback to resolveRelation). So maybe we can modify resolveTable and let it treat V2SessionCatalog as a try option:

case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) =>
        maybeCatalog.orElse(sessionCatalog) match {
          case Some(catalogPlugin) =>
            loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
          case None =>
            u
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to update ResolveTables, though I do see that it would be nice to use ResolveTables as the only rule that resolves UnresolvedRelation for v2 tables.

There is already another rule that loads tables from a catalog, ResolveInsertInto.

I considered updating that rule and moving the table resolution part into ResolveTables as well, but I think it is a little cleaner to resolve the table when converting the statement (in DataSourceResolution), as @cloud-fan is suggesting.

One of the reasons to do this for the insert plans is that those plans don't include the target relation as a child. Instead, those plans have the data to insert as a child node, which means that the unresolved relation won't be visible to the ResolveTables rule.

Taking the same approach in this PR would also make this a little cleaner. If DeleteFrom didn't expose the relation as a child, it could be a UnaryNode and you wouldn't need to update some of the other rules to explicitly include DeleteFrom.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I rolled back the resolve rules for DeleteFromTable as it was as @cloud-fan suggested. For cases that like deleting from formats or V2SessionCatalog support, let's open another pr. And another pr for resolve rules is also need because I found other issues related with that. Does this sounds reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this case after #25402, which updates ResolveTable to fallback to v2 session catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw the code in #25402 . I think it's the best choice.

identifier: Identifier,
delete: DeleteFromStatement): DeleteFromTable = {
val relation = UnresolvedRelation(delete.tableName)
val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: one-line map expressions should use (...) instead of {...}, like this:

delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@rdblue
Copy link
Contributor

rdblue commented Aug 12, 2019

This looks really close to being ready to me. Thanks for fixing the Filter problem!

@SparkQA
Copy link

SparkQA commented Aug 13, 2019

Test build #109021 has finished for PR 25115 at commit 792c36b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

LGTM except #25115 (comment)

@SparkQA
Copy link

SparkQA commented Aug 13, 2019

Test build #109038 has finished for PR 25115 at commit 792c36b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2019

Test build #109072 has finished for PR 25115 at commit bbf5156.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xianyinxin
Copy link
Contributor Author

It seems the failure pyspark test has nothing to do with this pr.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 14, 2019

Test build #109089 has finished for PR 25115 at commit bbf5156.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xianyinxin
Copy link
Contributor Author

retest this please

1 similar comment
@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 14, 2019

Test build #109105 has finished for PR 25115 at commit bbf5156.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@cloud-fan cloud-fan closed this in 2eeb25e Aug 14, 2019
@xianyinxin
Copy link
Contributor Author

Thank you @cloud-fan @rdblue for reviewing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
8 participants