Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns #20816

Closed
wants to merge 5 commits into from

Conversation

maryannxue
Copy link
Contributor

What changes were proposed in this pull request?

Added TransitPredicateInOuterJoin optimization rule that transits constraints from the preserved side of an outer join to the null-supplying side. The constraints of the join operator will remain unchanged.

How was this patch tested?

Added 3 tests in InferFiltersFromConstraintsSuite.

* join condition and will be pushed over to the null-supplying side. For example,
* if the preserved side has constraints of the form 'a > 5' and the join condition
* is 'a = b', in which 'b' is an attribute from the null-supplying side, a [[Filter]]
* operator of 'b > 5' will be applied to the null-supplying side.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already support it by InferFiltersFromConstraints

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To infer filters from constraints for joins, we need the constraints from preserved side and the join condition. This is already supported for inner joins, but not for left or right outer joins. Since the constraints for left/right outer joins are returned as left.constraints and right.constraints. Yet we cannot include the join condition into the constraints for outer joins since they do not hold true in the join output (left.joinkey does not equal right.joinkey since either side can have null values).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine about the rule, but we do not need a new rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll try moving this logic into InferFiltersFromConstraints.

val correctAnswer = left.join(right, LeftOuter, condition).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the test cases can pass without this new rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please verify again. The first two could not pass without this rule, the last one could since it's a counter case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Could you change the InferFiltersFromConstraints directly?

@SparkQA
Copy link

SparkQA commented Mar 13, 2018

Test build #88215 has finished for PR 20816 at commit ac17976.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maryannxue maryannxue changed the title SPARK-21479 Outer join filter pushdown in null supplying table when condition is on one of the joined columns [SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns Mar 13, 2018
@SparkQA
Copy link

SparkQA commented Mar 14, 2018

Test build #88216 has finished for PR 20816 at commit 1c5dedb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maryannxue
Copy link
Contributor Author

How does it look now, @gatorsmile? Please let me know if there's anything I can improve.

Join(newLeft, newRight, LeftOuter, j.condition)
}
case _ => j
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you simplify the code to something like?

val newConditionOpt = ... 

val newRight = joinType match {
  ...
}

val newLeft = joinType match {
  ...
}

if (newConditionOpt.isDefined || newLeft.isDefined || newRight.isDefined) {
  Join(newLeft.getOrElse(left), newRight.getOrElse(right), joinType,
    newConditionOpt.orElse(conditionOpt))
} else {
  join
}

@SparkQA
Copy link

SparkQA commented Mar 15, 2018

Test build #88242 has finished for PR 20816 at commit b10879f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

splitConjunctivePredicates(newConditionOpt.get).toSet)
val inferredConstraints = ExpressionSet(
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
val leftConditions = inferredConstraints
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call getRelevantConstraints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was too thinking to give it another name since it's public now. How about getInferredConstraints? I'm good either way, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to also union constructIsNotNullConstraints. Thus, let us use getRelevantConstraints

Copy link
Contributor Author

@maryannxue maryannxue Mar 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the constructIsNotNullConstraints logic does not deal with the "transitive" constraints so we do not need to include it here. Instead the "isNotNull" deduction for inferred filters on the null-supplying side is guaranteed by 2 things here:

  1. when getting constraints from the preserved side, constructIsNotNullConstraints has already been called and will be carried over by inferAdditionalConstraints to the null-supplying side;
  2. the Filter matching part of InferFiltersFromConstraints.
    That said, I'm good with the name getRelevantConstraints too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I am proposing to add a new helper function getRelevantConstraints in the trait QueryPlanConstraints. I would suggest to ignore what we are doing for this specific case. That function could be called in the other cases in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same inclination of wrapping the evaluation of allConstraints in a helper function too, till I realized that constructIsNotNullConstraints would depend on the LogicalPlan node in addition to the input constraints. constructIsNotNullConstraints has two parts, one being to deduce is-not-null from null-intolerant expressions, the other to deduce is-null-not from attribute nullability. We could reorganize these pieces into new methods, but I feel like we should wait till we find an actual usage so to figure out what needs to be included in the helper function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my proposal

val leftConstraints = left.getRelevantConstraints(
  left.constraints
    .union(right.constraints)
    .union(splitConjunctivePredicates(conditionOpt.get).toSet))
val newFilters = reduceConjunctivePredicates(leftConstraints.toSeq)
  .filterNot(left.constraints.contains)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let us use ExpressionSet

Copy link
Member

@gatorsmile gatorsmile Mar 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us leave allConstraints untouched. We should avoid the extra code changes in this PR. We need to consider conf.constraintPropagationEnabled for the extra constraints introduced by this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made another commit yesterday. How's it looking now?

QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
val leftConditions = inferredConstraints
.filter(_.deterministic)
.filter(_.references.subsetOf(left.outputSet))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also filter out constraint.references.isEmpty in the getRelevantConstraints

@SparkQA
Copy link

SparkQA commented Mar 20, 2018

Test build #88405 has finished for PR 20816 at commit 7fe9329.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maryannxue
Copy link
Contributor Author

Ping @gatorsmile

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Test build #88601 has finished for PR 20816 at commit 7fe9329.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case _ => None
}

