Skip to content

Commit

Permalink
[FLINK-9027] [web] Clean up web UI resources by installing shut down …
Browse files Browse the repository at this point in the history
…hook

The ClusterEntrypoint creates temp directory for the RestServerEndpoint. This
directory contains the web ui files and if not differently configured the web
upload directory. In case of a hard shut down, as it happens with bin/stop-cluster.sh
the ClusterEntrypoint will clean up this directory by installing a shut down hook.

All future directory cleanup tasks should go into this method
ClusterEntrypoin#cleanupDirectories.

This closes #5740.
  • Loading branch information
tillrohrmann committed Mar 23, 2018
1 parent 5f72ca5 commit 4fa4e8c
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 38 deletions.
Expand Up @@ -666,11 +666,6 @@ private class TestRestServerEndpoint extends RestServerEndpoint implements AutoC

@Override
protected void startInternal() throws Exception {}

@Override
public void close() throws Exception {
shutDownAsync().get();
}
}

@FunctionalInterface
Expand Down
Expand Up @@ -69,7 +69,9 @@
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

import akka.actor.ActorSystem;
import org.slf4j.Logger;
Expand All @@ -78,10 +80,14 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -159,9 +165,13 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
@GuardedBy("lock")
private JobManagerMetricGroup jobManagerMetricGroup;

private final Thread shutDownHook;

protected ClusterEntrypoint(Configuration configuration) {
this.configuration = Preconditions.checkNotNull(configuration);
this.configuration = generateClusterConfiguration(configuration);
this.terminationFuture = new CompletableFuture<>();

shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG);
}

public CompletableFuture<Void> getTerminationFuture() {
Expand Down Expand Up @@ -479,7 +489,7 @@ protected CompletableFuture<Void> stopClusterComponents() {
}

if (webMonitorEndpoint != null) {
terminationFutures.add(webMonitorEndpoint.shutDownAsync());
terminationFutures.add(webMonitorEndpoint.closeAsync());
}

if (dispatcher != null) {
Expand Down Expand Up @@ -523,6 +533,17 @@ public void onFatalError(Throwable exception) {
// Internal methods
// --------------------------------------------------

private Configuration generateClusterConfiguration(Configuration configuration) {
final Configuration resultConfiguration = new Configuration(Preconditions.checkNotNull(configuration));

final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
final Path uniqueWebTmpDir = Paths.get(webTmpDir, "flink-web-" + UUID.randomUUID());

resultConfiguration.setString(WebOptions.TMP_DIR, uniqueWebTmpDir.toAbsolutePath().toString());

return resultConfiguration;
}

private CompletableFuture<Void> shutDownAsync(boolean cleanupHaData) {
if (isShutDown.compareAndSet(false, true)) {
LOG.info("Stopping {}.", getClass().getSimpleName());
Expand All @@ -535,11 +556,22 @@ private CompletableFuture<Void> shutDownAsync(boolean cleanupHaData) {

serviceShutdownFuture.whenComplete(
(Void ignored2, Throwable serviceThrowable) -> {
Throwable finalException = null;

if (serviceThrowable != null) {
terminationFuture.completeExceptionally(
ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable));
finalException = ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable);
} else if (componentThrowable != null) {
terminationFuture.completeExceptionally(componentThrowable);
finalException = componentThrowable;
}

try {
cleanupDirectories();
} catch (IOException e) {
finalException = ExceptionUtils.firstOrSuppressed(e, finalException);
}

if (finalException != null) {
terminationFuture.completeExceptionally(finalException);
} else {
terminationFuture.complete(null);
}
Expand Down Expand Up @@ -576,6 +608,19 @@ private void shutDownAndTerminate(
}
}

/**
* Clean up of temporary directories created by the {@link ClusterEntrypoint}.
*
* @throws IOException if the temporary directories could not be cleaned up
*/
private void cleanupDirectories() throws IOException {
ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG);

final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);

FileUtils.deleteDirectory(new File(webTmpDir));
}

// --------------------------------------------------
// Abstract methods
// --------------------------------------------------
Expand Down
Expand Up @@ -758,7 +758,7 @@ private CompletableFuture<Void> shutDownDispatcher() {
}

if (dispatcherRestEndpoint != null) {
terminationFutures.add(dispatcherRestEndpoint.shutDownAsync());
terminationFutures.add(dispatcherRestEndpoint.closeAsync());

dispatcherRestEndpoint = null;
}
Expand Down
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.RouterHandler;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -67,7 +67,7 @@
/**
* An abstract class for netty-based REST server endpoints.
*/
public abstract class RestServerEndpoint {
public abstract class RestServerEndpoint implements AutoCloseableAsync {

protected final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -256,7 +256,8 @@ public String getRestBaseUrl() {
}
}

public final CompletableFuture<Void> shutDownAsync() {
@Override
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
log.info("Shutting down rest endpoint.");

Expand Down Expand Up @@ -370,12 +371,7 @@ protected CompletableFuture<Void> shutDownInternal() {
});
});

return FutureUtils.runAfterwards(
channelTerminationFuture,
() -> {
log.info("Cleaning upload directory {}", uploadDir);
FileUtils.cleanDirectory(uploadDir.toFile());
});
return channelTerminationFuture;
}
}

Expand Down
Expand Up @@ -36,7 +36,6 @@
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -172,7 +171,7 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co

final Path uploadDir = Paths.get(
config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)),
"flink-web-upload-" + UUID.randomUUID());
"flink-web-upload");

final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);

Expand Down
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.util.Preconditions;

import java.io.File;
import java.util.UUID;
import java.nio.file.Path;
import java.nio.file.Paths;

/**
* Configuration object containing values for the rest handler configuration.
Expand All @@ -37,20 +37,20 @@ public class RestHandlerConfiguration {

private final Time timeout;

private final File tmpDir;
private final Path webUiDir;

public RestHandlerConfiguration(
long refreshInterval,
int maxCheckpointStatisticCacheEntries,
Time timeout,
File tmpDir) {
Path webUiDir) {
Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
this.refreshInterval = refreshInterval;

this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries;

this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
this.webUiDir = Preconditions.checkNotNull(webUiDir);
}

public long getRefreshInterval() {
Expand All @@ -65,8 +65,8 @@ public Time getTimeout() {
return timeout;
}

public File getTmpDir() {
return tmpDir;
public Path getWebUiDir() {
return webUiDir;
}

public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
Expand All @@ -76,13 +76,13 @@ public static RestHandlerConfiguration fromConfiguration(Configuration configura

final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));

final String rootDir = "flink-web-" + UUID.randomUUID();
final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir);
final String rootDir = "flink-web-ui";
final Path webUiDir = Paths.get(configuration.getString(WebOptions.TMP_DIR), rootDir);

return new RestHandlerConfiguration(
refreshInterval,
maxCheckpointStatisticCacheEntries,
timeout,
tmpDir);
webUiDir);
}
}
Expand Up @@ -127,6 +127,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -498,7 +499,7 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
executor,
metricFetcher);

final File tmpDir = restConfiguration.getTmpDir();
final Path webUiDir = restConfiguration.getWebUiDir();

Optional<StaticFileServerHandler<T>> optWebContent;

Expand All @@ -507,7 +508,7 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
leaderRetriever,
restAddressFuture,
timeout,
tmpDir);
webUiDir.toFile());
} catch (IOException e) {
log.warn("Could not load web content handler.", e);
optWebContent = Optional.empty();
Expand Down Expand Up @@ -635,15 +636,15 @@ protected CompletableFuture<Void> shutDownInternal() {

final CompletableFuture<Void> shutdownFuture = super.shutDownInternal();

final File tmpDir = restConfiguration.getTmpDir();
final Path webUiDir = restConfiguration.getWebUiDir();

return FutureUtils.runAfterwardsAsync(
shutdownFuture,
() -> {
Exception exception = null;
try {
log.info("Removing cache directory {}", tmpDir);
FileUtils.deleteDirectory(tmpDir);
log.info("Removing cache directory {}", webUiDir);
FileUtils.deleteDirectory(webUiDir.toFile());
} catch (Exception e) {
exception = e;
}
Expand Down
Expand Up @@ -154,7 +154,7 @@ public void teardown() throws Exception {
}

if (serverEndpoint != null) {
serverEndpoint.shutDownAsync().get();
serverEndpoint.close();
serverEndpoint = null;
}
}
Expand Down

0 comments on commit 4fa4e8c

Please sign in to comment.