Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23405] Generate additional constraints for Join's children #20670

Closed
wants to merge 8 commits into from

Conversation

KaiXinXiaoLei
Copy link

@KaiXinXiaoLei KaiXinXiaoLei commented Feb 25, 2018

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)
I run a sql: select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number, The ls table is a small table ,and the number is one. The catalog_sales table is a big table, and the number is 10 billion. The task will be hang up. And i find the many null values of cs_order_number in the catalog_sales table. I think the null value should be removed in the logical plan.

== Optimized Logical Plan ==
Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
:- Project cs_order_number#1
: +- Filter isnotnull(cs_order_number#1)
: +- MetastoreRelation 100t, ls
+- Project cs_order_number#22
+- MetastoreRelation 100t, catalog_sales

Now, use this patch, the plan will be:

== Optimized Logical Plan ==
Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
:- Project cs_order_number#1
: +- Filter isnotnull(cs_order_number#1)
: +- MetastoreRelation 100t, ls
+- Project cs_order_number#22
: +- Filter isnotnull(cs_order_number#22)
:+- MetastoreRelation 100t, catalog_sales

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

@KaiXinXiaoLei KaiXinXiaoLei changed the title add constranits [SPARK-23405] Add constranits Feb 25, 2018
@SparkQA
Copy link

SparkQA commented Feb 25, 2018

Test build #87648 has finished for PR 20670 at commit 705ed46.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@KaiXinXiaoLei
Copy link
Author

@srowen @wangyum help me review, thanks.

@KaiXinXiaoLei
Copy link
Author

KaiXinXiaoLei commented Feb 25, 2018

@SparkQA i think this error is not caused by my patch. retest this please

@SparkQA
Copy link

SparkQA commented Feb 25, 2018

Test build #87651 has finished for PR 20670 at commit 705ed46.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Feb 25, 2018

This is still lacking detail about 'why'. It's not my area either. I think you should not have reopened this.

@KaiXinXiaoLei
Copy link
Author

KaiXinXiaoLei commented Feb 26, 2018

@srowen i redescribe the problem. Now i hive a small table ls with one row , and a big table catalog_sales with One hundred billion rows. And in the big table, the non null value about cs_order_number field has one million.
Then i join this tables with the query:select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number. My job is running, and there has been a data skew. Then i find the null value cause this phenomenon.
The join condition is ls.cs_order_number = cs.cs_order_number. In the Optimized Logical Plan, the left table has "Filter isnotnull(cs_order_number#1)" action, so i think the right table should have “Filter isnotnull” action. Then the right table will filter null value of cs_order_number field firstly , and join with left table secondly. So the data skew will not be caused by null value.
Using this idea, my sql runs success.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That description makes more sense, but then that's how this PR/JIRA should be described from the start. Does the order number column have a non-null constraint? sounds like that filter can be added to other tables in certain types of joins like an inner join or left join.

I don't know enough to review this change though.

@@ -29,12 +29,26 @@ trait QueryPlanConstraints { self: LogicalPlan =>
*/
lazy val constraints: ExpressionSet = {
if (conf.constraintPropagationEnabled) {
var relevantOutPutSet: AttributeSet = outputSet
constraints.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eq isn't used

var relevantOutPutSet: AttributeSet = outputSet
constraints.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
if (l.references.subsetOf(relevantOutPutSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid computing each subsetOf twice here.

case eq @ EqualTo(l: Attribute, r: Attribute) =>
if (l.references.subsetOf(relevantOutPutSet)
&& !r.references.subsetOf(relevantOutPutSet)) {
relevantOutPutSet = relevantOutPutSet.++(r.references)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ++ syntax, rather than write it as a method invocation.

@cloud-fan
Copy link
Contributor

cloud-fan commented Feb 26, 2018

Good catch! This is a real problem, but the fix looks hacky.

By definition, I think plan.contraints should only include constraints that refer to plan.output, as that's the promise a plan can make to its parent. However, join is special as Join.condition can refer to both of the join sides, and we add the constraints to Join.condition, which is kind of we are making a promise to Join's children, not parent. My proposal is to update QueryPlanConstraints to separate constraints:

  lazy val constraints: ExpressionSet = {
    if (conf.constraintPropagationEnabled) {
      allConstraints.filter { c =>
        c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
      }
    } else {
      ExpressionSet(Set.empty)
    }
  }

  lazy val allConstraints = ExpressionSet(validConstraints
          .union(inferAdditionalConstraints(validConstraints))
          .union(constructIsNotNullConstraints(validConstraints)))

Then we can call plan.allConstraints when inferring contraints for join.

@jiangxb1987
Copy link
Contributor

jiangxb1987 commented Feb 26, 2018

Agree with that @cloud-fan proposed to have constraints for a plan and the children. However, that requires a relative wider change as well as a fine set of test cases, please don't be hesitate to ask for help if you run into any issues working on this.

@jiangxb1987
Copy link
Contributor

Also, a better title for this PR would be:

Generate additional constraints for Join's children

@KaiXinXiaoLei KaiXinXiaoLei changed the title [SPARK-23405] Add constranits [SPARK-23405] Generate additional constraints for Join's children Feb 27, 2018
@KaiXinXiaoLei
Copy link
Author

@cloud-fan @srowen @jiangxb1987 i have changed the code and title , please help me review. Thanks.

@jiangxb1987
Copy link
Contributor

You shall also add test cases.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87691 has finished for PR 20670 at commit f44a92a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -27,16 +27,15 @@ trait QueryPlanConstraints { self: LogicalPlan =>
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment belongs to constraints not allConstraints

@cloud-fan
Copy link
Contributor

LGTM except we should add a test

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87726 has finished for PR 20670 at commit 1e0f78a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87727 has finished for PR 20670 at commit f7d764e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* An [[ExpressionSet]] that contains an additional set of constraints about equality
* constraints and `isNotNull` constraints.
*/
lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be guarded by constraintPropagationEnabled

ExpressionSet(Set.empty)
}
}

/**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
*/
lazy val constraints: ExpressionSet = {
if (conf.constraintPropagationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we don't need this if.

@@ -192,4 +192,17 @@ class InferFiltersFromConstraintsSuite extends PlanTest {

comparePlans(Optimize.execute(original.analyze), correct.analyze)
}

test("SPARK-23405:single left-semi join, filter out nulls on either side on equi-join keys") {
Copy link
Contributor

@cloud-fan cloud-fan Feb 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SPARK-23405: left-semi equal-join should filter out null join keys on both sides

val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y, LeftSemi,
condition = Some("x.a".attr === "y.a".attr)).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can create a val condition = Some("x.a".attr === "y.a".attr) to reduce duplicated code

@cloud-fan
Copy link
Contributor

LGTM except several minor comments

@@ -22,21 +22,30 @@ import org.apache.spark.sql.catalyst.expressions._

trait QueryPlanConstraints { self: LogicalPlan =>

/**
* An [[ExpressionSet]] that contains an additional set of constraints about equality
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not acute, we may have various kinds of constraints.

@jiangxb1987
Copy link
Contributor

LGTM only nits

@SparkQA
Copy link

SparkQA commented Feb 28, 2018

Test build #87766 has finished for PR 20670 at commit b3f2ade.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val left = x.where(IsNotNull('a))
val right = y.where(IsNotNull('a))
val correctAnswer = left.join(right, LeftSemi, condition)
.analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't need to be in a new line

@SparkQA
Copy link

SparkQA commented Feb 28, 2018

Test build #87772 has finished for PR 20670 at commit ed5c170.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87804 has finished for PR 20670 at commit 023f2f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indents

* An [[ExpressionSet]] that contains an additional set of constraints, such as equality
* constraints and `isNotNull` constraints, etc.
*/
lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need if (conf.constraintPropagationEnabled)

@KaiXinXiaoLei
Copy link
Author

@gatorsmile thanks.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87817 has finished for PR 20670 at commit 709ed39.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Mar 1, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87836 has finished for PR 20670 at commit 709ed39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in cdcccd7 Mar 1, 2018
ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 23, 2018
…'s children

## What changes were proposed in this pull request?

The existing query constraints framework has 2 steps:
1. propagate constraints bottom up.
2. use constraints to infer additional filters for better data pruning.

For step 2, it mostly helps with Join, because we can connect the constraints from children to the join condition and infer powerful filters to prune the data of the join sides. e.g., the left side has constraints `a = 1`, the join condition is `left.a = right.a`, then we can infer `right.a = 1` to the right side and prune the right side a lot.

However, the current logic of inferring filters from constraints for Join is pretty weak. It infers the filters from Join's constraints. Some joins like left semi/anti exclude output from right side and the right side constraints will be lost here.

This PR propose to check the left and right constraints individually, expand the constraints with join condition and add filters to children of join directly, instead of adding to the join condition.

This reverts apache#20670 , covers apache#20717 and apache#20816

This is inspired by the original PRs and the tests are all from these PRs. Thanks to the authors mgaido91 maryannxue KaiXinXiaoLei !

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21083 from cloud-fan/join.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants