Skip to content

Commit

Permalink
[FLINK-4354] [heartbeat] Implement heartbeat logic between TaskManage…
Browse files Browse the repository at this point in the history
…r and ResourceManager

This closes #3591.
  • Loading branch information
zhijiangW authored and tillrohrmann committed Mar 23, 2017
1 parent d20fb09 commit fd90672
Show file tree
Hide file tree
Showing 22 changed files with 640 additions and 102 deletions.
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.heartbeat;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class TestingHeartbeatServices extends HeartbeatServices {

private final ScheduledExecutor scheduledExecutorToUse;

public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
super(heartbeatInterval, heartbeatTimeout);

this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
}

@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor scheduledExecutor,
Logger log) {

return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
resourceId,
heartbeatListener,
org.apache.flink.runtime.concurrent.Executors.directExecutor(),
scheduledExecutorToUse,
log);
}
}
Expand Up @@ -1043,11 +1043,11 @@ private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Vo


@Override @Override
public void notifyHeartbeatTimeout(ResourceID resourceID) { public void notifyHeartbeatTimeout(ResourceID resourceID) {
log.info("Task manager with id {} timed out.", resourceID); log.info("Task manager with id {} heartbeat timed out.", resourceID);


getSelf().disconnectTaskManager( getSelf().disconnectTaskManager(
resourceID, resourceID,
new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out.")); new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
} }


@Override @Override
Expand Down
Expand Up @@ -241,7 +241,7 @@ public void start() throws Exception {
// bring up the ResourceManager(s) // bring up the ResourceManager(s)
LOG.info("Starting {} ResourceManger(s)", numResourceManagers); LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
resourceManagerRunners = startResourceManagers( resourceManagerRunners = startResourceManagers(
configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices); configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);


// bring up the TaskManager(s) for the mini cluster // bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers); LOG.info("Starting {} TaskManger(s)", numTaskManagers);
Expand Down Expand Up @@ -508,6 +508,7 @@ protected RpcService createRpcService(
protected ResourceManagerRunner[] startResourceManagers( protected ResourceManagerRunner[] startResourceManagers(
Configuration configuration, Configuration configuration,
HighAvailabilityServices haServices, HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry, MetricRegistry metricRegistry,
int numResourceManagers, int numResourceManagers,
RpcService[] resourceManagerRpcServices) throws Exception { RpcService[] resourceManagerRpcServices) throws Exception {
Expand All @@ -517,9 +518,11 @@ protected ResourceManagerRunner[] startResourceManagers(
for (int i = 0; i < numResourceManagers; i++) { for (int i = 0; i < numResourceManagers; i++) {


resourceManagerRunners[i] = new ResourceManagerRunner( resourceManagerRunners[i] = new ResourceManagerRunner(
ResourceID.generate(),
configuration, configuration,
resourceManagerRpcServices[i], resourceManagerRpcServices[i],
haServices, haServices,
heartbeatServices,
metricRegistry); metricRegistry);


resourceManagerRunners[i].start(); resourceManagerRunners[i].start();
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.registration;

/**
* Classes which want to be notified about the registration result by the {@link RegisteredRpcConnection}
* have to implement this interface.
*/
public interface RegistrationConnectionListener<Success extends RegistrationResponse.Success> {

/**
* This method is called by the {@link RegisteredRpcConnection} when the registration is success.
*
* @param success The concrete response information for successful registration.
*/
void onRegistrationSuccess(Success success);

/**
* This method is called by the {@link RegisteredRpcConnection} when the registration fails.
*
* @param failure The exception which causes the registration failure.
*/
void onRegistrationFailure(Throwable failure);
}
Expand Up @@ -30,6 +30,10 @@
import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.InstanceID;
Expand Down Expand Up @@ -64,6 +68,7 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;


import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;


Expand All @@ -81,6 +86,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
extends RpcEndpoint<ResourceManagerGateway> extends RpcEndpoint<ResourceManagerGateway>
implements LeaderContender { implements LeaderContender {


/** Unique id of the resource manager */
private final ResourceID resourceId;

/** Configuration of the resource manager */ /** Configuration of the resource manager */
private final ResourceManagerConfiguration resourceManagerConfiguration; private final ResourceManagerConfiguration resourceManagerConfiguration;


Expand All @@ -96,6 +104,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
/** High availability services for leader retrieval and election. */ /** High availability services for leader retrieval and election. */
private final HighAvailabilityServices highAvailabilityServices; private final HighAvailabilityServices highAvailabilityServices;


/** The heartbeat manager with task managers. */
private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;

/** The factory to construct the SlotManager. */ /** The factory to construct the SlotManager. */
private final SlotManagerFactory slotManagerFactory; private final SlotManagerFactory slotManagerFactory;


Expand All @@ -118,23 +129,32 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners; private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;


public ResourceManager( public ResourceManager(
ResourceID resourceId,
RpcService rpcService, RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration, ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices, HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManagerFactory slotManagerFactory, SlotManagerFactory slotManagerFactory,
MetricRegistry metricRegistry, MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService, JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) { FatalErrorHandler fatalErrorHandler) {


super(rpcService); super(rpcService);


this.resourceId = checkNotNull(resourceId);
this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration); this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.slotManagerFactory = checkNotNull(slotManagerFactory); this.slotManagerFactory = checkNotNull(slotManagerFactory);
this.metricRegistry = checkNotNull(metricRegistry); this.metricRegistry = checkNotNull(metricRegistry);
this.jobLeaderIdService = checkNotNull(jobLeaderIdService); this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.fatalErrorHandler = checkNotNull(fatalErrorHandler);


this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);

this.jobManagerRegistrations = new HashMap<>(4); this.jobManagerRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8); this.taskExecutors = new HashMap<>(8);
this.leaderSessionId = null; this.leaderSessionId = null;
Expand Down Expand Up @@ -178,6 +198,8 @@ public void start() throws Exception {
public void shutDown() throws Exception { public void shutDown() throws Exception {
Exception exception = null; Exception exception = null;


taskManagerHeartbeatManager.stop();

try { try {
super.shutDown(); super.shutDown();
} catch (Exception e) { } catch (Exception e) {
Expand Down Expand Up @@ -326,49 +348,61 @@ public RegistrationResponse apply(RegistrationResponse registrationResponse, Thr
* *
* @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
* @param taskExecutorAddress The address of the TaskExecutor that registers * @param taskExecutorAddress The address of the TaskExecutor that registers
* @param resourceID The resource ID of the TaskExecutor that registers * @param taskExecutorResourceId The resource ID of the TaskExecutor that registers
* *
* @return The response by the ResourceManager. * @return The response by the ResourceManager.
*/ */
@RpcMethod @RpcMethod
public Future<RegistrationResponse> registerTaskExecutor( public Future<RegistrationResponse> registerTaskExecutor(
final UUID resourceManagerLeaderId, final UUID resourceManagerLeaderId,
final String taskExecutorAddress, final String taskExecutorAddress,
final ResourceID resourceID, final ResourceID taskExecutorResourceId,
final SlotReport slotReport) { final SlotReport slotReport) {


if (leaderSessionId.equals(resourceManagerLeaderId)) { if (leaderSessionId.equals(resourceManagerLeaderId)) {
Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);


return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@Override @Override
public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) { public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
if (throwable != null) { if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage()); return new RegistrationResponse.Decline(throwable.getMessage());
} else { } else {
WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(resourceID); WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) { if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself // TODO :: suggest old taskExecutor to stop itself
log.info("Replacing old instance of worker for ResourceID {}", resourceID); log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId);
} }


WorkerType newWorker = workerStarted(resourceID); WorkerType newWorker = workerStarted(taskExecutorResourceId);
WorkerRegistration<WorkerType> registration = WorkerRegistration<WorkerType> registration =
new WorkerRegistration<>(taskExecutorGateway, newWorker); new WorkerRegistration<>(taskExecutorGateway, newWorker);


taskExecutors.put(resourceID, registration); taskExecutors.put(taskExecutorResourceId, registration);
slotManager.registerTaskExecutor(resourceID, registration, slotReport); slotManager.registerTaskExecutor(taskExecutorResourceId, registration, slotReport);

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
// the task manager will not request heartbeat, so this method will never be called currently
}

@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
});


return new TaskExecutorRegistrationSuccess( return new TaskExecutorRegistrationSuccess(
registration.getInstanceID(), registration.getInstanceID(), resourceId,
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds()); resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
} }
} }
}, getMainThreadExecutor()); }, getMainThreadExecutor());
} else { } else {
log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
"not equal the received leader session ID {}", "not equal the received leader session ID {}",
resourceID, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId); taskExecutorResourceId, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);


return FlinkCompletableFuture.<RegistrationResponse>completed( return FlinkCompletableFuture.<RegistrationResponse>completed(
new RegistrationResponse.Decline("Discard registration because the leader id " + new RegistrationResponse.Decline("Discard registration because the leader id " +
Expand All @@ -377,6 +411,16 @@ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throw
} }
} }


@RpcMethod
public void heartbeatFromTaskManager(final ResourceID resourceID) {
taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
}

@RpcMethod
public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
closeTaskManagerConnection(resourceId, cause);
}

/** /**
* Requests a slot from the resource manager. * Requests a slot from the resource manager.
* *
Expand Down Expand Up @@ -716,24 +760,24 @@ public void handleError(final Exception exception) {
* This method should be called by the framework once it detects that a currently registered * This method should be called by the framework once it detects that a currently registered
* task executor has failed. * task executor has failed.
* *
* @param resourceID Id of the worker that has failed. * @param resourceID Id of the TaskManager that has failed.
* @param message An informational message that explains why the worker failed. * @param cause The exception which cause the TaskManager failed.
*/ */
public void notifyWorkerFailed(final ResourceID resourceID, final String message) { public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
runAsync(new Runnable() { taskManagerHeartbeatManager.unmonitorTarget(resourceID);
@Override
public void run() {
WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);


if (workerRegistration != null) { WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
log.info("Task manager {} failed because {}.", resourceID, message);
// TODO :: suggest failed task executor to stop itself if (workerRegistration != null) {
slotManager.notifyTaskManagerFailure(resourceID); log.info("Task manager {} failed because {}.", resourceID, cause);
} else {
log.debug("Could not find a registered task manager with the process id {}.", resourceID); // TODO :: suggest failed task executor to stop itself
} slotManager.notifyTaskManagerFailure(resourceID);
}
}); workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
} else {
log.debug("Could not find a registered task manager with the process id {}.", resourceID);
}
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down Expand Up @@ -827,5 +871,26 @@ public void handleError(Throwable error) {
onFatalErrorAsync(error); onFatalErrorAsync(error);
} }
} }

private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {

@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);

closeTaskManagerConnection(resourceID, new TimeoutException(
"Task manager with id " + resourceID + " heartbeat timed out."));
}

@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since there is no payload
}

@Override
public Future<Void> retrievePayload() {
return FlinkCompletableFuture.completed(null);
}
}
} }


Expand Up @@ -130,4 +130,20 @@ void notifySlotAvailable(
* @return The future to the number of registered TaskManagers. * @return The future to the number of registered TaskManagers.
*/ */
Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId); Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);

/**
* Sends the heartbeat to resource manager from task manager
*
* @param resourceID unique id of the task manager
*/
void heartbeatFromTaskManager(final ResourceID resourceID);

/**
* Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
* {@link ResourceManager}.
*
* @param resourceID identifying the TaskManager to disconnect
* @param cause for the disconnection of the TaskManager
*/
void disconnectTaskManager(ResourceID resourceID, Exception cause);
} }

0 comments on commit fd90672

Please sign in to comment.