Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25482][SQL] Avoid pushdown of subqueries to data source filters #22518

Closed
wants to merge 9 commits into from

Conversation

mgaido91
Copy link
Contributor

@mgaido91 mgaido91 commented Sep 21, 2018

What changes were proposed in this pull request?

An expressions with a subquery can be pushed down as a data source filter. Despite the filter is not actively used, this causes anyway a re-execution of the subquery becuase the ReuseSubquery optimization rule is ineffective in this case.

The PR avoids this problem by forbidding the push down of filters containing a subquery.

How was this patch tested?

added UT

@SparkQA
Copy link

SparkQA commented Sep 21, 2018

Test build #96428 has finished for PR 22518 at commit 7c75067.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 21, 2018

Test build #96427 has finished for PR 22518 at commit 36fa664.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
assert(getNumSortsInQuery(query5) == 1)
}
}

test("SPARK-25482: Reuse same Subquery in order to execute it only once") {
withTempView("t1", "t2", "t3") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for "t3".

@@ -166,7 +168,7 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]())
val sameResult = sameSchema.find(_.sameResult(sub.plan))
if (sameResult.isDefined) {
sub.withNewPlan(sameResult.get)
sub.withNewPlan(sameResult.get).withNewExprId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid double copy()? Or is it cleaner this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is cleaner this way. I don't expect this to happen very often (how many subquery can you have in a plan?) so I don't think it is an issue. But if there are cleaner options/solutions, I am open to suggestions, thanks.

@mgaido91
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Sep 22, 2018

Test build #96472 has finished for PR 22518 at commit ec72458.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

This can happen for instance when a filter containing a scalar subquery is pushed to a DataSource

hmm, how can this happen? I don't think a data source can handle a filter of subquery...

@mgaido91
Copy link
Contributor Author

hmm, how can this happen?

you can check the UT which reproduces the issue. The scalar subquery is pushed down as part of the filter GreaterThen

@gengliangwang
Copy link
Member

@mgaido91 Sorry but can you have a more detailed explanation in the PR description?
With the code changes, the predicate with subquery result can be push down into data source. Is this the main point of the PR? And why is that creating a new expr ID can fix it?

@mgaido91
Copy link
Contributor Author

@gengliangwang no, let me cite and explain the PR description. I am not sure how to improve it, but if you have suggestions I am happy to. The main point of the PR is to address an issue which arise when:

When a ExecSubqueryExpression is copied

Now the point is, can this condition happen? The answer is yes, and one situation in which this happens (as reported in the JIRA) is

when a filter containing a scalar subquery is pushed to a DataSource.

So in the plan we have two ExecSubqueryExpression each with a copy of the same SubqueryExec. The problem which arises in this condition is that:

ReuseSubquery becomes useless, as replacing the SubqueryExec is ignored since the new plan is equal to the previous one.

So this result in the subquery being executed twice (as the two SubqueryExec are distinct, despite they are the same).

@gengliangwang
Copy link
Member

@mgaido91 I see, thanks for the explanation!

@SparkQA
Copy link

SparkQA commented Nov 10, 2018

Test build #98681 has finished for PR 22518 at commit 144091f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

withTempView("t1", "t2") {
sql("create temporary view t1(a int) using parquet")
sql("create temporary view t2(b int) using parquet")
val plan = sql("select * from t2 where b > (select max(a) from t1)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry it has been a long time and I don't quite remember the context.

What was the problem we are trying to fix? This test looks nothing related to subquery reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, please can you check the PR description? I think the context is quite well explained there.

Anyway, as a quick summary: in this case b > (select max(a) from t1) is pushed down as a datasource filter. So we have 2 instances of b > (select max(a) from t1) and the result is not reused. It is not reused because the copied plan satisfies ==, so even if ReuseSubquery replaces it, then the change is ignored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we only have a problem when we have subquery in data source filter? If that's the case, I would suggest not pushdown subquery filter into data source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have this problem if we copy the same subquery. I can't think of any other case than filter push-down, but I may be wrong.

Forbidding to push down these filter may cause a perf regression, I am not sure it is the right solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any data source can support subquery filter? for data source v1/v2, the public Filter API does not support subquery. For file source, they don't support subquery filter either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also means the data source scan must wait until the subquery is finished

The subquery should be executed anyway sooner or later, right? So I don't see the problem here: am I missing something?

Ok, thanks, I'll follow your suggestion and forbid it here and create a new ticket about pushing it down to data sources. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subquery should be executed anyway sooner or later, right?

Yes, but we could execute scan and subquery at the same time (2 spark jobs running together), instead of executing them serialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could execute scan and subquery at the same time (

is this really possible? My understanding is that subqueries are executed before the plan they belong to (in SparkPlan.executeQuery). So my understanding is that when a subquery is running, the rest of the query is not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry I misread the code. Unless the subquery is rewritten into join, we must wait for all subqueries to be finished before executing the plan.

We can rewrite scalar subquery in data source filters into literal, to make it work with the filter pushdown API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exactly. That's what I meant. Shall I revert the changes to the previous PR? And in the scope of the new JIRA do the rewriting to literals? Thanks.

@mgaido91 mgaido91 changed the title [SPARK-25482][SQL] ReuseSubquery can be useless when the subqueries have the same id [SPARK-25482][SQL] Avoid pushdown of subqueries to data source filters Nov 12, 2018
@SparkQA
Copy link

SparkQA commented Nov 12, 2018

Test build #98734 has finished for PR 22518 at commit ef0a953.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -155,15 +155,14 @@ object FileSourceStrategy extends Strategy with Logging {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
}.filterNot(SubqueryExpression.hasSubquery)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do the filter before the map?

@@ -47,7 +47,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
case a: AttributeReference =>
a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
}
}
}.filterNot(SubqueryExpression.hasSubquery)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
assert(getNumSortsInQuery(query5) == 1)
}
}

test("SPARK-25482: Reuse same Subquery in order to execute it only once") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the test

@cloud-fan
Copy link
Contributor

I'd like to merge this simple PR first, to address the performance problem (unnecessary subquery execution).

Let's create a new ticket for subquery filter pushing to data source, and have more people to attend the discussion.

@cloud-fan
Copy link
Contributor

BTW can you include a simple benchmark to show this problem? e.g. just run a query in spark-shell, and post the result before and after this PR.

@mgaido91
Copy link
Contributor Author

@cloud-fan this is the benchmark:

(1 to 1000000).toSeq.toDF("a").write.save("/tmp/t1")
spark.read.load("/tmp/t1").createTempView("t1")
(1 to 2000).toSeq.toDF("b").write.save("/tmp/t2")
spark.read.load("/tmp/t2").createTempView("t2")
val plan = sql("select * from t2 where b > (select avg(a + 1) from t1)")
val t0 = System.nanoTime()
plan.show
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")

the result is:

Before PR: Elapsed time: 862499689ns
After  PR: Elapsed time: 914728641ns

The difference is very small because all the subqueries run in parallel. The execution time would be much more affected if there are several subqueries (our thread pool has 16 threads so a query like that but with 9 filters with subqueries would see a big performance gain after this PR).

@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, NamedExpression, PlanExpression}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks sorry, I missed it.

@@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
assert(getNumSortsInQuery(query5) == 1)
}
}

test("SPARK-25482: Forbid pushdown to dattasources of filters containing subqueries") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dattasources typo

@SparkQA
Copy link

SparkQA commented Nov 13, 2018

Test build #98775 has finished for PR 22518 at commit da3843e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2018

Test build #98777 has finished for PR 22518 at commit b414572.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2018

Test build #98778 has finished for PR 22518 at commit 56ed812.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2018

Test build #98779 has finished for PR 22518 at commit 52ae956.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 4b95562 Nov 13, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

An expressions with a subquery can be pushed down as a data source filter. Despite the filter is not actively used, this causes anyway a re-execution of the subquery becuase the `ReuseSubquery` optimization rule is ineffective in this case.

The PR avoids this problem by forbidding the push down of filters containing a subquery.
## How was this patch tested?

added UT

Closes apache#22518 from mgaido91/SPARK-25482.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ulysses-you added a commit that referenced this pull request Jul 31, 2023
…with scalar subquery

### What changes were proposed in this pull request?

Scalar subquery can be pushed down as data filter at runtime, since we always execute subquery first. Ideally, we can rewrite `ScalarSubquery` to `Literal` before pushing down filter. The main issue before we do not support that is `ReuseSubquery` is ineffective, see #22518. It is not a issue now.

For example:
```sql
SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)
```

### Why are the changes needed?

Improve peformance if data filter have scalar subquery.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes #41088 from ulysses-you/SPARK-43402.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Xiduo You <ulyssesyou@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…with scalar subquery

### What changes were proposed in this pull request?

Scalar subquery can be pushed down as data filter at runtime, since we always execute subquery first. Ideally, we can rewrite `ScalarSubquery` to `Literal` before pushing down filter. The main issue before we do not support that is `ReuseSubquery` is ineffective, see apache#22518. It is not a issue now.

For example:
```sql
SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)
```

### Why are the changes needed?

Improve peformance if data filter have scalar subquery.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes apache#41088 from ulysses-you/SPARK-43402.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Xiduo You <ulyssesyou@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants