Skip to content

Commit

Permalink
[FLINK-4606] Integrate the new ResourceManager with the existed Flink…
Browse files Browse the repository at this point in the history
…ResourceManager
  • Loading branch information
beyond1920 authored and tillrohrmann committed Oct 6, 2016
1 parent a1e45ae commit 406dccb
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.RpcGateway;

/**
* A gateway to listen for info messages from {@link ResourceManager}
*/
public interface InfoMessageListenerRpcGateway extends RpcGateway {

/**
* Notifies when resource manager need to notify listener about InfoMessage
* @param infoMessage
*/
void notifyInfoMessage(InfoMessage infoMessage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
Expand All @@ -42,8 +45,6 @@
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

import java.util.HashMap;
Expand All @@ -66,15 +67,16 @@
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {

private final Logger LOG = LoggerFactory.getLogger(getClass());
/** The exit code with which the process is stopped in case of a fatal error */
protected static final int EXIT_CODE_FATAL_ERROR = -13;

private final Map<JobID, JobMasterGateway> jobMasterGateways;

private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;

private final Map<ResourceID, TaskExecutorRegistration> taskExecutorGateways;
private final Map<ResourceID, WorkerType> taskExecutorGateways;

private final HighAvailabilityServices highAvailabilityServices;

Expand All @@ -84,16 +86,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme

private UUID leaderSessionID;

public ResourceManager(
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
SlotManager slotManager) {
private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;

public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.jobMasterGateways = new HashMap<>();
this.slotManager = slotManager;
this.slotManager = checkNotNull(slotManager);
this.jobMasterLeaderRetrievalListeners = new HashSet<>();
this.taskExecutorGateways = new HashMap<>();
infoMessageListeners = new HashMap<>();
}

@Override
Expand All @@ -103,6 +105,8 @@ public void start() {
super.start();
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
leaderElectionService.start(this);
// framework specific initialization
initialize();
} catch (Throwable e) {
log.error("A fatal error happened when starting the ResourceManager", e);
throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
Expand Down Expand Up @@ -166,12 +170,12 @@ public JobMasterGateway call() throws Exception {
jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
} catch (Exception e) {
LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
}

if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
throw new Exception("JobManager is not leading");
}

Expand All @@ -190,7 +194,7 @@ public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable t
LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
} catch (Exception e) {
LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
}
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
Expand Down Expand Up @@ -237,13 +241,24 @@ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throw
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
InstanceID id = new InstanceID();
TaskExecutorRegistration oldTaskExecutor =
taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id));
if (oldTaskExecutor != null) {
log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
WorkerType startedWorker = taskExecutorGateways.get(resourceID);
if(startedWorker != null) {
String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
if (taskExecutorAddress.equals(oldWorkerAddress)) {
log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
} else {
log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
resourceID, oldWorkerAddress, taskExecutorAddress);
// TODO :: suggest old taskExecutor to stop itself
slotManager.notifyTaskManagerFailure(resourceID);
startedWorker = workerStarted(resourceID, taskExecutorGateway);
taskExecutorGateways.put(resourceID, startedWorker);
}
} else {
startedWorker = workerStarted(resourceID, taskExecutorGateway);
taskExecutorGateways.put(resourceID, startedWorker);
}
return new TaskExecutorRegistrationSuccess(id, 5000);
return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
}
}
}, getMainThreadExecutor());
Expand All @@ -263,14 +278,12 @@ public SlotRequestReply requestSlot(SlotRequest slotRequest) {
if (jobMasterGateway != null) {
return slotManager.requestSlot(slotRequest);
} else {
LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
return new SlotRequestRejected(slotRequest.getAllocationId());
}
}




// ------------------------------------------------------------------------
// Leader Contender
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
shutDown();
}

/**
* Registers an infoMessage listener
*
* @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
*/
@RpcMethod
public void registerInfoMessageListener(final String infoMessageListenerAddress) {
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress);
} else {
Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class);

infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
@Override
public void accept(InfoMessageListenerRpcGateway gateway) {
log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress);
infoMessageListeners.put(infoMessageListenerAddress, gateway);
}
}, getMainThreadExecutor());

infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
public Void apply(Throwable failure) {
log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress);
return null;
}
}, getMainThreadExecutor());
}
}

/**
* Unregisters an infoMessage listener
*
* @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
*
*/
@RpcMethod
public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) {
infoMessageListeners.remove(infoMessageListenerAddress);
}

/**
* Shutdowns cluster
*
* @param finalStatus
* @param optionalDiagnostics
*/
@RpcMethod
public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
shutDownApplication(finalStatus, optionalDiagnostics);
}

/**
* This method should be called by the framework once it detects that a currently registered task executor has failed.
*
* @param resourceID Id of the worker that has failed.
* @param message An informational message that explains why the worker failed.
*/
public void notifyWorkerFailed(final ResourceID resourceID, String message) {
runAsync(new Runnable() {
@Override
public void run() {
WorkerType worker = taskExecutorGateways.remove(resourceID);
if (worker != null) {
// TODO :: suggest failed task executor to stop itself
slotManager.notifyTaskManagerFailure(resourceID);
}
}
});
}

/**
* Gets the number of currently started TaskManagers.
*
* @return The number of currently started TaskManagers.
*/
public int getNumberOfStartedTaskManagers() {
return taskExecutorGateways.size();
}

/**
* Notifies the resource manager of a fatal error.
*
* <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
* such a way that a high-availability setting would restart this or fail over
* to another master.
*/
public void onFatalError(final String message, final Throwable error) {
runAsync(new Runnable() {
@Override
public void run() {
fatalError(message, error);
}
});
}

// ------------------------------------------------------------------------
// Framework specific behavior
// ------------------------------------------------------------------------

/**
* Initializes the framework specific components.
*
* @throws Exception Exceptions during initialization cause the resource manager to fail.
*/
protected abstract void initialize() throws Exception;

/**
* Callback when a task executor register.
*
* @param resourceID The worker resource id
* @param taskExecutorGateway the task executor gateway
*/
protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);

/**
* Callback when a resource manager faced a fatal error
* @param message
* @param error
*/
protected abstract void fatalError(String message, Throwable error);

/**
* The framework specific code for shutting down the application. This should report the
* application's final status and shut down the resource manager cleanly.
*
* This method also needs to make sure all pending containers that are not registered
* yet are returned.
*
* @param finalStatus The application status to report.
* @param optionalDiagnostics An optional diagnostics message.
*/
protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);

// ------------------------------------------------------------------------
// Info messaging
// ------------------------------------------------------------------------

public void sendInfoMessage(final String message) {
runAsync(new Runnable() {
@Override
public void run() {
InfoMessage infoMessage = new InfoMessage(message);
for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) {
listenerRpcGateway
.notifyInfoMessage(infoMessage);
}
}
});
}

private static class JobMasterLeaderListener implements LeaderRetrievalListener {

private final JobID jobID;
Expand All @@ -343,5 +508,6 @@ public void handleError(final Exception exception) {
// TODO
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcGateway;
Expand Down Expand Up @@ -75,4 +76,26 @@ Future<RegistrationResponse> registerTaskExecutor(
String taskExecutorAddress,
ResourceID resourceID,
@RpcTimeout Time timeout);

/**
* Registers an infoMessage listener
*
* @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
*/
void registerInfoMessageListener(String infoMessageListenerAddress);

/**
* Unregisters an infoMessage listener
*
* @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
*
*/
void unRegisterInfoMessageListener(String infoMessageListenerAddress);

/**
* shutdown cluster
* @param finalStatus
* @param optionalDiagnostics
*/
void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
}

0 comments on commit 406dccb

Please sign in to comment.