Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4606] [cluster management] Integrate the new ResourceManager with the existed FlinkResourceManager #2540

Closed
wants to merge 43 commits into from

Conversation

beyond1920
Copy link
Contributor

This pr aims to integrate the new ResourceManager with the existed FlinkResourceManager, the main difference including:

  1. Move the useful rpc communication in existed FlinkResourceManager to new ResourceManager, e.g : register infoMessageListener, unregister infoMessageListener, shutDownCluster
  2. Make ResourceManager to be an abstract class, extract framework specific behavior
  3. Implement standalone resourceManager based on the new base ResourceManager class.
  4. Modify testcases which are effected by abstract resourceManager class.

tillrohrmann and others added 30 commits September 21, 2016 11:39
This PR introduces a generic AkkaRpcActor which receives rpc calls as a
RpcInvocation message. The RpcInvocation message is generated by the
AkkaInvocationHandler which gets them from automatically generated Java Proxies.

Add documentation for proxy based akka rpc service

Log unknown message type in AkkaRpcActor but do not fail actor

Use ReflectionUtil to extract RpcGateway type from RpcEndpoint

This closes apache#2357.
This PR introduces an eager serialization for remote rpc invocation messages.
That way it is possible to check whether the message is serializable and
whether it exceeds the maximum allowed akka frame size. If either of these
constraints is violated, a proper exception is thrown instead of simply
swallowing the exception as Akka does it.

Address PR comments

This closes apache#2365.
…ourceProfile

[FLINK-4373] [cluster management] address comments

This closes apache#2370.
…arted

This PR allows the AkkaRpcActor to stash messages until the corresponding RcpEndpoint
has been started. When receiving a Processing.START message, the AkkaRpcActor
unstashes all messages and starts processing rpcs. When receiving a Processing.STOP
message, it will stop processing messages and stash incoming messages again.

Add test case for message stashing

This closes apache#2358.
The RpcGateway.getAddress method allows to retrieve the fully qualified address of the
associated RpcEndpoint.

This closes apache#2392.
…asters

Adapt related components to the changes in HighAvailabilityServices

Add comments for getJobMasterElectionService in HighAvailabilityServices

This closes apache#2377.
…itance

This commit extends the RpcCompletenessTest such that it can now check for inherited
remote procedure calls. All methods defined at the RpcGateway are considered native.
This means that they need no RpcEndpoint counterpart because they are implemented by
the RpcGateway implementation.

This closes apache#2401.

update comments

remove native method annotation

add line break
The recovery mode is not used any more by the latest CheckpointCoordinator.

All difference in recovery logic between high-availability and non-high-availability
is encapsulated in the HighAvailabilityServices.
- add serial rpc service
- add a special rpcService implementation which directly executes the asynchronous calls serially one by one, it is just for testcase
- Change ResourceManagerLeaderContender code and TestingSerialRpcService code
- override shutdown logic to stop leadershipService
- use a mocked RpcService rather than TestingSerialRpcService for resourceManager HA test

This closes apache#2427
…r out of the rpc package

The TaskExecutor, the JobMaster and the ResourceManager were still contained in the rpc
package. With this commit, they will be moved out of this package. Now they are contained
in dedicated packages on the o.a.f.runtime level.

This closes apache#2438.
… as protected

Give main thread execution context into the TaskExecutorToResourceManagerConnection
…not reachable

This PR introduces a RpcConnectionException which is thrown if the rpc endpoint
is not reachable when calling RpcService.connect.

This closes apache#2405.
- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot
- adds SlotManager as RM constructor parameter
- adds LeaderRetrievalListener to SlotManager to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot

This closes apache#2463
Flink's future abstraction whose API is similar to Java 8's CompletableFuture.
That's in order to ease a future transition to this class once we ditch Java 7.
The current set of operations comprises:

