diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java index 564ef3a30b349..f1e58d5d17aa0 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java @@ -29,16 +29,17 @@ public class FileSystemTestUtils { /** * Verifies that the given path eventually appears on / disappears from fs within - * deadline nanoseconds. + * consistencyToleranceNS nanoseconds. */ public static void checkPathEventualExistence( FileSystem fs, Path path, boolean expectedExists, - long deadline) throws IOException, InterruptedException { + long consistencyToleranceNS) throws IOException, InterruptedException { boolean dirExists; + long deadline = System.nanoTime() + consistencyToleranceNS; while ((dirExists = fs.exists(path)) != expectedExists && - System.nanoTime() < deadline) { + System.nanoTime() - deadline < 0) { Thread.sleep(10); } assertEquals(expectedExists, dirExists); diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java index 083a8801da112..f59a91bbf8882 100644 --- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java @@ -49,22 +49,22 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger { protected static FileSystem fs; protected static Path basePath; - protected static long deadline; + protected static long consistencyToleranceNS; public static void checkPathExistence(Path path, boolean expectedExists, - long deadline) throws IOException, InterruptedException { - if (deadline == 0) { + long consistencyToleranceNS) throws IOException, InterruptedException { + if (consistencyToleranceNS == 0) { //strongly consistency assertEquals(expectedExists, fs.exists(path)); } else { //eventually consistency - checkPathEventualExistence(fs, path, expectedExists, deadline); + checkPathEventualExistence(fs, path, expectedExists, consistencyToleranceNS); } } protected void checkEmptyDirectory(Path path) throws IOException, InterruptedException { - checkPathExistence(path, true, deadline); + checkPathExistence(path, true, consistencyToleranceNS); } @Test @@ -80,7 +80,7 @@ public void testSimpleFileWriteAndRead() throws Exception { } // just in case, wait for the path to exist - checkPathExistence(path, true, deadline); + checkPathExistence(path, true, consistencyToleranceNS); try (FSDataInputStream in = fs.open(path); InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8); @@ -93,7 +93,7 @@ public void testSimpleFileWriteAndRead() throws Exception { fs.delete(path, false); } - checkPathExistence(path, false, deadline); + checkPathExistence(path, false, consistencyToleranceNS); } @Test @@ -122,7 +122,7 @@ public void testDirectoryListing() throws Exception { } // just in case, wait for the file to exist (should then also be reflected in the // directory's file list below) - checkPathExistence(file, true, deadline); + checkPathExistence(file, true, consistencyToleranceNS); } FileStatus[] files = fs.listStatus(directory); @@ -138,7 +138,7 @@ public void testDirectoryListing() throws Exception { } finally { // clean up - cleanupDirectoryWithRetry(fs, directory, deadline); + cleanupDirectoryWithRetry(fs, directory, consistencyToleranceNS); } } @@ -146,7 +146,7 @@ public void testDirectoryListing() throws Exception { public static void teardown() throws IOException, InterruptedException { try { if (fs != null) { - cleanupDirectoryWithRetry(fs, basePath, deadline); + cleanupDirectoryWithRetry(fs, basePath, consistencyToleranceNS); } } finally { @@ -154,8 +154,10 @@ public static void teardown() throws IOException, InterruptedException { } } - public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long deadline) throws IOException, InterruptedException { - while (fs.exists(path) && System.nanoTime() < deadline) { + public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long consistencyToleranceNS) throws IOException, InterruptedException { + fs.delete(path, true); + long deadline = System.nanoTime() + consistencyToleranceNS; + while (fs.exists(path) && System.nanoTime() - deadline < 0) { fs.delete(path, true); Thread.sleep(50L); } diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java index eaf722aff05bb..2f6178b0c5cbb 100644 --- a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java +++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java @@ -51,7 +51,7 @@ public static void setup() throws IOException { FileSystem.initialize(conf); basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR); fs = basePath.getFileSystem(); - deadline = 0; + consistencyToleranceNS = 0; } @Test diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java index fdfed2414b596..23d7123c6ba99 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java @@ -53,7 +53,7 @@ public static void setup() throws IOException { basePath = new Path(S3TestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID()); fs = basePath.getFileSystem(); - deadline = System.nanoTime() + 90_000_000_000L; + consistencyToleranceNS = 30_000_000_000L; // 30 seconds // check for uniqueness of the test directory // directory must not yet exist diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java index 5929af12f748b..5ab4a334654f3 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java @@ -69,7 +69,7 @@ public static void setup() throws IOException { basePath = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR); fs = basePath.getFileSystem(); - deadline = System.nanoTime() + 90_000_000_000L; + consistencyToleranceNS = 30_000_000_000L; // 30 seconds // check for uniqueness of the test directory // directory must not yet exist