-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-24907] Support side out late data for interval join #18118
[FLINK-24907] Support side out late data for interval join #18118
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit d8e09a1 (Wed Dec 15 11:06:23 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot re-run azure |
@gyfora Is this a PR that you could review? I recall that this was mentioned in the Flink Slack channel? |
Thanks, I or someone from my team will review this ASAP . |
@gaborgsomogyi will help reviewing this feature. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functionality looks good in general, I have only a minor comment to refactor the tests.
@@ -155,6 +157,134 @@ public void testJoinsCorrectlyWithMultipleKeys() throws Exception { | |||
"(key2,5):(key2,5)"); | |||
} | |||
|
|||
@Test | |||
public void testIntervalJoinSideOutputLeftLateData() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 2 tests are 99% the same. Can we do something to merge then into a single function call w/ parameters?
For example the content of run
can be a function pointer to the extracted function or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your tips! I have merged the 2 tests into one test method by sideout the late data of left and right side at the same time .
Should I continue to extracted the run
content ? Cause I am a little confuse about how to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to have a private function which is called from 2 different tests in order not to mix-up left and right functionality but I'm personally fine w/ the actual code stand. It's a little bit harder to understand what's going on inside this single test but not horror.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The technique is create a function like
private void testIntervalJoinSideOutput(Consumer<SourceContext<Tuple2<String, Integer>>> streamOneRun) {
...
DataStream<Tuple2<String, Integer>> streamOne =
env.addSource(
new SourceFunction<Tuple2<String, Integer>>() {
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) {
streamOneRun.accept(ctx);
}
@Override
public void cancel() {
// do nothing
}
});
...
}
and call it like this:
@Test
public void testIntervalJoinSideOutputLeftLateData() throws Exception {
testIntervalJoinSideOutput(ctx -> {
ctx.collectWithTimestamp(Tuple2.of("key", 2), 2L);
ctx.collectWithTimestamp(Tuple2.of("key", 3), 3L);
ctx.emitWatermark(new Watermark(3));
ctx.collectWithTimestamp(Tuple2.of("key", 1), 1L); // late data
});
...
expectInAnyOrder("(key,1)");
}
Be aware that this may or may not compile, just wanted to show the way :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I understand what you mean, thanks very much!
I have extracted the run
content as a Consumer
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks.
@flinkbot run azure |
Test failed w/ unrelated error. |
@flinkbot re-run azure |
.sideOutputLeftLateData(late) | ||
.process(new CombineToStringJoinFunction()); | ||
|
||
process.getSideOutput(late) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be complete a last beautification can be added, namely we can create a function something like and call it from the 2 places:
private void addSinkToSideOutput(...) {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it has been optimized.
When the last minor is resolved + unit tests passed then it's good to go. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@flinkbot run azure |
Seems like there is a permanent issue w/ jenkins:
|
@gaborgsomogyi The PR needs to be rebated since it doesn't contains the latest necessary changes to run the CI |
@MartijnVisser expected something like this. @chenyuzhi459 could you do a rebase to the latest master please? |
d023ec5
to
af9887e
Compare
Ok, I have rebased it. |
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
Jenkins passed, good to go from my perspective. |
What is the purpose of the change
This pull requst makes interval-join support side out late data.
Brief change log
Add
OutputTag
for left and right stream inorg.apache.flink.streaming.api.operators.co.IntervalJoinOperator
, and emit late data whenisLate(ourTimestamp) = true
Verifying this change
IntervalJoinITCase.testIntervalJoinSideOutputLeftLateData
IntervalJoinITCase.testIntervalJoinSideOutputRightLateData
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes/ no)Documentation