Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5830] [Distributed Coordination] [FLIP-6] Handle OutOfMemory error during process async message in akka rpc actor #3360

Closed
wants to merge 2 commits into from

Conversation

zhijiangW
Copy link
Contributor

@zhijiangW zhijiangW commented Feb 20, 2017

If caught OOM error during process async messages in AkkaRpcActor, it will bring ambiguous behavior and may lost rpc messages. If the message is for notifying final state in TaskExecutor, it will result in JobMaster can not receive final state any more during process failing job, and may cause job stuck in final.

The solution is to catch this special error in AkkaRpcActor and throw it, then it will result in shutting down ActorSystem and exiting TaskExecutor process. So the JobMaster can be aware of that and make the job restart if necessary.

BTW, the mvn clean verify is successful in my machine.

@StephanEwen
Copy link
Contributor

StephanEwen commented Feb 20, 2017

I would suggest that we adopt the following pattern for all the places like the one in this pull request where we catch Throwables:

try {
    ...
} catch (Throwable t) {
    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);

     // the other handling logic...
}

This requires to add the function rethrowIfFatalErrorOrOOM(Throwable) to the ExceptionUtils, similar to the method here: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L109

It would be even nicer if we could do something like Scala supports, but I think there is no way better way to do this in Java than the way suggested above

try {
    ...
} catch {
    case NonFatal(t) => // does not include OOM and internal errors
}

@zhijiangW
Copy link
Contributor Author

@StephanEwen , thank you for so quick reviews!

That is a good idea to add the uniform way in the utils, so we can use that in anywhere.

I will fix it as your suggestions later today.

淘江 added 2 commits February 21, 2017 14:36
@zhijiangW
Copy link
Contributor Author

@StephanEwen , already submit the modifications.

@tillrohrmann
Copy link
Contributor

I think adding this safety net makes sense and protects against a corrupted state.

However, isn't the root cause of the described problem that the JobMaster-TaskExecutor communication does not tolerate a lost message? Maybe we should introduce an acknowledge message which signals the correct reception of the status message. If the response times out we can either retry to send it or fail.

@zhijiangW
Copy link
Contributor Author

zhijiangW commented Feb 22, 2017

Hi @tillrohrmann , thank you for reviews and positive suggestions!

I try to explain the root case of this issue first:

From JobMaster side, it sends the cancel rpc message and gets the acknowledge from TaskExecutor, then the execution state transition to CANCELING.

From TaskExecutor side, it would notify the final state to JobMaster before task exits. The notifyFinalState can be divided into two steps:

  • Execute the RunAsync message by akka actor and this is a tell action, and it will trigger to run unregisterTaskAndNotifyFinalState.
  • In process of unregisterTaskAndNotifyFinalState, it will trigger the rpc message of updateTaskExecutionState, and it is a ask action, so the mechanism can avoid lost message.

The problem is that it may cause OOM before trigger updateTaskExecutionState, and this error is caught by AkkaRpcActor and does not do anything resulting in interrupting the following process. The updateTaskExecutionState will not be executed anymore.

For the key point interaction between TaskExecutor and JobMaster, it should not tolerate lost message, and I agree with your above suggestions. So there may be two ideas for this improvement:

  • Enhance the robustness of notifyFinalState, and the current rethrow OOM is an easy option, but it will cause the TaskExecutor exit,there should be other ways to make the cost reduction.
  • After get cancel acknowledge in JobMaster side, it can trigger a timeout to check the execution final state. If the execution has not entered the final state within timeout, the JobMaster can resend the acknowledge message to TaskExecutor to confirm the status.

And I prefers the first way to just make it sense in one side, avoid the complex interaction between TaskExecutor and JobMaster.

Wish your further suggestions or any ideas.

@StephanEwen
Copy link
Contributor

Looking at this from another angle: If any Runnable that is scheduled ever lets an exception bubble out, can we still assume that the JobManager is in a sane state? Or should be actually make every uncaught exception in the RPC executors a fatal error and send a notifyFatalError to the RpcEndpoint?

@tillrohrmann
Copy link
Contributor

Thanks for the clarification @zhijiangW. I know understand the problem that we effectively introduce via RpcEndpoint.runAsync another message which might get "lost" (e.g. due to OOM exception).

I agree with Stephan that it's hard to reason about the consistency of the AkkaRpcActor s internal state once we see an exception. The conservative approach would probably be to let it terminate or calling notifyFatalError to handle it.

Related to this is also how we handle exceptions in the AkkaRpcActor.handleRpcInvocation. There we catch all exception and simply send them to the caller. I think in this method we should only send the non-fatal exceptions back and terminate otherwise.

To follow a similar pattern for the handleRunAsync we could think about returning a Future which we return when calling RpcEndpoint.runAsync which will be completed with non fatal exceptions or if the Runnable has been executed. And in case that we see a fatal exception we terminate or call notifyFatalError. What do you think?

@zhijiangW
Copy link
Contributor Author

zhijiangW commented Feb 23, 2017

@StephanEwen , if the exception is bubbled out, and cause TaskExecutor to exit as a result, I think the JobMaster can be assumed as a sane state in final based on detection of TaskExecutor failure.

The current solution just refers to OOM error, maybe it can extend to any exceptions, because it is difficult to confirm the consistency of the internal state and the conservative approach is to let it terminate as @tillrohrmann said.

If I understand correctly from @tillrohrmann 's suggestions, the RpcEndpoint.runAsync method would modify to return a Future that is similar with RpcEndpoint.callAsync, but still a Tell action in akka. And this Future should be set as a field in RunAsync in order to get it when handle in AkkaRpcActor. The Future can help to determine whether the message is executed successfully or lost to enhance the Tell mechanism. If the Future with Tell action is better than current RpcEndpoint .callAsync** which refers to Ask action, I will try to do for that. Or another option is just tolerating the lost message in current RpcEndpoint.runAsync, and it should be used in such scenarios for efficiency and not safe. For the important interaction, it should resort to RpcEndpoint.callAsync. What do you think?

@zhijiangW zhijiangW changed the title [FLINK-5830][Distributed Coordination] Handle OutOfMemory error during process async message in akka rpc actor [FLINK-5830] [Distributed Coordination] [FLIP-6] Handle OutOfMemory error during process async message in akka rpc actor Mar 3, 2017
@StephanEwen
Copy link
Contributor

Merging this...

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Mar 9, 2017
…errors during process async message in akka rpc actor

This closes apache#3360
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Mar 9, 2017
…errors during process async message in akka rpc actor

This closes apache#3360
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Mar 9, 2017
…errors during process async message in akka rpc actor

This closes apache#3360
@asfgit asfgit closed this in 527eabd Mar 9, 2017
p16i pushed a commit to p16i/flink that referenced this pull request Apr 16, 2017
…errors during process async message in akka rpc actor

This closes apache#3360
@zhijiangW zhijiangW deleted the FLINK-5830 branch August 6, 2017 13:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants