From 24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 1 Feb 2019 13:16:59 +0100 Subject: [PATCH] [FLINK-9920] Only check part files in BucketingSinkFaultToleranceITCase Before, it could happen that other files are in the same directory. Then the verification logic in the test would pick up those files and the test would fail. Now we ignore all files that don't match our expected pattern. --- .../streaming/connectors/fs/RollingSink.java | 43 ++++++++++-------- .../fs/bucketing/BucketingSink.java | 45 ++++++++++--------- .../BucketingSinkFaultToleranceITCase.java | 8 ++++ 3 files changed, 57 insertions(+), 39 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index 9ec97b7db2cfc..831377edf7526 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -533,26 +533,33 @@ private Method reflectTruncate(FileSystem fs) { } // verify that truncate actually works - Path testPath = new Path(UUID.randomUUID().toString()); - try (FSDataOutputStream outputStream = fs.create(testPath)) { - outputStream.writeUTF("hello"); - } catch (IOException e) { - LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works.", e); - } - + Path testPath = new Path(basePath, UUID.randomUUID().toString()); try { - m.invoke(fs, testPath, 2); - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.debug("Truncate is not supported.", e); - m = null; - } + try (FSDataOutputStream outputStream = fs.create(testPath)) { + outputStream.writeUTF("hello"); + } catch (IOException e) { + LOG.error("Could not create file for checking if truncate works.", e); + throw new RuntimeException( + "Could not create file for checking if truncate works. " + + "You can disable support for truncate() completely via " + + "BucketingSink.setUseTruncate(false).", e); + } - try { - fs.delete(testPath, false); - } catch (IOException e) { - LOG.error("Could not delete truncate test file.", e); - throw new RuntimeException("Could not delete truncate test file.", e); + try { + m.invoke(fs, testPath, 2); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.debug("Truncate is not supported.", e); + m = null; + } + } finally { + try { + fs.delete(testPath, false); + } catch (IOException e) { + LOG.error("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file. " + + "You can disable support for truncate() completely via " + + "BucketingSink.setUseTruncate(false).", e); + } } } return m; diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 4f85e3cf8d599..b1d1786ec1d4f 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -632,29 +632,32 @@ private Method reflectTruncate(FileSystem fs) { // verify that truncate actually works Path testPath = new Path(basePath, UUID.randomUUID().toString()); - try (FSDataOutputStream outputStream = fs.create(testPath)) { - outputStream.writeUTF("hello"); - } catch (IOException e) { - LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works. " + - "You can disable support for truncate() completely via " + - "BucketingSink.setUseTruncate(false).", e); - } - try { - m.invoke(fs, testPath, 2); - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.debug("Truncate is not supported.", e); - m = null; - } + try (FSDataOutputStream outputStream = fs.create(testPath)) { + outputStream.writeUTF("hello"); + } catch (IOException e) { + LOG.error("Could not create file for checking if truncate works.", e); + throw new RuntimeException( + "Could not create file for checking if truncate works. " + + "You can disable support for truncate() completely via " + + "BucketingSink.setUseTruncate(false).", e); + } - try { - fs.delete(testPath, false); - } catch (IOException e) { - LOG.error("Could not delete truncate test file.", e); - throw new RuntimeException("Could not delete truncate test file. " + - "You can disable support for truncate() completely via " + - "BucketingSink.setUseTruncate(false).", e); + try { + m.invoke(fs, testPath, 2); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.debug("Truncate is not supported.", e); + m = null; + } + } finally { + try { + fs.delete(testPath, false); + } catch (IOException e) { + LOG.error("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file. " + + "You can disable support for truncate() completely via " + + "BucketingSink.setUseTruncate(false).", e); + } } } return m; diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java index 49ef5ebab6b17..dcb77bfd800ce 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java @@ -66,6 +66,9 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB static final long NUM_STRINGS = 16_000; + // this is already the default, but we explicitly set it to make the test explicit + static final String PART_PREFIX = "part"; + @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); @@ -115,6 +118,7 @@ public void testProgram(StreamExecutionEnvironment env) { .setBucketer(new BasePathBucketer()) .setBatchSize(10000) .setValidLengthPrefix("") + .setPartPrefix(PART_PREFIX) .setPendingPrefix("") .setPendingSuffix(PENDING_SUFFIX) .setInProgressSuffix(IN_PROGRESS_SUFFIX); @@ -144,6 +148,10 @@ public void postSubmit() throws Exception { while (files.hasNext()) { LocatedFileStatus file = files.next(); + if (!file.getPath().getName().startsWith(PART_PREFIX)) { + // ignore files that don't match with our expected part prefix + continue; + } if (!file.getPath().toString().endsWith(".valid-length")) { int validLength = (int) file.getLen();