Skip to content

Commit

Permalink
[FLINK-6120] [heartbeat] Implement heartbeat logic between JobManager…
Browse files Browse the repository at this point in the history
… and ResourceManager

This closes #3645.
  • Loading branch information
zhijiangW authored and tillrohrmann committed Apr 27, 2017
1 parent bb972b8 commit 81114d5
Show file tree
Hide file tree
Showing 11 changed files with 521 additions and 74 deletions.
Expand Up @@ -155,7 +155,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private final MetricGroup jobMetricGroup;

/** The heartbeat manager with task managers */
private final HeartbeatManager<Void, Void> heartbeatManager;
private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;

/** The heartbeat manager with resource manager */
private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

/** The execution context which is used to execute futures */
private final Executor executor;
Expand Down Expand Up @@ -218,12 +221,18 @@ public JobMaster(
this.errorHandler = checkNotNull(errorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);

this.heartbeatManager = heartbeatServices.createHeartbeatManagerSender(
this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);

this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
resourceId,
new ResourceManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);

final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();

Expand Down Expand Up @@ -309,7 +318,8 @@ public void start(final UUID leaderSessionID) throws Exception {
*/
@Override
public void shutDown() throws Exception {
heartbeatManager.stop();
taskManagerHeartbeatManager.stop();
resourceManagerHeartbeatManager.stop();

// make sure there is a graceful exit
getSelf().suspendExecution(new Exception("JobManager is shutting down."));
Expand Down Expand Up @@ -407,7 +417,7 @@ public void suspendExecution(final Throwable cause) {
slotPoolGateway.suspend();

// disconnect from resource manager:
closeResourceManagerConnection();
closeResourceManagerConnection(new Exception("Execution was suspended.", cause));
}

//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -534,7 +544,7 @@ public Acknowledge scheduleOrUpdateConsumers(

@RpcMethod
public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
heartbeatManager.unmonitorTarget(resourceID);
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
slotPoolGateway.releaseTaskManager(resourceID);

Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
Expand Down Expand Up @@ -766,7 +776,7 @@ public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway,
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));

// monitor the task manager as heartbeat target
heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
// the task manager will not request heartbeat, so this method will never be called currently
Expand All @@ -788,13 +798,24 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
public void disconnectResourceManager(
final UUID jobManagerLeaderId,
final UUID resourceManagerLeaderId,
final Exception cause) {
// TODO: Implement disconnect behaviour
final Exception cause) throws Exception {

validateLeaderSessionId(jobManagerLeaderId);

if (resourceManagerConnection != null
&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
closeResourceManagerConnection(cause);
}
}

@RpcMethod
public void heartbeatFromTaskManager(final ResourceID resourceID) {
heartbeatManager.receiveHeartbeat(resourceID, null);
taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
}

@RpcMethod
public void heartbeatFromResourceManager(final ResourceID resourceID) {
resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -872,56 +893,79 @@ private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, fina
}
}

private void notifyOfNewResourceManagerLeader(
final String resourceManagerAddress, final UUID resourceManagerLeaderId)
{
validateRunsInMainThread();

private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final UUID resourceManagerLeaderId) {
if (resourceManagerConnection != null) {
if (resourceManagerAddress != null) {
if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
// both address and leader id are not changed, we can keep the old connection
return;
}

closeResourceManagerConnection(new Exception(
"ResourceManager leader changed to new address " + resourceManagerAddress));

log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
} else {
log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
resourceManagerConnection.getTargetAddress());
resourceManagerConnection.getTargetAddress());
}
}

closeResourceManagerConnection();

if (resourceManagerAddress != null) {
log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);

resourceManagerConnection = new ResourceManagerConnection(
log, jobGraph.getJobID(), getAddress(), leaderSessionID,
resourceManagerAddress, resourceManagerLeaderId, executor);
log,
jobGraph.getJobID(),
resourceId,
getAddress(),
leaderSessionID,
resourceManagerAddress,
resourceManagerLeaderId,
executor);

resourceManagerConnection.start();
}
}

private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
validateRunsInMainThread();
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
final UUID resourceManagerLeaderId = success.getResourceManagerLeaderId();

