diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index 8a3f66263e224..08158639b1759 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -31,6 +31,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.examples.java.wordcount.WordCount; import org.apache.flink.runtime.blob.BlobRecoveryITCase; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FileUtils; @@ -234,7 +236,17 @@ public void testBlobServerRecovery() throws Exception { config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); - BlobRecoveryITCase.testBlobServerRecovery(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } // package visible diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala index e8d6a58601772..78346394ec03a 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.mesos.runtime.clusterframework import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration @@ -38,7 +38,7 @@ class MesosTaskManager( ioManager: IOManager, network: NetworkEnvironment, numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, + highAvailabilityServices: HighAvailabilityServices, metricRegistry : MetricRegistry) extends TaskManager( config, @@ -48,7 +48,7 @@ class MesosTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) { override def handleMessage: Receive = { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 03b53ad245792..10d7c6c18f933 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.SSLUtils; @@ -148,6 +149,7 @@ public class WebRuntimeMonitor implements WebMonitor { public WebRuntimeMonitor( Configuration config, LeaderRetrievalService leaderRetrievalService, + BlobView blobView, ActorSystem actorSystem) throws IOException, InterruptedException { this.leaderRetrievalService = checkNotNull(leaderRetrievalService); @@ -269,10 +271,26 @@ public WebRuntimeMonitor( GET(router, new JobMetricsHandler(metricFetcher)); GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher)); - GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.LOG, config, enableSSL)); - GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, - TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL)); + GET(router, + new TaskManagerLogHandler( + retriever, + context, + jobManagerAddressPromise.future(), + timeout, + TaskManagerLogHandler.FileMode.LOG, + config, + enableSSL, + blobView)); + GET(router, + new TaskManagerLogHandler( + retriever, + context, + jobManagerAddressPromise.future(), + timeout, + TaskManagerLogHandler.FileMode.STDOUT, + config, + enableSSL, + blobView)); GET(router, new TaskManagerMetricsHandler(metricFetcher)); router diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 37ee8149dcddf..53ee336fe32d7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -50,6 +50,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.BiFunction; @@ -62,6 +63,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +118,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { private final Time timeTimeout; + private final BlobView blobView; + public enum FileMode { LOG, STDOUT @@ -128,7 +132,8 @@ public TaskManagerLogHandler( FiniteDuration timeout, FileMode fileMode, Configuration config, - boolean httpsEnabled) { + boolean httpsEnabled, + BlobView blobView) { super(retriever, localJobManagerAddressPromise, timeout, httpsEnabled); this.executor = checkNotNull(executor); @@ -142,6 +147,8 @@ public TaskManagerLogHandler( break; } + this.blobView = Preconditions.checkNotNull(blobView, "blobView"); + timeTimeout = Time.milliseconds(timeout.toMillis()); } @@ -167,7 +174,7 @@ public BlobCache checkedApply(Object result) throws IOException { Option hostOption = jobManager.actor().path().address().host(); String host = hostOption.isDefined() ? hostOption.get() : "localhost"; int port = (int) result; - return new BlobCache(new InetSocketAddress(host, port), config); + return new BlobCache(new InetSocketAddress(host, port), config, blobView); } }, executor); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index a51a2340b985f..cd5a2b7eb5de3 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; @@ -154,6 +155,7 @@ public void testRedirectToLeader() throws Exception { webMonitor[i] = new WebRuntimeMonitor( config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.createBlobStore(), jobManagerSystem[i]); } @@ -294,9 +296,11 @@ public void testLeaderNotAvailable() throws Exception { actorSystem = AkkaUtils.createDefaultActorSystem(); - LeaderRetrievalService leaderRetrievalService = mock(LeaderRetrievalService.class); webRuntimeMonitor = new WebRuntimeMonitor( - config, leaderRetrievalService, actorSystem); + config, + mock(LeaderRetrievalService.class), + mock(BlobView.class), + actorSystem); webRuntimeMonitor.start("akka://schmakka"); @@ -467,10 +471,12 @@ private WebRuntimeMonitor startWebRuntimeMonitor( config.setInteger(JobManagerOptions.WEB_PORT, 0); config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); + HighAvailabilityServices highAvailabilityServices = flink.highAvailabilityServices(); + WebRuntimeMonitor webMonitor = new WebRuntimeMonitor( config, - flink.highAvailabilityServices().getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.createBlobStore(), jmActorSystem); webMonitor.start(jobManagerAddress); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java index 4177f4497af97..3d8f1a31264b5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Executors; @@ -53,7 +54,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Matchers.any; @@ -71,7 +71,8 @@ public void testGetPaths() { AkkaUtils.getDefaultClientTimeout(), TaskManagerLogHandler.FileMode.LOG, new Configuration(), - false); + false, + new VoidBlobStore()); String[] pathsLog = handlerLog.getPaths(); Assert.assertEquals(1, pathsLog.length); Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]); @@ -83,7 +84,8 @@ public void testGetPaths() { AkkaUtils.getDefaultClientTimeout(), TaskManagerLogHandler.FileMode.STDOUT, new Configuration(), - false); + false, + new VoidBlobStore()); String[] pathsOut = handlerOut.getPaths(); Assert.assertEquals(1, pathsOut.length); Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]); @@ -131,7 +133,8 @@ public void testLogFetchingFailure() throws Exception { AkkaUtils.getDefaultClientTimeout(), TaskManagerLogHandler.FileMode.LOG, new Configuration(), - false); + false, + new VoidBlobStore()); final AtomicReference exception = new AtomicReference<>(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 23c7e63b403a5..aa47eaef3a324 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.slf4j.Logger; @@ -58,7 +57,7 @@ public final class BlobCache implements BlobService { private final File storageDir; /** Blob store for distributed file storage, e.g. in HA */ - private final BlobStore blobStore; + private final BlobView blobView; private final AtomicBoolean shutdownRequested = new AtomicBoolean(); @@ -78,55 +77,19 @@ public final class BlobCache implements BlobService { * address of the {@link BlobServer} to use for fetching files from * @param blobClientConfig * global configuration - * - * @throws IOException - * thrown if the (local or distributed) file storage cannot be created or - * is not usable - */ - public BlobCache(InetSocketAddress serverAddress, - Configuration blobClientConfig) throws IOException { - this(serverAddress, blobClientConfig, - BlobUtils.createBlobStoreFromConfig(blobClientConfig)); - } - - /** - * Instantiates a new BLOB cache. - * - * @param serverAddress - * address of the {@link BlobServer} to use for fetching files from - * @param blobClientConfig - * global configuration - * @param haServices - * high availability services able to create a distributed blob store - * - * @throws IOException - * thrown if the (local or distributed) file storage cannot be created or - * is not usable - */ - public BlobCache(InetSocketAddress serverAddress, - Configuration blobClientConfig, HighAvailabilityServices haServices) throws IOException { - this(serverAddress, blobClientConfig, haServices.createBlobStore()); - } - - /** - * Instantiates a new BLOB cache. - * - * @param serverAddress - * address of the {@link BlobServer} to use for fetching files from - * @param blobClientConfig - * global configuration - * @param blobStore + * @param blobView * (distributed) blob store file system to retrieve files from first * * @throws IOException * thrown if the (local or distributed) file storage cannot be created or is not usable */ - private BlobCache( - final InetSocketAddress serverAddress, final Configuration blobClientConfig, - final BlobStore blobStore) throws IOException { + public BlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig, + final BlobView blobView) throws IOException { this.serverAddress = checkNotNull(serverAddress); this.blobClientConfig = checkNotNull(blobClientConfig); - this.blobStore = blobStore; + this.blobView = checkNotNull(blobView, "blobStore"); // configure and create the storage directory String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY); @@ -168,7 +131,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // first try the distributed blob store (if available) try { - blobStore.get(requiredBlob, localJarFile); + blobView.get(requiredBlob, localJarFile); } catch (Exception e) { LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e); } @@ -293,28 +256,23 @@ public int getPort() { } @Override - public void shutdown() { + public void close() throws IOException { if (shutdownRequested.compareAndSet(false, true)) { LOG.info("Shutting down BlobCache"); // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); - } - catch (IOException e) { - LOG.error("BLOB cache failed to properly clean up its storage directory."); - } - - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { - LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); + } finally { + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } catch (Throwable t) { + LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 0e157772defa2..937eab0824269 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -21,9 +21,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.NetUtils; @@ -94,19 +94,14 @@ public class BlobServer extends Thread implements BlobService { /** * Instantiates a new BLOB server and binds it to a free network port. * + * @param config Configuration to be used to instantiate the BlobServer + * @param blobStore BlobStore to store blobs persistently + * * @throws IOException * thrown if the BLOB server cannot bind to a free network port or if the * (local or distributed) file storage cannot be created or is not usable */ - public BlobServer(Configuration config) throws IOException { - this(config, BlobUtils.createBlobStoreFromConfig(config)); - } - - public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException { - this(config, haServices.createBlobStore()); - } - - private BlobServer(Configuration config, BlobStore blobStore) throws IOException { + public BlobServer(Configuration config, BlobStore blobStore) throws IOException { this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); @@ -269,7 +264,12 @@ public void run() { catch (Throwable t) { if (!this.shutdownRequested.get()) { LOG.error("BLOB server stopped working. Shutting down", t); - shutdown(); + + try { + close(); + } catch (Throwable closeThrowable) { + LOG.error("Could not properly close the BlobServer.", closeThrowable); + } } } } @@ -278,13 +278,15 @@ public void run() { * Shuts down the BLOB server. */ @Override - public void shutdown() { + public void close() throws IOException { if (shutdownRequested.compareAndSet(false, true)) { + Exception exception = null; + try { this.serverSocket.close(); } catch (IOException ioe) { - LOG.debug("Error while closing the server socket.", ioe); + exception = ioe; } // wake the thread up, in case it is waiting on some operation @@ -294,13 +296,15 @@ public void shutdown() { join(); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.debug("Error while waiting for this thread to die.", ie); } synchronized (activeConnections) { if (!activeConnections.isEmpty()) { for (BlobServerConnection conn : activeConnections) { - LOG.debug("Shutting down connection " + conn.getName()); + LOG.debug("Shutting down connection {}.", conn.getName()); conn.close(); } activeConnections.clear(); @@ -312,7 +316,7 @@ public void shutdown() { FileUtils.deleteDirectory(storageDir); } catch (IOException e) { - LOG.error("BLOB server failed to properly clean up its storage directory."); + exception = ExceptionUtils.firstOrSuppressed(e, exception); } // Remove shutdown hook to prevent resource leaks, unless this is invoked by the @@ -325,13 +329,15 @@ public void shutdown() { // race, JVM is in shutdown already, we can safely ignore this } catch (Throwable t) { - LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook."); + LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t); } } if(LOG.isInfoEnabled()) { LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort()); } + + ExceptionUtils.tryRethrowIOException(exception); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index 419ee8dbf19fd..97a2d5166db8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.blob; +import java.io.Closeable; import java.io.IOException; import java.net.URL; /** * A simple store and retrieve binary large objects (BLOBs). */ -public interface BlobService { +public interface BlobService extends Closeable { /** * This method returns the URL of the file associated with the provided blob key. @@ -49,11 +50,6 @@ public interface BlobService { * @return the port of the blob service. */ int getPort(); - - /** - * Shutdown method which is called to terminate the blob service. - */ - void shutdown(); BlobClient createClient() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java index 64dc942dbeadd..4c26a5a215b3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java @@ -26,7 +26,7 @@ /** * A blob store. */ -public interface BlobStore { +public interface BlobStore extends BlobView { /** * Copies the local file to the blob store. @@ -49,25 +49,6 @@ public interface BlobStore { */ void put(File localFile, JobID jobId, String key) throws IOException; - /** - * Copies a blob to a local file. - * - * @param blobKey The blob ID - * @param localFile The local file to copy to - * @throws IOException If the copy fails - */ - void get(BlobKey blobKey, File localFile) throws IOException; - - /** - * Copies a blob to a local file. - * - * @param jobId The JobID part of ID for the blob - * @param key The String part of ID for the blob - * @param localFile The local file to copy to - * @throws IOException If the copy fails - */ - void get(JobID jobId, String key, File localFile) throws IOException; - /** * Tries to delete a blob from storage. * @@ -95,9 +76,4 @@ public interface BlobStore { * @param jobId The JobID part of all blobs to delete */ void deleteAll(JobID jobId); - - /** - * Cleans up the store and deletes all blobs. - */ - void cleanUp(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java new file mode 100644 index 0000000000000..83cd9d4eebb60 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import java.io.Closeable; + +/** + * Service interface for the BlobStore which allows to close and clean up its data. + */ +public interface BlobStoreService extends BlobStore, Closeable { + + /** + * Closes and cleans up the store. This entails the deletion of all blobs. + */ + void closeAndCleanupAllData(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 3c14f2fe7b08f..8da362db9b471 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -22,8 +22,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FileUtils; import org.apache.flink.util.StringUtils; @@ -41,6 +43,7 @@ import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * Utility class to work with blob data. @@ -78,18 +81,49 @@ public class BlobUtils { * @throws IOException * thrown if the (distributed) file storage cannot be created */ - static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException { + public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException { HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); if (highAvailabilityMode == HighAvailabilityMode.NONE) { return new VoidBlobStore(); } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { - return ZooKeeperHaServices.createBlobStore(config); + return createFileSystemBlobStore(config); } else { throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); } } + private static BlobStoreService createFileSystemBlobStore(Configuration configuration) throws IOException { + String storagePath = configuration.getValue( + HighAvailabilityOptions.HA_STORAGE_PATH); + if (isNullOrWhitespaceOnly(storagePath)) { + throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + + HighAvailabilityOptions.HA_STORAGE_PATH); + } + + final Path path; + try { + path = new Path(storagePath); + } catch (Exception e) { + throw new IOException("Invalid path for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + final FileSystem fileSystem; + try { + fileSystem = path.getFileSystem(); + } catch (Exception e) { + throw new IOException("Could not create FileSystem for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + final String clusterId = + configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + storagePath += "/" + clusterId; + + return new FileSystemBlobStore(fileSystem, storagePath); + } + /** * Creates a storage directory for a blob service. * @@ -246,10 +280,10 @@ static Thread addShutdownHook(final BlobService service, final Logger logger) { @Override public void run() { try { - service.shutdown(); + service.close(); } catch (Throwable t) { - logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(), t); + logger.error("Error during shutdown of blob service via JVM shutdown hook.", t); } } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java new file mode 100644 index 0000000000000..11cf0111d7bbc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.File; +import java.io.IOException; + +/** + * View on blobs stored in a {@link BlobStore}. + */ +public interface BlobView { + + /** + * Copies a blob to a local file. + * + * @param blobKey The blob ID + * @param localFile The local file to copy to + * @throws IOException If the copy fails + */ + void get(BlobKey blobKey, File localFile) throws IOException; + + /** + * Copies a blob to a local file. + * + * @param jobId The JobID part of ID for the blob + * @param key The String part of ID for the blob + * @param localFile The local file to copy to + * @throws IOException If the copy fails + */ + void get(JobID jobId, String key, File localFile) throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index 7cfce7a399519..b54756cc9d7b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -41,7 +41,7 @@ * *

This is used in addition to the local blob storage for high availability. */ -public class FileSystemBlobStore implements BlobStore { +public class FileSystemBlobStore implements BlobStoreService { private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class); @@ -157,14 +157,19 @@ private void delete(String blobPath) { } @Override - public void cleanUp() { + public void closeAndCleanupAllData() { try { LOG.debug("Cleaning up {}.", basePath); fileSystem.delete(new Path(basePath), true); } catch (Exception e) { - LOG.error("Failed to clean up recovery directory."); + LOG.error("Failed to clean up recovery directory.", e); } } + + @Override + public void close() throws IOException { + // nothing to do for the FileSystemBlobStore + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java index 8606844c08cde..c14d0820d5d59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java @@ -26,7 +26,7 @@ /** * A blob store doing nothing. */ -public class VoidBlobStore implements BlobStore { +public class VoidBlobStore implements BlobStoreService { @Override public void put(File localFile, BlobKey blobKey) throws IOException { @@ -57,6 +57,8 @@ public void deleteAll(JobID jobId) { } @Override - public void cleanUp() { - } + public void closeAndCleanupAllData() {} + + @Override + public void close() throws IOException {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index b570383cfd654..86d927adc640d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -191,7 +191,8 @@ public static JobListeningContext attachToRunningJob( public static ClassLoader retrieveClassLoader( JobID jobID, ActorGateway jobManager, - Configuration config) + Configuration config, + HighAvailabilityServices highAvailabilityServices) throws JobRetrievalException { final Object jmAnswer; @@ -213,7 +214,8 @@ public static ClassLoader retrieveClassLoader( InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); final BlobCache blobClient; try { - blobClient = new BlobCache(serverAddress, config); + // TODO: Fix lifecycle of BlobCache to properly close it upon usage + blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore()); } catch (IOException e) { throw new JobRetrievalException(jobID, "Failed to setup blob cache", e); @@ -229,7 +231,12 @@ public static ClassLoader retrieveClassLoader( try { allURLs[pos++] = blobClient.getURL(blobKey); } catch (Exception e) { - blobClient.shutdown(); + try { + blobClient.close(); + } catch (IOException ioe) { + LOG.warn("Could not properly close the BlobClient.", ioe); + } + throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey, e); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java index fe8c34cd09b61..bb448be508b08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java @@ -134,7 +134,11 @@ public FiniteDuration getTimeout() { public ClassLoader getClassLoader() throws JobRetrievalException { if (classLoader == null) { // lazily initializes the class loader when it is needed - classLoader = JobClient.retrieveClassLoader(jobID, getJobManager(), configuration); + classLoader = JobClient.retrieveClassLoader( + jobID, + getJobManager(), + configuration, + highAvailabilityServices); LOG.info("Reconstructed class loader for Job {}", jobID); } return classLoader; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index ea508d191e7e5..5bdfe1a01d07b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -31,7 +31,6 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.util.NetUtils; @@ -191,13 +190,12 @@ public static WebMonitor startWebMonitorIfConfigured( if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { logger.info("Starting JobManager Web Frontend"); - LeaderRetrievalService leaderRetrievalService = - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); - // start the web frontend. we need to load this dynamically // because it is not in the same project/dependencies WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor( - config, leaderRetrievalService, actorSystem); + config, + highAvailabilityServices, + actorSystem); // start the web monitor if (monitor != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index b0d5d834e459e..0702a1134819c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -208,7 +208,7 @@ public void shutdown() throws IOException{ LOG.warn("Failed to run clean up task before shutdown", t); } - blobService.shutdown(); + blobService.close(); cleanupTimer.cancel(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index c9e295702f3f5..2ebfd20245662 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -21,6 +21,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices; import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; @@ -49,10 +51,13 @@ public static HighAvailabilityServices createAvailableOrEmbeddedServices( return new EmbeddedHaServices(executor); case ZOOKEEPER: + BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(config), executor, - config); + config, + blobStoreService); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); @@ -85,10 +90,13 @@ public static HighAvailabilityServices createHighAvailabilityServices( return new StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl); case ZOOKEEPER: + BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); + return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), executor, - configuration); + configuration, + blobStoreService); default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index ac90e3fb4a542..9c3d986929cd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -44,10 +44,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices private final RunningJobsRegistry runningJobsRegistry; + private final VoidBlobStore voidBlobStore; + private boolean shutdown; public AbstractNonHaServices() { this.runningJobsRegistry = new StandaloneRunningJobsRegistry(); + this.voidBlobStore = new VoidBlobStore(); shutdown = false; } @@ -88,7 +91,7 @@ public BlobStore createBlobStore() throws IOException { synchronized (lock) { checkNotShutdown(); - return new VoidBlobStore(); + return voidBlobStore; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index 5d895c1e14261..d4748cd3d8a57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -23,11 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobStore; -import org.apache.flink.runtime.blob.FileSystemBlobStore; +import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -36,12 +33,12 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper. @@ -102,11 +99,20 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { /** The zookeeper based running jobs registry */ private final RunningJobsRegistry runningJobsRegistry; - public ZooKeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) { + /** Store for arbitrary blobs */ + private final BlobStoreService blobStoreService; + + public ZooKeeperHaServices( + CuratorFramework client, + Executor executor, + Configuration configuration, + BlobStoreService blobStoreService) { this.client = checkNotNull(client); this.executor = checkNotNull(executor); this.configuration = checkNotNull(configuration); this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration); + + this.blobStoreService = checkNotNull(blobStoreService); } // ------------------------------------------------------------------------ @@ -150,61 +156,52 @@ public RunningJobsRegistry getRunningJobsRegistry() { @Override public BlobStore createBlobStore() throws IOException { - return createBlobStore(configuration); + return blobStoreService; } - /** - * Creates the BLOB store in which BLOBs are stored in a highly-available - * fashion. - * - * @param configuration configuration to extract the storage path from - * @return Blob store - * @throws IOException if the blob store could not be created - */ - public static BlobStore createBlobStore( - final Configuration configuration) throws IOException { - String storagePath = configuration.getValue( - HighAvailabilityOptions.HA_STORAGE_PATH); - if (isNullOrWhitespaceOnly(storagePath)) { - throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + - HighAvailabilityOptions.HA_STORAGE_PATH); - } + // ------------------------------------------------------------------------ + // Shutdown + // ------------------------------------------------------------------------ - final Path path; - try { - path = new Path(storagePath); - } catch (Exception e) { - throw new IOException("Invalid path for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); - } + @Override + public void close() throws Exception { + Throwable exception = null; - final FileSystem fileSystem; try { - fileSystem = path.getFileSystem(); - } catch (Exception e) { - throw new IOException("Could not create FileSystem for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + blobStoreService.close(); + } catch (Throwable t) { + exception = t; } - final String clusterId = - configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); - storagePath += "/" + clusterId; + internalClose(); - return new FileSystemBlobStore(fileSystem, storagePath); + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices."); + } } - // ------------------------------------------------------------------------ - // Shutdown - // ------------------------------------------------------------------------ - @Override - public void close() throws Exception { - client.close(); + public void closeAndCleanupAllData() throws Exception { + Throwable exception = null; + + try { + blobStoreService.closeAndCleanupAllData(); + } catch (Throwable t) { + exception = t; + } + + internalClose(); + + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices."); + } } - @Override - public void closeAndCleanupAllData() throws Exception { - close(); + /** + * Closes components which don't distinguish between close and closeAndCleanupAllData + */ + private void internalClose() { + client.close(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java index 8cda0f77d4f0a..ac4d06fc815e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java @@ -105,7 +105,7 @@ public static JobManagerServices fromConfiguration( Configuration config, HighAvailabilityServices haServices) throws Exception { - final BlobServer blobServer = new BlobServer(config, haServices); + final BlobServer blobServer = new BlobServer(config, haServices.createBlobStore()); final long cleanupInterval = config.getLong( ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index d05d900a9992d..a91906569b610 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -923,7 +923,10 @@ private JobManagerConnection associateWithJobManager( final LibraryCacheManager libraryCacheManager; try { - final BlobCache blobCache = new BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices); + final BlobCache blobCache = new BlobCache( + blobServerAddress, + taskManagerConfiguration.getConfiguration(), + haServices.createBlobStore()); libraryCacheManager = new BlobLibraryCacheManager( blobCache, taskManagerConfiguration.getCleanupInterval()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index dd9527ea328f0..3853b216b15ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -28,10 +28,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -117,12 +119,14 @@ private static File resolveFileLocation(String logFilePath) { * Because failure to start the web runtime monitor is not considered fatal, this method does * not throw any exceptions, but only logs them. * - * @param config The configuration for the runtime monitor. - * @param leaderRetrievalService Leader retrieval service to get the leading JobManager + * @param config The configuration for the runtime monitor. + * @param highAvailabilityServices HighAvailabilityServices used to start the WebRuntimeMonitor + * @param actorSystem ActorSystem used to connect to the JobManager + * */ public static WebMonitor startWebRuntimeMonitor( Configuration config, - LeaderRetrievalService leaderRetrievalService, + HighAvailabilityServices highAvailabilityServices, ActorSystem actorSystem) { // try to load and instantiate the class try { @@ -130,9 +134,14 @@ public static WebMonitor startWebRuntimeMonitor( Class clazz = Class.forName(classname).asSubclass(WebMonitor.class); Constructor constructor = clazz.getConstructor(Configuration.class, - LeaderRetrievalService.class, - ActorSystem.class); - return constructor.newInstance(config, leaderRetrievalService, actorSystem); + LeaderRetrievalService.class, + BlobView.class, + ActorSystem.class); + return constructor.newInstance( + config, + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices.createBlobStore(), + actorSystem); } catch (ClassNotFoundException e) { LOG.error("Could not load web runtime monitor. " + "Probably reason: flink-runtime-web is not in the classpath"); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 57a6415c31975..60950946e668e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -36,7 +36,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup import org.apache.flink.metrics.{Gauge, MetricGroup} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} -import org.apache.flink.runtime.blob.BlobServer +import org.apache.flink.runtime.blob.{BlobServer, BlobStore} import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore} import org.apache.flink.runtime.client._ @@ -46,7 +46,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} @@ -2274,14 +2274,12 @@ object JobManager { val webMonitor: Option[WebMonitor] = if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { LOG.info("Starting JobManager web frontend") - val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID) // start the web frontend. we need to load this dynamically // because it is not in the same project/dependencies val webServer = WebMonitorUtils.startWebRuntimeMonitor( configuration, - leaderRetrievalService, + highAvailabilityServices, jobManagerSystem) Option(webServer) @@ -2507,12 +2505,14 @@ object JobManager { * @param configuration The configuration from which to parse the config values. * @param futureExecutor to run JobManager's futures * @param ioExecutor to run blocking io operations + * @param blobStore to store blobs persistently * @return The members for a default JobManager. */ def createJobManagerComponents( configuration: Configuration, futureExecutor: ScheduledExecutorService, - ioExecutor: Executor) : + ioExecutor: Executor, + blobStore: BlobStore) : (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, @@ -2557,7 +2557,7 @@ object JobManager { var libraryCacheManager: BlobLibraryCacheManager = null try { - blobServer = new BlobServer(configuration) + blobServer = new BlobServer(configuration, blobStore) instanceManager = new InstanceManager() scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor)) libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) @@ -2576,7 +2576,7 @@ object JobManager { instanceManager.shutdown() } if (blobServer != null) { - blobServer.shutdown() + blobServer.close() } throw t @@ -2688,7 +2688,8 @@ object JobManager { metricsRegistry) = createJobManagerComponents( configuration, futureExecutor, - ioExecutor) + ioExecutor, + highAvailabilityServices.createBlobStore()) val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath) @@ -2744,7 +2745,7 @@ object JobManager { ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, - libraryCacheManager: BlobLibraryCacheManager, + libraryCacheManager: LibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, timeout: FiniteDuration, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 46c440464efda..2ace8db4c09d5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -37,7 +37,7 @@ import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, High import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.HighAvailabilityMode -import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService} +import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} @@ -387,17 +387,13 @@ abstract class FlinkMiniCluster( config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) && config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) { - // TODO: Add support for HA: Make web server work independently from the JM - val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID) - LOG.info("Starting JobManger web frontend") // start the new web frontend. we need to load this dynamically // because it is not in the same project/dependencies val webServer = Option( WebMonitorUtils.startWebRuntimeMonitor( config, - leaderRetrievalService, + highAvailabilityServices, actorSystem) ) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 8677307461258..a535388e6b923 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -143,7 +143,8 @@ class LocalFlinkMiniCluster( metricsRegistry) = JobManager.createJobManagerComponents( config, futureExecutor, - ioExecutor) + ioExecutor, + highAvailabilityServices.createBlobStore()) if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { metricsRegistry.get.startQueryService(system, null) @@ -249,8 +250,6 @@ class LocalFlinkMiniCluster( taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment, - highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID), metricRegistry) if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { @@ -315,7 +314,6 @@ class LocalFlinkMiniCluster( memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, - leaderRetrievalService: LeaderRetrievalService, metricsRegistry: MetricRegistry): Props = { TaskManager.getTaskManagerProps( @@ -326,7 +324,7 @@ class LocalFlinkMiniCluster( memoryManager, ioManager, networkEnvironment, - leaderRetrievalService, + highAvailabilityServices, metricsRegistry) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a3110a4909c90..7684a6b97a2d7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -125,7 +125,7 @@ class TaskManager( protected val ioManager: IOManager, protected val network: NetworkEnvironment, protected val numberOfSlots: Int, - protected val leaderRetrievalService: LeaderRetrievalService, + protected val highAvailabilityServices: HighAvailabilityServices, protected val metricsRegistry: FlinkMetricRegistry) extends FlinkActor with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging @@ -149,6 +149,10 @@ class TaskManager( /** Handler for distributed files cached by this TaskManager */ protected val fileCache = new FileCache(config.getTmpDirectories()) + protected val leaderRetrievalService: LeaderRetrievalService = highAvailabilityServices. + getJobManagerLeaderRetriever( + HighAvailabilityServices.DEFAULT_JOB_ID) + private var taskManagerMetricGroup : TaskManagerMetricGroup = _ /** Actors which want to be notified once this task manager has been @@ -959,7 +963,10 @@ class TaskManager( log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.") try { - val blobcache = new BlobCache(address, config.getConfiguration()) + val blobcache = new BlobCache( + address, + config.getConfiguration(), + highAvailabilityServices.createBlobStore()) blobService = Option(blobcache) libraryCacheManager = Some( new BlobLibraryCacheManager(blobcache, config.getCleanupInterval())) @@ -1039,12 +1046,24 @@ class TaskManager( // shut down BLOB and library cache libraryCacheManager foreach { - manager => manager.shutdown() + manager => + try { + manager.shutdown() + } catch { + case ioe: IOException => log.error( + "Could not properly shutdown library cache manager.", + ioe) + } } libraryCacheManager = None blobService foreach { - service => service.shutdown() + service => + try { + service.close() + } catch { + case ioe: IOException => log.error("Could not properly shutdown blob service.", ioe) + } } blobService = None @@ -1905,9 +1924,6 @@ object TaskManager { val metricRegistry = taskManagerServices.getMetricRegistry() - val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID) - // create the actor properties (which define the actor constructor parameters) val tmProps = getTaskManagerProps( taskManagerClass, @@ -1917,7 +1933,7 @@ object TaskManager { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), - leaderRetrievalService, + highAvailabilityServices, metricRegistry) metricRegistry.startQueryService(actorSystem, resourceID) @@ -1936,7 +1952,7 @@ object TaskManager { memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, - leaderRetrievalService: LeaderRetrievalService, + highAvailabilityServices: HighAvailabilityServices, metricsRegistry: FlinkMetricRegistry ): Props = { Props( @@ -1948,7 +1964,7 @@ object TaskManager { ioManager, networkEnvironment, taskManagerConfig.getNumberSlots(), - leaderRetrievalService, + highAvailabilityServices, metricsRegistry) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index 34a8a39ea47a5..1cf77eab26c24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -43,10 +43,10 @@ public class BlobCacheRetriesTest { * A test where the connection fails twice and then the get operation succeeds. */ @Test - public void testBlobFetchRetries() { + public void testBlobFetchRetries() throws IOException { final Configuration config = new Configuration(); - testBlobFetchRetries(config); + testBlobFetchRetries(config, new VoidBlobStore()); } /** @@ -54,13 +54,23 @@ public void testBlobFetchRetries() { * (with high availability set). */ @Test - public void testBlobFetchRetriesHa() { + public void testBlobFetchRetriesHa() throws IOException { final Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); - testBlobFetchRetries(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + testBlobFetchRetries(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } /** @@ -71,14 +81,14 @@ public void testBlobFetchRetriesHa() { * configuration to use (the BlobCache will get some additional settings * set compared to this one) */ - private void testBlobFetchRetries(final Configuration config) { + private void testBlobFetchRetries(final Configuration config, final BlobStore blobStore) throws IOException { final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}; BlobServer server = null; BlobCache cache = null; try { - server = new TestingFailingBlobServer(config, 2); + server = new TestingFailingBlobServer(config, blobStore, 2); final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -97,13 +107,7 @@ private void testBlobFetchRetries(final Configuration config) { } } - // create a separate config for the cache with no access to - // the (shared) storage path if available so that the cache - // will always bother the BlobServer! - final Configuration cacheConfig = new Configuration(config); - cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); - cache = new BlobCache(serverAddress, cacheConfig); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // trigger a download - it should fail the first two times, but retry, and succeed eventually URL url = cache.getURL(key); @@ -116,17 +120,12 @@ private void testBlobFetchRetries(final Configuration config) { finally { is.close(); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (cache != null) { - cache.shutdown(); + cache.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @@ -135,10 +134,10 @@ private void testBlobFetchRetries(final Configuration config) { * A test where the connection fails too often and eventually fails the GET request. */ @Test - public void testBlobFetchWithTooManyFailures() { + public void testBlobFetchWithTooManyFailures() throws IOException { final Configuration config = new Configuration(); - testBlobFetchWithTooManyFailures(config); + testBlobFetchWithTooManyFailures(config, new VoidBlobStore()); } /** @@ -146,13 +145,23 @@ public void testBlobFetchWithTooManyFailures() { * (with high availability set). */ @Test - public void testBlobFetchWithTooManyFailuresHa() { + public void testBlobFetchWithTooManyFailuresHa() throws IOException { final Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); - testBlobFetchWithTooManyFailures(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + testBlobFetchWithTooManyFailures(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } /** @@ -163,14 +172,14 @@ public void testBlobFetchWithTooManyFailuresHa() { * configuration to use (the BlobCache will get some additional settings * set compared to this one) */ - private void testBlobFetchWithTooManyFailures(final Configuration config) { + private void testBlobFetchWithTooManyFailures(final Configuration config, final BlobStore blobStore) throws IOException { final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; BlobServer server = null; BlobCache cache = null; try { - server = new TestingFailingBlobServer(config, 10); + server = new TestingFailingBlobServer(config, blobStore, 10); final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -189,13 +198,7 @@ private void testBlobFetchWithTooManyFailures(final Configuration config) { } } - // create a separate config for the cache with no access to - // the (shared) storage path if available so that the cache - // will always bother the BlobServer! - final Configuration cacheConfig = new Configuration(config); - cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); - cache = new BlobCache(serverAddress, cacheConfig); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // trigger a download - it should fail eventually try { @@ -206,16 +209,12 @@ private void testBlobFetchWithTooManyFailures(final Configuration config) { // as we expected } } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { if (cache != null) { - cache.shutdown(); + cache.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index db55331b54324..2a65a3b246787 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -25,6 +25,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.net.URL; @@ -49,7 +50,7 @@ public class BlobCacheSuccessTest { * BlobServer. */ @Test - public void testBlobCache() { + public void testBlobCache() throws IOException { Configuration config = new Configuration(); uploadFileGetTest(config, false, false); } @@ -60,7 +61,7 @@ public void testBlobCache() { * BlobServer. */ @Test - public void testBlobCacheHa() { + public void testBlobCacheHa() throws IOException { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, @@ -73,7 +74,7 @@ public void testBlobCacheHa() { * file system and thus needs to download BLOBs from the BlobServer. */ @Test - public void testBlobCacheHaFallback() { + public void testBlobCacheHaFallback() throws IOException { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, @@ -82,17 +83,30 @@ public void testBlobCacheHaFallback() { } private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer, - boolean cacheHasAccessToFs) { + boolean cacheHasAccessToFs) throws IOException { // First create two BLOBs and upload them to BLOB server final byte[] buf = new byte[128]; final List blobKeys = new ArrayList(2); BlobServer blobServer = null; BlobCache blobCache = null; + BlobStoreService blobStoreService = null; try { + final Configuration cacheConfig; + if (cacheHasAccessToFs) { + cacheConfig = config; + } else { + // just in case parameters are still read from the server, + // create a separate configuration object for the cache + cacheConfig = new Configuration(config); + cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath() + "/does-not-exist"); + } + + blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig); // Start the BLOB server - blobServer = new BlobServer(config); + blobServer = new BlobServer(config, blobStoreService); final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort()); // Upload BLOBs @@ -112,22 +126,11 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit if (cacheWorksWithoutServer) { // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. - blobServer.shutdown(); + blobServer.close(); blobServer = null; } - final Configuration cacheConfig; - if (cacheHasAccessToFs) { - cacheConfig = config; - } else { - // just in case parameters are still read from the server, - // create a separate configuration object for the cache - cacheConfig = new Configuration(config); - cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); - } - - blobCache = new BlobCache(serverAddress, cacheConfig); + blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService); for (BlobKey blobKey : blobKeys) { blobCache.getURL(blobKey); @@ -135,7 +138,7 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit if (blobServer != null) { // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. - blobServer.shutdown(); + blobServer.close(); blobServer = null; } @@ -162,18 +165,17 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit fail(e.getMessage()); } } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (blobServer != null) { - blobServer.shutdown(); + blobServer.close(); } if(blobCache != null){ - blobCache.shutdown(); + blobCache.close(); + } + + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 27603d0586940..f9052e13ee8dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -36,6 +36,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -43,7 +44,7 @@ /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ -public class BlobClientSslTest { +public class BlobClientSslTest extends TestLogger { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; @@ -64,19 +65,14 @@ public class BlobClientSslTest { * Starts the SSL enabled BLOB server. */ @BeforeClass - public static void startSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SSL_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); + sslClientConfig = new Configuration(); sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -88,20 +84,14 @@ public static void startSSLServer() { * Starts the SSL disabled BLOB server. */ @BeforeClass - public static void startNonSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startNonSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(BlobServerOptions.SSL_ENABLED, false); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -114,13 +104,13 @@ public static void startNonSSLServer() { * Shuts the BLOB server down. */ @AfterClass - public static void stopServers() { + public static void stopServers() throws IOException { if (BLOB_SSL_SERVER != null) { - BLOB_SSL_SERVER.shutdown(); + BLOB_SSL_SERVER.close(); } if (BLOB_SERVER != null) { - BLOB_SERVER.shutdown(); + BLOB_SERVER.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 8f8f8c5203225..fda4ee9f6ecb1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -57,24 +57,18 @@ public class BlobClientTest { * Starts the BLOB server. */ @BeforeClass - public static void startServer() { - try { - blobServiceConfig = new Configuration(); - BLOB_SERVER = new BlobServer(blobServiceConfig); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startServer() throws IOException { + blobServiceConfig = new Configuration(); + BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore()); } /** * Shuts the BLOB server down. */ @AfterClass - public static void stopServer() { + public static void stopServer() throws IOException { if (BLOB_SERVER != null) { - BLOB_SERVER.shutdown(); + BLOB_SERVER.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index f8d50d5020cf9..4f12ddb36db4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -30,16 +30,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Random; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -59,10 +56,20 @@ public void testBlobServerRecovery() throws Exception { config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); - testBlobServerRecovery(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } - public static void testBlobServerRecovery(final Configuration config) throws IOException { + public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException { final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId; Random rand = new Random(); @@ -73,7 +80,7 @@ public static void testBlobServerRecovery(final Configuration config) throws IOE try { for (int i = 0; i < server.length; i++) { - server[i] = new BlobServer(config); + server[i] = new BlobServer(config, blobStore); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); } @@ -166,7 +173,7 @@ public static void testBlobServerRecovery(final Configuration config) throws IOE finally { for (BlobServer s : server) { if (s != null) { - s.shutdown(); + s.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 025a2ffaa33c2..e8e28a1949f60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -44,10 +44,11 @@ public class BlobServerDeleteTest { public void testDeleteSingle() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -93,10 +94,11 @@ public void testDeleteSingle() { public void testDeleteAll() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -156,10 +158,11 @@ public void testDeleteAll() { public void testDeleteAlreadyDeletedByBlobKey() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -195,10 +198,11 @@ public void testDeleteAlreadyDeletedByBlobKey() { public void testDeleteAlreadyDeletedByName() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -237,10 +241,11 @@ public void testDeleteFails() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -289,7 +294,11 @@ private void cleanup(BlobServer server, BlobClient client) { } } if (server != null) { - server.shutdown(); + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 59a62e132918e..6d1dba8db875d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -40,13 +40,13 @@ public class BlobServerGetTest { private final Random rnd = new Random(); @Test - public void testGetFailsDuringLookup() { + public void testGetFailsDuringLookup() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -66,37 +66,27 @@ public void testGetFailsDuringLookup() { try { client.get(key); fail("This should not succeed."); - } - catch (IOException e) { + } catch (IOException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testGetFailsDuringStreaming() { + public void testGetFailsDuringStreaming() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -129,21 +119,12 @@ public void testGetFailsDuringStreaming() { catch (IOException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index c4d6d1cbf0a94..441ca7d007b86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -42,13 +42,13 @@ public class BlobServerPutTest { private final Random rnd = new Random(); @Test - public void testPutBufferSuccessful() { + public void testPutBufferSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -95,34 +95,25 @@ public void testPutBufferSuccessful() { BlobUtils.readFully(is3, result3, 0, result3.length, null); is3.close(); assertArrayEquals(data, result3); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutStreamSuccessful() { + public void testPutStreamSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -143,12 +134,7 @@ public void testPutStreamSuccessful() { String stringKey = "my test key"; client.put(jid, stringKey, new ByteArrayInputStream(data)); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { try { client.close(); @@ -157,19 +143,19 @@ public void testPutStreamSuccessful() { } } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutChunkedStreamSuccessful() { + public void testPutChunkedStreamSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -190,27 +176,18 @@ public void testPutChunkedStreamSuccessful() { String stringKey = "my test key"; client.put(jid, stringKey, new ChunkedInputStream(data, 17)); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutBufferFails() { + public void testPutBufferFails() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -219,7 +196,7 @@ public void testPutBufferFails() { File tempFileDir = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); @@ -250,31 +227,22 @@ public void testPutBufferFails() { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { // set writable again to make sure we can remove the directory if (tempFileDir != null) { tempFileDir.setWritable(true, false); } if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutNamedBufferFails() { + public void testPutNamedBufferFails() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -283,7 +251,7 @@ public void testPutNamedBufferFails() { File tempFileDir = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); @@ -317,25 +285,16 @@ public void testPutNamedBufferFails() { catch (IllegalStateException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { // set writable again to make sure we can remove the directory if (tempFileDir != null) { tempFileDir.setWritable(true, false); } if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index c3762aa49f68a..120d86a320fde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -39,8 +39,8 @@ public class BlobServerRangeTest extends TestLogger { public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, "0"); - BlobServer srv = new BlobServer(conf); - srv.shutdown(); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); + srv.close(); } /** @@ -63,7 +63,7 @@ public void testPortUnavailable() throws IOException { // this thing is going to throw an exception try { - BlobServer srv = new BlobServer(conf); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); } finally { socket.close(); } @@ -92,9 +92,9 @@ public void testOnePortAvailable() throws IOException { // this thing is going to throw an exception try { - BlobServer srv = new BlobServer(conf); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); Assert.assertEquals(availablePort, srv.getPort()); - srv.shutdown(); + srv.close(); } finally { sockets[0].close(); sockets[1].close(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java index 93f9b7331dbe7..91e119b9173fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java @@ -28,8 +28,8 @@ public class TestingFailingBlobServer extends BlobServer { private int numFailures; - public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException { - super(config); + public TestingFailingBlobServer(Configuration config, BlobStore blobStore, int numFailures) throws IOException { + super(config, blobStore); this.numFailures = numFailures; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 5d9ade3281e01..98e6b3ec53efa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -18,13 +18,12 @@ package org.apache.flink.runtime.execution.librarycache; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.util.OperatingSystem; @@ -45,7 +44,7 @@ public class BlobLibraryCacheManagerTest { @Test - public void testLibraryCacheManagerCleanup() { + public void testLibraryCacheManagerCleanup() throws IOException, InterruptedException { JobID jid = new JobID(); List keys = new ArrayList(); @@ -56,7 +55,7 @@ public void testLibraryCacheManagerCleanup() { try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress, config); @@ -108,14 +107,9 @@ public void testLibraryCacheManagerCleanup() { assertEquals(2, caughtExceptions); bc.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (server != null) { - server.shutdown(); + server.close(); } if (libraryCacheManager != null) { @@ -130,7 +124,7 @@ public void testLibraryCacheManagerCleanup() { } @Test - public void testRegisterAndDownload() { + public void testRegisterAndDownload() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -139,9 +133,9 @@ public void testRegisterAndDownload() { try { // create the blob transfer services Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - cache = new BlobCache(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // upload some meaningless data to the server BlobClient uploader = new BlobClient(serverAddress, config); @@ -210,22 +204,17 @@ public void testRegisterAndDownload() { catch (IOException e) { // splendid! } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (cacheDir != null) { if (!cacheDir.setWritable(true, false)) { System.err.println("Could not re-add write permissions to cache directory."); } } if (cache != null) { - cache.shutdown(); + cache.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 54e1a9b1ec89a..16e3a05f2b9e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.TestLogger; @@ -63,6 +65,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2]; BlobCache cache = null; BlobLibraryCacheManager libCache = null; + BlobStoreService blobStoreService = null; Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); @@ -70,8 +73,10 @@ public void testRecoveryRegisterAndDownload() throws Exception { config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + for (int i = 0; i < server.length; i++) { - server[i] = new BlobServer(config); + server[i] = new BlobServer(config, blobStoreService); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000); } @@ -89,7 +94,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { } // The cache - cache = new BlobCache(serverAddress[0], config); + cache = new BlobCache(serverAddress[0], config, blobStoreService); libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Register uploaded libraries @@ -110,10 +115,10 @@ public void testRecoveryRegisterAndDownload() throws Exception { } // Shutdown cache and start with other server - cache.shutdown(); + cache.close(); libCache.shutdown(); - cache = new BlobCache(serverAddress[1], config); + cache = new BlobCache(serverAddress[1], config, blobStoreService); libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Verify key 1 @@ -156,17 +161,21 @@ public void testRecoveryRegisterAndDownload() throws Exception { finally { for (BlobServer s : server) { if (s != null) { - s.shutdown(); + s.close(); } } if (cache != null) { - cache.shutdown(); + cache.close(); } if (libCache != null) { libCache.shutdown(); } + + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java index 06ffe3cded04c..d89093d138959 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; -import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; @@ -62,7 +62,10 @@ public void testZooKeeperRegistry() throws Exception { configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); final HighAvailabilityServices zkHaService = new ZooKeeperHaServices( - ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration); + ZooKeeperUtils.startCuratorFramework(configuration), + Executors.directExecutor(), + configuration, + new VoidBlobStore()); final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index b8b5984f0b20b..a63b02d785f19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.SubtaskState; @@ -71,7 +70,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -102,7 +100,6 @@ import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -190,7 +187,11 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { TestingUtils.defaultExecutor(), instanceManager, scheduler, - new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), + new BlobLibraryCacheManager( + new BlobServer( + flinkConfiguration, + testingHighAvailabilityServices.createBlobStore()), + 3600000L), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index d6257ba67189d..70800e50ad9d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; @@ -184,7 +185,7 @@ private Props createJobManagerProps(Configuration configuration) throws Exceptio TestingUtils.defaultExecutor(), new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), - new BlobLibraryCacheManager(new BlobServer(configuration), 10L), + new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeoutAsFiniteDuration(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 0ea47f2d865ca..0282a4fefe1f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -18,17 +18,21 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -64,10 +68,13 @@ public void before() throws Exception { config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); - highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, + CuratorFramework client = ZooKeeperUtils.startCuratorFramework(config); + + highAvailabilityServices = new ZooKeeperHaServices( + client, TestingUtils.defaultExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); + config, + new VoidBlobStore()); } @After diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index 58f223113ca5d..d6fc48c5e16a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -97,7 +97,7 @@ public void testMetricRegistryLifeCycle() throws Exception { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices, tmRegistry); final ActorRef taskManager = actorSystem.actorOf(tmProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 2a4c0365d034c..9dcfc70c26bae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -149,9 +149,6 @@ public void testComponentsStartupShutdown() throws Exception { network.start(); - LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID); - MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config); // create the task manager @@ -164,7 +161,7 @@ public void testComponentsStartupShutdown() throws Exception { ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, new MetricRegistry(metricRegistryConfiguration)); taskManager = actorSystem.actorOf(tmProps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 92de31ac531d5..0844aad18a499 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.concurrent.Executors; @@ -57,6 +58,7 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -601,7 +603,7 @@ protected void run() { } @Test - public void testCheckForValidRegistrationSessionIDs() { + public void testCheckForValidRegistrationSessionIDs() throws IOException { new JavaTestKit(actorSystem) {{ ActorGateway taskManagerGateway = null; @@ -612,6 +614,7 @@ public void testCheckForValidRegistrationSessionIDs() { HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class); when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID))) .thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID)); + when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore()); try { // we make the test actor (the test kit) the JobManager to intercept diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 7ba163366a631..98f136aca541f 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console # ----------------------------------------------------------------------------- # Console (use 'console') diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 4be3299bf0346..1b9ee48646c73 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -249,7 +249,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor val components = JobManager.createJobManagerComponents( config, executor, - executor) + executor, + highAvailabilityServices.createBlobStore()) // Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService. // The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus, diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 09dc5ed24f271..1db0a8528ff5a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.runtime.testingUtils import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration @@ -32,15 +32,15 @@ import scala.language.postfixOps /** Subclass of the [[TaskManager]] to support testing messages */ class TestingTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) extends TaskManager( config, resourceID, @@ -49,19 +49,19 @@ class TestingTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) with TestingTaskManagerLike { def this( - config: TaskManagerConfiguration, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) { + config: TaskManagerConfiguration, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) { this( config, ResourceID.generate(), @@ -70,7 +70,7 @@ class TestingTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 5f9d1784ae0a1..2983d667f78ea 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -155,6 +155,7 @@ public void testJobManagerFailure(String zkQuorum, final File coordinateDir) thr Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath()); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( "leader", 1, config); diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala index 0f82faa40dc99..1df4b8dcc37ff 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala @@ -19,6 +19,7 @@ package org.apache.flink.yarn import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService @@ -40,19 +41,19 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike * @param ioManager IOManager responsible for I/O * @param network NetworkEnvironment for this actor * @param numberOfSlots Number of slots for this TaskManager - * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading - * JobManager + * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval + * service for retrieving the leading JobManager */ class TestingYarnTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) extends YarnTaskManager( config, resourceID, @@ -61,7 +62,7 @@ class TestingYarnTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) with TestingTaskManagerLike { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java index e9c3904b4220c..f81d0406339c9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java @@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.blob.FileSystemBlobStore; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -91,6 +92,9 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe * HA services clean up */ protected final Path haDataDirectory; + /** Blob store service to be used for the BlobServer and BlobCache */ + protected final BlobStoreService blobStoreService; + /** Flag marking this instance as shut down */ private volatile boolean closed; @@ -153,6 +157,8 @@ protected YarnHighAvailabilityServices( } LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory); + + blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString()); } // ------------------------------------------------------------------------ @@ -163,7 +169,7 @@ protected YarnHighAvailabilityServices( public BlobStore createBlobStore() throws IOException { enter(); try { - return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString()); + return blobStoreService; } finally { exit(); } @@ -192,11 +198,23 @@ public void close() throws Exception { } closed = true; + Throwable exception = null; + + try { + blobStoreService.close(); + } catch (Throwable t) { + exception = t; + } + // we do not propagate exceptions here, but only log them try { hadoopFileSystem.close(); } catch (Throwable t) { - LOG.warn("Error closing Hadoop FileSystem", t); + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Could not properly close the YarnHighAvailabilityServices."); } } finally { @@ -213,12 +231,18 @@ public void closeAndCleanupAllData() throws Exception { // we remember exceptions only, then continue cleanup, and re-throw at the end Throwable exception = null; + try { + blobStoreService.closeAndCleanupAllData(); + } catch (Throwable t) { + exception = t; + } + // first, we delete all data in Flink's data directory try { flinkFileSystem.delete(haDataDirectory, true); } catch (Throwable t) { - exception = t; + exception = ExceptionUtils.firstOrSuppressed(t, exception); } // now we actually close the services diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index be310854855ad..b7f4c9a382467 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.yarn import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.metrics.MetricRegistry @@ -38,7 +38,7 @@ class YarnTaskManager( ioManager: IOManager, network: NetworkEnvironment, numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, + highAvailabilityServices: HighAvailabilityServices, metricRegistry : MetricRegistry) extends TaskManager( config, @@ -48,7 +48,7 @@ class YarnTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) { override def handleMessage: Receive = {