Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33608][SQL] Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates #30555

Closed
wants to merge 1 commit into from

Conversation

aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds logic to handle DELETE/UPDATE/MERGE plans in PullupCorrelatedPredicates.

Why are the changes needed?

Right now, PullupCorrelatedPredicates applies only to filters and unary nodes. As a result, correlated predicates in DELETE/UPDATE/MERGE are not rewritten.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

The PR adds 3 new test cases.

@github-actions github-actions bot added the SQL label Nov 30, 2020
@aokolnychyi
Copy link
Contributor Author

@viirya @sunchao @dbtsai @dongjoon-hyun @cloud-fan, could you take a look whenever you get a minute?

@dongjoon-hyun
Copy link
Member

Sure, @aokolnychyi !

@SparkQA
Copy link

SparkQA commented Nov 30, 2020

Test build #132006 has started for PR 30555 at commit b92852d.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. (Pending CIs).

Thank you, @aokolnychyi .

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. BTW, these commands are not supported yet, right?

@HyukjinKwon
Copy link
Member

cc @maryannxue as well.

@@ -328,6 +328,8 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode =>
rewriteSubQueries(q, q.children)
case s: SupportsSubquery =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment above this line? To be honest, it is hard to tell that this trait means UPDATE/MERGE/DELETE.

Also, I think this change is just part of the whole changes for supporting the subquery in UPDATE/MERGE/DELETE. We need the other changes in Analyzer and Optimizer rules. For example, CheckAnalysis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment above this line? To be honest, it is hard to tell that this trait means UPDATE/MERGE/DELETE.

Sure, what kind of comment would make sense? SupportsSubquery seems generic to me and may cover different rules in the future. Here, I match the behavior in the analyzer.

Also, I think this change is just part of the whole changes for supporting the subquery in UPDATE/MERGE/DELETE. We need the other changes in Analyzer and Optimizer rules. For example, CheckAnalysis.

You are right it is the first step and potentially more changes will be needed. At the same time, I think we've updated the analyzer already. Here is what we have in CheckAnalysis:

// Only certain operators are allowed to host subquery expression containing
// outer references.
plan match {
   case _: Filter | _: Aggregate | _: Project | _: SupportsSubquery => // Ok
   case other => failAnalysis(
       "Correlated scalar sub-queries can only be used in a " +
       s"Filter/Aggregate/Project and a few commands: $plan")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to implement SupportsSubquery in other nodes and remove UnaryNode from here, I think the comment above may be sufficient (with minor tweaks once we remove UnaryNode).

@gatorsmile
Copy link
Member

cc @dilipbiswal

@@ -328,6 +328,8 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
case q: UnaryNode =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make Filter, Aggregate and Project extend SupportsSubquery and only match SupportsSubquery here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. We can also simplify the check inside CheckAnalysis in a follow-up PR.

Let me submit a separate PR for this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we can get this one in first. How does it sound, @cloud-fan?

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 478fb7f Dec 1, 2020
@aokolnychyi
Copy link
Contributor Author

Thanks everyone for the review! I've created SPARK-33624 to extend SupportsSubquery in Filter, Aggregate and Project.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Dec 1, 2020

Actually, I think there are places where we distinguish Aggregate and SupportsSubquery so we may not unify those. For example, CheckAnalysis.

@cloud-fan
Copy link
Contributor

I think we need a util method to match Project/Filter/Aggregate/SupportsSubquery, so that we don't make the same mistake due to code inconsistency (CheckAnalysis handles SupportsSubquery but the rule does not)

@cloud-fan
Copy link
Contributor

cloud-fan commented Jan 20, 2021

After a second look, I'm a bit worried about this half-baked solution. The correlated subquery handling is split into 3 steps in general:

  1. CheckAnalysis makes sure correlated subquery can only exist in SupportsSubquery, Filter, and a few other operators.
  2. PullupCorrelatedPredicates pulls up the outer references in the correlated subquery to the root node. It handles SupportsSubquery and UnaryNode.
  3. RewriteCorrelatedScalarSubquery and RewritePredicateSubquery rewrite correlated subquery to join. They only handle Filter, Aggregate and Project.

I have a hard time imagining how we can rewrite UPDATE/DELETE/MERGE commands with correlated subquery to joins, and start to doubt if this is the right direction to go. Before this PR, SupportsSubquery is mostly a marker-trait, to let CheckAnalysis not get in the way (fail UPDATE/DELETE/MERGE commands with correlated subquery). We assume users would add catalyst rules and/or provide proper UPDATE/DELETE/MERGE physical plans to support correlated subquery. Now PullupCorrelatedPredicates can get in the way as well.

@aokolnychyi can you share your plan of supporting UPDATE/DELETE/MERGE commands with correlated subquery? It's better to leave this half-baked state ASAP, by either reverting this patch or finishing the feature completely.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jan 21, 2021

I fully understand your concern, @cloud-fan . @aokolnychyi is working on this area actively.

If you have an idea for the better and cleaner solution, could you share it as a working example? We can adjust to it accordingly. As of now, I'm reluctant to discuss revert these without feasible alternatives.

start to doubt if this is the right direction to go.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jan 21, 2021

We can switch to your new suggestion (PR) if it's applicable ASAP.
cc @rdblue , @dbtsai , @viirya , @holdenk , @sunchao

@cloud-fan
Copy link
Contributor

UPDATE/DELETE/MERGE are just logical plans in Spark, we need third-party libraries or vendors to provide proper implementations. So it's not about fully support this feature in Spark (as Spark can't), but about what Spark can do to make it easier for others to support this feature.

