New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13249][SQL] Add Filter checking nullability of keys for inner join #11235
Conversation
Test build #51414 has finished for PR 11235 at commit
|
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueri | |||
import org.apache.spark.sql.catalyst.expressions._ | |||
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} | |||
import org.apache.spark.sql.catalyst.expressions.aggregate._ | |||
import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions} | |||
import org.apache.spark.sql.catalyst.planning._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Is it better, import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins, Unions}
?
ISTM we can also add |
Test build #51425 has finished for PR 11235 at commit
|
Test build #51426 has finished for PR 11235 at commit
|
…roadcastHashJoin.
045121e
to
0c14be5
Compare
retest this please. |
Test build #51468 has finished for PR 11235 at commit
|
@maropu yeah, I think so. |
…nerjoin Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
Test build #51531 has finished for PR 11235 at commit
|
ping @davies |
…nerjoin Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Test build #51631 has finished for PR 11235 at commit
|
@@ -57,6 +57,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { | |||
ReplaceDistinctWithAggregate) :: | |||
Batch("Aggregate", FixedPoint(100), | |||
RemoveLiteralFromGroupExpressions) :: | |||
Batch("Join", Once, | |||
AddFilterOfNullForInnerJoin) :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this rule idempotent instead of hacking it and making it run only once. You are loosing the benefits of emergent optimizations with this implementation.
I would directly construct the filter in the left/right child, but only when its not already present in the constraints
of the child. This is the whole reason we added the ability to reason about what constraints
are already present on a subtree.
Test build #51959 has finished for PR 11235 at commit
|
Test build #51960 has finished for PR 11235 at commit
|
Test build #52021 has finished for PR 11235 at commit
|
Test build #52028 has finished for PR 11235 at commit
|
@marmbrus I've addressed the comments. Please see if this change is appropriate. Thanks! |
@liancheng Can you review this too? I think I've addressed previous comments. Thanks! |
ping @marmbrus @rxin @davies @liancheng Is this ready to go? Or you have other comments? Thanks! |
@@ -57,6 +57,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { | |||
ReplaceDistinctWithAggregate) :: | |||
Batch("Aggregate", FixedPoint(100), | |||
RemoveLiteralFromGroupExpressions) :: | |||
Batch("Join", FixedPoint(100), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may have more InnerJoin from OuterJoinElimination, should we move this rule after that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can make this rule idempotent, we don't need to put this as separate group.
Test build #52617 has finished for PR 11235 at commit
|
Comments addressed, please check if the change is good to merge now. Thanks! |
object AddNullFilterForEquiJoin extends Rule[LogicalPlan] with PredicateHelper { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => | ||
val leftConditions = leftKeys.distinct.map { l => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only add predicate if the key is nullable and there is no IsNotNull constraints on the key.
I did not realized that we had merged https://github.com/apache/spark/pull/11372/files, do we still need this? |
@davies yea, looks like it is doing the same. Let me close this now. Thanks for reviewing this anyway! |
JIRA: https://issues.apache.org/jira/browse/SPARK-13249
For inner join, the join key with null in it will not match each other, so we could insert a Filter before inner join (could be pushed down), then we don't need to check nullability of keys while joining.