- isDone to check the completion of the future
- get/getNow to obtain the future's value
- cancel to cancel the future (best effort basis)
- thenApplyAsync to transform the future's value into another value
- thenAcceptAsync to register a callback for a successful completion of the future
- exceptionallyAsync to register a callback for an exception completion of the future
- thenComposeAsync to transform the future's value and flatten the returned future
- handleAsync to register a callback which is called either with the regular result
or the exceptional result

Additionally, Flink offers a CompletableFuture which can be completed with a regular
value or an exception:

- complete/completeExceptionally

Complete FlinkCompletableFuture exceptionally with a CanellationException upon cancel

This closes apache#2472.
… submission & setting up the ExecutionGraph

This closes apache#2480
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! The CI reports a checkstyle error.

@@ -66,15 +67,16 @@
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
Copy link
Contributor

@mxm mxm Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be

 ResourceManager<WorkerType extends TaskExecutorRegistration>
        extends RpcEndpoint<ResourceManagerGateway>...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RpcCompletnessTest might have to be adapted for this to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm , I adopt ResourceManager<WorkerType extends TaskExecutorRegistration> extends RpcEndpoint<ResourceManagerGateway> at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, public class StandaloneResourceManager extends ResourceManager<TaskExecutorRegistration>, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see. I'll modify RpcEndpoint and RpcCompletnessTest for this to work.

@mxm
Copy link
Contributor

mxm commented Sep 26, 2016

Thank you for your changes. I'm trying to incorporate them in flip-6 now.

@beyond1920
Copy link
Contributor Author

@mxm , thanks for your review, I modified the pr based on your advices:

  1. fIx checkstyle error, AkkaRpcActorTest testcase and RpcCompletenessTest testcase. Sorry for those mistakes, I would take care of it next time.
  2. About resourceManager, I adopt ResourceManager extends RpcEndpoint at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, public class StandaloneResourceManager extends ResourceManager, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint

* @param resourceID The worker resource id
* @param taskExecutorGateway the task executor gateway
*/
protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing all the other abstract methods of the old ResourceManager. We will need requestNewWorkers, releasePendingWorker, releaseStartedWorker, and reacceptRegisteredWorkers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm, I omit these method before because of following reasons:

  1. Maybe we need requestNewWorkers method. But numWorkers parameter is not enough to allocate a certain number of new workers, the expected ResourceProfile of each worker also needed to pass in.
  2. Maybe we need releaseStartedWorker method. I omit the method because it is used to release started taskExecutors when ResourceManager receives RemoveResource request, but I could not find any places where sends this request. So I omitted this method before.
  3. We don't need reacceptRegisteredWorkers method. Because the method was used to consolidate the taskExecutor view between resourceManager and jobManager when resourceManager reconnects to jobManager after resourceManager restart. But in the new cluster management mode, JobManager doesn't kept the view of live taskExecutors. ResourceManager is responsible for receiving taskExecutors' registration and maintain the taskExecutor view. So we don't need this method.
  4. We don't need releasePendingWorker method. Because the method was only used to release pending requests when resourceManager consolidates the taskExecutor view with jobManager after resourceManager restart. As we said before, this logic is not needed in new cluster management mode.

@mxm
Copy link
Contributor

mxm commented Sep 28, 2016

This has been merged. Thank you. Could you close the PR?

@beyond1920 beyond1920 closed this Sep 29, 2016
asfgit pushed a commit that referenced this pull request Oct 2, 2016
asfgit pushed a commit that referenced this pull request Oct 6, 2016
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Oct 14, 2016
asfgit pushed a commit that referenced this pull request Oct 21, 2016
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 31, 2016
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Oct 31, 2016
asfgit pushed a commit that referenced this pull request Nov 1, 2016
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Dec 23, 2016
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Dec 23, 2016
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Dec 23, 2016
liuyuzhong7 pushed a commit to liuyuzhong7/flink that referenced this pull request Jan 4, 2017
liuyuzhong7 pushed a commit to liuyuzhong7/flink that referenced this pull request Jan 17, 2017
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants