New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8428] [table] Implement stream-stream non-window left outer join #5327
Conversation
Hi, @twalthr It would be great if you can take a look at the pr. I'm looking forward to finish outer join (left/right/full) before the end of March. Besides, there are a few PRs planed to optimize inner/outer joins. Thanks :) |
Thanks for the reminder @hequn8128. I will review it in the next 2 weeks. If not, feel free to ping me again. |
hi, @twalthr Look forward to your review, thanks :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pr @hequn8128 . Really looking forward to having LEFT JOIN support in datastream soon. I left a few comments and questions
Best,
Rong
if (lInKeys.isEmpty || rInKeys.isEmpty) { | ||
None | ||
} else { | ||
// Output of inner join must have keys if left and right both contain key(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove "inner"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you
* @param defaultRow The result row used for output, right side fields will all be null. | ||
* @param out The collector for returning result values. | ||
*/ | ||
def collectWithNullRight(leftRow: Row, defaultRow: Row, out: Collector[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably reuse this function for RIGTH JOIN as well? maybe rename it to collectWithNull
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to reduce the number of if else
as much as possible which is inefficient. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about somthing like
def collectWithDefault(mainRow: Row, defaultRow: Row, out: Collector[Row]): Unit = {
//...
}
This way mainRow
will be leftRow
when LEFT JOIN and rightRow
when RIGTH JOIN. It might be usable in FULL OUTER JOIN as well.
if (rigthKeyNum == 1 && value.change) { | ||
cRowWrapper.setChange(false) | ||
collectWithNullRight(otherSideRow, resultRow, cRowWrapper) | ||
retractFlag = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is retractFlag
used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will remove it :-)
if (!value.change && rigthKeyNum == 0) { | ||
cRowWrapper.setChange(true) | ||
collectWithNullRight(otherSideRow, resultRow, cRowWrapper) | ||
hasReEmittedNullRight = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, is hasReEmittedNullRight
used?
@@ -201,18 +202,294 @@ class JoinITCase extends StreamingWithStateTestBase { | |||
// Proctime window output uncertain results, so assert has been ignored here. | |||
} | |||
|
|||
@Test | |||
def testJoin(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be more specific? like, inner equality join
while (otherSideIterator.hasNext) { | ||
val otherSideEntry = otherSideIterator.next() | ||
val otherSideRow = otherSideEntry.getKey | ||
val cntAndExpiredTime = otherSideEntry.getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cntAndExpiredTime
is already defined in upper layer, maybe change the naming to avoid confusion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
while (otherSideIterator.hasNext) { | ||
val otherSideEntry = otherSideIterator.next() | ||
val otherSideRow = otherSideEntry.getKey | ||
val cntAndExpiredTime = otherSideEntry.getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cntAndExpiredTime
is already defined in upper layer, maybe change the naming to avoid confusion?
} | ||
// update matched cnt only when left row cnt is changed from 0 to 1. Each time encountered a | ||
// new record from right, leftJoinCnt will also be updated. | ||
if (cntAndExpiredTime.f0 == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this check be triggered if cntAndExpiredTime
was updated from 2 ==> 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we should also take value.change into consideration.
override def open(parameters: Configuration): Unit = { | ||
super.open(parameters) | ||
|
||
val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably use [Row, Int]
, to match with the type for count in LeftSideState
and RightSideState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long is more safe. I will change all count type to Long. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either is fine as long as they are consistent.
@@ -985,4 +1017,232 @@ class JoinHarnessTest extends HarnessTestBase { | |||
|
|||
testHarness.close() | |||
} | |||
|
|||
@Test | |||
def testNonWindowLeftJoinWithOutNonEqualPred() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WithOut
==> Without
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @walterddr
Thanks very much for your review and suggestions. I will update it soon.
Best, Hequn
if (lInKeys.isEmpty || rInKeys.isEmpty) { | ||
None | ||
} else { | ||
// Output of inner join must have keys if left and right both contain key(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you
* @param defaultRow The result row used for output, right side fields will all be null. | ||
* @param out The collector for returning result values. | ||
*/ | ||
def collectWithNullRight(leftRow: Row, defaultRow: Row, out: Collector[Row]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to reduce the number of if else
as much as possible which is inefficient. What do you think?
while (otherSideIterator.hasNext) { | ||
val otherSideEntry = otherSideIterator.next() | ||
val otherSideRow = otherSideEntry.getKey | ||
val cntAndExpiredTime = otherSideEntry.getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
if (rigthKeyNum == 1 && value.change) { | ||
cRowWrapper.setChange(false) | ||
collectWithNullRight(otherSideRow, resultRow, cRowWrapper) | ||
retractFlag = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will remove it :-)
override def open(parameters: Configuration): Unit = { | ||
super.open(parameters) | ||
|
||
val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long is more safe. I will change all count type to Long. What do you think?
} | ||
// update matched cnt only when left row cnt is changed from 0 to 1. Each time encountered a | ||
// new record from right, leftJoinCnt will also be updated. | ||
if (cntAndExpiredTime.f0 == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we should also take value.change into consideration.
Update pr according to @walterddr 's suggestions. |
Hi @twalthr @walterddr |
Thanks @hequn8128! We're pretty busy with the Flink 1.5 release right now. Best, Fabian |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for this PR @hequn8128 and sorry for the delay! The code is already in very good shape. I only added minor things:
- I would move the
enforceKeys
change into a separate PR. - We need more tests at certain places because some code paths are never tested.
- Would be great to add more inline and method comments to maintain the code in the future.
I will run a couple of TPC-H/TPC-DS queries all feedback has been addressed.
/** | ||
* Whether the [[DataStreamRel]] produces retraction messages. | ||
*/ | ||
def producesRetractions: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't producesUpdates
enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A join generates retraction if it's type is left/right/full. It is different from agg which generates retractions if sendsUpdatesAsRetraction(node) && node.producesUpdates
is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
@@ -60,6 +60,9 @@ class DataStreamJoin( | |||
|
|||
override def needsUpdatesAsRetraction: Boolean = true | |||
|
|||
// outer join will generate retractions | |||
override def producesRetractions: Boolean = joinType != JoinRelType.INNER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify this? A inner join is producing retractions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inner join doesn't produce retractions, left/right/full join does, for example, left join will retract the previous non-matched output when new matched row comes from the right side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, now I understand the terminology between producing and just forwarding retractions.
isLeft: Boolean): Unit = { | ||
|
||
val inputRow = value.row | ||
val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, currentSideState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not use Scala sugar in runtime classes. This might create an tuple object for every processed element.
cRowWrapper.reset() | ||
cRowWrapper.setCollector(out) | ||
cRowWrapper.setChange(value.change) | ||
cRowWrapper.setEmitCnt(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line.
recordFromLeft: Boolean): Unit = { | ||
|
||
val inputRow = value.row | ||
val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, currentSideState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same object creation issue as above.
* Join current row with other side rows. Preserve current row if there are no matched rows | ||
* from other side. | ||
*/ | ||
def preservedJoin( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain return type.
* return 1 if from right. | ||
*/ | ||
def getJoinCntIndex(isInputFromLeft: Boolean): Int = { | ||
if (isInputFromLeft) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could return the state directly instead of returning the index.
"DataStreamCalc", | ||
binaryNode( | ||
"DataStreamJoin", | ||
"DataStreamScan(true, Acc)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test for a join that consumes from an aggregation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testJoin()
has covered this case.
genFunction.code, | ||
joinType == JoinRelType.LEFT, | ||
queryConfig) | ||
case JoinRelType.LEFT => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we don't support right outer joins here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I planed to add right join in FLINK-8429. It's ok to add right join in this pr if you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also do it as part of FLINK-8429.
} | ||
|
||
@Test | ||
def testDataStreamJoinWithAggregation(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try to use a consistent naming scheme for the test methods you added. Remove DataStream
or Table
from the names. And mark Inner
, Outer
joins correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All names have been renamed, both stream and batch tests.
@twalthr Hi, Great to see your review and valuable suggestions. I will update my pr late next week(maybe next weekend). Thanks very much. |
@twalthr Hi, thanks for your review. I have updated the pr according to your suggestions. Changes mainly include:
Best, Hequn. |
Thanks for the update @hequn8128. The changes look good. I tested your implementation on a cluster with TPC-H data. The results were equal to the batch results and the state clean-up worked. I will merge this :-) |
Two different CoProcessFunctions are used to implement left join for performance reasons. One for left join with non-equal predicates, the other for left join without non-equal predicates. The main difference between them is, for left join without non-equal predicates, left rows can always find matching right rows as long as join keys are same. - Left join with non-equal predicates: Use a mapState to keep how many rows(joinCnt) from right table can be matched by current left row. If joinCnt is 0, output NULL right with left row. If joinCnt is changed from 0 to 1, retract the previous NULL right output and output the matched result. If joinCnt is changed from 1 to 0 when received a right retract input, retract the previous mathched result and output NULL right with left row. - Left join without non-equal predicates: We don't need to count joinCnt any more, because joinCnt is same with right state size, so check state size is ok. Table Modes: Left join will generate retractions, so DataStreamRel node of left join will working under AccRetract mode. Also, the table mode of dynamic table produced by left join is Update Mode, even if the table does not include a key definition. This closes apache#5327.
Two different CoProcessFunctions are used to implement left join for performance reasons. One for left join with non-equal predicates, the other for left join without non-equal predicates. The main difference between them is, for left join without non-equal predicates, left rows can always find matching right rows as long as join keys are same. - Left join with non-equal predicates: Use a mapState to keep how many rows(joinCnt) from right table can be matched by current left row. If joinCnt is 0, output NULL right with left row. If joinCnt is changed from 0 to 1, retract the previous NULL right output and output the matched result. If joinCnt is changed from 1 to 0 when received a right retract input, retract the previous mathched result and output NULL right with left row. - Left join without non-equal predicates: We don't need to count joinCnt any more, because joinCnt is same with right state size, so check state size is ok. Table Modes: Left join will generate retractions, so DataStreamRel node of left join will working under AccRetract mode. Also, the table mode of dynamic table produced by left join is Update Mode, even if the table does not include a key definition. This closes apache#5327.
What is the purpose of the change
Implement stream-stream non-window left outer join for sql/table-api. A simple design doc can be found here
Brief change log
UpsertTableSink
. Table mode of dynamic table produced by left join is Update Mode, even if the table does not include a key definitionVerifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation