From 8c5ed020c0a1a65b3fb1d826338a58aa3888f27c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 9 May 2017 10:26:37 +0200 Subject: [PATCH] [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices The HighAvailabilityService creates a single BlobStoreService instance which is shared by all BlobServer and BlobCache instances. The BlobStoreService's lifecycle is exclusively managed by the HighAvailabilityServices. This means that the BlobStore's content is only cleaned up if the HighAvailabilityService's HA data is cleaned up. Having this single point of control, makes it easier to decide when to discard HA data (e.g. in case of a successful job execution) and when to retain the data (e.g. for recovery). Close and cleanup all data of BlobStore in HighAvailabilityServices Use HighAvailabilityServices to create BlobStore Introduce BlobStoreService interface to hide close and closeAndCleanupAllData methods --- .../org/apache/flink/hdfstests/HDFSTest.java | 14 ++- .../clusterframework/MesosTaskManager.scala | 6 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 26 +++++- .../handlers/TaskManagerLogHandler.java | 11 ++- .../webmonitor/WebRuntimeMonitorITCase.java | 14 ++- .../handlers/TaskManagerLogHandlerTest.java | 11 ++- .../apache/flink/runtime/blob/BlobCache.java | 80 ++++------------ .../apache/flink/runtime/blob/BlobServer.java | 38 ++++---- .../flink/runtime/blob/BlobService.java | 8 +- .../apache/flink/runtime/blob/BlobStore.java | 26 +----- .../flink/runtime/blob/BlobStoreService.java | 32 +++++++ .../apache/flink/runtime/blob/BlobUtils.java | 44 ++++++++- .../apache/flink/runtime/blob/BlobView.java | 49 ++++++++++ .../runtime/blob/FileSystemBlobStore.java | 11 ++- .../flink/runtime/blob/VoidBlobStore.java | 8 +- .../flink/runtime/client/JobClient.java | 13 ++- .../runtime/client/JobListeningContext.java | 6 +- .../clusterframework/BootstrapTools.java | 8 +- .../librarycache/BlobLibraryCacheManager.java | 2 +- .../HighAvailabilityServicesUtils.java | 12 ++- .../nonha/AbstractNonHaServices.java | 5 +- .../zookeeper/ZooKeeperHaServices.java | 93 +++++++++---------- .../runtime/jobmaster/JobManagerServices.java | 2 +- .../runtime/taskexecutor/TaskExecutor.java | 5 +- .../runtime/webmonitor/WebMonitorUtils.java | 21 +++-- .../flink/runtime/jobmanager/JobManager.scala | 21 +++-- .../minicluster/FlinkMiniCluster.scala | 8 +- .../minicluster/LocalFlinkMiniCluster.scala | 8 +- .../runtime/taskmanager/TaskManager.scala | 36 +++++-- .../runtime/blob/BlobCacheRetriesTest.java | 79 ++++++++-------- .../runtime/blob/BlobCacheSuccessTest.java | 56 +++++------ .../flink/runtime/blob/BlobClientSslTest.java | 52 +++++------ .../flink/runtime/blob/BlobClientTest.java | 16 +--- .../runtime/blob/BlobRecoveryITCase.java | 21 +++-- .../runtime/blob/BlobServerDeleteTest.java | 21 +++-- .../flink/runtime/blob/BlobServerGetTest.java | 41 +++----- .../flink/runtime/blob/BlobServerPutTest.java | 89 +++++------------- .../runtime/blob/BlobServerRangeTest.java | 10 +- .../blob/TestingFailingBlobServer.java | 4 +- .../BlobLibraryCacheManagerTest.java | 33 +++---- .../BlobLibraryCacheRecoveryITCase.java | 21 +++-- .../zookeeper/ZooKeeperRegistryTest.java | 7 +- .../jobmanager/JobManagerHARecoveryTest.java | 9 +- .../JobManagerLeaderElectionTest.java | 3 +- .../ZooKeeperLeaderRetrievalTest.java | 13 ++- .../metrics/TaskManagerMetricsTest.java | 2 +- ...kManagerComponentsStartupShutdownTest.java | 5 +- .../TaskManagerRegistrationTest.java | 5 +- .../src/test/resources/log4j-test.properties | 2 +- .../JobManagerRegistrationTest.scala | 3 +- .../testingUtils/TestingTaskManager.scala | 40 ++++---- ...erHAProcessFailureBatchRecoveryITCase.java | 1 + .../flink/yarn/TestingYarnTaskManager.scala | 25 ++--- .../YarnHighAvailabilityServices.java | 30 +++++- .../apache/flink/yarn/YarnTaskManager.scala | 6 +- 55 files changed, 667 insertions(+), 545 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index 8a3f66263e224..08158639b1759 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -31,6 +31,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.examples.java.wordcount.WordCount; import org.apache.flink.runtime.blob.BlobRecoveryITCase; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FileUtils; @@ -234,7 +236,17 @@ public void testBlobServerRecovery() throws Exception { config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); - BlobRecoveryITCase.testBlobServerRecovery(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } // package visible diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala index e8d6a58601772..78346394ec03a 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.mesos.runtime.clusterframework import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration @@ -38,7 +38,7 @@ class MesosTaskManager( ioManager: IOManager, network: NetworkEnvironment, numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, + highAvailabilityServices: HighAvailabilityServices, metricRegistry : MetricRegistry) extends TaskManager( config, @@ -48,7 +48,7 @@ class MesosTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) { override def handleMessage: Receive = { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 03b53ad245792..10d7c6c18f933 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.SSLUtils; @@ -148,6 +149,7 @@ public class WebRuntimeMonitor implements WebMonitor { public WebRuntimeMonitor( Configuration config, LeaderRetrievalService leaderRetrievalService, + BlobView blobView, ActorSystem actorSystem) throws IOException, InterruptedException { this.leaderRetrievalService = checkNotNull(leaderRetrievalService); @@ -269,10 +271,26 @@ public WebRuntimeMonitor( GET(router, new JobMetricsHandler(metricFetcher)); GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.LOG, config, enableSSL)); - GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL)); + GET(router, + new TaskManagerLogHandler( + retriever, + context, + jobManagerAddressPromise.future(), + timeout, + TaskManagerLogHandler.FileMode.LOG, + config, + enableSSL, + blobView)); + GET(router, + new TaskManagerLogHandler( + retriever, + context, + jobManagerAddressPromise.future(), + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config, + enableSSL, + blobView)); GET(router, new TaskManagerMetricsHandler(metricFetcher)); router diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 37ee8149dcddf..53ee336fe32d7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -50,6 +50,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.BiFunction; @@ -62,6 +63,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +118,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { private final Time timeTimeout; + private final BlobView blobView; + public enum FileMode { LOG, STDOUT @@ -128,7 +132,8 @@ public TaskManagerLogHandler( FiniteDuration timeout, FileMode fileMode, Configuration config, - boolean httpsEnabled) { + boolean httpsEnabled, + BlobView blobView) { super(retriever, localJobManagerAddressPromise, timeout, httpsEnabled); this.executor = checkNotNull(executor); @@ -142,6 +147,8 @@ public TaskManagerLogHandler( break; } + this.blobView = Preconditions.checkNotNull(blobView, "blobView"); + timeTimeout = Time.milliseconds(timeout.toMillis()); } @@ -167,7 +174,7 @@ public BlobCache checkedApply(Object result) throws IOException { Option hostOption = jobManager.actor().path().address().host(); String host = hostOption.isDefined() ? hostOption.get() : "localhost"; int port = (int) result; - return new BlobCache(new InetSocketAddress(host, port), config); + return new BlobCache(new InetSocketAddress(host, port), config, blobView); } }, executor); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index a51a2340b985f..cd5a2b7eb5de3 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; @@ -154,6 +155,7 @@ public void testRedirectToLeader() throws Exception { webMonitor[i] = new WebRuntimeMonitor( config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.createBlobStore(), jobManagerSystem[i]); } @@ -294,9 +296,11 @@ public void testLeaderNotAvailable() throws Exception { actorSystem = AkkaUtils.createDefaultActorSystem(); - LeaderRetrievalService leaderRetrievalService = mock(LeaderRetrievalService.class); webRuntimeMonitor = new WebRuntimeMonitor( - config, leaderRetrievalService, actorSystem); + config, + mock(LeaderRetrievalService.class), + mock(BlobView.class), + actorSystem); webRuntimeMonitor.start("akka://schmakka"); @@ -467,10 +471,12 @@ private WebRuntimeMonitor startWebRuntimeMonitor( config.setInteger(JobManagerOptions.WEB_PORT, 0); config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); + HighAvailabilityServices highAvailabilityServices = flink.highAvailabilityServices(); + WebRuntimeMonitor webMonitor = new WebRuntimeMonitor( config, - flink.highAvailabilityServices().getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.createBlobStore(), jmActorSystem); webMonitor.start(jobManagerAddress); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java index 4177f4497af97..3d8f1a31264b5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Executors; @@ -53,7 +54,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Matchers.any; @@ -71,7 +71,8 @@ public void testGetPaths() { AkkaUtils.getDefaultClientTimeout(), TaskManagerLogHandler.FileMode.LOG, new Configuration(), - false); + false, + new VoidBlobStore()); String[] pathsLog = handlerLog.getPaths(); Assert.assertEquals(1, pathsLog.length); Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]); @@ -83,7 +84,8 @@ public void testGetPaths() { AkkaUtils.getDefaultClientTimeout(), TaskManagerLogHandler.FileMode.STDOUT, new Configuration(), - false); + false, + new VoidBlobStore()); String[] pathsOut = handlerOut.getPaths(); Assert.assertEquals(1, pathsOut.length); Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]); @@ -131,7 +133,8 @@ public void testLogFetchingFailure() throws Exception { AkkaUtils.getDefaultClientTimeout(), TaskManagerLogHandler.FileMode.LOG, new Configuration(), - false); + false, + new VoidBlobStore()); final AtomicReference exception = new AtomicReference<>(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 23c7e63b403a5..aa47eaef3a324 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.slf4j.Logger; @@ -58,7 +57,7 @@ public final class BlobCache implements BlobService { private final File storageDir; /** Blob store for distributed file storage, e.g. in HA */ - private final BlobStore blobStore; + private final BlobView blobView; private final AtomicBoolean shutdownRequested = new AtomicBoolean(); @@ -78,55 +77,19 @@ public final class BlobCache implements BlobService { * address of the {@link BlobServer} to use for fetching files from * @param blobClientConfig * global configuration - * - * @throws IOException - * thrown if the (local or distributed) file storage cannot be created or - * is not usable - */ - public BlobCache(InetSocketAddress serverAddress, - Configuration blobClientConfig) throws IOException { - this(serverAddress, blobClientConfig, - BlobUtils.createBlobStoreFromConfig(blobClientConfig)); - } - - /** - * Instantiates a new BLOB cache. - * - * @param serverAddress - * address of the {@link BlobServer} to use for fetching files from - * @param blobClientConfig - * global configuration - * @param haServices - * high availability services able to create a distributed blob store - * - * @throws IOException - * thrown if the (local or distributed) file storage cannot be created or - * is not usable - */ - public BlobCache(InetSocketAddress serverAddress, - Configuration blobClientConfig, HighAvailabilityServices haServices) throws IOException { - this(serverAddress, blobClientConfig, haServices.createBlobStore()); - } - - /** - * Instantiates a new BLOB cache. - * - * @param serverAddress - * address of the {@link BlobServer} to use for fetching files from - * @param blobClientConfig - * global configuration - * @param blobStore + * @param blobView * (distributed) blob store file system to retrieve files from first * * @throws IOException * thrown if the (local or distributed) file storage cannot be created or is not usable */ - private BlobCache( - final InetSocketAddress serverAddress, final Configuration blobClientConfig, - final BlobStore blobStore) throws IOException { + public BlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig, + final BlobView blobView) throws IOException { this.serverAddress = checkNotNull(serverAddress); this.blobClientConfig = checkNotNull(blobClientConfig); - this.blobStore = blobStore; + this.blobView = checkNotNull(blobView, "blobStore"); // configure and create the storage directory String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY); @@ -168,7 +131,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // first try the distributed blob store (if available) try { - blobStore.get(requiredBlob, localJarFile); + blobView.get(requiredBlob, localJarFile); } catch (Exception e) { LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e); } @@ -293,28 +256,23 @@ public int getPort() { } @Override - public void shutdown() { + public void close() throws IOException { if (shutdownRequested.compareAndSet(false, true)) { LOG.info("Shutting down BlobCache"); // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); - } - catch (IOException e) { - LOG.error("BLOB cache failed to properly clean up its storage directory."); - } - - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { - LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); + } finally { + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } catch (Throwable t) { + LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 0e157772defa2..937eab0824269 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -21,9 +21,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.NetUtils; @@ -94,19 +94,14 @@ public class BlobServer extends Thread implements BlobService { /** * Instantiates a new BLOB server and binds it to a free network port. * + * @param config Configuration to be used to instantiate the BlobServer + * @param blobStore BlobStore to store blobs persistently + * * @throws IOException * thrown if the BLOB server cannot bind to a free network port or if the * (local or distributed) file storage cannot be created or is not usable */ - public BlobServer(Configuration config) throws IOException { - this(config, BlobUtils.createBlobStoreFromConfig(config)); - } - - public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException { - this(config, haServices.createBlobStore()); - } - - private BlobServer(Configuration config, BlobStore blobStore) throws IOException { + public BlobServer(Configuration config, BlobStore blobStore) throws IOException { this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); @@ -269,7 +264,12 @@ public void run() { catch (Throwable t) { if (!this.shutdownRequested.get()) { LOG.error("BLOB server stopped working. Shutting down", t); - shutdown(); + + try { + close(); + } catch (Throwable closeThrowable) { + LOG.error("Could not properly close the BlobServer.", closeThrowable); + } } } } @@ -278,13 +278,15 @@ public void run() { * Shuts down the BLOB server. */ @Override - public void shutdown() { + public void close() throws IOException { if (shutdownRequested.compareAndSet(false, true)) { + Exception exception = null; + try { this.serverSocket.close(); } catch (IOException ioe) { - LOG.debug("Error while closing the server socket.", ioe); + exception = ioe; } // wake the thread up, in case it is waiting on some operation @@ -294,13 +296,15 @@ public void shutdown() { join(); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.debug("Error while waiting for this thread to die.", ie); } synchronized (activeConnections) { if (!activeConnections.isEmpty()) { for (BlobServerConnection conn : activeConnections) { - LOG.debug("Shutting down connection " + conn.getName()); + LOG.debug("Shutting down connection {}.", conn.getName()); conn.close(); } activeConnections.clear(); @@ -312,7 +316,7 @@ public void shutdown() { FileUtils.deleteDirectory(storageDir); } catch (IOException e) { - LOG.error("BLOB server failed to properly clean up its storage directory."); + exception = ExceptionUtils.firstOrSuppressed(e, exception); } // Remove shutdown hook to prevent resource leaks, unless this is invoked by the @@ -325,13 +329,15 @@ public void shutdown() { // race, JVM is in shutdown already, we can safely ignore this } catch (Throwable t) { - LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook."); + LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t); } } if(LOG.isInfoEnabled()) { LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort()); } + + ExceptionUtils.tryRethrowIOException(exception); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index 419ee8dbf19fd..97a2d5166db8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.blob; +import java.io.Closeable; import java.io.IOException; import java.net.URL; /** * A simple store and retrieve binary large objects (BLOBs). */ -public interface BlobService { +public interface BlobService extends Closeable { /** * This method returns the URL of the file associated with the provided blob key. @@ -49,11 +50,6 @@ public interface BlobService { * @return the port of the blob service. */ int getPort(); - - /** - * Shutdown method which is called to terminate the blob service. - */ - void shutdown(); BlobClient createClient() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java index 64dc942dbeadd..4c26a5a215b3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java @@ -26,7 +26,7 @@ /** * A blob store. */ -public interface BlobStore { +public interface BlobStore extends BlobView { /** * Copies the local file to the blob store. @@ -49,25 +49,6 @@ public interface BlobStore { */ void put(File localFile, JobID jobId, String key) throws IOException; - /** - * Copies a blob to a local file. - * - * @param blobKey The blob ID - * @param localFile The local file to copy to - * @throws IOException If the copy fails - */ - void get(BlobKey blobKey, File localFile) throws IOException; - - /** - * Copies a blob to a local file. - * - * @param jobId The JobID part of ID for the blob - * @param key The String part of ID for the blob - * @param localFile The local file to copy to - * @throws IOException If the copy fails - */ - void get(JobID jobId, String key, File localFile) throws IOException; - /** * Tries to delete a blob from storage. * @@ -95,9 +76,4 @@ public interface BlobStore { * @param jobId The JobID part of all blobs to delete */ void deleteAll(JobID jobId); - - /** - * Cleans up the store and deletes all blobs. - */ - void cleanUp(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java new file mode 100644 index 0000000000000..83cd9d4eebb60 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import java.io.Closeable; + +/** + * Service interface for the BlobStore which allows to close and clean up its data. + */ +public interface BlobStoreService extends BlobStore, Closeable { + + /** + * Closes and cleans up the store. This entails the deletion of all blobs. + */ + void closeAndCleanupAllData(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 3c14f2fe7b08f..8da362db9b471 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -22,8 +22,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FileUtils; import org.apache.flink.util.StringUtils; @@ -41,6 +43,7 @@ import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * Utility class to work with blob data. @@ -78,18 +81,49 @@ public class BlobUtils { * @throws IOException * thrown if the (distributed) file storage cannot be created */ - static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException { + public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException { HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); if (highAvailabilityMode == HighAvailabilityMode.NONE) { return new VoidBlobStore(); } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { - return ZooKeeperHaServices.createBlobStore(config); + return createFileSystemBlobStore(config); } else { throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); } } + private static BlobStoreService createFileSystemBlobStore(Configuration configuration) throws IOException { + String storagePath = configuration.getValue( + HighAvailabilityOptions.HA_STORAGE_PATH); + if (isNullOrWhitespaceOnly(storagePath)) { + throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + + HighAvailabilityOptions.HA_STORAGE_PATH); + } + + final Path path; + try { + path = new Path(storagePath); + } catch (Exception e) { + throw new IOException("Invalid path for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + final FileSystem fileSystem; + try { + fileSystem = path.getFileSystem(); + } catch (Exception e) { + throw new IOException("Could not create FileSystem for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + final String clusterId = + configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + storagePath += "/" + clusterId; + + return new FileSystemBlobStore(fileSystem, storagePath); + } + /** * Creates a storage directory for a blob service. * @@ -246,10 +280,10 @@ static Thread addShutdownHook(final BlobService service, final Logger logger) { @Override public void run() { try { - service.shutdown(); + service.close(); } catch (Throwable t) { - logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(), t); + logger.error("Error during shutdown of blob service via JVM shutdown hook.", t); } } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java new file mode 100644 index 0000000000000..11cf0111d7bbc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.File; +import java.io.IOException; + +/** + * View on blobs stored in a {@link BlobStore}. + */ +public interface BlobView { + + /** + * Copies a blob to a local file. + * + * @param blobKey The blob ID + * @param localFile The local file to copy to + * @throws IOException If the copy fails + */ + void get(BlobKey blobKey, File localFile) throws IOException; + + /** + * Copies a blob to a local file. + * + * @param jobId The JobID part of ID for the blob + * @param key The String part of ID for the blob + * @param localFile The local file to copy to + * @throws IOException If the copy fails + */ + void get(JobID jobId, String key, File localFile) throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index 7cfce7a399519..b54756cc9d7b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -41,7 +41,7 @@ * *

This is used in addition to the local blob storage for high availability. */ -public class FileSystemBlobStore implements BlobStore { +public class FileSystemBlobStore implements BlobStoreService { private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class); @@ -157,14 +157,19 @@ private void delete(String blobPath) { } @Override - public void cleanUp() { + public void closeAndCleanupAllData() { try { LOG.debug("Cleaning up {}.", basePath); fileSystem.delete(new Path(basePath), true); } catch (Exception e) { - LOG.error("Failed to clean up recovery directory."); + LOG.error("Failed to clean up recovery directory.", e); } } + + @Override + public void close() throws IOException { + // nothing to do for the FileSystemBlobStore + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java index 8606844c08cde..c14d0820d5d59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java @@ -26,7 +26,7 @@ /** * A blob store doing nothing. */ -public class VoidBlobStore implements BlobStore { +public class VoidBlobStore implements BlobStoreService { @Override public void put(File localFile, BlobKey blobKey) throws IOException { @@ -57,6 +57,8 @@ public void deleteAll(JobID jobId) { } @Override - public void cleanUp() { - } + public void closeAndCleanupAllData() {} + + @Override + public void close() throws IOException {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index b570383cfd654..86d927adc640d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -191,7 +191,8 @@ public static JobListeningContext attachToRunningJob( public static ClassLoader retrieveClassLoader( JobID jobID, ActorGateway jobManager, - Configuration config) + Configuration config, + HighAvailabilityServices highAvailabilityServices) throws JobRetrievalException { final Object jmAnswer; @@ -213,7 +214,8 @@ public static ClassLoader retrieveClassLoader( InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); final BlobCache blobClient; try { - blobClient = new BlobCache(serverAddress, config); + // TODO: Fix lifecycle of BlobCache to properly close it upon usage + blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore()); } catch (IOException e) { throw new JobRetrievalException(jobID, "Failed to setup blob cache", e); @@ -229,7 +231,12 @@ public static ClassLoader retrieveClassLoader( try { allURLs[pos++] = blobClient.getURL(blobKey); } catch (Exception e) { - blobClient.shutdown(); + try { + blobClient.close(); + } catch (IOException ioe) { + LOG.warn("Could not properly close the BlobClient.", ioe); + } + throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey, e); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java index fe8c34cd09b61..bb448be508b08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java @@ -134,7 +134,11 @@ public FiniteDuration getTimeout() { public ClassLoader getClassLoader() throws JobRetrievalException { if (classLoader == null) { // lazily initializes the class loader when it is needed - classLoader = JobClient.retrieveClassLoader(jobID, getJobManager(), configuration); + classLoader = JobClient.retrieveClassLoader( + jobID, + getJobManager(), + configuration, + highAvailabilityServices); LOG.info("Reconstructed class loader for Job {}", jobID); } return classLoader; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index ea508d191e7e5..5bdfe1a01d07b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -31,7 +31,6 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.util.NetUtils; @@ -191,13 +190,12 @@ public static WebMonitor startWebMonitorIfConfigured( if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { logger.info("Starting JobManager Web Frontend"); - LeaderRetrievalService leaderRetrievalService = - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); - // start the web frontend. we need to load this dynamically // because it is not in the same project/dependencies WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor( - config, leaderRetrievalService, actorSystem); + config, + highAvailabilityServices, + actorSystem); // start the web monitor if (monitor != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index b0d5d834e459e..0702a1134819c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -208,7 +208,7 @@ public void shutdown() throws IOException{ LOG.warn("Failed to run clean up task before shutdown", t); } - blobService.shutdown(); + blobService.close(); cleanupTimer.cancel(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index c9e295702f3f5..2ebfd20245662 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -21,6 +21,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices; import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; @@ -49,10 +51,13 @@ public static HighAvailabilityServices createAvailableOrEmbeddedServices( return new EmbeddedHaServices(executor); case ZOOKEEPER: + BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(config), executor, - config); + config, + blobStoreService); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); @@ -85,10 +90,13 @@ public static HighAvailabilityServices createHighAvailabilityServices( return new StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl); case ZOOKEEPER: + BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); + return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), executor, - configuration); + configuration, + blobStoreService); default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index ac90e3fb4a542..9c3d986929cd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -44,10 +44,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices private final RunningJobsRegistry runningJobsRegistry; + private final VoidBlobStore voidBlobStore; + private boolean shutdown; public AbstractNonHaServices() { this.runningJobsRegistry = new StandaloneRunningJobsRegistry(); + this.voidBlobStore = new VoidBlobStore(); shutdown = false; } @@ -88,7 +91,7 @@ public BlobStore createBlobStore() throws IOException { synchronized (lock) { checkNotShutdown(); - return new VoidBlobStore(); + return voidBlobStore; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index 5d895c1e14261..d4748cd3d8a57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -23,11 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobStore; -import org.apache.flink.runtime.blob.FileSystemBlobStore; +import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -36,12 +33,12 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper. @@ -102,11 +99,20 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { /** The zookeeper based running jobs registry */ private final RunningJobsRegistry runningJobsRegistry; - public ZooKeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) { + /** Store for arbitrary blobs */ + private final BlobStoreService blobStoreService; + + public ZooKeeperHaServices( + CuratorFramework client, + Executor executor, + Configuration configuration, + BlobStoreService blobStoreService) { this.client = checkNotNull(client); this.executor = checkNotNull(executor); this.configuration = checkNotNull(configuration); this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration); + + this.blobStoreService = checkNotNull(blobStoreService); } // ------------------------------------------------------------------------ @@ -150,61 +156,52 @@ public RunningJobsRegistry getRunningJobsRegistry() { @Override public BlobStore createBlobStore() throws IOException { - return createBlobStore(configuration); + return blobStoreService; } - /** - * Creates the BLOB store in which BLOBs are stored in a highly-available - * fashion. - * - * @param configuration configuration to extract the storage path from - * @return Blob store - * @throws IOException if the blob store could not be created - */ - public static BlobStore createBlobStore( - final Configuration configuration) throws IOException { - String storagePath = configuration.getValue( - HighAvailabilityOptions.HA_STORAGE_PATH); - if (isNullOrWhitespaceOnly(storagePath)) { - throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + - HighAvailabilityOptions.HA_STORAGE_PATH); - } + // ------------------------------------------------------------------------ + // Shutdown + // ------------------------------------------------------------------------ - final Path path; - try { - path = new Path(storagePath); - } catch (Exception e) { - throw new IOException("Invalid path for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); - } + @Override + public void close() throws Exception { + Throwable exception = null; - final FileSystem fileSystem; try { - fileSystem = path.getFileSystem(); - } catch (Exception e) { - throw new IOException("Could not create FileSystem for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + blobStoreService.close(); + } catch (Throwable t) { + exception = t; } - final String clusterId = - configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); - storagePath += "/" + clusterId; + internalClose(); - return new FileSystemBlobStore(fileSystem, storagePath); + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices."); + } } - // ------------------------------------------------------------------------ - // Shutdown - // ------------------------------------------------------------------------ - @Override - public void close() throws Exception { - client.close(); + public void closeAndCleanupAllData() throws Exception { + Throwable exception = null; + + try { + blobStoreService.closeAndCleanupAllData(); + } catch (Throwable t) { + exception = t; + } + + internalClose(); + + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices."); + } } - @Override - public void closeAndCleanupAllData() throws Exception { - close(); + /** + * Closes components which don't distinguish between close and closeAndCleanupAllData + */ + private void internalClose() { + client.close(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java index 8cda0f77d4f0a..ac4d06fc815e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java @@ -105,7 +105,7 @@ public static JobManagerServices fromConfiguration( Configuration config, HighAvailabilityServices haServices) throws Exception { - final BlobServer blobServer = new BlobServer(config, haServices); + final BlobServer blobServer = new BlobServer(config, haServices.createBlobStore()); final long cleanupInterval = config.getLong( ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index d05d900a9992d..a91906569b610 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -923,7 +923,10 @@ private JobManagerConnection associateWithJobManager( final LibraryCacheManager libraryCacheManager; try { - final BlobCache blobCache = new BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices); + final BlobCache blobCache = new BlobCache( + blobServerAddress, + taskManagerConfiguration.getConfiguration(), + haServices.createBlobStore()); libraryCacheManager = new BlobLibraryCacheManager( blobCache, taskManagerConfiguration.getCleanupInterval()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index dd9527ea328f0..3853b216b15ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -28,10 +28,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -117,12 +119,14 @@ private static File resolveFileLocation(String logFilePath) { * Because failure to start the web runtime monitor is not considered fatal, this method does * not throw any exceptions, but only logs them. * - * @param config The configuration for the runtime monitor. - * @param leaderRetrievalService Leader retrieval service to get the leading JobManager + * @param config The configuration for the runtime monitor. + * @param highAvailabilityServices HighAvailabilityServices used to start the WebRuntimeMonitor + * @param actorSystem ActorSystem used to connect to the JobManager + * */ public static WebMonitor startWebRuntimeMonitor( Configuration config, - LeaderRetrievalService leaderRetrievalService, + HighAvailabilityServices highAvailabilityServices, ActorSystem actorSystem) { // try to load and instantiate the class try { @@ -130,9 +134,14 @@ public static WebMonitor startWebRuntimeMonitor( Class clazz = Class.forName(classname).asSubclass(WebMonitor.class); Constructor constructor = clazz.getConstructor(Configuration.class, - LeaderRetrievalService.class, - ActorSystem.class); - return constructor.newInstance(config, leaderRetrievalService, actorSystem); + LeaderRetrievalService.class, + BlobView.class, + ActorSystem.class); + return constructor.newInstance( + config, + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.createBlobStore(), + actorSystem); } catch (ClassNotFoundException e) { LOG.error("Could not load web runtime monitor. " + "Probably reason: flink-runtime-web is not in the classpath"); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 57a6415c31975..60950946e668e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -36,7 +36,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup import org.apache.flink.metrics.{Gauge, MetricGroup} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} -import org.apache.flink.runtime.blob.BlobServer +import org.apache.flink.runtime.blob.{BlobServer, BlobStore} import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore} import org.apache.flink.runtime.client._ @@ -46,7 +46,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} @@ -2274,14 +2274,12 @@ object JobManager { val webMonitor: Option[WebMonitor] = if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { LOG.info("Starting JobManager web frontend") - val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID) // start the web frontend. we need to load this dynamically // because it is not in the same project/dependencies val webServer = WebMonitorUtils.startWebRuntimeMonitor( configuration, - leaderRetrievalService, + highAvailabilityServices, jobManagerSystem) Option(webServer) @@ -2507,12 +2505,14 @@ object JobManager { * @param configuration The configuration from which to parse the config values. * @param futureExecutor to run JobManager's futures * @param ioExecutor to run blocking io operations + * @param blobStore to store blobs persistently * @return The members for a default JobManager. */ def createJobManagerComponents( configuration: Configuration, futureExecutor: ScheduledExecutorService, - ioExecutor: Executor) : + ioExecutor: Executor, + blobStore: BlobStore) : (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, @@ -2557,7 +2557,7 @@ object JobManager { var libraryCacheManager: BlobLibraryCacheManager = null try { - blobServer = new BlobServer(configuration) + blobServer = new BlobServer(configuration, blobStore) instanceManager = new InstanceManager() scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor)) libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) @@ -2576,7 +2576,7 @@ object JobManager { instanceManager.shutdown() } if (blobServer != null) { - blobServer.shutdown() + blobServer.close() } throw t @@ -2688,7 +2688,8 @@ object JobManager { metricsRegistry) = createJobManagerComponents( configuration, futureExecutor, - ioExecutor) + ioExecutor, + highAvailabilityServices.createBlobStore()) val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath) @@ -2744,7 +2745,7 @@ object JobManager { ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, - libraryCacheManager: BlobLibraryCacheManager, + libraryCacheManager: LibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, timeout: FiniteDuration, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 46c440464efda..2ace8db4c09d5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -37,7 +37,7 @@ import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, High import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.HighAvailabilityMode -import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService} +import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} @@ -387,17 +387,13 @@ abstract class FlinkMiniCluster( config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) && config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { - // TODO: Add support for HA: Make web server work independently from the JM - val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID) - LOG.info("Starting JobManger web frontend") // start the new web frontend. we need to load this dynamically // because it is not in the same project/dependencies val webServer = Option( WebMonitorUtils.startWebRuntimeMonitor( config, - leaderRetrievalService, + highAvailabilityServices, actorSystem) ) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 8677307461258..a535388e6b923 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -143,7 +143,8 @@ class LocalFlinkMiniCluster( metricsRegistry) = JobManager.createJobManagerComponents( config, futureExecutor, - ioExecutor) + ioExecutor, + highAvailabilityServices.createBlobStore()) if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { metricsRegistry.get.startQueryService(system, null) @@ -249,8 +250,6 @@ class LocalFlinkMiniCluster( taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment, - highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID), metricRegistry) if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { @@ -315,7 +314,6 @@ class LocalFlinkMiniCluster( memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, - leaderRetrievalService: LeaderRetrievalService, metricsRegistry: MetricRegistry): Props = { TaskManager.getTaskManagerProps( @@ -326,7 +324,7 @@ class LocalFlinkMiniCluster( memoryManager, ioManager, networkEnvironment, - leaderRetrievalService, + highAvailabilityServices, metricsRegistry) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a3110a4909c90..7684a6b97a2d7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -125,7 +125,7 @@ class TaskManager( protected val ioManager: IOManager, protected val network: NetworkEnvironment, protected val numberOfSlots: Int, - protected val leaderRetrievalService: LeaderRetrievalService, + protected val highAvailabilityServices: HighAvailabilityServices, protected val metricsRegistry: FlinkMetricRegistry) extends FlinkActor with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging @@ -149,6 +149,10 @@ class TaskManager( /** Handler for distributed files cached by this TaskManager */ protected val fileCache = new FileCache(config.getTmpDirectories()) + protected val leaderRetrievalService: LeaderRetrievalService = highAvailabilityServices. + getJobManagerLeaderRetriever( + HighAvailabilityServices.DEFAULT_JOB_ID) + private var taskManagerMetricGroup : TaskManagerMetricGroup = _ /** Actors which want to be notified once this task manager has been @@ -959,7 +963,10 @@ class TaskManager( log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.") try { - val blobcache = new BlobCache(address, config.getConfiguration()) + val blobcache = new BlobCache( + address, + config.getConfiguration(), + highAvailabilityServices.createBlobStore()) blobService = Option(blobcache) libraryCacheManager = Some( new BlobLibraryCacheManager(blobcache, config.getCleanupInterval())) @@ -1039,12 +1046,24 @@ class TaskManager( // shut down BLOB and library cache libraryCacheManager foreach { - manager => manager.shutdown() + manager => + try { + manager.shutdown() + } catch { + case ioe: IOException => log.error( + "Could not properly shutdown library cache manager.", + ioe) + } } libraryCacheManager = None blobService foreach { - service => service.shutdown() + service => + try { + service.close() + } catch { + case ioe: IOException => log.error("Could not properly shutdown blob service.", ioe) + } } blobService = None @@ -1905,9 +1924,6 @@ object TaskManager { val metricRegistry = taskManagerServices.getMetricRegistry() - val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID) - // create the actor properties (which define the actor constructor parameters) val tmProps = getTaskManagerProps( taskManagerClass, @@ -1917,7 +1933,7 @@ object TaskManager { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), - leaderRetrievalService, + highAvailabilityServices, metricRegistry) metricRegistry.startQueryService(actorSystem, resourceID) @@ -1936,7 +1952,7 @@ object TaskManager { memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, - leaderRetrievalService: LeaderRetrievalService, + highAvailabilityServices: HighAvailabilityServices, metricsRegistry: FlinkMetricRegistry ): Props = { Props( @@ -1948,7 +1964,7 @@ object TaskManager { ioManager, networkEnvironment, taskManagerConfig.getNumberSlots(), - leaderRetrievalService, + highAvailabilityServices, metricsRegistry) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index 34a8a39ea47a5..1cf77eab26c24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -43,10 +43,10 @@ public class BlobCacheRetriesTest { * A test where the connection fails twice and then the get operation succeeds. */ @Test - public void testBlobFetchRetries() { + public void testBlobFetchRetries() throws IOException { final Configuration config = new Configuration(); - testBlobFetchRetries(config); + testBlobFetchRetries(config, new VoidBlobStore()); } /** @@ -54,13 +54,23 @@ public void testBlobFetchRetries() { * (with high availability set). */ @Test - public void testBlobFetchRetriesHa() { + public void testBlobFetchRetriesHa() throws IOException { final Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); - testBlobFetchRetries(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + testBlobFetchRetries(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } /** @@ -71,14 +81,14 @@ public void testBlobFetchRetriesHa() { * configuration to use (the BlobCache will get some additional settings * set compared to this one) */ - private void testBlobFetchRetries(final Configuration config) { + private void testBlobFetchRetries(final Configuration config, final BlobStore blobStore) throws IOException { final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}; BlobServer server = null; BlobCache cache = null; try { - server = new TestingFailingBlobServer(config, 2); + server = new TestingFailingBlobServer(config, blobStore, 2); final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -97,13 +107,7 @@ private void testBlobFetchRetries(final Configuration config) { } } - // create a separate config for the cache with no access to - // the (shared) storage path if available so that the cache - // will always bother the BlobServer! - final Configuration cacheConfig = new Configuration(config); - cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); - cache = new BlobCache(serverAddress, cacheConfig); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // trigger a download - it should fail the first two times, but retry, and succeed eventually URL url = cache.getURL(key); @@ -116,17 +120,12 @@ private void testBlobFetchRetries(final Configuration config) { finally { is.close(); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (cache != null) { - cache.shutdown(); + cache.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @@ -135,10 +134,10 @@ private void testBlobFetchRetries(final Configuration config) { * A test where the connection fails too often and eventually fails the GET request. */ @Test - public void testBlobFetchWithTooManyFailures() { + public void testBlobFetchWithTooManyFailures() throws IOException { final Configuration config = new Configuration(); - testBlobFetchWithTooManyFailures(config); + testBlobFetchWithTooManyFailures(config, new VoidBlobStore()); } /** @@ -146,13 +145,23 @@ public void testBlobFetchWithTooManyFailures() { * (with high availability set). */ @Test - public void testBlobFetchWithTooManyFailuresHa() { + public void testBlobFetchWithTooManyFailuresHa() throws IOException { final Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); - testBlobFetchWithTooManyFailures(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + testBlobFetchWithTooManyFailures(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } /** @@ -163,14 +172,14 @@ public void testBlobFetchWithTooManyFailuresHa() { * configuration to use (the BlobCache will get some additional settings * set compared to this one) */ - private void testBlobFetchWithTooManyFailures(final Configuration config) { + private void testBlobFetchWithTooManyFailures(final Configuration config, final BlobStore blobStore) throws IOException { final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; BlobServer server = null; BlobCache cache = null; try { - server = new TestingFailingBlobServer(config, 10); + server = new TestingFailingBlobServer(config, blobStore, 10); final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -189,13 +198,7 @@ private void testBlobFetchWithTooManyFailures(final Configuration config) { } } - // create a separate config for the cache with no access to - // the (shared) storage path if available so that the cache - // will always bother the BlobServer! - final Configuration cacheConfig = new Configuration(config); - cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); - cache = new BlobCache(serverAddress, cacheConfig); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // trigger a download - it should fail eventually try { @@ -206,16 +209,12 @@ private void testBlobFetchWithTooManyFailures(final Configuration config) { // as we expected } } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { if (cache != null) { - cache.shutdown(); + cache.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index db55331b54324..2a65a3b246787 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -25,6 +25,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.net.URL; @@ -49,7 +50,7 @@ public class BlobCacheSuccessTest { * BlobServer. */ @Test - public void testBlobCache() { + public void testBlobCache() throws IOException { Configuration config = new Configuration(); uploadFileGetTest(config, false, false); } @@ -60,7 +61,7 @@ public void testBlobCache() { * BlobServer. */ @Test - public void testBlobCacheHa() { + public void testBlobCacheHa() throws IOException { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, @@ -73,7 +74,7 @@ public void testBlobCacheHa() { * file system and thus needs to download BLOBs from the BlobServer. */ @Test - public void testBlobCacheHaFallback() { + public void testBlobCacheHaFallback() throws IOException { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, @@ -82,17 +83,30 @@ public void testBlobCacheHaFallback() { } private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer, - boolean cacheHasAccessToFs) { + boolean cacheHasAccessToFs) throws IOException { // First create two BLOBs and upload them to BLOB server final byte[] buf = new byte[128]; final List blobKeys = new ArrayList(2); BlobServer blobServer = null; BlobCache blobCache = null; + BlobStoreService blobStoreService = null; try { + final Configuration cacheConfig; + if (cacheHasAccessToFs) { + cacheConfig = config; + } else { + // just in case parameters are still read from the server, + // create a separate configuration object for the cache + cacheConfig = new Configuration(config); + cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath() + "/does-not-exist"); + } + + blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig); // Start the BLOB server - blobServer = new BlobServer(config); + blobServer = new BlobServer(config, blobStoreService); final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort()); // Upload BLOBs @@ -112,22 +126,11 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit if (cacheWorksWithoutServer) { // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. - blobServer.shutdown(); + blobServer.close(); blobServer = null; } - final Configuration cacheConfig; - if (cacheHasAccessToFs) { - cacheConfig = config; - } else { - // just in case parameters are still read from the server, - // create a separate configuration object for the cache - cacheConfig = new Configuration(config); - cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); - } - - blobCache = new BlobCache(serverAddress, cacheConfig); + blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService); for (BlobKey blobKey : blobKeys) { blobCache.getURL(blobKey); @@ -135,7 +138,7 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit if (blobServer != null) { // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. - blobServer.shutdown(); + blobServer.close(); blobServer = null; } @@ -162,18 +165,17 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit fail(e.getMessage()); } } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (blobServer != null) { - blobServer.shutdown(); + blobServer.close(); } if(blobCache != null){ - blobCache.shutdown(); + blobCache.close(); + } + + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 27603d0586940..f9052e13ee8dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -36,6 +36,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -43,7 +44,7 @@ /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ -public class BlobClientSslTest { +public class BlobClientSslTest extends TestLogger { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; @@ -64,19 +65,14 @@ public class BlobClientSslTest { * Starts the SSL enabled BLOB server. */ @BeforeClass - public static void startSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SSL_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); + sslClientConfig = new Configuration(); sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -88,20 +84,14 @@ public static void startSSLServer() { * Starts the SSL disabled BLOB server. */ @BeforeClass - public static void startNonSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startNonSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(BlobServerOptions.SSL_ENABLED, false); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -114,13 +104,13 @@ public static void startNonSSLServer() { * Shuts the BLOB server down. */ @AfterClass - public static void stopServers() { + public static void stopServers() throws IOException { if (BLOB_SSL_SERVER != null) { - BLOB_SSL_SERVER.shutdown(); + BLOB_SSL_SERVER.close(); } if (BLOB_SERVER != null) { - BLOB_SERVER.shutdown(); + BLOB_SERVER.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 8f8f8c5203225..fda4ee9f6ecb1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -57,24 +57,18 @@ public class BlobClientTest { * Starts the BLOB server. */ @BeforeClass - public static void startServer() { - try { - blobServiceConfig = new Configuration(); - BLOB_SERVER = new BlobServer(blobServiceConfig); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startServer() throws IOException { + blobServiceConfig = new Configuration(); + BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore()); } /** * Shuts the BLOB server down. */ @AfterClass - public static void stopServer() { + public static void stopServer() throws IOException { if (BLOB_SERVER != null) { - BLOB_SERVER.shutdown(); + BLOB_SERVER.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index f8d50d5020cf9..4f12ddb36db4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -30,16 +30,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Random; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -59,10 +56,20 @@ public void testBlobServerRecovery() throws Exception { config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); - testBlobServerRecovery(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } - public static void testBlobServerRecovery(final Configuration config) throws IOException { + public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException { final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId; Random rand = new Random(); @@ -73,7 +80,7 @@ public static void testBlobServerRecovery(final Configuration config) throws IOE try { for (int i = 0; i < server.length; i++) { - server[i] = new BlobServer(config); + server[i] = new BlobServer(config, blobStore); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); } @@ -166,7 +173,7 @@ public static void testBlobServerRecovery(final Configuration config) throws IOE finally { for (BlobServer s : server) { if (s != null) { - s.shutdown(); + s.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 025a2ffaa33c2..e8e28a1949f60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -44,10 +44,11 @@ public class BlobServerDeleteTest { public void testDeleteSingle() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -93,10 +94,11 @@ public void testDeleteSingle() { public void testDeleteAll() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -156,10 +158,11 @@ public void testDeleteAll() { public void testDeleteAlreadyDeletedByBlobKey() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -195,10 +198,11 @@ public void testDeleteAlreadyDeletedByBlobKey() { public void testDeleteAlreadyDeletedByName() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -237,10 +241,11 @@ public void testDeleteFails() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -289,7 +294,11 @@ private void cleanup(BlobServer server, BlobClient client) { } } if (server != null) { - server.shutdown(); + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 59a62e132918e..6d1dba8db875d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -40,13 +40,13 @@ public class BlobServerGetTest { private final Random rnd = new Random(); @Test - public void testGetFailsDuringLookup() { + public void testGetFailsDuringLookup() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -66,37 +66,27 @@ public void testGetFailsDuringLookup() { try { client.get(key); fail("This should not succeed."); - } - catch (IOException e) { + } catch (IOException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testGetFailsDuringStreaming() { + public void testGetFailsDuringStreaming() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -129,21 +119,12 @@ public void testGetFailsDuringStreaming() { catch (IOException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index c4d6d1cbf0a94..441ca7d007b86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -42,13 +42,13 @@ public class BlobServerPutTest { private final Random rnd = new Random(); @Test - public void testPutBufferSuccessful() { + public void testPutBufferSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -95,34 +95,25 @@ public void testPutBufferSuccessful() { BlobUtils.readFully(is3, result3, 0, result3.length, null); is3.close(); assertArrayEquals(data, result3); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutStreamSuccessful() { + public void testPutStreamSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -143,12 +134,7 @@ public void testPutStreamSuccessful() { String stringKey = "my test key"; client.put(jid, stringKey, new ByteArrayInputStream(data)); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { try { client.close(); @@ -157,19 +143,19 @@ public void testPutStreamSuccessful() { } } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutChunkedStreamSuccessful() { + public void testPutChunkedStreamSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -190,27 +176,18 @@ public void testPutChunkedStreamSuccessful() { String stringKey = "my test key"; client.put(jid, stringKey, new ChunkedInputStream(data, 17)); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutBufferFails() { + public void testPutBufferFails() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -219,7 +196,7 @@ public void testPutBufferFails() { File tempFileDir = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); @@ -250,31 +227,22 @@ public void testPutBufferFails() { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { // set writable again to make sure we can remove the directory if (tempFileDir != null) { tempFileDir.setWritable(true, false); } if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutNamedBufferFails() { + public void testPutNamedBufferFails() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -283,7 +251,7 @@ public void testPutNamedBufferFails() { File tempFileDir = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); @@ -317,25 +285,16 @@ public void testPutNamedBufferFails() { catch (IllegalStateException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { // set writable again to make sure we can remove the directory if (tempFileDir != null) { tempFileDir.setWritable(true, false); } if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index c3762aa49f68a..120d86a320fde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -39,8 +39,8 @@ public class BlobServerRangeTest extends TestLogger { public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, "0"); - BlobServer srv = new BlobServer(conf); - srv.shutdown(); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); + srv.close(); } /** @@ -63,7 +63,7 @@ public void testPortUnavailable() throws IOException { // this thing is going to throw an exception try { - BlobServer srv = new BlobServer(conf); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); } finally { socket.close(); } @@ -92,9 +92,9 @@ public void testOnePortAvailable() throws IOException { // this thing is going to throw an exception try { - BlobServer srv = new BlobServer(conf); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); Assert.assertEquals(availablePort, srv.getPort()); - srv.shutdown(); + srv.close(); } finally { sockets[0].close(); sockets[1].close(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java index 93f9b7331dbe7..91e119b9173fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java @@ -28,8 +28,8 @@ public class TestingFailingBlobServer extends BlobServer { private int numFailures; - public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException { - super(config); + public TestingFailingBlobServer(Configuration config, BlobStore blobStore, int numFailures) throws IOException { + super(config, blobStore); this.numFailures = numFailures; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 5d9ade3281e01..98e6b3ec53efa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -18,13 +18,12 @@ package org.apache.flink.runtime.execution.librarycache; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.util.OperatingSystem; @@ -45,7 +44,7 @@ public class BlobLibraryCacheManagerTest { @Test - public void testLibraryCacheManagerCleanup() { + public void testLibraryCacheManagerCleanup() throws IOException, InterruptedException { JobID jid = new JobID(); List keys = new ArrayList(); @@ -56,7 +55,7 @@ public void testLibraryCacheManagerCleanup() { try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress, config); @@ -108,14 +107,9 @@ public void testLibraryCacheManagerCleanup() { assertEquals(2, caughtExceptions); bc.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (server != null) { - server.shutdown(); + server.close(); } if (libraryCacheManager != null) { @@ -130,7 +124,7 @@ public void testLibraryCacheManagerCleanup() { } @Test - public void testRegisterAndDownload() { + public void testRegisterAndDownload() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -139,9 +133,9 @@ public void testRegisterAndDownload() { try { // create the blob transfer services Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - cache = new BlobCache(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // upload some meaningless data to the server BlobClient uploader = new BlobClient(serverAddress, config); @@ -210,22 +204,17 @@ public void testRegisterAndDownload() { catch (IOException e) { // splendid! } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (cacheDir != null) { if (!cacheDir.setWritable(true, false)) { System.err.println("Could not re-add write permissions to cache directory."); } } if (cache != null) { - cache.shutdown(); + cache.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 54e1a9b1ec89a..16e3a05f2b9e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.TestLogger; @@ -63,6 +65,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2]; BlobCache cache = null; BlobLibraryCacheManager libCache = null; + BlobStoreService blobStoreService = null; Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); @@ -70,8 +73,10 @@ public void testRecoveryRegisterAndDownload() throws Exception { config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + for (int i = 0; i < server.length; i++) { - server[i] = new BlobServer(config); + server[i] = new BlobServer(config, blobStoreService); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000); } @@ -89,7 +94,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { } // The cache - cache = new BlobCache(serverAddress[0], config); + cache = new BlobCache(serverAddress[0], config, blobStoreService); libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Register uploaded libraries @@ -110,10 +115,10 @@ public void testRecoveryRegisterAndDownload() throws Exception { } // Shutdown cache and start with other server - cache.shutdown(); + cache.close(); libCache.shutdown(); - cache = new BlobCache(serverAddress[1], config); + cache = new BlobCache(serverAddress[1], config, blobStoreService); libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Verify key 1 @@ -156,17 +161,21 @@ public void testRecoveryRegisterAndDownload() throws Exception { finally { for (BlobServer s : server) { if (s != null) { - s.shutdown(); + s.close(); } } if (cache != null) { - cache.shutdown(); + cache.close(); } if (libCache != null) { libCache.shutdown(); } + + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java index 06ffe3cded04c..d89093d138959 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; -import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; @@ -62,7 +62,10 @@ public void testZooKeeperRegistry() throws Exception { configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); final HighAvailabilityServices zkHaService = new ZooKeeperHaServices( - ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration); + ZooKeeperUtils.startCuratorFramework(configuration), + Executors.directExecutor(), + configuration, + new VoidBlobStore()); final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index b8b5984f0b20b..a63b02d785f19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.SubtaskState; @@ -71,7 +70,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -102,7 +100,6 @@ import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -190,7 +187,11 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { TestingUtils.defaultExecutor(), instanceManager, scheduler, - new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), + new BlobLibraryCacheManager( + new BlobServer( + flinkConfiguration, + testingHighAvailabilityServices.createBlobStore()), + 3600000L), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index d6257ba67189d..70800e50ad9d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; @@ -184,7 +185,7 @@ private Props createJobManagerProps(Configuration configuration) throws Exceptio TestingUtils.defaultExecutor(), new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), - new BlobLibraryCacheManager(new BlobServer(configuration), 10L), + new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeoutAsFiniteDuration(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 0ea47f2d865ca..0282a4fefe1f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -18,17 +18,21 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -64,10 +68,13 @@ public void before() throws Exception { config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); - highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, + CuratorFramework client = ZooKeeperUtils.startCuratorFramework(config); + + highAvailabilityServices = new ZooKeeperHaServices( + client, TestingUtils.defaultExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); + config, + new VoidBlobStore()); } @After diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index 58f223113ca5d..d6fc48c5e16a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -97,7 +97,7 @@ public void testMetricRegistryLifeCycle() throws Exception { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices, tmRegistry); final ActorRef taskManager = actorSystem.actorOf(tmProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 2a4c0365d034c..9dcfc70c26bae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -149,9 +149,6 @@ public void testComponentsStartupShutdown() throws Exception { network.start(); - LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID); - MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config); // create the task manager @@ -164,7 +161,7 @@ public void testComponentsStartupShutdown() throws Exception { ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, new MetricRegistry(metricRegistryConfiguration)); taskManager = actorSystem.actorOf(tmProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 92de31ac531d5..0844aad18a499 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.concurrent.Executors; @@ -57,6 +58,7 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -601,7 +603,7 @@ protected void run() { } @Test - public void testCheckForValidRegistrationSessionIDs() { + public void testCheckForValidRegistrationSessionIDs() throws IOException { new JavaTestKit(actorSystem) {{ ActorGateway taskManagerGateway = null; @@ -612,6 +614,7 @@ public void testCheckForValidRegistrationSessionIDs() { HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class); when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID))) .thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID)); + when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore()); try { // we make the test actor (the test kit) the JobManager to intercept diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 7ba163366a631..98f136aca541f 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console # ----------------------------------------------------------------------------- # Console (use 'console') diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 4be3299bf0346..1b9ee48646c73 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -249,7 +249,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor val components = JobManager.createJobManagerComponents( config, executor, - executor) + executor, + highAvailabilityServices.createBlobStore()) // Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService. // The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus, diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 09dc5ed24f271..1db0a8528ff5a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.runtime.testingUtils import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration @@ -32,15 +32,15 @@ import scala.language.postfixOps /** Subclass of the [[TaskManager]] to support testing messages */ class TestingTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) extends TaskManager( config, resourceID, @@ -49,19 +49,19 @@ class TestingTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) with TestingTaskManagerLike { def this( - config: TaskManagerConfiguration, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) { + config: TaskManagerConfiguration, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) { this( config, ResourceID.generate(), @@ -70,7 +70,7 @@ class TestingTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 5f9d1784ae0a1..2983d667f78ea 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -155,6 +155,7 @@ public void testJobManagerFailure(String zkQuorum, final File coordinateDir) thr Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath()); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( "leader", 1, config); diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala index 0f82faa40dc99..1df4b8dcc37ff 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala @@ -19,6 +19,7 @@ package org.apache.flink.yarn import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService @@ -40,19 +41,19 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike * @param ioManager IOManager responsible for I/O * @param network NetworkEnvironment for this actor * @param numberOfSlots Number of slots for this TaskManager - * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading - * JobManager + * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval + * service for retrieving the leading JobManager */ class TestingYarnTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) extends YarnTaskManager( config, resourceID, @@ -61,7 +62,7 @@ class TestingYarnTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) with TestingTaskManagerLike { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java index e9c3904b4220c..f81d0406339c9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java @@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.blob.FileSystemBlobStore; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -91,6 +92,9 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe * HA services clean up */ protected final Path haDataDirectory; + /** Blob store service to be used for the BlobServer and BlobCache */ + protected final BlobStoreService blobStoreService; + /** Flag marking this instance as shut down */ private volatile boolean closed; @@ -153,6 +157,8 @@ protected YarnHighAvailabilityServices( } LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory); + + blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString()); } // ------------------------------------------------------------------------ @@ -163,7 +169,7 @@ protected YarnHighAvailabilityServices( public BlobStore createBlobStore() throws IOException { enter(); try { - return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString()); + return blobStoreService; } finally { exit(); } @@ -192,11 +198,23 @@ public void close() throws Exception { } closed = true; + Throwable exception = null; + + try { + blobStoreService.close(); + } catch (Throwable t) { + exception = t; + } + // we do not propagate exceptions here, but only log them try { hadoopFileSystem.close(); } catch (Throwable t) { - LOG.warn("Error closing Hadoop FileSystem", t); + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Could not properly close the YarnHighAvailabilityServices."); } } finally { @@ -213,12 +231,18 @@ public void closeAndCleanupAllData() throws Exception { // we remember exceptions only, then continue cleanup, and re-throw at the end Throwable exception = null; + try { + blobStoreService.closeAndCleanupAllData(); + } catch (Throwable t) { + exception = t; + } + // first, we delete all data in Flink's data directory try { flinkFileSystem.delete(haDataDirectory, true); } catch (Throwable t) { - exception = t; + exception = ExceptionUtils.firstOrSuppressed(t, exception); } // now we actually close the services diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index be310854855ad..b7f4c9a382467 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.yarn import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.metrics.MetricRegistry @@ -38,7 +38,7 @@ class YarnTaskManager( ioManager: IOManager, network: NetworkEnvironment, numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, + highAvailabilityServices: HighAvailabilityServices, metricRegistry : MetricRegistry) extends TaskManager( config, @@ -48,7 +48,7 @@ class YarnTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) { override def handleMessage: Receive = {