-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-5256] Extend DataSetSingleRowJoin to support Left and Right joins #3673
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks for working on this @DmytroShkvyra. I'll be on vacation next week but will review your PR when I am back. Thanks, Fabian |
447ae9b
to
2cb9c45
Compare
Hi @fhueske could you review this variant. I had to removed tests with non-equality predicates (https://issues.apache.org/jira/browse/FLINK-5520) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @DmytroShkvyra, sorry for the delayed review.
We are currently working on getting the last features for the upcoming 1.3 release in and there are some open ends that I had to prioritize.
The approach of the PR looks good in general but needs a few changes. I left a couple of comments.
Thank you, Fabian
} else { | ||
false | ||
join.getJoinType match { | ||
case JoinRelType.INNER | JoinRelType.LEFT | JoinRelType.RIGHT => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can only support this for outer joins if the single-row input is on the inner side of the join.
If the single row is on the outer side, we would need check that it does not join with any of the inner side before we send out a result with null
fields. Since the single row is broadcasted / replicated, it is not possible to check this.
Hence, the condition should be:
case JoinRelType.INNER if isSingleRow(join.getLeft) || isSingleRow(join.getRight) => true
case JoinRelType.LEFT if isSingleRow(join.getRight) => true
case JoinRelType.RIGHT if isSingleRow(join.getLeft) => true
case _ => false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.map(field => getRowType.getFieldNames.indexOf(field.getName)) | ||
.map(i => s"${conversion.resultTerm}.setField($i,null);") | ||
|
||
if (joinType == JoinRelType.LEFT && leftIsSingle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only need to to handle the cases JoinRelType.LEFT && rightIsSingle
and JoinRelType.RIGHT && leftIsSingle
.
dataSetLeftNode, | ||
dataSetRightNode, | ||
leftIsSingle, | ||
rightIsSingle, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rightIsSingle
is !leftIsSingle
, so we can omit this parameter, IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
broadcastSet match { | ||
case Some(singleInput) => function.join(multiInput, singleInput, out) | ||
case None => | ||
case None => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add a parameter `outerJoin: Boolean, we can do
case None if outerJoin => function.join(multiInput, null, out)
if we generate the function such that it checks if the single input is null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cant pass null value to join function, but adding of outerJoin
parameter is useful. It will fix cross join testcase.
Thanks!!!
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not? We configure the CodeGenerator
to expect null
as input (see DataSetSingleRowJoin
line 134).
The generated code will make sure we do not run into a NPE and the generated condition will be false
if one of the input is null
.
So, we can simply do case None if outerJoin => function.join(multiInput, null.asInstanceOf[IN2], out)
here and case None if outerJoin => function.join(null.asInstanceOf[IN1], multiInput, out)
in MapJoinRightRunner
.
I check the generated code and this seems to be fine. Also the tests that you provided pass with that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fhueske, If I use your code in MapJoinLeftRunner
I get following spooky error:
scala.MatchError: None (of class scala.None$)
at org.apache.flink.table.runtime.MapJoinLeftRunner.flatMap(MapJoinLeftRunner.scala:34)
at org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:73)
I have googled this issue and find out that https://groups.google.com/forum/#!topic/akka-user/jvbbilXDKdA it could be related to akka.
So, I left code in MapJoinLeftRunner
as was
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A MatchError
is thrown when no suitable pattern is found (Akka uses a lot of pattern matching, that's probably why you found that error, but it's actual a Scala thing).
This means that there is the case of an empty singleElement
in case of an inner join. The problem can be easily solved by not replacing case None =>
with case None if outerJoin =>
but just inserting case None if outerJoin =>
in between. Pattern matching works from top to bottom and takes the first match (following matches are not executed).
broadcastSet match { | ||
case Some(singleInput) => function.join(singleInput, multiInput, out) | ||
case None => | ||
if (isRowClass(multiInput) && returnType.getTypeClass.equals(classOf[Row])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on MapJoinLeftRunner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See reply on MapJoinLeftRunner
Done
resultType: TypeInformation[_]) | ||
resultType: TypeInformation[_]) { | ||
|
||
var leftNullTerm: String = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need these attributes.
We can check with ${codeGenerator.input1Term} == null
whether an input record is null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fhueske It doesn't work because ${codeGenerator.input1Term}
is Row
@Override public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c) throws Exception { org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1; org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;
But I need know which field in join expression is null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
s""" | ||
|${condition.code} | ||
|${conversion.code} | ||
|if(!${condition.resultTerm}){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add || ${codeGenerator.input1Term} == null
to check if the first input is null for the JoinRelType.RIGHT && leftIsSingle
case and || ${codeGenerator.input2Term} == null
for the JoinRelType.LEFT && rightIsSingle
case
genFunction.code, | ||
genFunction.returnType, | ||
broadcastInputSetName) | ||
if (joinType == JoinRelType.RIGHT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not invert the runners for JoinRelType.RIGHT
the join input sides should be independent of the join type.
tEnv.registerTable("B", ds1) | ||
|
||
val result = tEnv.sql(sqlQuery) | ||
val expected = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result should be Seq("3,3", "3,3", "3,3")
.
The outer side may only produce a single row with null
if no other row matched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
val env = ExecutionEnvironment.getExecutionEnvironment | ||
val tEnv = TableEnvironment.getTableEnvironment(env, config) | ||
val sqlQuery = | ||
"SELECT a, cnt " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please format all test queries in a consistent style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
2d88e18
to
bdfac39
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @DmytroShkvyra!
This looks pretty good. I made some minor comments inline.
Aside of those minor issues I noticed that the tests only cover equality joins. In fact, the main purpose of the SingleRowJoin is to support arbitrary join conditions that cannot be addressed by the default join which only handles equality joins.
When I tried to change some of the test cases to use inequality predicates (e.g., a < cnt
), I notice that these joins are "blocked" by the FlinkLogicalJoinConverter
rule.
We should
- change that rule to also convert joins that can be translated by the SingleRowJoin (LEFT / RIGHT joins with a single row on the inner side).
- adapt all test cases to use inequality predicates to enforce SingleRowJoins. Some of the current tests will simply use the default join, which is not what we want to test here.
What do you think?
Fabian
broadcastSet match { | ||
case Some(singleInput) => function.join(multiInput, singleInput, out) | ||
case None => | ||
case None => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not? We configure the CodeGenerator
to expect null
as input (see DataSetSingleRowJoin
line 134).
The generated code will make sure we do not run into a NPE and the generated condition will be false
if one of the input is null
.
So, we can simply do case None if outerJoin => function.join(multiInput, null.asInstanceOf[IN2], out)
here and case None if outerJoin => function.join(null.asInstanceOf[IN1], multiInput, out)
in MapJoinRightRunner
.
I check the generated code and this seems to be fine. Also the tests that you provided pass with that change.
genFunction.returnType, | ||
broadcastInputSetName) | ||
} | ||
if (!leftIsSingle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
un-indent by 2 spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
"ON a1 = cnt" | ||
|
||
//val queryRightJoin = | ||
// "SELECT a2 FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON a1 < cnt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
util.addTable[(Long, Int)]("A", 'a1, 'a2) | ||
util.addTable[(Int, Int)]("B", 'b1, 'b2) | ||
|
||
val queryRightJoin = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plan for this query does not include a single row join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fhueske Why??? We have single row at right side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the optimizer decides to execute the join as a regular join (see DataSetJoin
in the plan). If we change the predicate to a non-equality predicate (a1 < cnt
) the regular join cannot be used, and we can force a single row join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fhueske see term("joinType", "RightOuterJoin")
in plan.
I was really confused about "Disable outer joins with non-equality predicates" in https://issues.apache.org/jira/browse/FLINK-5520. Everything works fine until this update.
Do you want extend scope of this PR for support non-equality predicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generate join is a RightOuterJoin
but not a SingleRowJoin
, which this test should verify.
We had to disable outer joins with predicates that include non-equi conditions in FLINK-5520 because they were not properly implemented. That implementation was based on splitting the join predicate into equi-conditions which were evaluated by the join and non-equi-conditions which were evaluated in a subsequent filter step. However, this split did not work correctly, because it would generate too many null
rows if records passed the equi-join predicate in the join but not the non-equi predicate in the filter (since each filter call did only see a single row and would not know if all other rows had been filtered as well).
In our case the situation is different. We are translating the join into a NestedLoopJoin (where one side is at most one record), which can evaluate the full predicate including the non-equi conditions inside the join and know if we need to emit a
null` result because there is only a single row that either matches the predicate or not.
I think we should reopen FLINK-5520. It's not my fault. I have removed wrong test as you wished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @DmytroShkvyra.
I left a couple of comments.
Please let me know if you have questions.
Thank you,
Fabian
case None => | ||
case None if outerJoin => function. | ||
join(multiInput, null.asInstanceOf[IN2], out) | ||
case None => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case None =>
will only be reached for inner joins where the single row input is empty. Since an inner join does not return anything, this case can be empty as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, the condition will never evaluate to true
because the case of outerJoin == true
is caught by the case
above.
case None if outerJoin => function. | ||
join(null.asInstanceOf[IN1], multiInput, out) | ||
case None => | ||
if (outerJoin && isRowClass(multiInput) && returnType.getTypeClass.equals(classOf[Row])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
} | ||
} | ||
|
||
private def isRowClass(obj: Any) = obj match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed
} | ||
} | ||
|
||
private def isRowClass(obj: Any) = obj match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed
) | ||
|
||
util.verifySql(query, expected) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the following additional unit tests here:
- left join with inequality join predicate
- right join with equality join predicate
- right join with inequality join predicate
You will find that the left/right joins with inequality predicates cannot be translated because of the too strict condition in FlinkLogicalJoinConverter
. This condition needs to be adapted to accept single row outer joins with inequality predicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
@Test | ||
def testLeftNullLeftJoin (): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several of these tests are executed with regular joins because of the restriction that for an outer single row join, the single row input must be on the inner side. Some of these tests have the single row input on the outer side which will translate them into regular joins (which is only possible because the join conditions are equality predicates). These tests can be removed, because they test the existing join implementation.
In order to enforce tests of the new outer single row joins, the join condition must be an inequality predicate. It would be good to adapt the tests queries accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check the following cases:
- LEFT JOIN, single row on the right
- LEFT JOIN, empty input on the right
- RIGHT JOIN, single row on the left
- RIGHT JOIN; empty input on the left
Hi @fhueske thanks for help. I have fixed your comments. |
Hi @fhueske I have fixed all your notes and build almost green. :(
I think it not related to |
Thanks for the update @DmytroShkvyra! There are a few instable tests in the build, so failure is likely to be unrelated. |
Thanks a lot Fabian! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @DmytroShkvyra,
this looks really good!
If found one thing that needs to be fixed, but I'll do that myself before merging.
Thanks for working on this.
Fabian
hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join) | ||
(hasEqualityPredicates(join, joinInfo) | ||
|| isSingleRowInnerJoin(join) | ||
|| isOuterJoinWithSingleRowAtOuterSide(join, joinInfo)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need here the same condition as in DataSetSingleRowJoinRule.matches()
:
join.getJoinType match {
case JoinRelType.INNER if isSingleRow(join.getRight) || isSingleRow(join.getLeft) => true
case JoinRelType.LEFT if isSingleRow(join.getRight) => true
case JoinRelType.RIGHT if isSingleRow(join.getLeft) => true
case _ => false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fhueske Thanks for your guidances!
…Right joins. This closes #3673.
…Right joins. This closes apache#3673.
…Right joins. This closes apache#3673.
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
mvn clean verify
has been executed successfully locally or a Travis build has passed