From 6db7b5b48828b5323707b560a787146eaa820baa Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 17 Aug 2017 12:04:09 +0200 Subject: [PATCH 1/2] [FLINK-7056][tests][hotfix] make sure the client and a created InputStream are closed If not and the server has not yet sent all data packets, it may still occupy the read lock and block any writing operations (also see FLINK-7467). --- .../flink/runtime/blob/BlobClientTest.java | 55 +++++++++---------- .../runtime/blob/BlobServerDeleteTest.java | 10 ++-- .../flink/runtime/blob/BlobServerGetTest.java | 7 ++- .../flink/runtime/blob/BlobServerPutTest.java | 24 ++------ 4 files changed, 43 insertions(+), 53 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index cfec4c5f51494..b28b3b97c418d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -139,30 +139,35 @@ private static BlobKey prepareTestFile(File file) throws IOException { * the specified buffer. * * @param inputStream - * the input stream returned from the GET operation + * the input stream returned from the GET operation (will be closed by this method) * @param buf * the buffer to compare the input stream's data to * @throws IOException * thrown if an I/O error occurs while reading the input stream */ static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException { - byte[] receivedBuffer = new byte[buf.length]; + try { + byte[] receivedBuffer = new byte[buf.length]; - int bytesReceived = 0; + int bytesReceived = 0; - while (true) { + while (true) { - final int read = inputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived); - if (read < 0) { - throw new EOFException(); - } - bytesReceived += read; + final int read = inputStream + .read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived); + if (read < 0) { + throw new EOFException(); + } + bytesReceived += read; - if (bytesReceived == receivedBuffer.length) { - assertEquals(-1, inputStream.read()); - assertArrayEquals(buf, receivedBuffer); - return; + if (bytesReceived == receivedBuffer.length) { + assertEquals(-1, inputStream.read()); + assertArrayEquals(buf, receivedBuffer); + return; + } } + } finally { + inputStream.close(); } } @@ -171,7 +176,7 @@ static void validateGet(final InputStream inputStream, final byte[] buf) throws * the specified file. * * @param inputStream - * the input stream returned from the GET operation + * the input stream returned from the GET operation (will be closed by this method) * @param file * the file to compare the input stream's data to * @throws IOException @@ -200,6 +205,7 @@ private static void validateGet(final InputStream inputStream, final File file) if (inputStream2 != null) { inputStream2.close(); } + inputStream.close(); } } @@ -231,14 +237,11 @@ public void testContentAddressableBuffer() { assertEquals(origKey, receivedKey); // Retrieve the data - InputStream is = client.get(receivedKey); - validateGet(is, testBuffer); - is = client.get(jobId, receivedKey); - validateGet(is, testBuffer); + validateGet(client.get(receivedKey), testBuffer); + validateGet(client.get(jobId, receivedKey), testBuffer); // Check reaction to invalid keys - try { - client.get(new BlobKey()); + try (InputStream ignored = client.get(new BlobKey())) { fail("Expected IOException did not occur"); } catch (IOException fnfe) { @@ -246,8 +249,7 @@ public void testContentAddressableBuffer() { } // new client needed (closed from failure above) client = new BlobClient(serverAddress, getBlobClientConfig()); - try { - client.get(jobId, new BlobKey()); + try (InputStream ignored = client.get(jobId, new BlobKey())) { fail("Expected IOException did not occur"); } catch (IOException fnfe) { @@ -308,10 +310,8 @@ public void testContentAddressableStream() { is = null; // Retrieve the data - is = client.get(receivedKey); - validateGet(is, testFile); - is = client.get(jobId, receivedKey); - validateGet(is, testFile); + validateGet(client.get(receivedKey), testFile); + validateGet(client.get(jobId, receivedKey), testFile); } catch (Exception e) { e.printStackTrace(); @@ -362,8 +362,7 @@ private static void uploadJarFile( assertEquals(1, blobKeys.size()); try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { - InputStream is = blobClient.get(blobKeys.get(0)); - validateGet(is, testFile); + validateGet(blobClient.get(blobKeys.get(0)), testFile); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index d91aae420f852..fd2bc04f0a260 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -31,6 +31,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -99,8 +100,7 @@ public void testDeleteSingleByBlobKey() { client.close(); client = new BlobClient(serverAddress, config); - try { - client.get(key1); + try (InputStream ignored = client.get(key1)) { fail("BLOB should have been deleted"); } catch (IOException e) { @@ -111,12 +111,14 @@ public void testDeleteSingleByBlobKey() { client = new BlobClient(serverAddress, config); try { - client.get(jobId, key1); + // NOTE: the server will stall in its send operation until either the data is fully + // read or the socket is closed, e.g. via a client.close() call + BlobClientTest.validateGet(client.get(jobId, key1), data); } catch (IOException e) { - // expected fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key"); } + client.close(); // delete a file directly on the server server.delete(key2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 5ad8d95928a1c..5ecb812290937 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.apache.flink.runtime.blob.BlobClientTest.validateGet; import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -122,7 +123,7 @@ private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2) assertNotNull(key); assertEquals(key, key2); // request for jobId2 should succeed - getFileHelper(client, jobId2, key); + validateGet(getFileHelper(client, jobId2, key), data); // request for jobId1 should still fail client = verifyDeleted(client, jobId1, key, serverAddress, config); @@ -160,8 +161,7 @@ private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2) private static BlobClient verifyDeleted( BlobClient client, JobID jobId, BlobKey key, InetSocketAddress serverAddress, Configuration config) throws IOException { - try { - getFileHelper(client, jobId, key); + try (InputStream ignored = getFileHelper(client, jobId, key)) { fail("This should not succeed."); } catch (IOException e) { // expected @@ -227,6 +227,7 @@ private void testGetFailsDuringStreaming(final JobID jobId) throws IOException { catch (IOException e) { // expected } + is.close(); } finally { if (client != null) { client.close(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index f55adb7a34c3c..6fcd9e4728b41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -47,7 +47,6 @@ import java.util.concurrent.Executors; import static org.apache.flink.runtime.blob.BlobServerGetTest.getFileHelper; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -226,9 +225,9 @@ private void testPutBufferSuccessfulGet(final JobID jobId1, final JobID jobId2) * @param jobId * job ID or null if job-unrelated * @param key1 - * first key + * first key for 44 bytes starting at byte 10 of data in the BLOB * @param key2 - * second key + * second key for the complete data in the BLOB * @param data * expected data * @param serverAddress @@ -241,12 +240,9 @@ private static void testPutBufferSuccessfulGet( InetSocketAddress serverAddress, Configuration config) throws IOException { BlobClient client = new BlobClient(serverAddress, config); - InputStream is1 = null; - InputStream is2 = null; - try { - // one get request on the same client - is1 = getFileHelper(client, jobId, key2); + // one get request on the same client + try (InputStream is1 = getFileHelper(client, jobId, key2)) { byte[] result1 = new byte[44]; BlobUtils.readFully(is1, result1, 0, result1.length, null); is1.close(); @@ -255,20 +251,12 @@ private static void testPutBufferSuccessfulGet( assertEquals(data[j], result1[i]); } - // close the client and create a new one for the remaining requests + // close the client and create a new one for the remaining request client.close(); client = new BlobClient(serverAddress, config); - is2 = getFileHelper(client, jobId, key1); - BlobClientTest.validateGet(is2, data); - is2.close(); + BlobClientTest.validateGet(getFileHelper(client, jobId, key1), data); } finally { - if (is1 != null) { - is1.close(); - } - if (is2 != null) { - is1.close(); - } client.close(); } } From 56d4c3a8a53f3fa2459fd1b5fe6e310ec78c9d36 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 17 Aug 2017 12:24:07 +0200 Subject: [PATCH 2/2] [FLINK-7056] address PR comments --- .../apache/flink/runtime/blob/BlobClientTest.java | 14 +++++++------- .../flink/runtime/blob/BlobServerDeleteTest.java | 3 ++- .../flink/runtime/blob/BlobServerGetTest.java | 4 ++-- .../flink/runtime/blob/BlobServerPutTest.java | 3 ++- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index b28b3b97c418d..d511e86cfeacf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -145,7 +145,7 @@ private static BlobKey prepareTestFile(File file) throws IOException { * @throws IOException * thrown if an I/O error occurs while reading the input stream */ - static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException { + static void validateGetAndClose(final InputStream inputStream, final byte[] buf) throws IOException { try { byte[] receivedBuffer = new byte[buf.length]; @@ -182,7 +182,7 @@ static void validateGet(final InputStream inputStream, final byte[] buf) throws * @throws IOException * thrown if an I/O error occurs while reading the input stream or the file */ - private static void validateGet(final InputStream inputStream, final File file) throws IOException { + private static void validateGetAndClose(final InputStream inputStream, final File file) throws IOException { InputStream inputStream2 = null; try { @@ -237,8 +237,8 @@ public void testContentAddressableBuffer() { assertEquals(origKey, receivedKey); // Retrieve the data - validateGet(client.get(receivedKey), testBuffer); - validateGet(client.get(jobId, receivedKey), testBuffer); + validateGetAndClose(client.get(receivedKey), testBuffer); + validateGetAndClose(client.get(jobId, receivedKey), testBuffer); // Check reaction to invalid keys try (InputStream ignored = client.get(new BlobKey())) { @@ -310,8 +310,8 @@ public void testContentAddressableStream() { is = null; // Retrieve the data - validateGet(client.get(receivedKey), testFile); - validateGet(client.get(jobId, receivedKey), testFile); + validateGetAndClose(client.get(receivedKey), testFile); + validateGetAndClose(client.get(jobId, receivedKey), testFile); } catch (Exception e) { e.printStackTrace(); @@ -362,7 +362,7 @@ private static void uploadJarFile( assertEquals(1, blobKeys.size()); try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { - validateGet(blobClient.get(blobKeys.get(0)), testFile); + validateGetAndClose(blobClient.get(blobKeys.get(0)), testFile); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index fd2bc04f0a260..413e2e9f2ed78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -113,7 +114,7 @@ public void testDeleteSingleByBlobKey() { try { // NOTE: the server will stall in its send operation until either the data is fully // read or the socket is closed, e.g. via a client.close() call - BlobClientTest.validateGet(client.get(jobId, key1), data); + validateGetAndClose(client.get(jobId, key1), data); } catch (IOException e) { fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 5ecb812290937..7ccf075937085 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -49,7 +49,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.apache.flink.runtime.blob.BlobClientTest.validateGet; +import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose; import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -123,7 +123,7 @@ private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2) assertNotNull(key); assertEquals(key, key2); // request for jobId2 should succeed - validateGet(getFileHelper(client, jobId2, key), data); + validateGetAndClose(getFileHelper(client, jobId2, key), data); // request for jobId1 should still fail client = verifyDeleted(client, jobId1, key, serverAddress, config); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index 6fcd9e4728b41..2b8e2d27c0443 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose; import static org.apache.flink.runtime.blob.BlobServerGetTest.getFileHelper; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -255,7 +256,7 @@ private static void testPutBufferSuccessfulGet( client.close(); client = new BlobClient(serverAddress, config); - BlobClientTest.validateGet(getFileHelper(client, jobId, key1), data); + validateGetAndClose(getFileHelper(client, jobId, key1), data); } finally { client.close(); }