Skip to content

Commit

Permalink
[FLINK-7523] Add proper resource shutdown to ResourceManager/JobManag…
Browse files Browse the repository at this point in the history
…erRunner

This commit waits for the completion of the shutdown of the ResourceManager
before shutting down the ResourceManagerRuntimeServices. The JobManagerServices are
now exclusively passed in to the JobManagerRunner which means that it is no
longer responsible for shutting the JobManagerServices down. Additionally,
it waits until the JobMaster has been shut down before closing the
LeaderElectionService as well as the JobManagerMetricGroup.

The JobManagerServices are now managed by the caller of the JobManagerRunner. This
allows to reuse them across multiple JobManagerRunners.

The RpcEndpoint#postStop method is now called by the UntypedActor#postStop method,
which ensures that the RpcEndpoint's method is also called if only the underlying
RpcService is shut down (without explicitly shutting down the RpcEndpoint).

This closes #4596.
  • Loading branch information
tillrohrmann committed Sep 5, 2017
1 parent e70de0e commit ff16606
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 250 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
Expand Down Expand Up @@ -69,7 +70,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private final RunningJobsRegistry runningJobsRegistry;

private final HighAvailabilityServices highAvailabilityServices;
private final BlobServer blobServer;
private final JobManagerServices jobManagerServices;
private final HeartbeatServices heartbeatServices;
private final MetricRegistry metricRegistry;

Expand All @@ -92,7 +93,9 @@ protected Dispatcher(

this.configuration = Preconditions.checkNotNull(configuration);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.blobServer = Preconditions.checkNotNull(blobServer);
this.jobManagerServices = JobManagerServices.fromConfiguration(
configuration,
Preconditions.checkNotNull(blobServer));
this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
Expand All @@ -111,10 +114,16 @@ protected Dispatcher(

@Override
public void postStop() throws Exception {
Exception exception = null;
Throwable exception = null;

clearState();

try {
jobManagerServices.shutdown();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

try {
submittedJobGraphStore.stop();
} catch (Exception e) {
Expand Down Expand Up @@ -184,8 +193,8 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
configuration,
getRpcService(),
highAvailabilityServices,
blobServer,
heartbeatServices,
jobManagerServices,
metricRegistry,
new DispatcherOnCompleteActions(jobGraph.getJobID()),
fatalErrorHandler);
Expand Down Expand Up @@ -247,13 +256,23 @@ private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
*
* <p>The state are all currently running jobs.
*/
private void clearState() {
private void clearState() throws Exception {
Exception exception = null;

// stop all currently running JobManager since they run in the same process
for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
jobManagerRunner.shutdown();
try {
jobManagerRunner.shutdown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}

jobManagerRunners.clear();

if (exception != null) {
throw exception;
}
}

/**
Expand Down Expand Up @@ -296,8 +315,8 @@ protected abstract JobManagerRunner createJobManagerRunner(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerServices jobManagerServices,
MetricRegistry metricRegistry,
OnCompletionActions onCompleteActions,
FatalErrorHandler fatalErrorHandler) throws Exception;
Expand All @@ -321,7 +340,11 @@ public void grantLeadership(final UUID newLeaderSessionID) {

// clear the state if we've been the leader before
if (getFencingToken() != null) {
clearState();
try {
clearState();
} catch (Exception e) {
log.warn("Could not properly clear the Dispatcher state while granting leadership.", e);
}
}

setFencingToken(dispatcherId);
Expand All @@ -342,7 +365,11 @@ public void revokeLeadership() {
runAsyncWithoutFencing(
() -> {
log.info("Dispatcher {} was revoked leadership.", getAddress());
clearState();
try {
clearState();
} catch (Exception e) {
log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e);
}

setFencingToken(DispatcherId.generate());
});
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand Down Expand Up @@ -64,8 +65,8 @@ protected JobManagerRunner createJobManagerRunner(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerServices jobManagerServices,
MetricRegistry metricRegistry,
OnCompletionActions onCompleteActions,
FatalErrorHandler fatalErrorHandler) throws Exception {
Expand All @@ -76,8 +77,8 @@ protected JobManagerRunner createJobManagerRunner(
configuration,
rpcService,
highAvailabilityServices,
blobServer,
heartbeatServices,
jobManagerServices,
metricRegistry,
onCompleteActions,
fatalErrorHandler);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand All @@ -43,6 +44,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {

private ResourceManager<?> resourceManager;

private JobManagerServices jobManagerServices;

private JobManagerRunner jobManagerRunner;

public JobClusterEntrypoint(Configuration configuration) {
Expand All @@ -67,12 +70,14 @@ protected void startClusterComponents(
metricRegistry,
this);

jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);

jobManagerRunner = createJobManagerRunner(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
blobServer,
jobManagerServices,
heartbeatServices,
metricRegistry,
this);
Expand All @@ -89,7 +94,7 @@ protected JobManagerRunner createJobManagerRunner(
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobService,
JobManagerServices jobManagerServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
Expand All @@ -102,8 +107,8 @@ protected JobManagerRunner createJobManagerRunner(
configuration,
rpcService,
highAvailabilityServices,
blobService,
heartbeatServices,
jobManagerServices,
metricRegistry,
new TerminatingOnCompleteActions(jobGraph.getJobID()),
fatalErrorHandler);
Expand All @@ -121,6 +126,14 @@ protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
}
}

if (jobManagerServices != null) {
try {
jobManagerServices.shutdown();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}

if (resourceManager != null) {
try {
resourceManager.shutDown();
Expand Down
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand All @@ -36,10 +36,10 @@
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -92,53 +92,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F

// ------------------------------------------------------------------------

public JobManagerRunner(
final ResourceID resourceId,
final JobGraph jobGraph,
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices haServices,
final BlobServer blobService,
final HeartbeatServices heartbeatServices,
final OnCompletionActions toNotifyOnComplete,
final FatalErrorHandler errorHandler) throws Exception {
this(
resourceId,
jobGraph,
configuration,
rpcService,
haServices,
blobService,
heartbeatServices,
new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
toNotifyOnComplete,
errorHandler);
}

public JobManagerRunner(
final ResourceID resourceId,
final JobGraph jobGraph,
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices haServices,
final BlobServer blobService,
final HeartbeatServices heartbeatServices,
final MetricRegistry metricRegistry,
final OnCompletionActions toNotifyOnComplete,
final FatalErrorHandler errorHandler) throws Exception {
this(
resourceId,
jobGraph,
configuration,
rpcService,
haServices,
heartbeatServices,
JobManagerServices.fromConfiguration(configuration, blobService),
metricRegistry,
toNotifyOnComplete,
errorHandler);
}

/**
*
* <p>Exceptions that occur while creating the JobManager or JobManagerRunner are directly
Expand Down Expand Up @@ -217,12 +170,6 @@ public JobManagerRunner(
}
catch (Throwable t) {
// clean up everything
try {
jobManagerServices.shutdown();
} catch (Throwable tt) {
log.error("Error while shutting down JobManager services", tt);
}

if (jobManagerMetrics != null) {
jobManagerMetrics.close();
}
Expand All @@ -245,40 +192,37 @@ public void start() throws Exception {
}
}

public void shutdown() {
shutdownInternally();
public void shutdown() throws Exception {
shutdownInternally().get();
}

private void shutdownInternally() {
private CompletableFuture<Void> shutdownInternally() {
synchronized (lock) {
shutdown = true;

if (leaderElectionService != null) {
try {
leaderElectionService.stop();
} catch (Throwable t) {
log.error("Could not properly shutdown the leader election service", t);
}
}

try {
jobManager.shutDown();
} catch (Throwable t) {
log.error("Error shutting down JobManager", t);
}

try {
jobManagerServices.shutdown();
} catch (Throwable t) {
log.error("Error shutting down JobManager services", t);
}

// make all registered metrics go away
try {
jobManagerMetricGroup.close();
} catch (Throwable t) {
log.error("Error while unregistering metrics", t);
}
jobManager.shutDown();

return jobManager.getTerminationFuture()
.thenAccept(
ignored -> {
Throwable exception = null;
try {
leaderElectionService.stop();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

// make all registered metrics go away
try {
jobManagerMetricGroup.close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

if (exception != null) {
throw new FlinkFutureException("Could not properly shut down the JobManagerRunner.", exception);
}
});
}
}

Expand Down
Expand Up @@ -353,6 +353,8 @@ private void shutdownInternally() throws Exception {
if (tm != null) {
try {
tm.shutDown();
// wait for the TaskManager to properly terminate
tm.getTerminationFuture().get();
} catch (Throwable t) {
exception = firstOrSuppressed(t, exception);
}
Expand Down

0 comments on commit ff16606

Please sign in to comment.