From 464f2c834688507c67acb3ad584827132ebe444e Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 22 Nov 2016 12:49:03 +0100 Subject: [PATCH 01/15] [hotfix] remove unused package-private BlobUtils#copyFromRecoveryPath This was actually the same implementation as FileSystemBlobStore#get(java.lang.String, java.io.File) and either of the two could have been removed but the implementation makes most sense at the concrete file system abstraction layer, i.e. in FileSystemBlobStore. --- .../apache/flink/runtime/blob/BlobUtils.java | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 136df09ba6565..8fb09f464ad1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -21,20 +21,15 @@ import com.google.common.io.BaseEncoding; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.IOUtils; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import java.io.EOFException; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import java.net.URI; import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -366,32 +361,6 @@ static String getRecoveryPath(String basePath, JobID jobId) { return String.format("%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString()); } - /** - * Copies the file from the recovery path to the local file. - */ - static void copyFromRecoveryPath(String recoveryPath, File localBlobFile) throws Exception { - if (recoveryPath == null) { - throw new IllegalStateException("Failed to determine recovery path."); - } - - if (!localBlobFile.createNewFile()) { - throw new IllegalStateException("Failed to create new local file to copy to"); - } - - URI uri = new URI(recoveryPath); - Path path = new Path(recoveryPath); - - if (FileSystem.get(uri).exists(path)) { - try (InputStream is = FileSystem.get(uri).open(path)) { - FileOutputStream fos = new FileOutputStream(localBlobFile); - IOUtils.copyBytes(is, fos); // closes the streams - } - } - else { - throw new IOException("Cannot find required BLOB at '" + recoveryPath + "' for recovery."); - } - } - /** * Private constructor to prevent instantiation. */ From 2ebffd4c2d499b61f164b4d54dc86c9d44b9c0ea Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 23 Nov 2016 16:11:35 +0100 Subject: [PATCH 02/15] [hotfix] do not create intermediate strings inside String.format in BlobUtils --- .../java/org/apache/flink/runtime/blob/BlobUtils.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 8fb09f464ad1f..ddb07eb0962f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -336,7 +336,7 @@ static void closeSilently(Socket socket, Logger LOG) { */ static String getRecoveryPath(String basePath, BlobKey blobKey) { // format: $base/cache/blob_$key - return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX + blobKey.toString()); + return String.format("%s/cache/%s%s", basePath, BLOB_FILE_PREFIX, blobKey.toString()); } /** @@ -348,8 +348,8 @@ static String getRecoveryPath(String basePath, BlobKey blobKey) { */ static String getRecoveryPath(String basePath, JobID jobId, String key) { // format: $base/job_$id/blob_$key - return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString(), - BLOB_FILE_PREFIX + encodeKey(key)); + return String.format("%s/%s%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString(), + BLOB_FILE_PREFIX, encodeKey(key)); } /** @@ -358,7 +358,7 @@ static String getRecoveryPath(String basePath, JobID jobId, String key) { *

