From 1f86efd013a81e84ba1556de0a04e4ac70229f79 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 22 Jun 2017 17:31:17 +0200 Subject: [PATCH 1/6] [FLINK-7053][blob] remove code duplication in BlobClientSslTest This lets BlobClientSslTest extend BlobClientTest as most of its implementation came from there and was simply copied. --- .../flink/runtime/blob/BlobClientSslTest.java | 248 +++++------------- .../flink/runtime/blob/BlobClientTest.java | 64 +++-- 2 files changed, 102 insertions(+), 210 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index a5189ead7268d..db363e7505aa7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -18,48 +18,33 @@ package org.apache.flink.runtime.blob; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.security.MessageDigest; -import java.util.Collections; -import java.util.List; - import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.SecurityOptions; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; + +import static org.junit.Assert.fail; + /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ -public class BlobClientSslTest extends TestLogger { - - /** The buffer size used during the tests in bytes. */ - private static final int TEST_BUFFER_SIZE = 17 * 1000; +public class BlobClientSslTest extends BlobClientTest { /** The instance of the SSL BLOB server used during the tests. */ private static BlobServer BLOB_SSL_SERVER; - /** The SSL blob service client configuration */ - private static Configuration sslClientConfig; + /** Instance of a non-SSL BLOB server with SSL-enabled security options. */ + private static BlobServer BLOB_NON_SSL_SERVER; - /** The instance of the non-SSL BLOB server used during the tests. */ - private static BlobServer BLOB_SERVER; + /** The SSL blob service client configuration. */ + private static Configuration sslClientConfig; - /** The non-ssl blob service client configuration */ - private static Configuration clientConfig; + /** The non-SSL blob service client configuration with SSL-enabled security options. */ + private static Configuration nonSslClientConfig; /** * Starts the SSL enabled BLOB server. @@ -79,9 +64,6 @@ public static void startSSLServer() throws IOException { sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); } - /** - * Starts the SSL disabled BLOB server. - */ @BeforeClass public static void startNonSSLServer() throws IOException { Configuration config = new Configuration(); @@ -90,13 +72,13 @@ public static void startNonSSLServer() throws IOException { config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); + BLOB_NON_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); - clientConfig = new Configuration(); - clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); + nonSslClientConfig = new Configuration(); + nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); + nonSslClientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); + nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); } /** @@ -107,195 +89,89 @@ public static void stopServers() throws IOException { if (BLOB_SSL_SERVER != null) { BLOB_SSL_SERVER.close(); } + } - if (BLOB_SERVER != null) { - BLOB_SERVER.close(); - } + protected Configuration getBlobClientConfig() { + return sslClientConfig; + } + + protected BlobServer getBlobServer() { + return BLOB_SSL_SERVER; } /** - * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and - * computes the file's BLOB key. - * - * @param file - * the file to prepare for the unit tests - * @return the BLOB key of the prepared file - * @throws IOException - * thrown if an I/O error occurs while writing to the test file + * Verify ssl client to ssl server upload */ - private static BlobKey prepareTestFile(File file) throws IOException { - - MessageDigest md = BlobUtils.createMessageDigest(); - - final byte[] buf = new byte[TEST_BUFFER_SIZE]; - for (int i = 0; i < buf.length; ++i) { - buf[i] = (byte) (i % 128); - } - - FileOutputStream fos = null; - try { - fos = new FileOutputStream(file); - - for (int i = 0; i < 20; ++i) { - fos.write(buf); - md.update(buf); - } - - } finally { - if (fos != null) { - fos.close(); - } - } - - return new BlobKey(md.digest()); + @Test + public void testUploadJarFilesHelper() throws Exception { + uploadJarFile(BLOB_SSL_SERVER, sslClientConfig); } /** - * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of - * the specified file. - * - * @param inputStream - * the input stream returned from the GET operation - * @param file - * the file to compare the input stream's data to - * @throws IOException - * thrown if an I/O error occurs while reading the input stream or the file + * Verify ssl client to non-ssl server failure */ - private static void validateGet(final InputStream inputStream, final File file) throws IOException { - - InputStream inputStream2 = null; - try { - - inputStream2 = new FileInputStream(file); - - while (true) { - - final int r1 = inputStream.read(); - final int r2 = inputStream2.read(); - - assertEquals(r2, r1); - - if (r1 < 0) { - break; - } - } - - } finally { - if (inputStream2 != null) { - inputStream2.close(); - } - } - + @Test(expected = IOException.class) + public void testSSLClientFailure() throws Exception { + // SSL client connected to non-ssl server + uploadJarFile(BLOB_SERVER, sslClientConfig); } /** - * Tests the PUT/GET operations for content-addressable streams. + * Verify ssl client to non-ssl server failure */ - @Test - public void testContentAddressableStream() { - - BlobClient client = null; - InputStream is = null; - - try { - File testFile = File.createTempFile("testfile", ".dat"); - testFile.deleteOnExit(); - - BlobKey origKey = prepareTestFile(testFile); - - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort()); - client = new BlobClient(serverAddress, sslClientConfig); - - // Store the data - is = new FileInputStream(testFile); - BlobKey receivedKey = client.put(is); - assertEquals(origKey, receivedKey); - - is.close(); - is = null; - - // Retrieve the data - is = client.get(receivedKey); - validateGet(is, testFile); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - if (is != null) { - try { - is.close(); - } catch (Throwable t) {} - } - if (client != null) { - try { - client.close(); - } catch (Throwable t) {} - } - } + @Test(expected = IOException.class) + public void testSSLClientFailure2() throws Exception { + // SSL client connected to non-ssl server + uploadJarFile(BLOB_NON_SSL_SERVER, sslClientConfig); } /** - * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. + * Verify non-ssl client to ssl server failure */ - private void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception { - final File testFile = File.createTempFile("testfile", ".dat"); - testFile.deleteOnExit(); - prepareTestFile(testFile); - - InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); - - List blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig, - Collections.singletonList(new Path(testFile.toURI()))); - - assertEquals(1, blobKeys.size()); + @Test(expected = IOException.class) + public void testSSLServerFailure() throws Exception { + // Non-SSL client connected to ssl server + uploadJarFile(BLOB_SSL_SERVER, clientConfig); + } - try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { - InputStream is = blobClient.get(blobKeys.get(0)); - validateGet(is, testFile); - } + /** + * Verify non-ssl client to ssl server failure + */ + @Test(expected = IOException.class) + public void testSSLServerFailure2() throws Exception { + // Non-SSL client connected to ssl server + uploadJarFile(BLOB_SSL_SERVER, nonSslClientConfig); } /** - * Verify ssl client to ssl server upload + * Verify non-ssl connection sanity */ @Test - public void testUploadJarFilesHelper() throws Exception { - uploadJarFile(BLOB_SSL_SERVER, sslClientConfig); + public void testNonSSLConnection() throws Exception { + uploadJarFile(BLOB_SERVER, clientConfig); } /** - * Verify ssl client to non-ssl server failure + * Verify non-ssl connection sanity */ @Test - public void testSSLClientFailure() throws Exception { - try { - uploadJarFile(BLOB_SERVER, sslClientConfig); - fail("SSL client connected to non-ssl server"); - } catch (Exception e) { - // Exception expected - } + public void testNonSSLConnection2() throws Exception { + uploadJarFile(BLOB_SERVER, nonSslClientConfig); } /** - * Verify non-ssl client to ssl server failure + * Verify non-ssl connection sanity */ @Test - public void testSSLServerFailure() throws Exception { - try { - uploadJarFile(BLOB_SSL_SERVER, clientConfig); - fail("Non-SSL client connected to ssl server"); - } catch (Exception e) { - // Exception expected - } + public void testNonSSLConnection3() throws Exception { + uploadJarFile(BLOB_NON_SSL_SERVER, clientConfig); } /** * Verify non-ssl connection sanity */ @Test - public void testNonSSLConnection() throws Exception { - uploadJarFile(BLOB_SERVER, clientConfig); + public void testNonSSLConnection4() throws Exception { + uploadJarFile(BLOB_NON_SSL_SERVER, nonSslClientConfig); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 0a8f738009186..5a772e775d1e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -18,8 +18,11 @@ package org.apache.flink.runtime.blob; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; import java.io.EOFException; import java.io.File; @@ -32,12 +35,8 @@ import java.util.Collections; import java.util.List; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * This class contains unit tests for the {@link BlobClient}. @@ -47,19 +46,21 @@ public class BlobClientTest { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; - /** The instance of the BLOB server used during the tests. */ - private static BlobServer BLOB_SERVER; + /** The instance of the (non-ssl) BLOB server used during the tests. */ + static BlobServer BLOB_SERVER; - /** The blob service client and server configuration */ - private static Configuration blobServiceConfig; + /** The blob service (non-ssl) client configuration */ + static Configuration clientConfig; /** * Starts the BLOB server. */ @BeforeClass public static void startServer() throws IOException { - blobServiceConfig = new Configuration(); - BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore()); + Configuration config = new Configuration(); + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); + + clientConfig = new Configuration(); } /** @@ -204,8 +205,8 @@ public void testContentAddressableBuffer() { md.update(testBuffer); BlobKey origKey = new BlobKey(md.digest()); - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress, blobServiceConfig); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + client = new BlobClient(serverAddress, getBlobClientConfig()); // Store the data BlobKey receivedKey = client.put(testBuffer); @@ -232,11 +233,19 @@ public void testContentAddressableBuffer() { if (client != null) { try { client.close(); - } catch (Throwable t) {} + } catch (Throwable ignored) {} } } } + protected Configuration getBlobClientConfig() { + return clientConfig; + } + + protected BlobServer getBlobServer() { + return BLOB_SERVER; + } + /** * Tests the PUT/GET operations for content-addressable streams. */ @@ -252,8 +261,8 @@ public void testContentAddressableStream() { BlobKey origKey = prepareTestFile(testFile); - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress, blobServiceConfig); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + client = new BlobClient(serverAddress, getBlobClientConfig()); // Store the data is = new FileInputStream(testFile); @@ -275,12 +284,12 @@ public void testContentAddressableStream() { if (is != null) { try { is.close(); - } catch (Throwable t) {} + } catch (Throwable ignored) {} } if (client != null) { try { client.close(); - } catch (Throwable t) {} + } catch (Throwable ignored) {} } } } @@ -290,18 +299,25 @@ public void testContentAddressableStream() { */ @Test public void testUploadJarFilesHelper() throws Exception { + uploadJarFile(getBlobServer(), getBlobClientConfig()); + } + + /** + * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. + */ + static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception { final File testFile = File.createTempFile("testfile", ".dat"); testFile.deleteOnExit(); prepareTestFile(testFile); - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); - List blobKeys = BlobClient.uploadJarFiles(serverAddress, blobServiceConfig, + List blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig, Collections.singletonList(new Path(testFile.toURI()))); assertEquals(1, blobKeys.size()); - try (BlobClient blobClient = new BlobClient(serverAddress, blobServiceConfig)) { + try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { InputStream is = blobClient.get(blobKeys.get(0)); validateGet(is, testFile); } From ff083e850edb8f5f383f54f19519116e10308d61 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 23 Jun 2017 11:40:34 +0200 Subject: [PATCH 2/6] [FLINK-7053][blob] verify some of the buffers returned by GET --- .../apache/flink/runtime/blob/BlobClientTest.java | 7 +++++-- .../flink/runtime/blob/BlobServerGetTest.java | 13 +++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 5a772e775d1e0..7485893e4b251 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -135,19 +136,21 @@ private static BlobKey prepareTestFile(File file) throws IOException { * thrown if an I/O error occurs while reading the input stream */ private static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException { + byte[] receivedBuffer = new byte[buf.length]; int bytesReceived = 0; while (true) { - final int read = inputStream.read(buf, bytesReceived, buf.length - bytesReceived); + final int read = inputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived); if (read < 0) { throw new EOFException(); } bytesReceived += read; - if (bytesReceived == buf.length) { + if (bytesReceived == receivedBuffer.length) { assertEquals(-1, inputStream.read()); + assertArrayEquals(buf, receivedBuffer); return; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 3209648d11ae9..44f39910f3502 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -131,9 +131,10 @@ public void testGetFailsDuringStreaming() throws IOException { // issue a GET request that succeeds InputStream is = client.get(key); - byte[] receiveBuffer = new byte[50000]; - BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); - BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); + byte[] receiveBuffer = new byte[data.length]; + int firstChunkLen = 50000; + BlobUtils.readFully(is, receiveBuffer, 0, firstChunkLen, null); + BlobUtils.readFully(is, receiveBuffer, firstChunkLen, firstChunkLen, null); // shut down the server for (BlobServerConnection conn : server.getCurrentActiveConnections()) { @@ -141,10 +142,10 @@ public void testGetFailsDuringStreaming() throws IOException { } try { - byte[] remainder = new byte[data.length - 2*receiveBuffer.length]; - BlobUtils.readFully(is, remainder, 0, remainder.length, null); + BlobUtils.readFully(is, receiveBuffer, 2 * firstChunkLen, data.length - 2 * firstChunkLen, null); // we tolerate that this succeeds, as the receiver socket may have buffered - // everything already + // everything already, but in this case, also verify the contents + assertArrayEquals(data, receiveBuffer); } catch (IOException e) { // expected From e138569019f5c75b70a21085e4829cb6cd1e93bc Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 23 Jun 2017 12:04:10 +0200 Subject: [PATCH 3/6] [FLINK-7053][blob] use TemporaryFolder for local BLOB dir in unit tests This replaces the use of some temporary directory where it is not guaranteed that it will be deleted after the test. --- .../org/apache/flink/hdfstests/HDFSTest.java | 8 +++++ .../runtime/blob/BlobCacheRetriesTest.java | 11 ++++++- .../runtime/blob/BlobCacheSuccessTest.java | 28 ++++++++++------ .../flink/runtime/blob/BlobClientSslTest.java | 11 +++++-- .../flink/runtime/blob/BlobClientTest.java | 9 ++++++ .../runtime/blob/BlobRecoveryITCase.java | 4 ++- .../runtime/blob/BlobServerDeleteTest.java | 24 +++++++++++--- .../flink/runtime/blob/BlobServerGetTest.java | 14 +++++--- .../flink/runtime/blob/BlobServerPutTest.java | 32 +++++++++++++++---- .../runtime/blob/BlobServerRangeTest.java | 10 ++++++ .../flink/runtime/blob/BlobUtilsTest.java | 19 ++++++----- .../BlobLibraryCacheManagerTest.java | 15 +++++++++ .../BlobLibraryCacheRecoveryITCase.java | 9 ++++-- 13 files changed, 151 insertions(+), 43 deletions(-) diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index c490c9fb4816d..f70845bf14c34 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -48,7 +49,9 @@ import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -70,6 +73,9 @@ public class HDFSTest { private org.apache.hadoop.fs.Path hdPath; protected org.apache.hadoop.fs.FileSystem hdfs; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @BeforeClass public static void verifyOS() { Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); @@ -242,6 +248,8 @@ public void testBlobServerRecovery() throws Exception { config = new org.apache.flink.configuration.Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); BlobStoreService blobStoreService = null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index 1cf77eab26c24..366b592a4b322 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.junit.Rule; @@ -45,6 +46,8 @@ public class BlobCacheRetriesTest { @Test public void testBlobFetchRetries() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); testBlobFetchRetries(config, new VoidBlobStore()); } @@ -56,9 +59,11 @@ public void testBlobFetchRetries() throws IOException { @Test public void testBlobFetchRetriesHa() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath()); + temporaryFolder.newFolder().getPath()); BlobStoreService blobStoreService = null; @@ -136,6 +141,8 @@ private void testBlobFetchRetries(final Configuration config, final BlobStore bl @Test public void testBlobFetchWithTooManyFailures() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); testBlobFetchWithTooManyFailures(config, new VoidBlobStore()); } @@ -147,6 +154,8 @@ public void testBlobFetchWithTooManyFailures() throws IOException { @Test public void testBlobFetchWithTooManyFailuresHa() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 2a65a3b246787..19b129d902319 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.junit.Rule; @@ -52,6 +53,9 @@ public class BlobCacheSuccessTest { @Test public void testBlobCache() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + uploadFileGetTest(config, false, false); } @@ -63,9 +67,11 @@ public void testBlobCache() throws IOException { @Test public void testBlobCacheHa() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath()); + temporaryFolder.newFolder().getPath()); uploadFileGetTest(config, true, true); } @@ -76,9 +82,11 @@ public void testBlobCacheHa() throws IOException { @Test public void testBlobCacheHaFallback() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath()); + temporaryFolder.newFolder().getPath()); uploadFileGetTest(config, false, false); } @@ -92,15 +100,15 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit BlobCache blobCache = null; BlobStoreService blobStoreService = null; try { - final Configuration cacheConfig; - if (cacheHasAccessToFs) { - cacheConfig = config; - } else { - // just in case parameters are still read from the server, - // create a separate configuration object for the cache - cacheConfig = new Configuration(config); + final Configuration cacheConfig = new Configuration(config); + cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + if (!cacheHasAccessToFs) { + // make sure the cache cannot access the HA store directly + cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); + temporaryFolder.newFolder().getPath() + "/does-not-exist"); } blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index db363e7505aa7..19dc61af7379d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -23,12 +23,12 @@ import org.apache.flink.configuration.SecurityOptions; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.IOException; -import static org.junit.Assert.fail; - /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ @@ -46,12 +46,17 @@ public class BlobClientSslTest extends BlobClientTest { /** The non-SSL blob service client configuration with SSL-enabled security options. */ private static Configuration nonSslClientConfig; + @ClassRule + public static TemporaryFolder temporarySslFolder = new TemporaryFolder(); + /** * Starts the SSL enabled BLOB server. */ @BeforeClass public static void startSSLServer() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); @@ -67,6 +72,8 @@ public static void startSSLServer() throws IOException { @BeforeClass public static void startNonSSLServer() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 7485893e4b251..2932f41daea2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.EOFException; import java.io.File; @@ -53,12 +56,18 @@ public class BlobClientTest { /** The blob service (non-ssl) client configuration */ static Configuration clientConfig; + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Starts the BLOB server. */ @BeforeClass public static void startServer() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index c2a3a7a600bb5..3c7711d86bc68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -54,7 +55,8 @@ public void testBlobServerRecovery() throws Exception { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath()); BlobStoreService blobStoreService = null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 7100e792d66e7..c2f2cd26fe2f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -18,13 +18,16 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -52,6 +55,9 @@ public class BlobServerDeleteTest extends TestLogger { private final Random rnd = new Random(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test public void testDeleteSingleByBlobKey() { BlobServer server = null; @@ -59,7 +65,9 @@ public void testDeleteSingleByBlobKey() { BlobStore blobStore = new VoidBlobStore(); try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -125,7 +133,9 @@ public void testDeleteAlreadyDeletedByBlobKey() { BlobStore blobStore = new VoidBlobStore(); try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -172,7 +182,9 @@ public void testDeleteByBlobKeyFails() { File blobFile = null; File directory = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -223,7 +235,9 @@ public void testDeleteByBlobKeyFails() { */ @Test public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException { - final Configuration configuration = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + final BlobStore blobStore = mock(BlobStore.class); final int concurrentDeleteOperations = 3; @@ -233,7 +247,7 @@ public void testConcurrentDeleteOperations() throws IOException, ExecutionExcept final byte[] data = {1, 2, 3}; - try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + try (final BlobServer blobServer = new BlobServer(config, blobStore)) { final BlobKey blobKey; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 44f39910f3502..8e88bf7e93d99 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -75,7 +75,9 @@ public void testGetFailsDuringLookup() throws IOException { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -115,7 +117,9 @@ public void testGetFailsDuringStreaming() throws IOException { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -167,9 +171,9 @@ public void testGetFailsDuringStreaming() throws IOException { */ @Test public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException { - final Configuration configuration = new Configuration(); - configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); final BlobStore blobStore = mock(BlobStore.class); @@ -199,7 +203,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations); - try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + try (final BlobServer blobServer = new BlobServer(config, blobStore)) { for (int i = 0; i < numberConcurrentGetOperations; i++) { Future getOperation = FlinkCompletableFuture.supplyAsync(new Callable() { @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index 8b8ddf98047a9..0fe2de46ffe06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.concurrent.Future; @@ -26,7 +27,9 @@ import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; import java.io.File; @@ -63,6 +66,8 @@ public class BlobServerPutTest extends TestLogger { private final Random rnd = new Random(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); // --- concurrency tests for utility methods which could fail during the put operation --- @@ -89,7 +94,10 @@ public void go() throws Exception { */ @Test public void testServerContentAddressableGetStorageLocationConcurrent() throws Exception { - BlobServer server = new BlobServer(new Configuration(), new VoidBlobStore()); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + BlobServer server = new BlobServer(config, new VoidBlobStore()); try { BlobKey key = new BlobKey(); @@ -132,7 +140,9 @@ public void testPutBufferSuccessful() throws IOException { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -186,7 +196,9 @@ public void testPutStreamSuccessful() throws IOException { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -220,7 +232,9 @@ public void testPutChunkedStreamSuccessful() throws IOException { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -253,7 +267,9 @@ public void testPutBufferFails() throws IOException { File tempFileDir = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir @@ -306,7 +322,9 @@ public void testPutBufferFails() throws IOException { */ @Test public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException { - final Configuration configuration = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobStore blobStore = mock(BlobStore.class); int concurrentPutOperations = 2; int dataSize = 1024; @@ -319,7 +337,7 @@ public void testConcurrentPutOperations() throws IOException, ExecutionException ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations); try ( - final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + final BlobServer blobServer = new BlobServer(config, blobStore)) { for (int i = 0; i < concurrentPutOperations; i++) { Future putFuture = FlinkCompletableFuture.supplyAsync(new Callable() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index 120d86a320fde..834d3674e3f54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -23,7 +23,9 @@ import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.net.ServerSocket; @@ -32,6 +34,10 @@ * Tests to ensure that the BlobServer properly starts on a specified range of available ports. */ public class BlobServerRangeTest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Start blob server on 0 = pick an ephemeral port */ @@ -39,6 +45,8 @@ public class BlobServerRangeTest extends TestLogger { public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, "0"); + conf.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); srv.close(); } @@ -60,6 +68,7 @@ public void testPortUnavailable() throws IOException { Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, String.valueOf(socket.getLocalPort())); + conf.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); // this thing is going to throw an exception try { @@ -89,6 +98,7 @@ public void testOnePortAvailable() throws IOException { int availablePort = NetUtils.getAvailablePort(); Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort); + conf.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); // this thing is going to throw an exception try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java index 081e28cd9de13..df9b254a5845b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java @@ -27,7 +27,9 @@ import org.apache.flink.util.OperatingSystem; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -36,26 +38,23 @@ public class BlobUtilsTest { private final static String CANNOT_CREATE_THIS = "cannot-create-this"; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private File blobUtilsTestDirectory; @Before - public void before() { - // Prepare test directory - blobUtilsTestDirectory = Files.createTempDir(); - + public void before() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. + // Prepare test directory + blobUtilsTestDirectory = temporaryFolder.newFolder(); + assertTrue(blobUtilsTestDirectory.setExecutable(true, false)); assertTrue(blobUtilsTestDirectory.setReadable(true, false)); assertTrue(blobUtilsTestDirectory.setWritable(false, false)); } - @After - public void after() { - // Cleanup test directory - assertTrue(blobUtilsTestDirectory.delete()); - } - @Test(expected = IOException.class) public void testExceptionOnCreateStorageDirectoryFailure() throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index a72729417ecf3..9d2bd558a8bbe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; @@ -27,7 +28,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.util.OperatingSystem; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import static org.junit.Assert.*; import static org.junit.Assume.assumeTrue; @@ -43,6 +46,9 @@ public class BlobLibraryCacheManagerTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link * BlobLibraryCacheManager#unregisterJob(JobID)}. @@ -59,6 +65,9 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE try { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress, config); @@ -179,6 +188,9 @@ public void testLibraryCacheManagerTaskCleanup() throws IOException, Interrupted try { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress, config); @@ -249,6 +261,9 @@ public void testRegisterAndDownload() throws IOException { try { // create the blob transfer services Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); cache = new BlobCache(serverAddress, config, new VoidBlobStore()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 16e3a05f2b9e8..02f121baedc39 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.execution.librarycache; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -70,7 +71,10 @@ public void testRecoveryRegisterAndDownload() throws Exception { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.newFolder().getAbsolutePath()); try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); @@ -153,7 +157,8 @@ public void testRecoveryRegisterAndDownload() throws Exception { // Verify everything is clean below recoveryDir/ final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); - File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId); + String haBlobStorePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH); + File haBlobStoreDir = new File(haBlobStorePath, clusterId); File[] recoveryFiles = haBlobStoreDir.listFiles(); assertNotNull("HA storage directory does not exist", recoveryFiles); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); From 70f06d9b38c5d8afb50b6c18b7b3a4e07e2bb3da Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 21 Jun 2017 14:45:31 +0200 Subject: [PATCH 4/6] [FLINK-7054][blob] remove LibraryCacheManager#getFile() This was only used in tests where it is avoidable but if used anywhere else, it may have caused cleanup issues. --- .../librarycache/BlobLibraryCacheManager.java | 36 +++++-------------- .../FallbackLibraryCacheManager.java | 9 +---- .../librarycache/LibraryCacheManager.java | 9 ----- .../BlobLibraryCacheManagerTest.java | 23 ++++++------ .../BlobLibraryCacheRecoveryITCase.java | 6 ++-- 5 files changed, 24 insertions(+), 59 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index 0c4cb85921797..0387725db0a0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -18,7 +18,14 @@ package org.apache.flink.runtime.execution.librarycache; -import java.io.File; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.BlobService; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.net.URL; import java.util.Arrays; @@ -32,15 +39,6 @@ import java.util.Timer; import java.util.TimerTask; -import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.blob.BlobService; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; -import org.apache.flink.util.ExceptionUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -51,8 +49,6 @@ *

* All files registered via {@link #registerJob(JobID, Collection, Collection)} are reference-counted * and are removed by a timer-based cleanup task if their reference counter is zero. - * NOTE: this does not apply to files that enter the blob service via - * {@link #getFile(BlobKey)}! */ public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager { @@ -202,22 +198,6 @@ public ClassLoader getClassLoader(JobID id) { } } - /** - * Returns a file handle to the file identified by the blob key. - *

- * NOTE: if not already registered during - * {@link #registerJob(JobID, Collection, Collection)}, files that enter the library cache / - * backing blob store using this method will not be reference-counted and garbage-collected! - * - * @param blobKey identifying the requested file - * @return File handle - * @throws IOException if any error occurs when retrieving the file - */ - @Override - public File getFile(BlobKey blobKey) throws IOException { - return new File(blobService.getURL(blobKey).getFile()); - } - public int getBlobServerPort() { return blobService.getPort(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java index 1ef6e319e1ba8..8e14e5867263a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java @@ -18,14 +18,12 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.net.URL; import java.util.Collection; @@ -38,11 +36,6 @@ public ClassLoader getClassLoader(JobID id) { return getClass().getClassLoader(); } - @Override - public File getFile(BlobKey blobKey) throws IOException { - throw new IOException("There is no file associated to the blob key " + blobKey); - } - @Override public void registerJob(JobID id, Collection requiredJarFiles, Collection requiredClasspaths) { LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java index bf052718bc8a5..b01829e288aa3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java @@ -36,15 +36,6 @@ public interface LibraryCacheManager { */ ClassLoader getClassLoader(JobID id); - /** - * Returns a file handle to the file identified by the blob key. - * - * @param blobKey identifying the requested file - * @return File handle - * @throws IOException if any error occurs when retrieving the file - */ - File getFile(BlobKey blobKey) throws IOException; - /** * Registers a job with its required jar files and classpaths. The jar files are identified by their blob keys. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 9d2bd558a8bbe..606d8c9a0491e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; @@ -82,7 +83,7 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval); libraryCacheManager.registerJob(jid, keys, Collections.emptyList()); - assertEquals(2, checkFilesExist(keys, libraryCacheManager, true)); + assertEquals(2, checkFilesExist(keys, server, true)); assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid)); @@ -104,7 +105,7 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid)); // the blob cache should no longer contain the files - assertEquals(0, checkFilesExist(keys, libraryCacheManager, false)); + assertEquals(0, checkFilesExist(keys, server, false)); try { server.getURL(keys.get(0)); @@ -144,21 +145,21 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE * * @param keys * blob keys to check - * @param libraryCacheManager - * cache manager to use + * @param blobService + * BLOB store to use * @param doThrow * whether exceptions should be ignored (false), or throws (true) * - * @return number of files we were able to retrieve via {@link BlobLibraryCacheManager#getFile(BlobKey)} + * @return number of files we were able to retrieve via {@link BlobService#getURL(BlobKey)} */ - private int checkFilesExist( - List keys, BlobLibraryCacheManager libraryCacheManager, boolean doThrow) + private static int checkFilesExist( + List keys, BlobService blobService, boolean doThrow) throws IOException { int numFiles = 0; for (BlobKey key : keys) { try { - libraryCacheManager.getFile(key); + blobService.getURL(key); ++numFiles; } catch (IOException e) { if (doThrow) { @@ -204,13 +205,13 @@ public void testLibraryCacheManagerTaskCleanup() throws IOException, Interrupted libraryCacheManager.registerTask(jid, executionId1, keys, Collections.emptyList()); libraryCacheManager.registerTask(jid, executionId2, keys, Collections.emptyList()); - assertEquals(2, checkFilesExist(keys, libraryCacheManager, true)); + assertEquals(2, checkFilesExist(keys, server, true)); assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid)); libraryCacheManager.unregisterTask(jid, executionId1); - assertEquals(2, checkFilesExist(keys, libraryCacheManager, true)); + assertEquals(2, checkFilesExist(keys, server, true)); assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid)); @@ -232,7 +233,7 @@ public void testLibraryCacheManagerTaskCleanup() throws IOException, Interrupted assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid)); // the blob cache should no longer contain the files - assertEquals(0, checkFilesExist(keys, libraryCacheManager, false)); + assertEquals(0, checkFilesExist(keys, server, false)); bc.close(); } finally { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 02f121baedc39..e5efd19ba59f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -107,7 +107,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { libServer[0].registerTask(jobId, executionId, keys, Collections.emptyList()); // Verify key 1 - File f = libCache.getFile(keys.get(0)); + File f = new File(cache.getURL(keys.get(0)).toURI()); assertEquals(expected.length, f.length()); try (FileInputStream fis = new FileInputStream(f)) { @@ -126,7 +126,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Verify key 1 - f = libCache.getFile(keys.get(0)); + f = new File(cache.getURL(keys.get(0)).toURI()); assertEquals(expected.length, f.length()); try (FileInputStream fis = new FileInputStream(f)) { @@ -138,7 +138,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { } // Verify key 2 - f = libCache.getFile(keys.get(1)); + f = new File(cache.getURL(keys.get(1)).toURI()); assertEquals(256, f.length()); try (FileInputStream fis = new FileInputStream(f)) { From 5463f65aeb3ed519b51ccc78abfcc003bf02b3f8 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Sun, 16 Jul 2017 23:13:56 +0200 Subject: [PATCH 5/6] [FLINK-7054][hotfix] fix a checkstyle error --- .../runtime/execution/librarycache/LibraryCacheManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java index b01829e288aa3..5f9f4433a6af4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java @@ -22,7 +22,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; -import java.io.File; import java.io.IOException; import java.net.URL; import java.util.Collection; From 12052c0b8c8b869f9b6bd6f9d53c9ed6e362c631 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 21 Jun 2017 16:14:15 +0200 Subject: [PATCH 6/6] [FLINK-7055][blob] refactor getURL() to the more generic getFile() The fact that we always returned URL objects is a relic of the BlobServer's only use for URLClassLoader. Since we'd like to extend its use, returning File objects instead is more generic. --- .../handlers/TaskManagerLogHandler.java | 2 +- .../apache/flink/runtime/blob/BlobCache.java | 17 ++++++------ .../apache/flink/runtime/blob/BlobServer.java | 18 ++++++------- .../flink/runtime/blob/BlobService.java | 8 +++--- .../flink/runtime/client/JobClient.java | 2 +- .../librarycache/BlobLibraryCacheManager.java | 2 +- .../runtime/blob/BlobCacheRetriesTest.java | 4 +-- .../runtime/blob/BlobCacheSuccessTest.java | 27 ++++++------------- .../runtime/blob/BlobServerDeleteTest.java | 4 +-- .../BlobLibraryCacheManagerTest.java | 8 +++--- .../BlobLibraryCacheRecoveryITCase.java | 6 ++--- 11 files changed, 44 insertions(+), 54 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 1084623950748..675dcd6cfc1cf 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -238,7 +238,7 @@ public Future apply(Tuple2 value) { lastSubmittedFile.put(taskManagerID, blobKey); } try { - return FlinkCompletableFuture.completed(blobCache.getURL(blobKey).getFile()); + return FlinkCompletableFuture.completed(blobCache.getFile(blobKey).getAbsolutePath()); } catch (IOException e) { return FlinkCompletableFuture.completedExceptionally( new Exception("Could not retrieve blob for " + blobKey + '.', e)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 32bd8fd83ed32..3e195374a71d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -30,7 +30,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.URL; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkArgument; @@ -39,7 +38,7 @@ /** * The BLOB cache implements a local cache for content-addressable BLOBs. * - *

When requesting BLOBs through the {@link BlobCache#getURL} methods, the + *

When requesting BLOBs through the {@link BlobCache#getFile(BlobKey)} method, the * BLOB cache will first attempt to serve the file from its local cache. Only if * the local cache does not contain the desired BLOB, the BLOB cache will try to * download it from a distributed file system (if available) or the BLOB @@ -111,21 +110,22 @@ public BlobCache( } /** - * Returns the URL for the BLOB with the given key. The method will first attempt to serve + * Returns local copy of the file for the BLOB with the given key. The method will first attempt to serve * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it * from this cache's BLOB server. * * @param requiredBlob The key of the desired BLOB. - * @return URL referring to the local storage location of the BLOB. + * @return file referring to the local storage location of the BLOB. * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ - public URL getURL(final BlobKey requiredBlob) throws IOException { + @Override + public File getFile(final BlobKey requiredBlob) throws IOException { checkArgument(requiredBlob != null, "BLOB key cannot be null."); final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); if (localJarFile.exists()) { - return localJarFile.toURI().toURL(); + return localJarFile; } // first try the distributed blob store (if available) @@ -136,7 +136,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { } if (localJarFile.exists()) { - return localJarFile.toURI().toURL(); + return localJarFile; } // fallback: download from the BlobServer @@ -160,7 +160,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { } // success, we finished - return localJarFile.toURI().toURL(); + return localJarFile; } catch (Throwable t) { String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + @@ -188,6 +188,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { * Deletes the file associated with the given key from the BLOB cache. * @param key referring to the file to be deleted */ + @Override public void delete(BlobKey key) throws IOException{ final File localFile = BlobUtils.getStorageLocation(storageDir, key); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index ecb452701dcda..add9f7f7cdd4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.URL; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -334,22 +333,23 @@ public BlobClient createClient() throws IOException { } /** - * Method which retrieves the URL of a file associated with a blob key. The blob server looks - * the blob key up in its local storage. If the file exists, then the URL is returned. If the - * file does not exist, then a FileNotFoundException is thrown. + * Method which retrieves the local path of a file associated with a blob key. The blob server + * looks the blob key up in its local storage. If the file exists, it is returned. If the + * file does not exist, it is retrieved from the HA blob store (if available) or a + * FileNotFoundException is thrown. * * @param requiredBlob blob key associated with the requested file - * @return URL of the file - * @throws IOException + * @return file referring to the local storage location of the BLOB. + * @throws IOException Thrown if the file retrieval failed. */ @Override - public URL getURL(BlobKey requiredBlob) throws IOException { + public File getFile(BlobKey requiredBlob) throws IOException { checkArgument(requiredBlob != null, "BLOB key cannot be null."); final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); if (localFile.exists()) { - return localFile.toURI().toURL(); + return localFile; } else { try { @@ -361,7 +361,7 @@ public URL getURL(BlobKey requiredBlob) throws IOException { } if (localFile.exists()) { - return localFile.toURI().toURL(); + return localFile; } else { throw new FileNotFoundException("Local file " + localFile + " does not exist " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index c1447c849eeb3..1e56f26cabf3e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.blob; import java.io.Closeable; +import java.io.File; import java.io.IOException; -import java.net.URL; /** * A simple store and retrieve binary large objects (BLOBs). @@ -28,14 +28,14 @@ public interface BlobService extends Closeable { /** - * Returns the URL of the file associated with the provided blob key. + * Returns the path to a local copy of the file associated with the provided blob key. * * @param key blob key associated with the requested file - * @return The URL to the file. + * @return The path to the file. * @throws java.io.FileNotFoundException when the path does not exist; * @throws IOException if any other error occurs when retrieving the file */ - URL getURL(BlobKey key) throws IOException; + File getFile(BlobKey key) throws IOException; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 6a495646d9dde..2ec7eaa5fd279 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -229,7 +229,7 @@ public static ClassLoader retrieveClassLoader( int pos = 0; for (BlobKey blobKey : props.requiredJarFiles()) { try { - allURLs[pos++] = blobClient.getURL(blobKey); + allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL(); } catch (Exception e) { try { blobClient.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index 0387725db0a0b..9aff6f94c7b67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -254,7 +254,7 @@ private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException // it is important that we fetch the URL before increasing the counter. // in case the URL cannot be created (failed to fetch the BLOB), we have no stale counter try { - URL url = blobService.getURL(key); + URL url = blobService.getFile(key).toURI().toURL(); Integer references = blobKeyReferenceCounters.get(key); int newReferences = references == null ? 1 : references + 1; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index 366b592a4b322..fe763fa20dad7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -115,7 +115,7 @@ private void testBlobFetchRetries(final Configuration config, final BlobStore bl cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // trigger a download - it should fail the first two times, but retry, and succeed eventually - URL url = cache.getURL(key); + URL url = cache.getFile(key).toURI().toURL(); InputStream is = url.openStream(); try { byte[] received = new byte[data.length]; @@ -211,7 +211,7 @@ private void testBlobFetchWithTooManyFailures(final Configuration config, final // trigger a download - it should fail eventually try { - cache.getURL(key); + cache.getFile(key); fail("This should fail"); } catch (IOException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 19b129d902319..171143826b2cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -28,15 +28,12 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.net.URL; import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * This class contains unit tests for the {@link BlobCache}. @@ -141,7 +138,7 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService); for (BlobKey blobKey : blobKeys) { - blobCache.getURL(blobKey); + blobCache.getFile(blobKey); } if (blobServer != null) { @@ -150,28 +147,20 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit blobServer = null; } - final URL[] urls = new URL[blobKeys.size()]; + final File[] files = new File[blobKeys.size()]; for(int i = 0; i < blobKeys.size(); i++){ - urls[i] = blobCache.getURL(blobKeys.get(i)); + files[i] = blobCache.getFile(blobKeys.get(i)); } // Verify the result - assertEquals(blobKeys.size(), urls.length); + assertEquals(blobKeys.size(), files.length); - for (final URL url : urls) { + for (final File file : files) { + assertNotNull(file); - assertNotNull(url); - - try { - final File cachedFile = new File(url.toURI()); - - assertTrue(cachedFile.exists()); - assertEquals(buf.length, cachedFile.length()); - - } catch (URISyntaxException e) { - fail(e.getMessage()); - } + assertTrue(file.exists()); + assertEquals(buf.length, file.length()); } } finally { if (blobServer != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index c2f2cd26fe2f2..6d9afd62d2279 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -110,7 +110,7 @@ public void testDeleteSingleByBlobKey() { // delete a file directly on the server server.delete(key2); try { - server.getURL(key2); + server.getFile(key2); fail("BLOB should have been deleted"); } catch (IOException e) { @@ -210,7 +210,7 @@ public void testDeleteByBlobKeyFails() { server.delete(key); // the file should still be there - server.getURL(key); + server.getFile(key); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 606d8c9a0491e..476fdcbf3c458 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -108,13 +108,13 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE assertEquals(0, checkFilesExist(keys, server, false)); try { - server.getURL(keys.get(0)); + server.getFile(keys.get(0)); fail("name-addressable BLOB should have been deleted"); } catch (IOException e) { // expected } try { - server.getURL(keys.get(1)); + server.getFile(keys.get(1)); fail("name-addressable BLOB should have been deleted"); } catch (IOException e) { // expected @@ -150,7 +150,7 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE * @param doThrow * whether exceptions should be ignored (false), or throws (true) * - * @return number of files we were able to retrieve via {@link BlobService#getURL(BlobKey)} + * @return number of files we were able to retrieve via {@link BlobService#getFile(BlobKey)} */ private static int checkFilesExist( List keys, BlobService blobService, boolean doThrow) @@ -159,7 +159,7 @@ private static int checkFilesExist( for (BlobKey key : keys) { try { - blobService.getURL(key); + blobService.getFile(key); ++numFiles; } catch (IOException e) { if (doThrow) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index e5efd19ba59f7..b19835bc8c4a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -107,7 +107,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { libServer[0].registerTask(jobId, executionId, keys, Collections.emptyList()); // Verify key 1 - File f = new File(cache.getURL(keys.get(0)).toURI()); + File f = cache.getFile(keys.get(0)); assertEquals(expected.length, f.length()); try (FileInputStream fis = new FileInputStream(f)) { @@ -126,7 +126,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Verify key 1 - f = new File(cache.getURL(keys.get(0)).toURI()); + f = cache.getFile(keys.get(0)); assertEquals(expected.length, f.length()); try (FileInputStream fis = new FileInputStream(f)) { @@ -138,7 +138,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { } // Verify key 2 - f = new File(cache.getURL(keys.get(1)).toURI()); + f = cache.getFile(keys.get(1)); assertEquals(256, f.length()); try (FileInputStream fis = new FileInputStream(f)) {