-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30334][runtime] Fix noMoreSplits event handling for HybridSource #21464
Conversation
93544b9
to
964073c
Compare
d463f68
to
16f81a4
Compare
@zhuzhurk Hi, Can u help me to review it? thanks a lot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR! @chucheng92
My major concern is that we should avoid changing the public interface SplitEnumeratorContext
which is not necessary. Other changes looks good to me.
* @param subtask The index of the operator's parallel subtask that shall be signaled it will | ||
* receive splits later. | ||
*/ | ||
default void signalIntermediateNoMoreSplits(int subtask) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid adding this method to SplitEnumeratorContext
which is a @Public
interface.
Intermediate noMoreSplits event is strictly related to HybridSource and is hard for users to understand or handle.
I think we can just directly add the method to SourceCoordinatorContext
, then use instanceOf
and type cast in HybridSourceSplitEnumerator
to invoke this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhuzhurk Yes. SplitEnumeratorContext is a Public api. i add it with default method to avoid impact. We can not use realContext instanceOf SourceCoordinatorContext
directly, HybridSourceSplitEnumerator is in flink-connector-base, SourceCoordinatorContext is in flink-runtime, how can we reference it? using reflection or add runtime dependency both are ugly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add an interface in flink-core
, e.g.
@Internal
interface SupportsIntermediateNoMoreSplits {
void signalIntermediateNoMoreSplits(int subtask);
}
SourceCoordinatorContext
should implement it. And HybridSourceSplitEnumerator
should check that the realContext is an instance of SupportsIntermediateNoMoreSplits
.
@chucheng92 F.I.Y. the community is preparing to release 1.16.1. I hope we can get this issue fixed in this release. |
6cdf6d6
to
ae9e955
Compare
Yes, done with your suggested way. It works well. I have updated the pr. PTAL. thanks. |
3d0b9bd
to
2f4ba4a
Compare
|
||
@Override | ||
public void signalIntermediateNoMoreSplits(int subtask) { | ||
subtaskHasNoMoreSplits[subtask] = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should be empty. The values should be false initially but not set to be false in this method.
@@ -157,7 +158,17 @@ private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) { | |||
LOG.debug("Restoring splits to subtask={} {}", subtaskId, splits); | |||
context.assignSplits( | |||
new SplitsAssignment<>(Collections.singletonMap(subtaskId, splits))); | |||
context.signalNoMoreSplits(subtaskId); | |||
if (context instanceof SupportsIntermediateNoMoreSplits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to have a method to be reused, e.g.
private static void signalNoMoreSplits(
SplitEnumeratorContext<HybridSourceSplit> context,
subtaskId,
int sourceIndex,
int sourceSize);
// It's an intermediate noMoreSplit event, notify subtask to deal with this event. | ||
callInCoordinatorThread( | ||
() -> { | ||
subtaskHasNoMoreSplits[subtask] = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it needed to be set to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my understanding, it should be false
initially and set to true
when on signalNoMoreSplits(int)
. When failover happens, the value will be reset to false
. Seems there is no need for signalIntermediateNoMoreSplits(int)
to do the reset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, no need to set false. i delete it. thanks.
39fda04
to
de3b039
Compare
@zhuzhurk thanks for reviewing, i have fixed the problem you commented. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing all the comments! @chucheng92
LGTM.
de3b039
to
2e80782
Compare
@flinkbot run azure |
// test add splits back, then SUBTASK0 restore splitFromSource0 split | ||
// reset splits assignment & previous subtaskHasNoMoreSplits flag. | ||
context.getSplitsAssignmentSequence().clear(); | ||
Whitebox.setInternalState(context, "subtaskHasNoMoreSplits", new boolean[] {false, false}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add a method resetNoMoreSplits(int)
in MockSplitEnumeratorContext
.
Instead of using reflection to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhuzhurk thanks for your suggestion.
…hasNoMoreSplits check not consider the hybridsource situation
2e80782
to
4ae65d4
Compare
@flinkbot run azure |
Merging. |
What is the purpose of the change
SourceCoordinator#handleRequestSplitEvent hasNoMoreSplits check not consider the HybridSource situation. It will cause HybridSource do not read next child sources data and finally lead to data loss and unexpected runtime behavior.
Brief change log
Add an intercept strategy for hasNoMoreSplits checking correctly of HybridSourceSplitEnumerator.
Verifying this change
Add 2 new cases to cover the code path about the issue.
HybridSourceSplitEnumeratorTest#testInterceptNoMoreSplitEvent
SourceCoordinatorContextTest#testSignalNoMoreSplits
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation