Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24172][SQL] we should not apply operator pushdown to data source v2 many times #21230

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented May 3, 2018

What changes were proposed in this pull request?

In PushDownOperatorsToDataSource, we use transformUp to match PhysicalOperation and apply pushdown. This is problematic if we have multiple Filter and Project above the data source v2 relation.

e.g. for a query

Project
  Filter
    DataSourceV2Relation

The pattern match will be triggered twice and we will do operator pushdown twice. This is unnecessary, we can use mapChildren to only apply pushdown once.

How was this patch tested?

existing test

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented May 3, 2018

Test build #90140 has finished for PR 21230 at commit e224f8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

var pushed = false
plan transformDown {
// PhysicalOperation guarantees that filters are deterministic; no need to check
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) if !pushed =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that possible one plan has multiple PhysicalOperation?

Copy link
Contributor Author

@cloud-fan cloud-fan May 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PhysicalOperation just accumulates project and filter above a specific node, if we transform down a tree, and only transform once, we will never hit PhysicalOperation more than once.

@rdblue
Copy link
Contributor

rdblue commented May 7, 2018

So it was the use of transformUp that caused this rules to match multiple times, right? In that case, would it make more sense to do what @marmbrus suggested in the immutable plan PR and make this a strategy instead of an optimizer rule?

That approach fits with what I suggested on #21118. We could have the scan node handle the filter and the projection so that it doesn't matter whether the source produces UnsafeRow or InternalRow.

@rdblue
Copy link
Contributor

rdblue commented May 8, 2018

@cloud-fan, I opened #21262 that is similar to this, but does pushdown when converting to a physical plan. You might like that as an alternative because it cleans up DataSourceV2Relation quite a bit and adds output to the case class arguments like other relations.

The drawback to that approach that I had forgotten about is that it breaks computeStats because that runs on the optimized plan (but this affects all the other code paths as well).

Up to you how to continue with this work, I just think we should consider the other approach since it solves a few problems. And computeStats is something we should update to work on physical plans anyway, right? Just let me know how you want to move forward. If you want to pull that commit into this PR, I'll close the other one.

@cloud-fan
Copy link
Contributor Author

Hi @rdblue , thanks for your new approach! Like you said, the major problem is about statistics. This is unfortunately a problem of Spark's CBO design: the statistics should belong to physical node but it currently belongs to logical node.

For file-based data sources, since they are builtin sources, we can create rules to update statistics at logical phase, i.e. PruneFileSourcePartitions. But for external sources like iceberg, we would not be able to update statistics before planning, and shuffle join may be wrongly planned while broadcast join is applicable. In other words, users may need to create custom optimizer rules to make their data source work well.

That said, I do like your approach if we can fix the statistics problem first. I'm not sure how hard and how soon it can be fixed, cc @wzhfy

Before that, I'd like to still keep the pushdown logic in optimizer and left the hard work to Spark instead of users. What do you think?

@rdblue
Copy link
Contributor

rdblue commented May 8, 2018

Sounds good to me. Lets plan on getting this one in to fix the current problem, and commit the other approach when stats are fixed.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90436 has finished for PR 21230 at commit e224f8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented May 10, 2018

+1 (assuming tests pass)

@@ -23,17 +23,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
import org.apache.spark.sql.catalyst.rules.Rule

object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
override def apply(
plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan.mapChildren {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the PR description (transformDown -> mapChildren), too?

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90467 has finished for PR 21230 at commit f73440c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@SparkQA
Copy link

SparkQA commented May 11, 2018

Test build #90488 has finished for PR 21230 at commit 953cd7a.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented May 11, 2018

retest this please

@SparkQA
Copy link

SparkQA commented May 11, 2018

Test build #90506 has finished for PR 21230 at commit 953cd7a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM Thanks! Merged to master

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@asfgit asfgit closed this in 928845a May 11, 2018
robert3005 pushed a commit to palantir/spark that referenced this pull request Jun 24, 2018
…ce v2 many times

## What changes were proposed in this pull request?

In `PushDownOperatorsToDataSource`, we use `transformUp` to match `PhysicalOperation` and apply pushdown. This is problematic if we have multiple `Filter` and `Project` above the data source v2 relation.

e.g. for a query
```
Project
  Filter
    DataSourceV2Relation
```

The pattern match will be triggered twice and we will do operator pushdown twice. This is unnecessary, we can use `mapChildren` to only apply pushdown once.

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21230 from cloud-fan/step2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants