From b98f52653f5cd63c2940806c3e2cbd6867e65738 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 6 Jan 2017 18:42:58 +0100 Subject: [PATCH 01/17] [FLINK-6008][docs] update some config options to the new, non-deprecated ones --- docs/setup/config.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 6a5c1ff350579..032a5ba9a3ba9 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -496,13 +496,13 @@ Previously this key was named `recovery.mode` and the default value was `standal - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`. -- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`. +- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`. - `high-availability.zookeeper.path.latch`: (Default `/leaderlatch`) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named `recovery.zookeeper.path.latch`. - `high-availability.zookeeper.path.leader`: (Default `/leader`) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named `recovery.zookeeper.path.leader`. -- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was named `recovery.zookeeper.storageDir`. +- `high-availability.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was named `recovery.zookeeper.storageDir` and `high-availability.zookeeper.storageDir`. - `high-availability.zookeeper.client.session-timeout`: (Default `60000`) Defines the session timeout for the ZooKeeper session in ms. Previously this key was named `recovery.zookeeper.client.session-timeout` From 02cfa20f3945111daa96ebe0cb7889292aece1ea Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 20 Dec 2016 16:49:57 +0100 Subject: [PATCH 02/17] [FLINK-6008][docs] minor improvements in the BlobService docs --- .../main/java/org/apache/flink/runtime/blob/BlobService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index 419ee8dbf19fd..6b837aa5b0e4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -27,11 +27,12 @@ public interface BlobService { /** - * This method returns the URL of the file associated with the provided blob key. + * Returns the URL of the file associated with the provided blob key. * * @param key blob key associated with the requested file * @return The URL to the file. - * @throws IOException + * @throws java.io.FileNotFoundException when the path does not exist; + * @throws IOException if any other error occurs when retrieving the file */ URL getURL(BlobKey key) throws IOException; From 4d6cf1f9b413c4613cfd8fedf0efe840f545e269 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 9 Mar 2017 18:14:02 +0100 Subject: [PATCH 03/17] [FLINK-6008] use Preconditions.checkArgument in BlobClient --- .../apache/flink/runtime/blob/BlobClient.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index ea90f5441f517..8033bb5a40494 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -64,6 +64,7 @@ import static org.apache.flink.runtime.blob.BlobUtils.readFully; import static org.apache.flink.runtime.blob.BlobUtils.readLength; import static org.apache.flink.runtime.blob.BlobUtils.writeLength; +import static org.apache.flink.util.Preconditions.checkArgument; /** * The BLOB client can communicate with the BLOB server and either upload (PUT), download (GET), @@ -594,9 +595,7 @@ private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) t * the BLOB server or if the BLOB server cannot delete the file */ public void delete(BlobKey key) throws IOException { - if (key == null) { - throw new IllegalArgumentException("BLOB key must not be null"); - } + checkArgument(key != null, "BLOB key must not be null."); deleteInternal(null, null, key); } @@ -612,12 +611,9 @@ public void delete(BlobKey key) throws IOException { * thrown if an I/O error occurs while transferring the request to the BLOB server */ public void delete(JobID jobId, String key) throws IOException { - if (jobId == null) { - throw new IllegalArgumentException("JobID must not be null"); - } - if (key == null) { - throw new IllegalArgumentException("Key must not be null"); - } + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + if (key.length() > MAX_KEY_LENGTH) { throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH); } @@ -634,9 +630,7 @@ public void delete(JobID jobId, String key) throws IOException { * thrown if an I/O error occurs while transferring the request to the BLOB server */ public void deleteAll(JobID jobId) throws IOException { - if (jobId == null) { - throw new IllegalArgumentException("Argument jobID must not be null"); - } + checkArgument(jobId != null, "Job id must not be null."); deleteInternal(jobId, null, null); } From 1b40ff900330a141aa0d67cc4fb9ab2f32e28e66 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 20 Dec 2016 18:27:13 +0100 Subject: [PATCH 04/17] [FLINK-6008] refactor BlobCache#getURL() for cleaner code --- .../apache/flink/runtime/blob/BlobCache.java | 78 ++++++------------- 1 file changed, 22 insertions(+), 56 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 2587b155d607d..01d3a36f8fae2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.util.FileUtils; -import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,78 +179,45 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { - - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { - LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); - } - - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; - - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); - IOUtils.closeQuietly(bc); - - if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException(t.getMessage(), t); + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; } + os.write(buf, 0, read); } + + // success, we finished + return localJarFile.toURI().toURL(); } - catch (IOException e) { + catch (Throwable t) { String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + " and store it under " + localJarFile.getAbsolutePath(); if (attempt < numFetchRetries) { - attempt++; if (LOG.isDebugEnabled()) { - LOG.debug(message + " Retrying...", e); + LOG.debug(message + " Retrying...", t); } else { LOG.error(message + " Retrying..."); } } else { - LOG.error(message + " No retries left.", e); - throw new IOException(message, e); + LOG.error(message + " No retries left.", t); + throw new IOException(message, t); } + + // retry + ++attempt; + LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } } // end loop over retries } From 9aaf96d37a626c61531daf81ff6f3e03a3b72117 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 21 Dec 2016 16:23:29 +0100 Subject: [PATCH 05/17] [FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService --- .../apache/flink/runtime/blob/BlobCache.java | 15 ++++++++++ .../apache/flink/runtime/blob/BlobServer.java | 28 +++++++++++-------- .../runtime/blob/BlobServerConnection.java | 4 +-- .../flink/runtime/blob/BlobService.java | 11 +++++++- .../apache/flink/runtime/blob/BlobUtils.java | 2 +- 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 01d3a36f8fae2..90e1ac72e8616 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -234,6 +235,20 @@ public void delete(BlobKey key) throws IOException{ } } + /** + * Deletes all files associated with the given job id from the BLOB cache. + * + * @param jobId JobID of the file in the blob store + */ + @Override + public void deleteAll(JobID jobId) { + try { + BlobUtils.deleteJobDirectory(storageDir, jobId); + } catch (IOException e) { + LOG.warn("Failed to delete local BLOB storage dir {}.", BlobUtils.getJobDirectory(storageDir, jobId)); + } + } + /** * Deletes the file associated with the given key from the BLOB cache and * BLOB server. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 8a70559b589ec..d9bb96436761a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -213,18 +213,6 @@ File getStorageLocation(JobID jobID, String key) { return BlobUtils.getStorageLocation(storageDir, jobID, key); } - /** - * Method which deletes all files associated with the given jobID. - * - *

This is only called from the {@link BlobServerConnection} - * - * @param jobID all files associated to this jobID will be deleted - * @throws IOException - */ - void deleteJobDirectory(JobID jobID) throws IOException { - BlobUtils.deleteJobDirectory(storageDir, jobID); - } - /** * Returns a temporary file inside the BLOB server's incoming directory. * @@ -400,6 +388,22 @@ public void delete(BlobKey key) throws IOException { blobStore.delete(key); } + /** + * Deletes all files associated with the given job id from the storage. + * + * @param jobId JobID of the files in the blob store + */ + @Override + public void deleteAll(final JobID jobId) { + try { + BlobUtils.deleteJobDirectory(storageDir, jobId); + } catch (IOException e) { + LOG.warn("Failed to delete local BLOB storage dir {}.", BlobUtils.getJobDirectory(storageDir, jobId)); + } + + blobStore.deleteAll(jobId); + } + /** * Returns the port on which the server is listening. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index 13a90c626fac6..2b4ccd436cdeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -423,9 +423,7 @@ else if (type == JOB_ID_SCOPE) { readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); JobID jobID = JobID.fromByteArray(jidBytes); - blobServer.deleteJobDirectory(jobID); - - blobStore.deleteAll(jobID); + blobServer.deleteAll(jobID); } else { throw new IOException("Unrecognized addressing type: " + type); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index 6b837aa5b0e4c..8147b3747209a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.api.common.JobID; + import java.io.IOException; import java.net.URL; @@ -38,13 +40,20 @@ public interface BlobService { /** - * This method deletes the file associated with the provided blob key. + * Deletes the file associated with the provided blob key. * * @param key associated with the file to be deleted * @throws IOException */ void delete(BlobKey key) throws IOException; + /** + * Deletes all files associated with the given job id. + * + * @param jobId JobID of the files in the blob store + */ + void deleteAll(JobID jobId); + /** * Returns the port of the blob service. * @return the port of the blob service. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index c540f7401579d..b21fe7d2ec9b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -190,7 +190,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) { * the ID of the job to return the storage directory for * @return the storage directory for BLOBs belonging to the job with the given ID */ - private static File getJobDirectory(File storageDir, JobID jobID) { + static File getJobDirectory(File storageDir, JobID jobID) { final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString()); if (!jobDirectory.exists() && !jobDirectory.mkdirs()) { From 8ec1538205269c393b7b4fbdfc5eda5610bc5df1 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 20 Dec 2016 18:52:19 +0100 Subject: [PATCH 06/17] [FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs These blobs are referenced by the job ID and a selected name instead of the hash sum of the blob's contents. Some code was already prepared but lacked the proper additions in further APIs. This commit adds some. --- .../apache/flink/runtime/blob/BlobCache.java | 156 +++++++++++++++--- .../apache/flink/runtime/blob/BlobServer.java | 61 ++++++- .../flink/runtime/blob/BlobService.java | 24 ++- 3 files changed, 211 insertions(+), 30 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 90e1ac72e8616..15f434e9ce3e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -151,8 +151,8 @@ private BlobCache( /** * Returns the URL for the BLOB with the given key. The method will first attempt to serve - * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it - * from this cache's BLOB server. + * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to + * download it from this cache's BLOB server. * * @param requiredBlob The key of the desired BLOB. * @return URL referring to the local storage location of the BLOB. @@ -190,31 +190,13 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { final InputStream is = bc.get(requiredBlob); final OutputStream os = new FileOutputStream(localJarFile) ) { - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } + getURLTransferFile(buf, is, os); // success, we finished return localJarFile.toURI().toURL(); } catch (Throwable t) { - String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + - " and store it under " + localJarFile.getAbsolutePath(); - if (attempt < numFetchRetries) { - if (LOG.isDebugEnabled()) { - LOG.debug(message + " Retrying...", t); - } else { - LOG.error(message + " Retrying..."); - } - } - else { - LOG.error(message + " No retries left.", t); - throw new IOException(message, t); - } + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); // retry ++attempt; @@ -223,15 +205,120 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { } // end loop over retries } + /** + * Returns the URL for the BLOB with the given parameters. The method will first attempt to + * serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try + * to download it from this cache's BLOB server. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + * @return URL referring to the local storage location of the BLOB. + * @throws java.io.FileNotFoundException if the path does not exist; + * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. + */ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); + + final File localJarFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } + + // first try the distributed blob store (if available) + try { + blobStore.get(jobId, key, localJarFile); + } catch (Exception e) { + LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e); + } + + if (localJarFile.exists()) { + return localJarFile.toURI().toURL(); + } + + // fallback: download from the BlobServer + final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {}/{} from {}", jobId, key, serverAddress); + + // loop over retries + int attempt = 0; + while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(jobId, key); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(String.format("%s/%s", jobId, key), localJarFile, attempt, t); + + // retry + ++attempt; + LOG.info("Downloading {}/{} from {} (retry {})", jobId, key, serverAddress, attempt); + } + } // end loop over retries + } + + private static void getURLTransferFile( + final byte[] buf, final InputStream is, final OutputStream os) throws IOException { + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; + } + os.write(buf, 0, read); + } + } + + private final void getURLOnException( + final String requiredBlob, final File localJarFile, final int attempt, + final Throwable t) throws IOException { + String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + + " and store it under " + localJarFile.getAbsolutePath(); + if (attempt < numFetchRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", t); + } else { + LOG.error(message + " Retrying..."); + } + } + else { + LOG.error(message + " No retries left.", t); + throw new IOException(message, t); + } + } + /** * Deletes the file associated with the given key from the BLOB cache. + * * @param key referring to the file to be deleted */ - public void delete(BlobKey key) throws IOException{ + @Override + public void delete(BlobKey key) { final File localFile = BlobUtils.getStorageLocation(storageDir, key); if (localFile.exists() && !localFile.delete()) { - LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath()); + LOG.warn("Failed to delete locally cached BLOB {} at {}" + key, localFile.getAbsolutePath()); + } + } + + /** + * Deletes the file associated with the given job and key from the BLOB cache. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + */ + @Override + public void delete(JobID jobId, String key) { + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists() && !localFile.delete()) { + LOG.warn("Failed to delete locally cached BLOB {}/{} at {}", jobId, key, localFile.getAbsolutePath()); } } @@ -269,6 +356,27 @@ public void deleteGlobal(BlobKey key) throws IOException { } } + /** + * Deletes the file associated with the given job and key from the BLOB + * cache and BLOB server. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + * @throws IOException + * thrown if an I/O error occurs while transferring the request to + * the BLOB server or if the BLOB server cannot delete the file + */ + public void deleteGlobal(JobID jobId, String key) throws IOException { + BlobClient bc = createClient(); + try { + delete(jobId, key); + bc.delete(jobId, key); + } + finally { + bc.close(); + } + } + @Override public int getPort() { return serverAddress.getPort(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index d9bb96436761a..e39f7d52cc26e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -368,15 +368,45 @@ public URL getURL(BlobKey requiredBlob) throws IOException { } } + @Override + public URL getURL(JobID jobId, String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { + return localFile.toURI().toURL(); + } + else { + try { + // Try the blob store + blobStore.get(jobId, key, localFile); + } + catch (Exception e) { + throw new IOException("Failed to copy from blob store.", e); + } + + if (localFile.exists()) { + return localFile.toURI().toURL(); + } + else { + throw new FileNotFoundException("Local file " + localFile + " does not exist " + + "and failed to copy from blob store."); + } + } + } + /** - * This method deletes the file associated to the blob key if it exists in the local storage + * Deletes the file associated to the blob key if it exists in the local storage * of the blob server. * * @param key associated with the file to be deleted - * @throws IOException */ @Override - public void delete(BlobKey key) throws IOException { + public void delete(BlobKey key) { + checkArgument(key != null, "BLOB key must not be null."); + final File localFile = BlobUtils.getStorageLocation(storageDir, key); if (localFile.exists()) { @@ -388,6 +418,29 @@ public void delete(BlobKey key) throws IOException { blobStore.delete(key); } + /** + * Deletes the file associated with the given job and key if it exists in the local + * storage of the blob server. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + */ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { + if (!localFile.delete()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + } + } + + blobStore.delete(jobId, key); + } + /** * Deletes all files associated with the given job id from the storage. * @@ -395,6 +448,8 @@ public void delete(BlobKey key) throws IOException { */ @Override public void deleteAll(final JobID jobId) { + checkArgument(jobId != null, "Job id must not be null."); + try { BlobUtils.deleteJobDirectory(storageDir, jobId); } catch (IOException e) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index 8147b3747209a..c8d3968fbd823 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -33,19 +33,37 @@ public interface BlobService { * * @param key blob key associated with the requested file * @return The URL to the file. - * @throws java.io.FileNotFoundException when the path does not exist; + * @throws java.io.FileNotFoundException if the path does not exist; * @throws IOException if any other error occurs when retrieving the file */ URL getURL(BlobKey key) throws IOException; + /** + * Returns the URL of the file associated with the provided parameters. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + * @return The URL to the file. + * @throws java.io.FileNotFoundException if the path does not exist; + * @throws IOException if any other error occurs when retrieving the file + */ + URL getURL(JobID jobId, String key) throws IOException; + /** * Deletes the file associated with the provided blob key. * * @param key associated with the file to be deleted - * @throws IOException */ - void delete(BlobKey key) throws IOException; + void delete(BlobKey key); + + /** + * Deletes the file associated with the provided parameters. + * + * @param jobId JobID of the file in the blob store + * @param key String key of the file in the blob store + */ + void delete(JobID jobId, String key); /** * Deletes all files associated with the given job id. From bed7ab26903e7f3f264997cc7778909a8f0439fa Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 21 Dec 2016 17:59:27 +0100 Subject: [PATCH 07/17] [FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task termination --- .../librarycache/BlobLibraryCacheManager.java | 10 ++++++++++ .../librarycache/FallbackLibraryCacheManager.java | 7 +++++++ .../execution/librarycache/LibraryCacheManager.java | 10 +++++++++- .../apache/flink/runtime/taskmanager/TaskManager.scala | 3 +++ .../librarycache/BlobLibraryCacheManagerTest.java | 10 +++++++++- 5 files changed, 38 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index b0d5d834e459e..feb0e5aaf8fa9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -150,6 +150,11 @@ public void registerTask(JobID jobId, ExecutionAttemptID task, Collection re void unregisterTask(JobID id, ExecutionAttemptID execution); /** - * Unregisters a job from the library cache manager. + * Unregisters a job from the library cache manager and initiates the cleanup of stored resources. * * @param id job ID */ diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 2e8a6fa19132a..64780881052e2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1305,6 +1305,9 @@ class TaskManager( s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName} " + s"(${task.getExecutionId})") + // delete all NAME_ADDRESSABLE BLOBs + libraryCacheManager.get.getBlobService.deleteAll(task.getJobID) + val accumulators = { val registry = task.getAccumulatorRegistry registry.getSnapshot diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 5d9ade3281e01..aec34f5beed1c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -63,6 +63,7 @@ public void testLibraryCacheManagerCleanup() { keys.add(bc.put(buf)); buf[0] += 1; keys.add(bc.put(buf)); + bc.put(jid, "test", buf); long cleanupInterval = 1000l; libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval); @@ -84,7 +85,7 @@ public void testLibraryCacheManagerCleanup() { { long deadline = System.currentTimeMillis() + 30000; do { - Thread.sleep(500); + Thread.sleep(100); } while (libraryCacheManager.getNumberOfCachedLibraries() > 0 && System.currentTimeMillis() < deadline); @@ -107,6 +108,13 @@ public void testLibraryCacheManagerCleanup() { assertEquals(2, caughtExceptions); + try { + bc.get(jid, "test"); + fail("name-addressable BLOB should have been deleted"); + } catch (IOException e) { + // expected + } + bc.close(); } catch (Exception e) { From 5f3b8565bbbc0c76ffda09ab0a3c4e375cf728b0 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 9 Mar 2017 19:14:52 +0100 Subject: [PATCH 08/17] [FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the access methods that the BlobService implementations provide. This adds tests covering both. --- .../runtime/blob/BlobCacheSuccessTest.java | 15 +- .../flink/runtime/blob/BlobServerGetTest.java | 132 +++++++++++++++++- 2 files changed, 141 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index db55331b54324..72b9f9bb6365d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.junit.Rule; @@ -97,6 +98,8 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit // Upload BLOBs BlobClient blobClient = null; + JobID jobId = new JobID(); + String name = "random name"; try { blobClient = new BlobClient(serverAddress, config); @@ -104,6 +107,8 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit blobKeys.add(blobClient.put(buf)); buf[0] = 1; // Make sure the BLOB key changes blobKeys.add(blobClient.put(buf)); + + blobClient.put(jobId, name, buf); } finally { if (blobClient != null) { blobClient.close(); @@ -132,6 +137,7 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit for (BlobKey blobKey : blobKeys) { blobCache.getURL(blobKey); } + blobCache.getURL(jobId, name); if (blobServer != null) { // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. @@ -139,14 +145,15 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit blobServer = null; } - final URL[] urls = new URL[blobKeys.size()]; + final URL[] urls = new URL[blobKeys.size() + 1]; - for(int i = 0; i < blobKeys.size(); i++){ - urls[i] = blobCache.getURL(blobKeys.get(i)); + urls[0] = blobCache.getURL(jobId, name); + for (int i = 1; i <= blobKeys.size(); i++) { + urls[i] = blobCache.getURL(blobKeys.get(i - 1)); } // Verify the result - assertEquals(blobKeys.size(), urls.length); + assertEquals(blobKeys.size() + 1, urls.length); for (final URL url : urls) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 59a62e132918e..272615ac40905 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.junit.Test; @@ -40,7 +41,7 @@ public class BlobServerGetTest { private final Random rnd = new Random(); @Test - public void testGetFailsDuringLookup() { + public void testGetFailsDuringLookupByBlobKey() { BlobServer server = null; BlobClient client = null; @@ -70,6 +71,74 @@ public void testGetFailsDuringLookup() { catch (IOException e) { // expected } + + // try to access the file at the server + try { + server.getURL(key); + fail("This should not succeed."); + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } + + @Test + public void testGetFailsDuringLookupByName() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress, config); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + JobID jobId = new JobID(); + String name = "random name"; + client.put(jobId, name, data); + + // delete all files to make sure that GET requests fail + File blobFile = server.getStorageLocation(jobId, name); + assertTrue(blobFile.delete()); + + // issue a GET request that fails + try { + client.get(jobId, name); + fail("This should not succeed."); + } + catch (IOException e) { + // expected + } + + // try to access the file at the server + try { + server.getURL(jobId, name); + fail("This should not succeed."); + } + catch (IOException e) { + // expected + } } catch (Exception e) { e.printStackTrace(); @@ -90,7 +159,7 @@ public void testGetFailsDuringLookup() { } @Test - public void testGetFailsDuringStreaming() { + public void testGetFailsDuringStreamingByBlobKey() { BlobServer server = null; BlobClient client = null; @@ -147,4 +216,63 @@ public void testGetFailsDuringStreaming() { } } } + + @Test + public void testGetFailsDuringStreamingByName() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress, config); + + byte[] data = new byte[5000000]; + rnd.nextBytes(data); + + JobID jobId = new JobID(); + String name = "random name"; + client.put(jobId, name, data); + + // issue a GET request that succeeds + InputStream is = client.get(jobId, name); + + byte[] receiveBuffer = new byte[50000]; + BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); + BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); + + // shut down the server + for (BlobServerConnection conn : server.getCurrentActiveConnections()) { + conn.close(); + } + + try { + byte[] remainder = new byte[data.length - 2*receiveBuffer.length]; + BlobUtils.readFully(is, remainder, 0, remainder.length, null); + // we tolerate that this succeeds, as the receiver socket may have buffered + // everything already + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + if (server != null) { + server.shutdown(); + } + } + } } From 3aeedaa5ae6f4000f914705272140986dfa8601e Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 9 Mar 2017 18:15:08 +0100 Subject: [PATCH 09/17] [FLINK-6008] do not fail the BlobServer when delete fails This also enables us to reuse some more code between BlobServerConnection and BlobServer. --- .../runtime/blob/BlobServerConnection.java | 14 +- .../runtime/blob/BlobServerDeleteTest.java | 196 ++++++++++++++++-- 2 files changed, 178 insertions(+), 32 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index 2b4ccd436cdeb..7bf7737acd1af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -397,12 +397,7 @@ private void delete(InputStream inputStream, OutputStream outputStream, byte[] b if (type == CONTENT_ADDRESSABLE) { BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = this.blobServer.getStorageLocation(key); - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(key); + blobServer.delete(key); } else if (type == NAME_ADDRESSABLE) { byte[] jidBytes = new byte[JobID.SIZE]; @@ -411,12 +406,7 @@ else if (type == NAME_ADDRESSABLE) { String key = readKey(buf, inputStream); - File blobFile = this.blobServer.getStorageLocation(jobID, key); - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(jobID, key); + blobServer.delete(jobID, key); } else if (type == JOB_ID_SCOPE) { byte[] jidBytes = new byte[JobID.SIZE]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 025a2ffaa33c2..e7a6b52945775 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.util.Random; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -41,7 +42,7 @@ public class BlobServerDeleteTest { private final Random rnd = new Random(); @Test - public void testDeleteSingle() { + public void testDeleteSingleByBlobKey() { BlobServer server = null; BlobClient client = null; @@ -59,7 +60,13 @@ public void testDeleteSingle() { BlobKey key = client.put(data); assertNotNull(key); - // issue a DELETE request + // second item + data[0] ^= 1; + BlobKey key2 = client.put(data); + assertNotNull(key2); + assertNotEquals(key, key2); + + // issue a DELETE request via the client client.delete(key); client.close(); @@ -79,6 +86,78 @@ public void testDeleteSingle() { catch (IllegalStateException e) { // expected } + + // delete a file directly on the server + server.delete(key2); + try { + server.getURL(key2); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + cleanup(server, client); + } + } + + @Test + public void testDeleteSingleByName() { + BlobServer server = null; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress, config); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + JobID jobID = new JobID(); + String name1 = "random name"; + String name2 = "any nyme"; + + client.put(jobID, name1, data); + client.put(jobID, name2, data); + + // issue a DELETE request via the client + client.delete(jobID, name1); + client.close(); + + client = new BlobClient(serverAddress, config); + try { + client.get(jobID, name1); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + + try { + client.put(new byte[1]); + fail("client should be closed after erroneous operation"); + } + catch (IllegalStateException e) { + // expected + } + + // delete a file directly on the server + server.delete(jobID, name2); + try { + server.getURL(jobID, name2); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } } catch (Exception e) { e.printStackTrace(); @@ -111,9 +190,15 @@ public void testDeleteAll() { // put content addressable (like libraries) client.put(jobID, name1, data); client.put(jobID, name2, new byte[712]); + // items for a second (different!) job ID + final byte[] jobIdBytes = jobID.getBytes(); + jobIdBytes[0] ^= 1; + JobID jobID2 = JobID.fromByteArray(jobIdBytes); + client.put(jobID2, name1, data); + client.put(jobID2, name2, new byte[712]); - // issue a DELETE ALL request + // issue a DELETE ALL request via the client client.deleteAll(jobID); client.close(); @@ -142,6 +227,23 @@ public void testDeleteAll() { catch (IOException e) { // expected } + + // delete the second set of files directly on the server + server.deleteAll(jobID2); + try { + server.getURL(jobID2, name1); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } + try { + server.getURL(jobID2, name2); + fail("BLOB should have been deleted"); + } + catch (IOException e) { + // expected + } } catch (Exception e) { e.printStackTrace(); @@ -174,13 +276,16 @@ public void testDeleteAlreadyDeletedByBlobKey() { File blobFile = server.getStorageLocation(key); assertTrue(blobFile.delete()); - // issue a DELETE request + // issue a DELETE request via the client try { client.delete(key); } catch (IOException e) { fail("DELETE operation should not fail if file is already deleted"); } + + // issue a DELETE request on the server + server.delete(key); } catch (Exception e) { e.printStackTrace(); @@ -214,13 +319,16 @@ public void testDeleteAlreadyDeletedByName() { File blobFile = server.getStorageLocation(jid, name); assertTrue(blobFile.delete()); - // issue a DELETE request + // issue a DELETE request via the client try { client.delete(jid, name); } catch (IOException e) { fail("DELETE operation should not fail if file is already deleted"); } + + // issue a DELETE request on the server + server.delete(jid, name); } catch (Exception e) { e.printStackTrace(); @@ -232,12 +340,14 @@ public void testDeleteAlreadyDeletedByName() { } @Test - public void testDeleteFails() { + public void testDeleteFailsByBlobKey() { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; BlobClient client = null; + File blobFile = null; + File directory = null; try { Configuration config = new Configuration(); server = new BlobServer(config); @@ -252,30 +362,76 @@ public void testDeleteFails() { BlobKey key = client.put(data); assertNotNull(key); - File blobFile = server.getStorageLocation(key); - File directory = blobFile.getParentFile(); + blobFile = server.getStorageLocation(key); + directory = blobFile.getParentFile(); assertTrue(blobFile.setWritable(false, false)); assertTrue(directory.setWritable(false, false)); - // issue a DELETE request - try { - client.delete(key); - fail("DELETE operation should fail if file cannot be deleted"); - } - catch (IOException e) { - // expected - } - finally { + // issue a DELETE request via the client + client.delete(key); + + // issue a DELETE request on the server + server.delete(key); + + // the file should still be there + server.getURL(key); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (blobFile != null && directory != null) { blobFile.setWritable(true, false); directory.setWritable(true, false); } + cleanup(server, client); } - catch (Exception e) { + } + + @Test + public void testDeleteByNameFails() { + assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. + + BlobServer server = null; + BlobClient client = null; + + File blobFile = null; + File directory = null; + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + client = new BlobClient(serverAddress, config); + + byte[] data = new byte[2000000]; + rnd.nextBytes(data); + + JobID jid = new JobID(); + String name = "------------fdghljEgRJHF+##4U789Q345"; + + client.put(jid, name, data); + + blobFile = server.getStorageLocation(jid, name); + directory = blobFile.getParentFile(); + + assertTrue(blobFile.setWritable(false, false)); + assertTrue(directory.setWritable(false, false)); + + // issue a DELETE request via the client + client.delete(jid, name); + + // issue a DELETE request on the server + server.delete(jid, name); + + // the file should still be there + server.getURL(jid, name); + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); - } - finally { + } finally { + blobFile.setWritable(true, false); + directory.setWritable(true, false); cleanup(server, client); } } From 3f9c40e98eba6e3f21e288db898aaf0314fed7e1 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 9 Mar 2017 18:32:14 +0100 Subject: [PATCH 10/17] [FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code --- .../org/apache/flink/runtime/blob/BlobCache.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 15f434e9ce3e3..332cd4e4e911b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -367,14 +367,14 @@ public void deleteGlobal(BlobKey key) throws IOException { * the BLOB server or if the BLOB server cannot delete the file */ public void deleteGlobal(JobID jobId, String key) throws IOException { - BlobClient bc = createClient(); - try { - delete(jobId, key); + // delete locally + delete(jobId, key); + // then delete on the BLOB server + // (don't use the distributed storage directly - this way the blob + // server is aware of the delete operation, too) + try (BlobClient bc = createClient()) { bc.delete(jobId, key); } - finally { - bc.close(); - } } @Override From 8807cd44a20ec53cfdbf906da7d4f1f36fbabc2f Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 17 Mar 2017 16:21:40 +0100 Subject: [PATCH 11/17] [FLINK-6008] fix concurrent job directory creation also add according unit tests --- .../apache/flink/runtime/blob/BlobUtils.java | 2 +- .../flink/runtime/blob/BlobServerPutTest.java | 105 ++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index b21fe7d2ec9b3..dabf7ca17e16f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -193,7 +193,7 @@ static File getStorageLocation(File storageDir, JobID jobID, String key) { static File getJobDirectory(File storageDir, JobID jobID) { final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString()); - if (!jobDirectory.exists() && !jobDirectory.mkdirs()) { + if (!jobDirectory.mkdirs() && !jobDirectory.exists()) { throw new RuntimeException("Could not create jobId directory '" + jobDirectory.getAbsolutePath() + "'."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index c4d6d1cbf0a94..8a37a07e7a56d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.util.OperatingSystem; import org.junit.Test; @@ -41,6 +42,110 @@ public class BlobServerPutTest { private final Random rnd = new Random(); + + // --- concurrency tests for utility methods which could fail during the put operation --- + + /** + * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)} + */ + public static class ContentAddressableGetStorageLocation extends CheckedThread { + private final BlobServer server; + private final BlobKey key; + + public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) { + this.server = server; + this.key = key; + } + + @Override + public void go() throws Exception { + server.getStorageLocation(key); + } + } + + /** + * Tests concurrent calls to {@link BlobServer#getStorageLocation(BlobKey)}. + */ + @Test + public void testServerContentAddressableGetStorageLocationConcurrent() throws Exception { + BlobServer server = new BlobServer(new Configuration()); + + try { + BlobKey key = new BlobKey(); + CheckedThread[] threads = new CheckedThread[] { + new ContentAddressableGetStorageLocation(server, key), + new ContentAddressableGetStorageLocation(server, key), + new ContentAddressableGetStorageLocation(server, key) + }; + checkedThreadSimpleTest(threads); + } finally { + server.shutdown(); + } + } + + /** + * Helper method to first start all threads and then wait for their completion. + * + * @param threads threads to use + * @throws Exception exceptions that are thrown from the threads + */ + protected void checkedThreadSimpleTest(CheckedThread[] threads) + throws Exception { + + // start all threads + for (CheckedThread t: threads) { + t.start(); + } + + // wait for thread completion and check exceptions + for (CheckedThread t: threads) { + t.sync(); + } + } + + /** + * Checked thread that calls {@link BlobServer#getStorageLocation(JobID, String)} + */ + public static class NameAddressableGetStorageLocation extends CheckedThread { + private final BlobServer server; + private final JobID jid; + private final String name; + + public NameAddressableGetStorageLocation(BlobServer server, JobID jid, String name) { + this.server = server; + this.jid = jid; + this.name = name; + } + + @Override + public void go() throws Exception { + server.getStorageLocation(jid, name); + } + } + + /** + * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, String)}. + */ + @Test + public void testServerNameAddressableGetStorageLocationConcurrent() throws Exception { + BlobServer server = new BlobServer(new Configuration()); + + try { + JobID jid = new JobID(); + String stringKey = "my test key"; + CheckedThread[] threads = new CheckedThread[] { + new NameAddressableGetStorageLocation(server, jid, stringKey), + new NameAddressableGetStorageLocation(server, jid, stringKey), + new NameAddressableGetStorageLocation(server, jid, stringKey) + }; + checkedThreadSimpleTest(threads); + } finally { + server.shutdown(); + } + } + + // -------------------------------------------------------------------------------------------- + @Test public void testPutBufferSuccessful() { BlobServer server = null; From d6f3f5381439130af77150d80cb89f5944a0c282 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 17 Mar 2017 18:27:38 +0100 Subject: [PATCH 12/17] [FLINK-6008] address some of the PR comments by @StephanEwen --- docs/setup/config.md | 4 ++-- .../java/org/apache/flink/runtime/blob/BlobCache.java | 6 +++--- .../java/org/apache/flink/runtime/blob/BlobServer.java | 8 +++----- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/docs/setup/config.md b/docs/setup/config.md index 032a5ba9a3ba9..36e9118cb5923 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -488,6 +488,8 @@ May be set to -1 to disable this feature. - `none` (default): No high availability. A single JobManager runs and no JobManager state is checkpointed. - `zookeeper`: Supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `high-availability.zookeeper.quorum` configuration value. +- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`. + Previously this key was named `recovery.mode` and the default value was `standalone`. #### ZooKeeper-based HA Mode @@ -496,8 +498,6 @@ Previously this key was named `recovery.mode` and the default value was `standal - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`. -- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`. - - `high-availability.zookeeper.path.latch`: (Default `/leaderlatch`) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named `recovery.zookeeper.path.latch`. - `high-availability.zookeeper.path.leader`: (Default `/leader`) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named `recovery.zookeeper.path.leader`. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 332cd4e4e911b..794591640e9ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -302,8 +302,8 @@ private final void getURLOnException( public void delete(BlobKey key) { final File localFile = BlobUtils.getStorageLocation(storageDir, key); - if (localFile.exists() && !localFile.delete()) { - LOG.warn("Failed to delete locally cached BLOB {} at {}" + key, localFile.getAbsolutePath()); + if (!localFile.delete() && localFile.exists()) { + LOG.warn("Failed to delete locally cached BLOB {} at {}", key, localFile.getAbsolutePath()); } } @@ -317,7 +317,7 @@ public void delete(BlobKey key) { public void delete(JobID jobId, String key) { final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); - if (localFile.exists() && !localFile.delete()) { + if (!localFile.delete() && localFile.exists()) { LOG.warn("Failed to delete locally cached BLOB {}/{} at {}", jobId, key, localFile.getAbsolutePath()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index e39f7d52cc26e..d164b2cd4589d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -409,10 +409,8 @@ public void delete(BlobKey key) { final File localFile = BlobUtils.getStorageLocation(storageDir, key); - if (localFile.exists()) { - if (!localFile.delete()) { - LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); - } + if (!localFile.delete() && localFile.exists()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); } blobStore.delete(key); @@ -452,7 +450,7 @@ public void deleteAll(final JobID jobId) { try { BlobUtils.deleteJobDirectory(storageDir, jobId); - } catch (IOException e) { + } catch (Exception e) { LOG.warn("Failed to delete local BLOB storage dir {}.", BlobUtils.getJobDirectory(storageDir, jobId)); } From 4a9a6b2e7272e136143f101c75c1cfaacabc3368 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 18 Apr 2017 16:37:37 +0200 Subject: [PATCH 13/17] [FLINK-6008] some comments about BlobLibraryCacheManager cleanup --- .../librarycache/BlobLibraryCacheManager.java | 22 +++++++++++++++ .../librarycache/LibraryCacheManager.java | 27 +++++++++++++++---- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index feb0e5aaf8fa9..970a5cbf66b08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -48,6 +48,11 @@ * a set of libraries (typically JAR files) which the job requires to run. The library cache manager * caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton * programming pattern, so there exists at most one library manager at a time. + *

+ * All files registered via {@link #registerJob(JobID, Collection, Collection)} are reference-counted + * and are removed by a timer-based cleanup task if their reference counter is zero. + * NOTE: this does not apply to files that enter the blob service via + * {@link #getFile(BlobKey)}! */ public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager { @@ -73,6 +78,12 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC // -------------------------------------------------------------------------------------------- + /** + * Creates the blob library cache manager. + * + * @param blobService blob file retrieval service to use + * @param cleanupInterval cleanup interval in milliseconds + */ public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) { this.blobService = checkNotNull(blobService); @@ -201,6 +212,17 @@ public BlobService getBlobService() { return blobService; } + /** + * Returns a file handle to the file identified by the blob key. + *

+ * NOTE: if not already registered during + * {@link #registerJob(JobID, Collection, Collection)}, files that enter the library cache / + * backing blob store using this method will not be reference-counted and garbage-collected! + * + * @param blobKey identifying the requested file + * @return File handle + * @throws IOException if any error occurs when retrieving the file + */ @Override public File getFile(BlobKey blobKey) throws IOException { return new File(blobService.getURL(blobKey).getFile()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java index 1c19fd7a1dcba..bec29c4e312b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java @@ -49,7 +49,7 @@ public interface LibraryCacheManager { * * @param blobKey identifying the requested file * @return File handle - * @throws IOException + * @throws IOException if any error occurs when retrieving the file */ File getFile(BlobKey blobKey) throws IOException; @@ -59,7 +59,9 @@ public interface LibraryCacheManager { * @param id job ID * @param requiredJarFiles collection of blob keys identifying the required jar files * @param requiredClasspaths collection of classpaths that are added to the user code class loader - * @throws IOException + * @throws IOException if any error occurs when retrieving the required jar files + * + * @see #unregisterJob(JobID) counterpart of this method */ void registerJob(JobID id, Collection requiredJarFiles, Collection requiredClasspaths) throws IOException; @@ -71,21 +73,36 @@ void registerJob(JobID id, Collection requiredJarFiles, Collection * @param requiredJarFiles collection of blob keys identifying the required jar files * @param requiredClasspaths collection of classpaths that are added to the user code class loader * @throws IOException + * + * @see #unregisterTask(JobID, ExecutionAttemptID) counterpart of this method */ void registerTask(JobID id, ExecutionAttemptID execution, Collection requiredJarFiles, Collection requiredClasspaths) throws IOException; /** - * Unregisters a job from the library cache manager. + * Unregisters a job task execution from the library cache manager. + *

+ * Note: this is the counterpart of {@link #registerTask(JobID, + * ExecutionAttemptID, Collection, Collection)} and it will not remove any job added via + * {@link #registerJob(JobID, Collection, Collection)}! * * @param id job ID + * + * @see #registerTask(JobID, ExecutionAttemptID, Collection, Collection) counterpart of this method */ void unregisterTask(JobID id, ExecutionAttemptID execution); - + /** - * Unregisters a job from the library cache manager and initiates the cleanup of stored resources. + * Unregisters a job from the library cache manager and initiates the cleanup of stored + * resources. + *

+ * Note: this is the counterpart of {@link #registerJob(JobID, Collection, + * Collection)} and it will not remove any job task execution added via {@link + * #registerTask(JobID, ExecutionAttemptID, Collection, Collection)}! * * @param id job ID + * + * @see #registerJob(JobID, Collection, Collection) counterpart of this method */ void unregisterJob(JobID id); From 734a52a27413f0efc9ee8081d94cbeba77743528 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 19 Apr 2017 15:39:03 +0200 Subject: [PATCH 14/17] [hotfix] minor typos --- .../main/java/org/apache/flink/runtime/taskmanager/Task.java | 2 +- .../scala/org/apache/flink/runtime/jobmanager/JobManager.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index ef934de16eedc..aeab0740fecf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -507,7 +507,7 @@ public void startTaskThread() { } /** - * The core work method that bootstraps the task and executes it code + * The core work method that bootstraps the task and executes its code */ @Override public void run() { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f2ecde526ff69..2aa5b0a1ff5bc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1791,7 +1791,7 @@ class JobManager( libraryCacheManager.unregisterJob(jobID) } catch { case t: Throwable => - log.error(s"Could not properly unregister job $jobID form the library cache.", t) + log.error(s"Could not properly unregister job $jobID from the library cache.", t) } jobManagerMetricGroup.foreach(_.removeJob(jobID)) From d6119c2f917cd209aea9edb1dd7d7076ff656668 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 19 Apr 2017 15:40:25 +0200 Subject: [PATCH 15/17] [FLINK-6008] add retrieval and proper cleanup of name-addressable blobs at the BlobLibraryCacheManager --- .../librarycache/BlobLibraryCacheManager.java | 20 +++++++++++-------- .../FallbackLibraryCacheManager.java | 5 +++++ .../librarycache/LibraryCacheManager.java | 13 ++++++++++++ .../runtime/taskmanager/TaskManager.scala | 3 --- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index 970a5cbf66b08..66245c699b30d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -66,10 +66,10 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC private final Object lockObject = new Object(); /** Registered entries per job */ - private final Map cacheEntries = new HashMap(); + private final Map cacheEntries = new HashMap<>(); - /** Map to store the number of reference to a specific file */ - private final Map blobKeyReferenceCounters = new HashMap(); + /** Map to store the number of references to a specific file */ + private final Map blobKeyReferenceCounters = new HashMap<>(); /** The blob service to download libraries */ private final BlobService blobService; @@ -161,11 +161,6 @@ public void registerTask(JobID jobId, ExecutionAttemptID task, Collection delete all NAME_ADDRESSABLE BLOBs + blobService.deleteAll(jobId); } } // else has already been unregistered @@ -228,6 +227,11 @@ public File getFile(BlobKey blobKey) throws IOException { return new File(blobService.getURL(blobKey).getFile()); } + @Override + public File getFile(JobID jobId, String key) throws IOException { + return new File(blobService.getURL(jobId, key).getFile()); + } + public int getBlobServerPort() { return blobService.getPort(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java index 0d791cdfa30e8..a0d8767e8ab1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java @@ -50,6 +50,11 @@ public File getFile(BlobKey blobKey) throws IOException { throw new IOException("There is no file associated to the blob key " + blobKey); } + @Override + public File getFile(final JobID jobId, final String key) throws IOException { + throw new IOException("There is no file associated to the job id " + jobId + " and name " + key); + } + @Override public void registerJob(JobID id, Collection requiredJarFiles, Collection requiredClasspaths) { LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java index bec29c4e312b2..f375502eaa91b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java @@ -53,6 +53,19 @@ public interface LibraryCacheManager { */ File getFile(BlobKey blobKey) throws IOException; + /** + * Returns a file handle to the file identified by a job id. + *

+ * Note: these files will be cleaned up when the last job (task execution) + * with the given jobId is removed from this cache manager. + * + * @param jobId JobID of the requested file + * @param key String key of the requested file + * @return File handle + * @throws IOException if any error occurs when retrieving the file + */ + File getFile(JobID jobId, String key) throws IOException; + /** * Registers a job with its required jar files and classpaths. The jar files are identified by their blob keys. * diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 64780881052e2..2e8a6fa19132a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1305,9 +1305,6 @@ class TaskManager( s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName} " + s"(${task.getExecutionId})") - // delete all NAME_ADDRESSABLE BLOBs - libraryCacheManager.get.getBlobService.deleteAll(task.getJobID) - val accumulators = { val registry = task.getAccumulatorRegistry registry.getSnapshot From 4fb5052d4c5a7d1ae22cf52a2293a948765e5c67 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 19 Apr 2017 16:10:16 +0200 Subject: [PATCH 16/17] [FLINK-6008] further cleanup tests for BlobLibraryCacheManager --- .../BlobLibraryCacheManagerTest.java | 145 ++++++++++++++++-- 1 file changed, 128 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index aec34f5beed1c..f9faa7e26bc30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -44,8 +44,12 @@ public class BlobLibraryCacheManagerTest { + /** + * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link + * BlobLibraryCacheManager#unregisterJob(JobID)}. + */ @Test - public void testLibraryCacheManagerCleanup() { + public void testLibraryCacheManagerJobCleanup() { JobID jid = new JobID(); List keys = new ArrayList(); @@ -69,14 +73,9 @@ public void testLibraryCacheManagerCleanup() { libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval); libraryCacheManager.registerJob(jid, keys, Collections.emptyList()); - List files = new ArrayList(); - - for (BlobKey key : keys) { - files.add(libraryCacheManager.getFile(key)); - } - - assertEquals(2, files.size()); - files.clear(); + assertEquals(2, checkFilesExist(keys, libraryCacheManager, true)); + assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); + assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid)); libraryCacheManager.unregisterJob(jid); @@ -88,25 +87,137 @@ public void testLibraryCacheManagerCleanup() { Thread.sleep(100); } while (libraryCacheManager.getNumberOfCachedLibraries() > 0 && - System.currentTimeMillis() < deadline); + System.currentTimeMillis() < deadline); } // this fails if we exited via a timeout assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries()); + assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid)); + + // the blob cache should no longer contain the files + assertEquals(0, checkFilesExist(keys, libraryCacheManager, false)); - int caughtExceptions = 0; + try { + bc.get(jid, "test"); + fail("name-addressable BLOB should have been deleted"); + } catch (IOException e) { + // expected + } - for (BlobKey key : keys) { - // the blob cache should no longer contain the files + bc.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (server != null) { + server.shutdown(); + } + + if (libraryCacheManager != null) { try { - files.add(libraryCacheManager.getFile(key)); + libraryCacheManager.shutdown(); } - catch (IOException ioe) { - caughtExceptions++; + catch (IOException e) { + e.printStackTrace(); } } + } + } + + /** + * Checks how many of the files given by blob keys are accessible. + * + * @param keys + * blob keys to check + * @param libraryCacheManager + * cache manager to use + * @param doThrow + * whether exceptions should be ignored (false), or throws (true) + * + * @return number of files we were able to retrieve via {@link BlobLibraryCacheManager#getFile(BlobKey)} + */ + private int checkFilesExist( + List keys, BlobLibraryCacheManager libraryCacheManager, boolean doThrow) + throws IOException { + int numFiles = 0; + + for (BlobKey key : keys) { + try { + libraryCacheManager.getFile(key); + ++numFiles; + } catch (IOException e) { + if (doThrow) { + throw e; + } + } + } + + return numFiles; + } + + /** + * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link + * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}. + */ + @Test + public void testLibraryCacheManagerTaskCleanup() { + + JobID jid = new JobID(); + ExecutionAttemptID executionId1 = new ExecutionAttemptID(); + ExecutionAttemptID executionId2 = new ExecutionAttemptID(); + List keys = new ArrayList(); + BlobServer server = null; + BlobLibraryCacheManager libraryCacheManager = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + server = new BlobServer(config); + InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); + BlobClient bc = new BlobClient(blobSocketAddress, config); + + keys.add(bc.put(buf)); + buf[0] += 1; + keys.add(bc.put(buf)); + bc.put(jid, "test", buf); + + long cleanupInterval = 1000l; + libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval); + libraryCacheManager.registerTask(jid, executionId1, keys, Collections.emptyList()); + libraryCacheManager.registerTask(jid, executionId2, keys, Collections.emptyList()); + + assertEquals(2, checkFilesExist(keys, libraryCacheManager, true)); + assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); + assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid)); + + libraryCacheManager.unregisterTask(jid, executionId1); + + assertEquals(2, checkFilesExist(keys, libraryCacheManager, true)); + assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); + assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid)); + + libraryCacheManager.unregisterTask(jid, executionId2); + + // because we cannot guarantee that there are not thread races in the build system, we + // loop for a certain while until the references disappear + { + long deadline = System.currentTimeMillis() + 30000; + do { + Thread.sleep(100); + } + while (libraryCacheManager.getNumberOfCachedLibraries() > 0 && + System.currentTimeMillis() < deadline); + } + + // this fails if we exited via a timeout + assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries()); + assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid)); - assertEquals(2, caughtExceptions); + // the blob cache should no longer contain the files + assertEquals(0, checkFilesExist(keys, libraryCacheManager, false)); try { bc.get(jid, "test"); From e7409848aaab095e1707386ec21f4d03641aa8a7 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 19 Apr 2017 16:16:40 +0200 Subject: [PATCH 17/17] [FLINK-6008] remove the exposal of the undelying blob service in LibraryCacheManager This may actually change in future. --- .../librarycache/BlobLibraryCacheManager.java | 5 ----- .../librarycache/FallbackLibraryCacheManager.java | 9 +-------- .../execution/librarycache/LibraryCacheManager.java | 10 +--------- 3 files changed, 2 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index 66245c699b30d..9daeba2050512 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -206,11 +206,6 @@ public ClassLoader getClassLoader(JobID id) { } } - @Override - public BlobService getBlobService() { - return blobService; - } - /** * Returns a file handle to the file identified by the blob key. *

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java index a0d8767e8ab1f..e85271428f799 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java @@ -18,10 +18,9 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +38,6 @@ public ClassLoader getClassLoader(JobID id) { return getClass().getClassLoader(); } - @Override - public BlobService getBlobService() { - LOG.warn("FallbackLibraryCacheManager does not have a backing blob storage."); - return null; - } - @Override public File getFile(BlobKey blobKey) throws IOException { throw new IOException("There is no file associated to the blob key " + blobKey); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java index f375502eaa91b..c7285cec7b28f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java @@ -18,10 +18,9 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; import java.io.File; import java.io.IOException; @@ -37,13 +36,6 @@ public interface LibraryCacheManager { */ ClassLoader getClassLoader(JobID id); - /** - * Returns the blob storage service that is being used - * - * @return blob storage service - */ - BlobService getBlobService(); - /** * Returns a file handle to the file identified by the blob key. *