Skip to content

Commit

Permalink
[FLINK-8900] [yarn] Properly unregister application from Yarn RM
Browse files Browse the repository at this point in the history
This closes #5741.
  • Loading branch information
tillrohrmann committed Mar 23, 2018
1 parent 95cde57 commit 9ee02f6
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 51 deletions.
Expand Up @@ -362,9 +362,9 @@ public CompletableFuture<Void> postStop() {
}

@Override
protected void shutDownApplication(
protected void internalDeregisterApplication(
ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) throws ResourceManagerException {
@Nullable String diagnostics) throws ResourceManagerException {
LOG.info("Shutting down and unregistering as a Mesos framework.");

Exception exception = null;
Expand Down
Expand Up @@ -737,7 +737,7 @@ public void testStopWorker() throws Exception {
public void testShutdownApplication() throws Exception {
new Context() {{
startResourceManager();
resourceManager.shutDownCluster(ApplicationStatus.SUCCEEDED, "");
resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, "");

// verify that the Mesos framework is shutdown
verify(rmServices.schedulerDriver).stop(false);
Expand Down
Expand Up @@ -197,6 +197,7 @@ protected void startCluster() {
shutDownAndTerminate(
STARTUP_FAILURE_RETURN_CODE,
ApplicationStatus.FAILED,
t.getMessage(),
false);
}
}
Expand Down Expand Up @@ -245,6 +246,7 @@ protected void runCluster(Configuration configuration) throws Exception {
shutDownAndTerminate(
SUCCESS_RETURN_CODE,
ApplicationStatus.SUCCEEDED,
throwable != null ? throwable.getMessage() : null,
true);
});
}
Expand Down Expand Up @@ -544,38 +546,34 @@ private Configuration generateClusterConfiguration(Configuration configuration)
return resultConfiguration;
}

private CompletableFuture<Void> shutDownAsync(boolean cleanupHaData) {
private CompletableFuture<Void> shutDownAsync(
boolean cleanupHaData,
ApplicationStatus applicationStatus,
@Nullable String diagnostics) {
if (isShutDown.compareAndSet(false, true)) {
LOG.info("Stopping {}.", getClass().getSimpleName());

final CompletableFuture<Void> componentShutdownFuture = stopClusterComponents();

componentShutdownFuture.whenComplete(
(Void ignored1, Throwable componentThrowable) -> {
final CompletableFuture<Void> serviceShutdownFuture = stopClusterServices(cleanupHaData);

serviceShutdownFuture.whenComplete(
(Void ignored2, Throwable serviceThrowable) -> {
Throwable finalException = null;

if (serviceThrowable != null) {
finalException = ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable);
} else if (componentThrowable != null) {
finalException = componentThrowable;
}

try {
cleanupDirectories();
} catch (IOException e) {
finalException = ExceptionUtils.firstOrSuppressed(e, finalException);
}

if (finalException != null) {
terminationFuture.completeExceptionally(finalException);
} else {
terminationFuture.complete(null);
}
});
final CompletableFuture<Void> shutDownApplicationFuture = deregisterApplication(applicationStatus, diagnostics);

final CompletableFuture<Void> componentShutdownFuture = FutureUtils.composeAfterwards(
shutDownApplicationFuture,
this::stopClusterComponents);

final CompletableFuture<Void> serviceShutdownFuture = FutureUtils.composeAfterwards(
componentShutdownFuture,
() -> stopClusterServices(cleanupHaData));

final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards(
serviceShutdownFuture,
this::cleanupDirectories);

cleanupDirectoriesFuture.whenComplete(
(Void ignored2, Throwable serviceThrowable) -> {
if (serviceThrowable != null) {
terminationFuture.completeExceptionally(serviceThrowable);
} else {
terminationFuture.complete(null);
}
});
}

Expand All @@ -585,6 +583,7 @@ private CompletableFuture<Void> shutDownAsync(boolean cleanupHaData) {
private void shutDownAndTerminate(
int returnCode,
ApplicationStatus applicationStatus,
@Nullable String diagnostics,
boolean cleanupHaData) {

if (isTerminating.compareAndSet(false, true)) {
Expand All @@ -593,7 +592,10 @@ private void shutDownAndTerminate(
returnCode,
applicationStatus);

shutDownAsync(cleanupHaData).whenComplete(
shutDownAsync(
cleanupHaData,
applicationStatus,
diagnostics).whenComplete(
(Void ignored, Throwable t) -> {
if (t != null) {
LOG.info("Could not properly shut down cluster entrypoint.", t);
Expand All @@ -608,6 +610,25 @@ private void shutDownAndTerminate(
}
}

/**
* Deregister the Flink application from the resource management system by signalling
* the {@link ResourceManager}.
*
* @param applicationStatus to terminate the application with
* @param diagnostics additional information about the shut down, can be {@code null}
* @return Future which is completed once the shut down
*/
private CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
synchronized (lock) {
if (resourceManager != null) {
final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
} else {
return CompletableFuture.completedFuture(null);
}
}
}

/**
* Clean up of temporary directories created by the {@link ClusterEntrypoint}.
*
Expand Down
Expand Up @@ -486,19 +486,21 @@ public void unRegisterInfoMessageListener(final String address) {
* Cleanup application and shut down cluster.
*
* @param finalStatus of the Flink application
* @param optionalDiagnostics diagnostics message for the Flink application or {@code null}
* @param diagnostics diagnostics message for the Flink application or {@code null}
*/
@Override
public void shutDownCluster(
public CompletableFuture<Acknowledge> deregisterApplication(
final ApplicationStatus finalStatus,
@Nullable final String optionalDiagnostics) {
log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics);
@Nullable final String diagnostics) {
log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, diagnostics);

try {
shutDownApplication(finalStatus, optionalDiagnostics);
internalDeregisterApplication(finalStatus, diagnostics);
} catch (ResourceManagerException e) {
log.warn("Could not properly shutdown the application.", e);
}

return CompletableFuture.completedFuture(Acknowledge.get());
}

@Override
Expand Down Expand Up @@ -946,7 +948,7 @@ public void handleError(final Exception exception) {
protected abstract void initialize() throws ResourceManagerException;

/**
* The framework specific code for shutting down the application. This should report the
* The framework specific code to deregister the application. This should report the
* application's final status and shut down the resource manager cleanly.
*
* <p>This method also needs to make sure all pending containers that are not registered
Expand All @@ -956,7 +958,7 @@ public void handleError(final Exception exception) {
* @param optionalDiagnostics A diagnostics message or {@code null}.
* @throws ResourceManagerException if the application could not be shut down.
*/
protected abstract void shutDownApplication(
protected abstract void internalDeregisterApplication(
ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) throws ResourceManagerException;

Expand Down
Expand Up @@ -41,6 +41,8 @@
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -133,11 +135,12 @@ void notifySlotAvailable(
void unRegisterInfoMessageListener(String infoMessageListenerAddress);

/**
* shutdown cluster
* @param finalStatus
* @param optionalDiagnostics
* Deregister Flink from the underlying resource management system.
*
* @param finalStatus final status with which to deregister the Flink application
* @param diagnostics additional information for the resource management system, can be {@code null}
*/
void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
CompletableFuture<Acknowledge> deregisterApplication(final ApplicationStatus finalStatus, @Nullable final String diagnostics);

/**
* Gets the currently registered number of TaskManagers.
Expand Down
Expand Up @@ -72,7 +72,7 @@ protected void initialize() throws ResourceManagerException {
}

@Override
protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) {
protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) {
}

@Override
Expand Down
Expand Up @@ -68,7 +68,7 @@ protected void initialize() throws ResourceManagerException {
}

@Override
protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException {
protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) throws ResourceManagerException {
// noop
}

Expand Down
Expand Up @@ -214,8 +214,8 @@ public void unRegisterInfoMessageListener(String infoMessageListenerAddress) {
}

@Override
public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) {

public CompletableFuture<Acknowledge> deregisterApplication(ApplicationStatus finalStatus, String diagnostics) {
return CompletableFuture.completedFuture(Acknowledge.get());
}

@Override
Expand Down
Expand Up @@ -266,16 +266,16 @@ public CompletableFuture<Void> postStop() {
}

@Override
protected void shutDownApplication(
protected void internalDeregisterApplication(
ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) {
@Nullable String diagnostics) {

// first, de-register from YARN
FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);

try {
resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, "");
} catch (Throwable t) {
log.error("Could not unregister the application master.", t);
}
Expand Down

0 comments on commit 9ee02f6

Please sign in to comment.