// verify the response with current connection
if (resourceManagerConnection != null
&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
{
log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
success.getResourceManagerLeaderId());
&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {

slotPoolGateway.connectToResourceManager(
success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway());
log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerLeaderId);

final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();

slotPoolGateway.connectToResourceManager(resourceManagerLeaderId, resourceManagerGateway);

resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
resourceManagerGateway.heartbeatFromJobManager(resourceID);
}

@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// request heartbeat will never be called on the job manager side
}
});
}
}

private void closeResourceManagerConnection() {
validateRunsInMainThread();

private void closeResourceManagerConnection(Exception cause) {
if (resourceManagerConnection != null) {
log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);

resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());

ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause);

resourceManagerConnection.close();
resourceManagerConnection = null;
}
Expand Down Expand Up @@ -964,13 +1008,18 @@ private class ResourceManagerConnection
{
private final JobID jobID;

private final ResourceID jobManagerResourceID;

private final String jobManagerRpcAddress;

private final UUID jobManagerLeaderID;

private ResourceID resourceManagerResourceID;

ResourceManagerConnection(
final Logger log,
final JobID jobID,
final ResourceID jobManagerResourceID,
final String jobManagerRpcAddress,
final UUID jobManagerLeaderID,
final String resourceManagerAddress,
Expand All @@ -979,6 +1028,7 @@ private class ResourceManagerConnection
{
super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
this.jobID = checkNotNull(jobID);
this.jobManagerResourceID = checkNotNull(jobManagerResourceID);
this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress);
this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID);
}
Expand All @@ -998,6 +1048,7 @@ protected Future<RegistrationResponse> invokeRegistration(
return gateway.registerJobManager(
leaderId,
jobManagerLeaderID,
jobManagerResourceID,
jobManagerRpcAddress,
jobID,
timeout);
Expand All @@ -1010,7 +1061,8 @@ protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success)
runAsync(new Runnable() {
@Override
public void run() {
onResourceManagerRegistrationSuccess(success);
resourceManagerResourceID = success.getResourceManagerResourceId();
establishResourceManagerConnection(success);
}
});
}
Expand All @@ -1019,6 +1071,14 @@ public void run() {
protected void onRegistrationFailure(final Throwable failure) {
handleFatalError(failure);
}

public ResourceID getResourceManagerResourceID() {
return resourceManagerResourceID;
}

public JobID getJobID() {
return jobID;
}
}

//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1063,4 +1123,31 @@ public Future<Void> retrievePayload() {
return FlinkCompletableFuture.completed(null);
}
}

private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {

@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
runAsync(new Runnable() {
@Override
public void run() {
log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);

closeResourceManagerConnection(
new TimeoutException(
"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
}
});
}

@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since the payload is of type Void
}

@Override
public Future<Void> retrievePayload() {
return FlinkCompletableFuture.completed(null);
}
}
}
Expand Up @@ -226,4 +226,11 @@ Future<RegistrationResponse> registerTaskManager(
* @param resourceID unique id of the task manager
*/
void heartbeatFromTaskManager(final ResourceID resourceID);

/**
* Heartbeat request from the resource manager
*
* @param resourceID unique id of the resource manager
*/
void heartbeatFromResourceManager(final ResourceID resourceID);
}
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.registration.RegistrationResponse;

import java.util.UUID;
Expand All @@ -35,9 +36,15 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {

private final UUID resourceManagerLeaderId;

public JobMasterRegistrationSuccess(final long heartbeatInterval, final UUID resourceManagerLeaderId) {
private final ResourceID resourceManagerResourceId;

public JobMasterRegistrationSuccess(
final long heartbeatInterval,
final UUID resourceManagerLeaderId,
final ResourceID resourceManagerResourceId) {
this.heartbeatInterval = heartbeatInterval;
this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
this.resourceManagerResourceId = checkNotNull(resourceManagerResourceId);
}

/**
Expand All @@ -53,11 +60,16 @@ public UUID getResourceManagerLeaderId() {
return resourceManagerLeaderId;
}

public ResourceID getResourceManagerResourceId() {
return resourceManagerResourceId;
}

@Override
public String toString() {
return "JobMasterRegistrationSuccess{" +
"heartbeatInterval=" + heartbeatInterval +
", resourceManagerLeaderId=" + resourceManagerLeaderId +
", resourceManagerResourceId=" + resourceManagerResourceId +
'}';
}
}

0 comments on commit 81114d5

Please sign in to comment.