if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test case in which newConditionOpt ne conditionOpt and newConditionOpt.isDefined are not true at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      val newConditionOpt = conditionOpt match {
        case Some(condition) =>
          val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
          if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
        case None =>
          additionalConstraints.reduceOption(And)
      }

So here, if conditionOpt is matched "None" and meanwhile additionalConstraints is empty, I assume newConditionOpt and conditionOpt will both be an empty Opt, but reference comparison ne will return false.
Since this is part of the original InferFilterFromConstraints logic, and I only modified it so as to make newConditionOpt work for the rest of the function (the new logic added), I assume it has already been covered by the existing tests.

Copy link
Member

@gatorsmile gatorsmile Mar 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change this to if (newConditionOpt.isDefined || newLeftOpt.isDefined || newRightOpt.isDefined), do all the tests can pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess they do. Only that when conditionOpt is not empty and additionalConstraints is empty, there will be unnecessary operations.

@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
val newConditionOpt = conditionOpt match {
case Some(condition) =>
val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this unchanged? We just conditionOpt in line 681, 685, 693, and 697?

Copy link
Contributor Author

@maryannxue maryannxue Mar 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what would be the benefit of keeping that unchanged? To me, it would make the code look confusing. And in theory the two parts (1. infer newConditionOpt; 2. infer newLeftOp or newRightOpt) of this optimization rule will be unsynchronized, leaving part 2 always one iteration behind part 1.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 6, 2018

Test build #88994 has finished for PR 20816 at commit 7fe9329.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maryannxue
Copy link
Contributor Author

@gatorsmile Do I need to sync this branch and let the tests run again?

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 8, 2018

Test build #89022 has finished for PR 20816 at commit 7fe9329.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89420 has finished for PR 20816 at commit 194e6e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

gatorsmile commented Apr 17, 2018

cc @cloud-fan Please take a final look?

@cloud-fan
Copy link
Contributor

since this does fix the problem in a reasonable way, I'm merging it and will clean it up in #21083

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 1e3b876 Apr 18, 2018
@maryannxue
Copy link
Contributor Author

Thank you, @cloud-fan and @gatorsmile !

ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 23, 2018
…'s children

## What changes were proposed in this pull request?

The existing query constraints framework has 2 steps:
1. propagate constraints bottom up.
2. use constraints to infer additional filters for better data pruning.

For step 2, it mostly helps with Join, because we can connect the constraints from children to the join condition and infer powerful filters to prune the data of the join sides. e.g., the left side has constraints `a = 1`, the join condition is `left.a = right.a`, then we can infer `right.a = 1` to the right side and prune the right side a lot.

However, the current logic of inferring filters from constraints for Join is pretty weak. It infers the filters from Join's constraints. Some joins like left semi/anti exclude output from right side and the right side constraints will be lost here.

This PR propose to check the left and right constraints individually, expand the constraints with join condition and add filters to children of join directly, instead of adding to the join condition.

This reverts apache#20670 , covers apache#20717 and apache#20816

This is inspired by the original PRs and the tests are all from these PRs. Thanks to the authors mgaido91 maryannxue KaiXinXiaoLei !

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21083 from cloud-fan/join.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants