-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover #22806
[FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover #22806
Conversation
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
Outdated
Show resolved
Hide resolved
cf9faa1
to
c38eed0
Compare
final OperatorCoordinator.SubtaskGateway gateway = | ||
subtaskGateways.getOnlyGatewayAndNotCheckReady(subtaskId); | ||
if (gateway != null) { | ||
gateway.sendEvent(event); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there might be a race condition. Coordinator might run this check successfully and start sending an event, while simultaneously the receiver starts restarting? Shouldn't we have a try/catch
here and actually checking if the exception is "retry-able", so things like "subtask has failed" or "subtask not ready" or "task is initialising"? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There maybe no race condition, because all the operations that get/update SourceCoordinatorContext::subtaskGateways will execute in CoordinatorExecutor(just a single thread executor)~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @LoveHeat , as I understand, the operations of subtaskGateways isn't race condition. Piotr means when you intend to call gateway.sendEvent(event)
, the gateway must not have a race condition. But the corresponding subtask may be restarting, causing gateway.sendEvent(event) to fail, right?
If so, it's probably fine too. From the SourceCoordinatorContext#sendEventToSourceOperator
-> SubtaskGatewayImpl#sendEvent, if sendEvent
fails, it won't throw exception directly and call the subtaskAccess.triggerTaskFailover
.
BTW, I'm not sure whether the SubtaskGatewayImpl#sendEvent
should throw FlinkRuntimeException
. From the current code, the coordinatorExecutor cannot meet any exception, and most of SubtaskGatewayImpl#sendEvent
are called inside of the coordinatorExecutor thread. So the coordinatorExecutor will fail after any FlinkRuntimeException
is thrown. However, the expected behavior should be that the task fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the corresponding subtask may be restarting, causing gateway.sendEvent(event) to fail, right?
Yes, that's my concern.
Re your 2nd and 3rd paragraphs:
If sendEvent
fails, it will throw an exception, that will bubble up until the catch-all safety net in schedulePeriodTask
, so that will cause the job to failover. Which I guess is currently mostly fine, unless that source has been used in batch job with multiple failover regions. In that case expected behaviour should be that only subtasks belonging to the failover region with source subtasks should restart, not whole job. That's my main worry with this race condition. That's why I suggested:
Shouldn't we have a try/catch here and actually checking if the exception is "retry-able", so things like "subtask has failed" or "subtask not ready" or "task is initialising"? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but gateway.sendEvent()
only throw three potential exception:
- task is not ready, i think this exception will not thrown for ever, because we have already check task ready before call sendEvent()
- serialize event failed, i think this exception can not retry, should just fail job(but i don't known whether fail job for batch job is ok? )
- just like:
if sendEvent fails, it won't throw exception directly and call the subtaskAccess.triggerTaskFailover.
For other exception (like oom , jvm error) is also can not be retried
(correct me if i misunderstand your opinion 😃)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, I think you are right. Thanks for pointing this out. In that case this code be fine
...time/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the fix!
LGTM assuming green build
There seems to be a related failure?
|
just fyi: probably need to rebase to latest commit |
c38eed0
to
5c2518a
Compare
Hi @LoveHeat , thanks for the fix. Could you help backport this PR to release-1.16 and release-1.17? It's better to wait until all CIs pass before merging. |
1.16 : #22953 |
The CI failed due to Hi @snuyanzin , could you help take a look? I see you changed a little ArchTag at FLINK-32379 recently. Follow this JIRA: https://issues.apache.org/jira/browse/FLINK-32539 |
i confirm that
is related to FLINK-32539 (the reason is FLINK-27415) |
Thanks @snuyanzin for the feedback~ |
Hi @LoveHeat , FLINK-32539 is merged, could you rebase the master again? 😂 |
…CombinedWatermark to cover the case task failover
5c2518a
to
a82fc51
Compare
done~ @1996fanrui @pnowojski |
Thanks to @LoveHeat for the fix, and thanks @pnowojski for the review. CI passed, merging~ |
see Flink-32362
What is the purpose of the change
(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)
Brief change log
(for example:)
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation