From dbd180ce4423d08963af50ad7c8d4a4628d58824 Mon Sep 17 00:00:00 2001 From: Xintong Song Date: Thu, 21 May 2020 16:36:15 +0800 Subject: [PATCH 1/3] [hotfix][filesystems][test] Fix improper usage of System.nanoTime(). Per the JavaDoc of System.nanoTime(), we should use `t1 - t0 < 0` rather than `t1 < t0` because of the possibility of numerical overflow. --- .../test/java/org/apache/flink/core/fs/FileSystemTestUtils.java | 2 +- .../flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java index 564ef3a30b349..23bde19abfc1e 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java @@ -38,7 +38,7 @@ public static void checkPathEventualExistence( long deadline) throws IOException, InterruptedException { boolean dirExists; while ((dirExists = fs.exists(path)) != expectedExists && - System.nanoTime() < deadline) { + System.nanoTime() - deadline < 0) { Thread.sleep(10); } assertEquals(expectedExists, dirExists); diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java index 083a8801da112..a7b29a7f28d6b 100644 --- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java @@ -155,7 +155,7 @@ public static void teardown() throws IOException, InterruptedException { } public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long deadline) throws IOException, InterruptedException { - while (fs.exists(path) && System.nanoTime() < deadline) { + while (fs.exists(path) && System.nanoTime() - deadline < 0) { fs.delete(path, true); Thread.sleep(50L); } From da01a91ea8eccf9bdc1dee6e32b1f5a9b06b0cc4 Mon Sep 17 00:00:00 2001 From: Xintong Song Date: Thu, 21 May 2020 16:44:18 +0800 Subject: [PATCH 2/3] [FLINK-17721][filesystems][test] Use independent timeout for each file status checking. --- .../flink/core/fs/FileSystemTestUtils.java | 5 ++-- .../hdfs/AbstractHadoopFileSystemITTest.java | 24 ++++++++++--------- .../osshadoop/HadoopOSSFileSystemITCase.java | 2 +- .../fs/s3hadoop/HadoopS3FileSystemITCase.java | 2 +- .../fs/s3presto/PrestoS3FileSystemITCase.java | 2 +- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java index 23bde19abfc1e..f1e58d5d17aa0 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java @@ -29,14 +29,15 @@ public class FileSystemTestUtils { /** * Verifies that the given path eventually appears on / disappears from fs within - * deadline nanoseconds. + * consistencyToleranceNS nanoseconds. */ public static void checkPathEventualExistence( FileSystem fs, Path path, boolean expectedExists, - long deadline) throws IOException, InterruptedException { + long consistencyToleranceNS) throws IOException, InterruptedException { boolean dirExists; + long deadline = System.nanoTime() + consistencyToleranceNS; while ((dirExists = fs.exists(path)) != expectedExists && System.nanoTime() - deadline < 0) { Thread.sleep(10); diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java index a7b29a7f28d6b..f59a91bbf8882 100644 --- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java @@ -49,22 +49,22 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger { protected static FileSystem fs; protected static Path basePath; - protected static long deadline; + protected static long consistencyToleranceNS; public static void checkPathExistence(Path path, boolean expectedExists, - long deadline) throws IOException, InterruptedException { - if (deadline == 0) { + long consistencyToleranceNS) throws IOException, InterruptedException { + if (consistencyToleranceNS == 0) { //strongly consistency assertEquals(expectedExists, fs.exists(path)); } else { //eventually consistency - checkPathEventualExistence(fs, path, expectedExists, deadline); + checkPathEventualExistence(fs, path, expectedExists, consistencyToleranceNS); } } protected void checkEmptyDirectory(Path path) throws IOException, InterruptedException { - checkPathExistence(path, true, deadline); + checkPathExistence(path, true, consistencyToleranceNS); } @Test @@ -80,7 +80,7 @@ public void testSimpleFileWriteAndRead() throws Exception { } // just in case, wait for the path to exist - checkPathExistence(path, true, deadline); + checkPathExistence(path, true, consistencyToleranceNS); try (FSDataInputStream in = fs.open(path); InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8); @@ -93,7 +93,7 @@ public void testSimpleFileWriteAndRead() throws Exception { fs.delete(path, false); } - checkPathExistence(path, false, deadline); + checkPathExistence(path, false, consistencyToleranceNS); } @Test @@ -122,7 +122,7 @@ public void testDirectoryListing() throws Exception { } // just in case, wait for the file to exist (should then also be reflected in the // directory's file list below) - checkPathExistence(file, true, deadline); + checkPathExistence(file, true, consistencyToleranceNS); } FileStatus[] files = fs.listStatus(directory); @@ -138,7 +138,7 @@ public void testDirectoryListing() throws Exception { } finally { // clean up - cleanupDirectoryWithRetry(fs, directory, deadline); + cleanupDirectoryWithRetry(fs, directory, consistencyToleranceNS); } } @@ -146,7 +146,7 @@ public void testDirectoryListing() throws Exception { public static void teardown() throws IOException, InterruptedException { try { if (fs != null) { - cleanupDirectoryWithRetry(fs, basePath, deadline); + cleanupDirectoryWithRetry(fs, basePath, consistencyToleranceNS); } } finally { @@ -154,7 +154,9 @@ public static void teardown() throws IOException, InterruptedException { } } - public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long deadline) throws IOException, InterruptedException { + public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long consistencyToleranceNS) throws IOException, InterruptedException { + fs.delete(path, true); + long deadline = System.nanoTime() + consistencyToleranceNS; while (fs.exists(path) && System.nanoTime() - deadline < 0) { fs.delete(path, true); Thread.sleep(50L); diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java index eaf722aff05bb..2f6178b0c5cbb 100644 --- a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java +++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java @@ -51,7 +51,7 @@ public static void setup() throws IOException { FileSystem.initialize(conf); basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR); fs = basePath.getFileSystem(); - deadline = 0; + consistencyToleranceNS = 0; } @Test diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java index fdfed2414b596..4d8109a2fd540 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java @@ -53,7 +53,7 @@ public static void setup() throws IOException { basePath = new Path(S3TestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID()); fs = basePath.getFileSystem(); - deadline = System.nanoTime() + 90_000_000_000L; + consistencyToleranceNS = 5_000_000_000L; // 5 seconds // check for uniqueness of the test directory // directory must not yet exist diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java index 5929af12f748b..05442ae8fa57c 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java @@ -69,7 +69,7 @@ public static void setup() throws IOException { basePath = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR); fs = basePath.getFileSystem(); - deadline = System.nanoTime() + 90_000_000_000L; + consistencyToleranceNS = 5_000_000_000L; // 5 seconds // check for uniqueness of the test directory // directory must not yet exist From dd21b020a56c9e5c237897bdde65311822ee9157 Mon Sep 17 00:00:00 2001 From: Xintong Song Date: Mon, 25 May 2020 20:10:14 +0800 Subject: [PATCH 3/3] fixup! [FLINK-17721][filesystems][test] Use independent timeout for each file status checking. --- .../org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java | 2 +- .../org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java index 4d8109a2fd540..23d7123c6ba99 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java @@ -53,7 +53,7 @@ public static void setup() throws IOException { basePath = new Path(S3TestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID()); fs = basePath.getFileSystem(); - consistencyToleranceNS = 5_000_000_000L; // 5 seconds + consistencyToleranceNS = 30_000_000_000L; // 30 seconds // check for uniqueness of the test directory // directory must not yet exist diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java index 05442ae8fa57c..5ab4a334654f3 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java @@ -69,7 +69,7 @@ public static void setup() throws IOException { basePath = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR); fs = basePath.getFileSystem(); - consistencyToleranceNS = 5_000_000_000L; // 5 seconds + consistencyToleranceNS = 30_000_000_000L; // 30 seconds // check for uniqueness of the test directory // directory must not yet exist