Skip to content

Commit

Permalink
[FLINK-7074] [tm] Add entry point for the TaskManagerRunner
Browse files Browse the repository at this point in the history
The entry point can be used by the standalone mode to run a TaskManager. Moreover, the
YarnTaskExecutorRunner now reuses some of the start up logic of the TaskManagerRunner.

This closes #4252.
  • Loading branch information
tillrohrmann committed Jul 5, 2017
1 parent 1ba1260 commit 40dce29
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 196 deletions.
Expand Up @@ -39,8 +39,10 @@
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;

Expand Down Expand Up @@ -90,7 +92,7 @@ public class MiniCluster {
private ResourceManagerRunner[] resourceManagerRunners;

@GuardedBy("lock")
private TaskManagerRunner[] taskManagerRunners;
private TaskExecutor[] taskManagers;

@GuardedBy("lock")
private MiniClusterJobDispatcher jobDispatcher;
Expand Down Expand Up @@ -253,7 +255,7 @@ public void start() throws Exception {

// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
taskManagerRunners = startTaskManagers(
taskManagers = startTaskManagers(
configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);

// bring up the dispatcher that launches JobManagers when jobs submitted
Expand Down Expand Up @@ -338,17 +340,17 @@ private void shutdownInternally() throws Exception {
resourceManagerRunners = null;
}

if (taskManagerRunners != null) {
for (TaskManagerRunner tm : taskManagerRunners) {
if (taskManagers != null) {
for (TaskExecutor tm : taskManagers) {
if (tm != null) {
try {
tm.shutDown(null);
tm.shutDown();
} catch (Throwable t) {
exception = firstOrSuppressed(t, exception);
}
}
}
taskManagerRunners = null;
taskManagers = null;
}

// shut down the RpcServices
Expand Down Expand Up @@ -402,7 +404,7 @@ public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
final ResourceManagerGateway resourceManager =
commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get();

final int numTaskManagersToWaitFor = taskManagerRunners.length;
final int numTaskManagersToWaitFor = taskManagers.length;

// poll and wait until enough TaskManagers are available
while (true) {
Expand Down Expand Up @@ -540,30 +542,31 @@ protected ResourceManagerRunner[] startResourceManagers(
return resourceManagerRunners;
}

protected TaskManagerRunner[] startTaskManagers(
protected TaskExecutor[] startTaskManagers(
Configuration configuration,
HighAvailabilityServices haServices,
MetricRegistry metricRegistry,
int numTaskManagers,
RpcService[] taskManagerRpcServices) throws Exception {

final TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numTaskManagers];
final TaskExecutor[] taskExecutors = new TaskExecutor[numTaskManagers];
final boolean localCommunication = numTaskManagers == 1;

for (int i = 0; i < numTaskManagers; i++) {
taskManagerRunners[i] = new TaskManagerRunner(
taskExecutors[i] = TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServices[i],
haServices,
heartbeatServices,
metricRegistry,
localCommunication);
localCommunication,
new TerminatingFatalErrorHandler(i));

taskManagerRunners[i].start();
taskExecutors[i].start();
}

return taskManagerRunners;
return taskExecutors;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -614,4 +617,28 @@ private static MiniClusterConfiguration createConfig(Configuration cfg, boolean

return config;
}

private class TerminatingFatalErrorHandler implements FatalErrorHandler {

private final int index;

private TerminatingFatalErrorHandler(int index) {
this.index = index;
}

@Override
public void onFatalError(Throwable exception) {
LOG.error("TaskManager #{} failed.", index, exception);

try {
synchronized (lock) {
if (taskManagers[index] != null) {
taskManagers[index].shutDown();
}
}
} catch (Exception e) {
LOG.error("TaskManager #{} could not be properly terminated.", index, e);
}
}
}
}

0 comments on commit 40dce29

Please sign in to comment.