[BEAM-6284] Improve error message on waitUntilFinish.#8629
[BEAM-6284] Improve error message on waitUntilFinish.#8629akedin merged 5 commits intoapache:masterfrom
Conversation
Allow for infinite wait.
|
I am not familiar with this piece of code. Maybe also ask in dev@ to see who is also able to review this change? |
|
run java postcommit |
akedin
left a comment
There was a problem hiding this comment.
(Still Looking) I think this is a good thing to refactor, few comments:
| if (terminalState != null) { | ||
| return terminalState; | ||
| } | ||
| State getStateWithRetriesNoThrow(BackOff attempts, Sleeper sleeper) { |
There was a problem hiding this comment.
When reading ..NoThrow I had to lookup what happens instead of throwing. Would something like ..OrUnknown convey the behavior better?
| } | ||
|
|
||
| BackOff getBackoff(Duration duration, FluentBackoff factory) { | ||
| if (duration.equals(Duration.ZERO) || duration.isLongerThan(Duration.ZERO)) { |
There was a problem hiding this comment.
I think this logic is incorrect and a bit confusing:
- factory is always
MESSAGES_BACKOFF_FACTORY, from what I can see. I suggest just inlining it instead of passing as a parameter, otherwise here it's unclear what it is. At the call sites it doesn't matter as well, the logic there doesn't care how we get the backoff and what factory we use; - the duration in
withMaxCumulativeBackoffcannot be zero; - it's unclear that duration is a total cumulative timeout, not some other parameter of the backoff configuration;
- the logic is supposed to read (if I understand it right): "use default backoff config, plus set the max duration if it's positive";
I suggest rewriting this along the lines of:
FluentBackoff backoffConfig = maxDuration.isLongerThan(Duration.ZERO)
? MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(maxDuration)
: MESSAGES_BACKOFF_FACTORY;
return BackOffAdapter.toGcpBackOff(backoffConfig.backoff());
Or even
maxDuration = maxDuration.isLongerThan(Duration.ZERO) ? maxDuration : DEFAULT_MAX_BACKOFF;
return BackOffAdapter.toGcpBackOff(
MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(maxDuration).backoff());
| backoff = BackOff.STOP_BACKOFF; | ||
| } | ||
| // We can stop if the job is done. | ||
| if (state.isTerminal()) { |
There was a problem hiding this comment.
I suggest refactoring this a bit further. Right now it's hard to find the body of the if:
if (state.isTerminal()) {
logTerminalState(state);
return state;
}
There was a problem hiding this comment.
Yeah, was thinking of that, but kept it here since it was one of the main parts of this function as I seen it.
Will refactor it out.
| } | ||
| exception = processJobMessages(messageHandler, monitor); | ||
|
|
||
| if (exception != null) { |
There was a problem hiding this comment.
previously we would reset backoff before continue, is it the right thing to not do this anymore?
There was a problem hiding this comment.
Previous code is actually too layered and that's why I did refactored it a lot. backoff.reset() is under if(!hasError) condition.
We did reset backoff only in case if there was no error, ie status is not UNKNOWN. That caused us to fail upon reaching max retry count while receiving UNKNOWN state, not upon reaching timeout.
My change separates two cases:
a) If we fail to get status, we do not reset backoff and will fail due to exceptions upon attempt to get job status
b) If we actually receive UNKNOWN status, we will wait until timeout.
There was a problem hiding this comment.
Let me try to summarize the main flow to see if I understand it correctly:
Previous Flow
- get job state:
- get non-
UNKNOWNstate -> reset backoff -> continue loop if not terminal;- will timeout at max duration or get a terminal state; correct behavior;
- get
IOException, same as: - get
UNKNOWNstate -> continue loop unconditionally;- does not not reset backoff;
- can exceed number of allowed attempts fast, not waiting for max allowed duration;
- get non-
New Flow
- get job state:
- get non-
UNKNOWNorUNKNOWNstate -> reset backoff -> continue loop if not terminal;- can only receive
UNKNOWNexplicitly; - will timeout at max duration or get a terminal state; correct behavior;
- can only receive
- get
IOException-> continue loop unconditionally:- can exceed number of attempts instead of waiting for max allowed time;
- get non-
There was a problem hiding this comment.
In this case the logic seems right. I would probably try to organize the body of the loop to emphasize the flow though, something along the lines of:
Optional<State> state = tryGetState();
if (!state.isPresent() || !tryProcessJobMessages()) {
continue;
}
if (state.get().isTerminal()) {
return state.get();
}
resetAttemptsCount();
Hope this makes sense
There was a problem hiding this comment.
You are right and your code example makes sense. The problem that didn't let me to get that state is that I want to propagate exception to outside of the loop. Unfortunately, java can not pass method arguments by reference, so there's no clear way to return state or exception except defining explicit class. And that would be a bit of an overkill in this case.
|
UPD: |
|
run java postcommit |
|
Run Dataflow ValidatesRunner |
|
run java postcommit |
|
Run Dataflow ValidatesRunner |
|
Is there a reason that this PR's commits were not squashed? |
|
I squashed it when merging: ea32ab9 |
|
I see. Squash and merge will not update this PR directly. |
Allow for infinite wait.
Seems that [BEAM-6284] is relevant to this issue, even though logs are not available any more.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.