The returned path can be used with the state backend for recovery purposes. */ static String getRecoveryPath(String basePath, JobID jobId) { - return String.format("%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString()); + return String.format("%s/%s%s", basePath, JOB_DIR_PREFIX, jobId.toString()); } /** From 36ab6121e336f63138e442ea48a751ede7fb04c3 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 24 Nov 2016 17:11:19 +0100 Subject: [PATCH 03/15] [hotfix] properly shut down the BlobServer in BlobServerRangeTest --- .../java/org/apache/flink/runtime/blob/BlobServerRangeTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index 36ae8ccb5d628..ea0eb94721aae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -40,6 +40,7 @@ public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0"); BlobServer srv = new BlobServer(conf); + srv.shutdown(); } /** From c8c12c67ae875ca5c96db78375bef880cf2a3c59 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 5 Jan 2017 18:06:01 +0100 Subject: [PATCH 04/15] [hotfix] use JUnit's TemporaryFolder in BlobRecoveryITCase, too This makes cleaning up simpler. --- .../runtime/blob/BlobRecoveryITCase.java | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index 3fe207e514413..60cb81fb9867f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -18,15 +18,14 @@ package org.apache.flink.runtime.blob; -import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.junit.After; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.InputStream; @@ -38,22 +37,8 @@ public class BlobRecoveryITCase { - private File recoveryDir; - - @Before - public void setUp() throws Exception { - recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); - if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { - throw new IllegalStateException("Failed to create temp directory for test"); - } - } - - @After - public void cleanUp() throws Exception { - if (recoveryDir != null) { - FileUtils.deleteDirectory(recoveryDir); - } - } + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); /** * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any @@ -160,7 +145,7 @@ public void testBlobServerRecovery() throws Exception { } // Verify everything is clean - File[] recoveryFiles = recoveryDir.listFiles(); + File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } } From a078cb0c26071fe70e3668d23d0c8bef8550892f Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 5 Jan 2017 18:27:00 +0100 Subject: [PATCH 05/15] [hotfix] add a missing "'" to the BlobStore class --- .../src/main/java/org/apache/flink/runtime/blob/BlobServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 33f9db78d18b4..57fffedaa3272 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -479,7 +479,7 @@ private static BlobStore createBlobStoreFromConfig(Configuration config) throws return new FileSystemBlobStore(fileSystem, storagePath); } else { - throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "."); + throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); } } } From 7d832919040059961940fc96d0cdb285bc9f77d3 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 5 Jan 2017 18:18:10 +0100 Subject: [PATCH 06/15] [FLINK-5129] unify duplicate code between the BlobServer and ZookeeperHaServices (this was introduced by c64860677f) --- .../apache/flink/runtime/blob/BlobServer.java | 31 ++----------------- .../highavailability/ZookeeperHaServices.java | 16 +++++++++- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 57fffedaa3272..0ab4fc4f370f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -22,11 +22,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.ZookeeperHaServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.NetUtils; @@ -49,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and @@ -453,31 +450,9 @@ private static BlobStore createBlobStoreFromConfig(Configuration config) throws HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); if (highAvailabilityMode == HighAvailabilityMode.NONE) { - return new VoidBlobStore(); + return new VoidBlobStore(); } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { - final String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); - if (isNullOrWhitespaceOnly(storagePath)) { - throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + - HighAvailabilityOptions.HA_STORAGE_PATH); - } - - final Path path; - try { - path = new Path(storagePath); - } catch (Exception e) { - throw new IOException("Invalid path for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); - } - - final FileSystem fileSystem; - try { - fileSystem = path.getFileSystem(); - } catch (Exception e) { - throw new IOException("Could not create FileSystem for highly available storage (" + - HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); - } - - return new FileSystemBlobStore(fileSystem, storagePath); + return ZookeeperHaServices.createBlobStore(config); } else { throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index 25d21efc26b47..b483d12ae14bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -154,7 +154,21 @@ public RunningJobsRegistry getRunningJobsRegistry() { @Override public BlobStore createBlobStore() throws IOException { - final String storagePath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); + return createBlobStore(configuration); + } + + /** + * Creates the BLOB store in which BLOBs are stored in a highly-available + * fashion. + * + * @param configuration configuration to extract the storage path from + * @return Blob store + * @throws IOException if the blob store could not be created + */ + public static BlobStore createBlobStore( + final Configuration configuration) throws IOException { + final String storagePath = configuration.getValue( + HighAvailabilityOptions.HA_STORAGE_PATH); if (isNullOrWhitespaceOnly(storagePath)) { throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + HighAvailabilityOptions.HA_STORAGE_PATH); From a643f0b989c640a81b112ad14ae27a2a2b1ab257 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 5 Jan 2017 18:07:13 +0100 Subject: [PATCH 07/15] [FLINK-5129] BlobServer: include the cluster id in the HA storage path for blobs This applies to the ZookeeperHaServices implementation. --- .../highavailability/ZookeeperHaServices.java | 6 +++++- .../flink/runtime/blob/BlobRecoveryITCase.java | 16 +++++++++------- .../BlobLibraryCacheRecoveryITCase.java | 16 +++++++++------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index b483d12ae14bc..ed0ad171a6ddc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -167,7 +167,7 @@ public BlobStore createBlobStore() throws IOException { */ public static BlobStore createBlobStore( final Configuration configuration) throws IOException { - final String storagePath = configuration.getValue( + String storagePath = configuration.getValue( HighAvailabilityOptions.HA_STORAGE_PATH); if (isNullOrWhitespaceOnly(storagePath)) { throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + @@ -190,6 +190,10 @@ public static BlobStore createBlobStore( HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); } + final String clusterId = + configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + storagePath += "/" + clusterId; + return new FileSystemBlobStore(fileSystem, storagePath); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index 60cb81fb9867f..cc729f19fe5f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -52,12 +52,12 @@ public void testBlobServerRecovery() throws Exception { InetSocketAddress[] serverAddress = new InetSocketAddress[2]; BlobClient client = null; - try { - Configuration config = new Configuration(); - config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath()); + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); + try { for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); @@ -144,8 +144,10 @@ public void testBlobServerRecovery() throws Exception { } } - // Verify everything is clean - File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); + // Verify everything is clean below recoveryDir/ + final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); + File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId); + File[] recoveryFiles = haBlobStoreDir.listFiles(); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 8fabdf690f783..227cc7a9dfb38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -62,12 +62,12 @@ public void testRecoveryRegisterAndDownload() throws Exception { BlobCache cache = null; BlobLibraryCacheManager libCache = null; - try { - Configuration config = new Configuration(); - config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); + try { for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); @@ -160,8 +160,10 @@ public void testRecoveryRegisterAndDownload() throws Exception { } } - // Verify everything is clean - File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); + // Verify everything is clean below recoveryDir/ + final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); + File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId); + File[] recoveryFiles = haBlobStoreDir.listFiles(); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } } From 19879a01b99c4772a09627eb5f380f794f6c1e27 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 30 Nov 2016 14:52:12 +0100 Subject: [PATCH 08/15] [hotfix] add some more documentation in BlobStore-related classes --- .../apache/flink/runtime/blob/BlobCache.java | 17 +++++++++++++++-- .../apache/flink/runtime/blob/BlobClient.java | 3 ++- .../apache/flink/runtime/blob/BlobServer.java | 8 +++++--- .../runtime/blob/BlobServerConnection.java | 8 ++++++++ .../apache/flink/runtime/blob/BlobStore.java | 12 +++++++++--- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 3f9365258dc53..97fbf547cab58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -46,6 +46,7 @@ public final class BlobCache implements BlobService { private final InetSocketAddress serverAddress; + /** Root directory for local file storage */ private final File storageDir; private final AtomicBoolean shutdownRequested = new AtomicBoolean(); @@ -59,7 +60,14 @@ public final class BlobCache implements BlobService { /** Configuration for the blob client like ssl parameters required to connect to the blob server */ private final Configuration blobClientConfig; - + /** + * Instantiates a new BLOB cache. + * + * @param serverAddress + * address of the {@link BlobServer} to use for fetching files from + * @param blobClientConfig + * global configuration + */ public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig) { if (serverAddress == null || blobClientConfig == null) { throw new NullPointerException(); @@ -201,8 +209,13 @@ public void delete(BlobKey key) throws IOException{ } /** - * Deletes the file associated with the given key from the BLOB cache and BLOB server. + * Deletes the file associated with the given key from the BLOB cache and + * BLOB server. + * * @param key referring to the file to be deleted + * @throws IOException + * thrown if an I/O error occurs while transferring the request to + * the BLOB server or if the BLOB server cannot delete the file */ public void deleteGlobal(BlobKey key) throws IOException { BlobClient bc = createClient(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 27489679e23b0..ea90f5441f517 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -590,7 +590,8 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t * @param key * the key to identify the BLOB * @throws IOException - * thrown if an I/O error occurs while transferring the request to the BLOB server + * thrown if an I/O error occurs while transferring the request to + * the BLOB server or if the BLOB server cannot delete the file */ public void delete(BlobKey key) throws IOException { if (key == null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 0ab4fc4f370f7..1e536abd69382 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -73,10 +73,10 @@ public class BlobServer extends Thread implements BlobService { /** Indicates whether a shutdown of server component has been requested. */ private final AtomicBoolean shutdownRequested = new AtomicBoolean(); - /** Is the root directory for file storage */ + /** Root directory for local file storage */ private final File storageDir; - /** Blob store for HA */ + /** Blob store for distributed file storage, e.g. in HA */ private final BlobStore blobStore; /** Set of currently running threads */ @@ -95,7 +95,9 @@ public class BlobServer extends Thread implements BlobService { * Instantiates a new BLOB server and binds it to a free network port. * * @throws IOException - * thrown if the BLOB server cannot bind to a free network port + * thrown if the BLOB server cannot bind to a free network port or if, in + * HA mode, the (distributed) file storage cannot be created or is not + * usable */ public BlobServer(Configuration config) throws IOException { this(config, createBlobStoreFromConfig(config)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index 321fc67f9225b..13a90c626fac6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -168,6 +168,14 @@ public void close() { * thrown if an I/O error occurs while reading/writing data from/to the respective streams */ private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + /** + * Retrieve the file from the (distributed?) BLOB store and store it + * locally, then send it to the service which requested it. + * + * Instead, we could send it from the distributed store directly but + * chances are high that if there is one request, there will be more + * so a local cache makes more sense. + */ File blobFile; try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java index 70503387baeaf..9d728ede40fd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java @@ -68,14 +68,18 @@ public interface BlobStore { void get(JobID jobId, String key, File localFile) throws Exception; /** - * Deletes a blob. + * Tries to delete a blob from storage. + * + *

NOTE: This also tries to delete any created directories if empty.

* * @param blobKey The blob ID */ void delete(BlobKey blobKey); /** - * Deletes a blob. + * Tries to delete a blob from storage. + * + *

NOTE: This also tries to delete any created directories if empty.

* * @param jobId The JobID part of ID for the blob * @param key The String part of ID for the blob @@ -83,7 +87,9 @@ public interface BlobStore { void delete(JobID jobId, String key); /** - * Deletes blobs. + * Tries to delete all blobs for the given job from storage. + * + *

NOTE: This also tries to delete any created directories if empty.

* * @param jobId The JobID part of all blobs to delete */ From 80c17ef83104d1186c06d8f5d4cde11e4b05f2b8 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 6 Jan 2017 11:55:23 +0100 Subject: [PATCH 09/15] [hotfix] minor code beautifications when checking parameters + also check the blobService parameter in BlobLibraryCacheManager --- .../org/apache/flink/runtime/blob/BlobCache.java | 16 ++++++---------- .../apache/flink/runtime/blob/BlobServer.java | 9 +++------ .../librarycache/BlobLibraryCacheManager.java | 13 +++++++------ 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 97fbf547cab58..9978e2c7f1954 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -34,6 +34,9 @@ import java.net.URL; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting BLOBs through the * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve the file from its local cache. Only if the @@ -69,13 +72,8 @@ public final class BlobCache implements BlobService { * global configuration */ public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig) { - if (serverAddress == null || blobClientConfig == null) { - throw new NullPointerException(); - } - - this.serverAddress = serverAddress; - - this.blobClientConfig = blobClientConfig; + this.serverAddress = checkNotNull(serverAddress); + this.blobClientConfig = checkNotNull(blobClientConfig); // configure and create the storage directory String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); @@ -108,9 +106,7 @@ public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ public URL getURL(final BlobKey requiredBlob) throws IOException { - if (requiredBlob == null) { - throw new IllegalArgumentException("BLOB key cannot be null."); - } + checkArgument(requiredBlob != null, "BLOB key cannot be null."); final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 1e536abd69382..2b6ccb85c8a27 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -108,11 +109,9 @@ public BlobServer(Configuration config, HighAvailabilityServices haServices) thr } private BlobServer(Configuration config, BlobStore blobStore) throws IOException { - checkNotNull(config); + this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); - this.blobServiceConfiguration = config; - // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); @@ -356,9 +355,7 @@ public BlobClient createClient() throws IOException { */ @Override public URL getURL(BlobKey requiredBlob) throws IOException { - if (requiredBlob == null) { - throw new IllegalArgumentException("Required BLOB cannot be null."); - } + checkArgument(requiredBlob != null, "BLOB key cannot be null."); final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index c94768d7244d2..b0d5d834e459e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -37,11 +37,12 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * For each job graph that is submitted to the system the library cache manager maintains * a set of libraries (typically JAR files) which the job requires to run. The library cache manager @@ -73,7 +74,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC // -------------------------------------------------------------------------------------------- public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) { - this.blobService = blobService; + this.blobService = checkNotNull(blobService); // Initializing the clean up task this.cleanupTimer = new Timer(true); @@ -91,8 +92,8 @@ public void registerJob(JobID id, Collection requiredJarFiles, Collecti @Override public void registerTask(JobID jobId, ExecutionAttemptID task, Collection requiredJarFiles, Collection requiredClasspaths) throws IOException { - Preconditions.checkNotNull(jobId, "The JobId must not be null."); - Preconditions.checkNotNull(task, "The task execution id must not be null."); + checkNotNull(jobId, "The JobId must not be null."); + checkNotNull(task, "The task execution id must not be null."); if (requiredJarFiles == null) { requiredJarFiles = Collections.emptySet(); @@ -153,8 +154,8 @@ public void unregisterJob(JobID id) { @Override public void unregisterTask(JobID jobId, ExecutionAttemptID task) { - Preconditions.checkNotNull(jobId, "The JobId must not be null."); - Preconditions.checkNotNull(task, "The task execution id must not be null."); + checkNotNull(jobId, "The JobId must not be null."); + checkNotNull(task, "The task execution id must not be null."); synchronized (lockObject) { LibraryCacheEntry entry = cacheEntries.get(jobId); From c8e2815787338f52e5ad369bcaedb1798284dd29 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 6 Jan 2017 14:59:51 +0100 Subject: [PATCH 10/15] [hotfix] simplify code in BlobCache#deleteGlobal() Also, re-order the code so that a local delete is always tried before creating a connection to the BlobServer. If that fails, the local file is deleted at least. --- .../java/org/apache/flink/runtime/blob/BlobCache.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 9978e2c7f1954..1a5c971e9dde4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -214,14 +214,12 @@ public void delete(BlobKey key) throws IOException{ * the BLOB server or if the BLOB server cannot delete the file */ public void deleteGlobal(BlobKey key) throws IOException { - BlobClient bc = createClient(); - try { - delete(key); + // delete locally + delete(key); + // then delete on the BLOB server + try (BlobClient bc = createClient()) { bc.delete(key); } - finally { - bc.close(); - } } @Override From ff920e48bd69acef280bdef2a12e5f5f9cca3a88 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 6 Jan 2017 14:21:42 +0100 Subject: [PATCH 11/15] [FLINK-5129] let BlobUtils#initStorageDirectory() throw a proper IOException --- .../handlers/TaskManagerLogHandler.java | 2 +- .../apache/flink/runtime/blob/BlobCache.java | 7 ++++++- .../apache/flink/runtime/blob/BlobServer.java | 5 ++--- .../apache/flink/runtime/blob/BlobUtils.java | 9 +++++++-- .../flink/runtime/client/JobClient.java | 8 +++++++- .../runtime/taskexecutor/TaskExecutor.java | 19 ++++++++++++++----- .../flink/runtime/blob/BlobUtilsTest.java | 6 ++++-- 7 files changed, 41 insertions(+), 15 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 78c44550a6cf0..6583d3b92657d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -150,7 +150,7 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou scala.concurrent.Future portFuture = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout); scala.concurrent.Future cacheFuture = portFuture.map(new Mapper() { @Override - public BlobCache apply(Object result) { + public BlobCache checkedApply(Object result) throws IOException { Option hostOption = jobManager.actor().path().address().host(); String host = hostOption.isDefined() ? hostOption.get() : "localhost"; int port = (int) result; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 1a5c971e9dde4..25293a5b0842b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -70,8 +70,13 @@ public final class BlobCache implements BlobService { * address of the {@link BlobServer} to use for fetching files from * @param blobClientConfig * global configuration + * + * @throws IOException + * thrown if the local file storage cannot be created or + * is not usable */ - public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig) { + public BlobCache(InetSocketAddress serverAddress, + Configuration blobClientConfig) throws IOException { this.serverAddress = checkNotNull(serverAddress); this.blobClientConfig = checkNotNull(blobClientConfig); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 2b6ccb85c8a27..d90679fb10f79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -96,9 +96,8 @@ public class BlobServer extends Thread implements BlobService { * Instantiates a new BLOB server and binds it to a free network port. * * @throws IOException - * thrown if the BLOB server cannot bind to a free network port or if, in - * HA mode, the (distributed) file storage cannot be created or is not - * usable + * thrown if the BLOB server cannot bind to a free network port or if the + * (local or distributed) file storage cannot be created or is not usable */ public BlobServer(Configuration config) throws IOException { this(config, createBlobStoreFromConfig(config)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index ddb07eb0962f3..d9963a1ecc45a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -66,8 +66,13 @@ public class BlobUtils { * Creates a storage directory for a blob service. * * @return the storage directory used by a BLOB service + * + * @throws IOException + * thrown if the (local or distributed) file storage cannot be created or + * is not usable */ - static File initStorageDirectory(String storageDirectory) { + static File initStorageDirectory(String storageDirectory) throws + IOException { File baseDir; if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) { baseDir = new File(System.getProperty("java.io.tmpdir")); @@ -91,7 +96,7 @@ static File initStorageDirectory(String storageDirectory) { } // max attempts exceeded to find a storage directory - throw new RuntimeException("Could not create storage directory for BLOB store in '" + baseDir + "'."); + throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'."); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 9f0c5739029ac..76d6d86e5ded8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -209,7 +209,13 @@ public static ClassLoader retrieveClassLoader( Option jmHost = jobManager.actor().path().address().host(); String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost"; InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); - final BlobCache blobClient = new BlobCache(serverAddress, config); + final BlobCache blobClient; + try { + blobClient = new BlobCache(serverAddress, config); + } catch (IOException e) { + throw new JobRetrievalException(jobID, + "Failed to setup blob cache", e); + } final Collection requiredJarFiles = props.requiredJarFiles(); final Collection requiredClasspaths = props.requiredClasspaths(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f11cb98a4d108..ff7c12a86401c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -794,11 +794,20 @@ private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterG InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort); - BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration()); - - LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( - blobCache, - taskManagerConfiguration.getCleanupInterval()); + final LibraryCacheManager libraryCacheManager; + try { + final BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration()); + libraryCacheManager = new BlobLibraryCacheManager( + blobCache, + taskManagerConfiguration.getCleanupInterval()); + } catch (IOException e) { + // Can't pass the IOException up - we need a RuntimeException anyway + // two levels up where this is run asynchronously. Also, we don't + // know whether this is caught in the thread running this method. + final String message = "Could not create BLOB cache or library cache."; + log.error(message, e); + throw new RuntimeException(message, e); + } ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( jobManagerLeaderId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java index 63ec33856ffbd..081e28cd9de13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import java.io.File; +import java.io.IOException; public class BlobUtilsTest { @@ -55,8 +56,9 @@ public void after() { assertTrue(blobUtilsTestDirectory.delete()); } - @Test(expected = Exception.class) - public void testExceptionOnCreateStorageDirectoryFailure() { + @Test(expected = IOException.class) + public void testExceptionOnCreateStorageDirectoryFailure() throws + IOException { // Should throw an Exception BlobUtils.initStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath()); } From 38626a705fd0725a8e54f2ee1c3d0ec410184b8a Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 6 Jan 2017 15:06:30 +0100 Subject: [PATCH 12/15] [FLINK-5129] make the BlobCache also use a distributed file system in HA mode If available (in HA mode), download the jar files from the distributed file system directly instead of querying the BlobServer. This way the load is more distributed among the nodes of the file system (depending on its implementation of course) compared to putting all the burden on a single BlobServer. --- .../apache/flink/runtime/blob/BlobCache.java | 167 ++++++++++-------- .../apache/flink/runtime/blob/BlobServer.java | 15 +- .../apache/flink/runtime/blob/BlobUtils.java | 27 +++ 3 files changed, 122 insertions(+), 87 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 25293a5b0842b..bf79bc55300ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -38,9 +38,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting BLOBs through the - * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve the file from its local cache. Only if the - * local cache does not contain the desired BLOB, the BLOB cache will try to download it from the BLOB server. + * The BLOB cache implements a local cache for content-addressable BLOBs. + * + *

When requesting BLOBs through the {@link BlobCache#getURL} methods, the + * BLOB cache will first attempt to serve the file from its local cache. Only if + * the local cache does not contain the desired BLOB, the BLOB cache will try to + * download it from a distributed file system (if available) or the BLOB + * server.

*/ public final class BlobCache implements BlobService { @@ -52,6 +56,9 @@ public final class BlobCache implements BlobService { /** Root directory for local file storage */ private final File storageDir; + /** Blob store for distributed file storage, e.g. in HA */ + private final BlobStore blobStore; + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); /** Shutdown hook thread to ensure deletion of the storage directory. */ @@ -72,13 +79,14 @@ public final class BlobCache implements BlobService { * global configuration * * @throws IOException - * thrown if the local file storage cannot be created or + * thrown if the (local or distributed) file storage cannot be created or * is not usable */ public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig) throws IOException { this.serverAddress = checkNotNull(serverAddress); this.blobClientConfig = checkNotNull(blobClientConfig); + this.blobStore = BlobUtils.createBlobStoreFromConfig(blobClientConfig); // configure and create the storage directory String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); @@ -115,86 +123,97 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); - if (!localJarFile.exists()) { + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } - final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + // first try the distributed blob store (if available) + try { + blobStore.get(requiredBlob, localJarFile); + } catch (Exception e) { + throw new IOException("Failed to copy from blob store.", e); + } - // loop over retries - int attempt = 0; - while (true) { + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { - LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); - } + // fallback: download from the BlobServer + final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; - - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } + // loop over retries + int attempt = 0; + while (true) { - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - break; - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - closeSilently(os); - closeSilently(is); - closeSilently(bc); - - if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException(t.getMessage(), t); + if (attempt == 0) { + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); + } else { + LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); + } + + try { + BlobClient bc = null; + InputStream is = null; + OutputStream os = null; + + try { + bc = new BlobClient(serverAddress, blobClientConfig); + is = bc.get(requiredBlob); + os = new FileOutputStream(localJarFile); + + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; } + os.write(buf, 0, read); } + + // we do explicitly not use a finally block, because we want the closing + // in the regular case to throw exceptions and cause the writing to fail. + // But, the closing on exception should not throw further exceptions and + // let us keep the root exception + os.close(); + os = null; + is.close(); + is = null; + bc.close(); + bc = null; + + // success, we finished + return localJarFile.toURI().toURL(); } - catch (IOException e) { - String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + - " and store it under " + localJarFile.getAbsolutePath(); - if (attempt < numFetchRetries) { - attempt++; - if (LOG.isDebugEnabled()) { - LOG.debug(message + " Retrying...", e); - } else { - LOG.error(message + " Retrying..."); - } + catch (Throwable t) { + // we use "catch (Throwable)" to keep the root exception. Otherwise that exception + // it would be replaced by any exception thrown in the finally block + closeSilently(os); + closeSilently(is); + closeSilently(bc); + + if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException(t.getMessage(), t); } - else { - LOG.error(message + " No retries left.", e); - throw new IOException(message, e); + } + } + catch (IOException e) { + String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + + " and store it under " + localJarFile.getAbsolutePath(); + if (attempt < numFetchRetries) { + attempt++; + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", e); + } else { + LOG.error(message + " Retrying..."); } } - } // end loop over retries - } - - return localJarFile.toURI().toURL(); + else { + LOG.error(message + " No retries left.", e); + throw new IOException(message, e); + } + } + } // end loop over retries } /** @@ -222,6 +241,8 @@ public void deleteGlobal(BlobKey key) throws IOException { // delete locally delete(key); // then delete on the BLOB server + // (don't use the distributed storage directly - this way the blob + // server is aware of the delete operation, too) try (BlobClient bc = createClient()) { bc.delete(key); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index d90679fb10f79..d75b54583bcbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -22,9 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.ZookeeperHaServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.NetUtils; @@ -100,7 +98,7 @@ public class BlobServer extends Thread implements BlobService { * (local or distributed) file storage cannot be created or is not usable */ public BlobServer(Configuration config) throws IOException { - this(config, createBlobStoreFromConfig(config)); + this(config, BlobUtils.createBlobStoreFromConfig(config)); } public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException { @@ -444,15 +442,4 @@ List getCurrentActiveConnections() { } } - private static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException { - HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); - - if (highAvailabilityMode == HighAvailabilityMode.NONE) { - return new VoidBlobStore(); - } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { - return ZookeeperHaServices.createBlobStore(config); - } else { - throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index d9963a1ecc45a..81596df43644d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -21,6 +21,10 @@ import com.google.common.io.BaseEncoding; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.highavailability.ZookeeperHaServices; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; @@ -62,6 +66,29 @@ public class BlobUtils { */ static final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + /** + * Creates a BlobStore based on the parameters set in the configuration. + * + * @param config + * configuration to use + * + * @return a (distributed) blob store for high availability + * + * @throws IOException + * thrown if the (distributed) file storage cannot be created + */ + static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException { + HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); + + if (highAvailabilityMode == HighAvailabilityMode.NONE) { + return new VoidBlobStore(); + } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { + return ZookeeperHaServices.createBlobStore(config); + } else { + throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); + } + } + /** * Creates a storage directory for a blob service. * From 1e86c5c92f9ac35c26c1e707d2d840c4edbeefb1 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 29 Nov 2016 18:11:06 +0100 Subject: [PATCH 13/15] [hotfix] re-use some code in BlobServerDeleteTest --- .../runtime/blob/BlobServerDeleteTest.java | 66 +++++-------------- 1 file changed, 17 insertions(+), 49 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 53e1d73f5ebaa..025a2ffaa33c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -85,16 +85,7 @@ public void testDeleteSingle() { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -157,16 +148,7 @@ public void testDeleteAll() { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -205,16 +187,7 @@ public void testDeleteAlreadyDeletedByBlobKey() { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -254,16 +227,7 @@ public void testDeleteAlreadyDeletedByName() { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); - } + cleanup(server, client); } } @@ -312,16 +276,20 @@ public void testDeleteFails() { fail(e.getMessage()); } finally { - if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - if (server != null) { - server.shutdown(); + cleanup(server, client); + } + } + + private void cleanup(BlobServer server, BlobClient client) { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); } } + if (server != null) { + server.shutdown(); + } } } From 68d2959b60f6b583cb48de8ed5aee3e18b163082 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 30 Nov 2016 14:35:38 +0100 Subject: [PATCH 14/15] [hotfix] improve some failure messages in the BlobService's HA unit tests --- .../org/apache/flink/runtime/blob/BlobRecoveryITCase.java | 5 ++++- .../librarycache/BlobLibraryCacheRecoveryITCase.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index cc729f19fe5f1..3ef65fb212b41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -34,6 +34,7 @@ import java.util.Random; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class BlobRecoveryITCase { @@ -148,6 +149,8 @@ public void testBlobServerRecovery() throws Exception { final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId); File[] recoveryFiles = haBlobStoreDir.listFiles(); - assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); + assertNotNull("HA storage directory does not exist", recoveryFiles); + assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), + 0, recoveryFiles.length); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 227cc7a9dfb38..ae11abd1081d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -43,6 +43,7 @@ import java.util.Random; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class BlobLibraryCacheRecoveryITCase { @@ -164,6 +165,8 @@ public void testRecoveryRegisterAndDownload() throws Exception { final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId); File[] recoveryFiles = haBlobStoreDir.listFiles(); - assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); + assertNotNull("HA storage directory does not exist", recoveryFiles); + assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), + 0, recoveryFiles.length); } } From 7cfbeb7707329cad57604a58f44254d4f8b6c9b3 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 6 Jan 2017 17:21:05 +0100 Subject: [PATCH 15/15] [FLINK-5129] add unit tests for the BlobCache accessing the distributed FS directly --- .../runtime/blob/BlobCacheSuccessTest.java | 76 +++++++++++++++++-- 1 file changed, 68 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 7ba5a8a5b5334..42bfa2b166f74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -18,6 +18,12 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + import java.io.File; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -25,9 +31,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.flink.configuration.Configuration; -import org.junit.Test; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -38,9 +41,48 @@ */ public class BlobCacheSuccessTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * BlobCache with no HA. BLOBs need to be downloaded form a working + * BlobServer. + */ @Test public void testBlobCache() { + Configuration config = new Configuration(); + uploadFileGetTest(config, false, false); + } + + /** + * BlobCache is configured in HA mode and the cache can download files from + * the file system directly and does not need to download BLOBs from the + * BlobServer. + */ + @Test + public void testBlobCacheHa() { + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath()); + uploadFileGetTest(config, true, true); + } + /** + * BlobCache is configured in HA mode but the cache itself cannot access the + * file system and thus needs to download BLOBs from the BlobServer. + */ + @Test + public void testBlobCacheHaFallback() { + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath()); + uploadFileGetTest(config, false, false); + } + + private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer, + boolean cacheHasAccessToFs) { // First create two BLOBs and upload them to BLOB server final byte[] buf = new byte[128]; final List blobKeys = new ArrayList(2); @@ -50,7 +92,6 @@ public void testBlobCache() { try { // Start the BLOB server - Configuration config = new Configuration(); blobServer = new BlobServer(config); final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort()); @@ -69,15 +110,34 @@ public void testBlobCache() { } } - blobCache = new BlobCache(serverAddress, new Configuration()); + if (cacheWorksWithoutServer) { + // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. + blobServer.shutdown(); + blobServer = null; + } + + final Configuration cacheConfig; + if (cacheHasAccessToFs) { + cacheConfig = config; + } else { + // just in case parameters are still read from the server, + // create a separate configuration object for the cache + cacheConfig = new Configuration(config); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.getRoot().getPath() + "/does-not-exist"); + } + + blobCache = new BlobCache(serverAddress, cacheConfig); for (BlobKey blobKey : blobKeys) { blobCache.getURL(blobKey); } - // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. - blobServer.shutdown(); - blobServer = null; + if (blobServer != null) { + // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. + blobServer.shutdown(); + blobServer = null; + } final URL[] urls = new URL[blobKeys.size()];