Before this PR, the UPDATE/DELETE/MERGE implementation (physical plans) is fully responsible to handle correlated subqueries, as correlated subqueries inside UPDATE/DELETE/MERGE are not decorrelated. As an example, in physical plan's doExecute method, people can put UPDATE/DELETE/MERGE conditions in filter and build a DataFrame to evaluate the condition and collect the result.

After this PR, correlated subqueries inside UPDATE/DELETE/MERGE are half-decorrelated. I don't know how UPDATE/DELETE/MERGE implementation can handle it. At least our internal UPDATE/DELETE/MERGE implementation is broken after this commit. If you guys have a good idea about how to handle half-decorrelated correlated subqueries, let's document it so that others can follow.

@dongjoon-hyun
Copy link
Member

If you can, could you elaborate about the detail of conflicts or the reason of brokerage, please?

At least our internal UPDATE/DELETE/MERGE implementation is broken after this commit.

@aokolnychyi
Copy link
Contributor Author

Sorry, I missed this discussion. Let me take a look.

@aokolnychyi
Copy link
Contributor Author

I think Spark is actually capable of rewriting DELETE/UPDATE/MERGE operations and should do that in the future instead of delegating that to data source implementations. In my view, data sources just need to know which records to modify and Spark should be responsible for executing a distributed query to determine that.

Spark may rewrite the plan differently based on whether a data source supports row-level or file-level changes but that should be it.

Let me show how a DELETE statement with subqueries can be represented as a filter/project/join.

DELETE FROM t WHERE id IN (SELECT * FROM deleted_id)

This DELETE can be rewritten by Spark as below for data sources that support file-level updates only. It basically queries a set of files, filters out records to be deleted, writes new files and replaced the old files with new ones.

ReplaceData (a node that replaces data) 
+- Project [id, dep]
   +- Filter NOT (id IN (list []) (a filter with our subquery)
      +- RelationV2[id#48, dep#49]

Where the filter, in turn, could be converted into a join by RewritePredicateSubquery.

ReplaceData (a node that replaces data) 
+- Project [id, dep]
   +- Filter NOT (exists <=> true)
      +- Join ExistenceJoin(exists), (id = value)
         :- RelationV2[id, dep]
         +- LocalRelation [value]

For the last step to happen, we need to handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates.

I thought there was enough consensus that Spark should rewrite DELETE/UPDATE/MERGE but it was not clear how. That's why I went ahead and added SupportsSubquery to PullupCorrelatedPredicates as a first step.

That being said, I do accept the criticism that it is half done and I would not object if we want to revert the change from 3.1. I'd be still interested to know more about how it breaks other rules, though. Could you provide a bit more details, @cloud-fan?

A design doc is being prepared but it is not ready yet. I would like to finish with the required distribution and ordering first.

@cloud-fan
Copy link
Contributor

I can't refer to code in our private repo, but the problem is after PullupCorrelatedPredicates, the correlated scalar subquery has more output columns (because the outer references are pulled up), then we can't use the DELETE/UPDATE/MERGE conditions to build Dataset, as Analyzer requires that scalar subquery can only have one output column.

We've reverted this commit internally and are unblocked, but I'm not sure if there are others implementing DELETE/UPDATE/MERGE like us and get broken. If no other complaints I'm OK to not revert it.

I agree with the plan rewriting approach, and I'm looking forward to your design doc!

@dongjoon-hyun
Copy link
Member

Thank you, @aokolnychyi and @cloud-fan .

@aokolnychyi
Copy link
Contributor Author

Thanks for the context, @cloud-fan! I think we will want this rule eventually but I would not object reverting this from 3.1 if the community thinks that's safer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants