diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 4f394be76f4f..56749b552aa5 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -27,6 +27,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.flink.configuration.Configuration; @@ -373,11 +374,15 @@ protected TableLoader tableLoader() { protected static String closeJobClient(JobClient jobClient, File savepointDir) { if (jobClient != null) { if (savepointDir != null) { - // Stop with savepoint - jobClient.stopWithSavepoint(false, savepointDir.getPath(), SavepointFormatType.CANONICAL); - // Wait until the savepoint is created and the job has been stopped - Awaitility.await().until(() -> savepointDir.listFiles(File::isDirectory).length == 1); - return savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath(); + // Stop with a savepoint; get() blocks until it is fully written and returns its path, so + // that a job restoring from it does not race the savepoint completion + try { + return jobClient + .stopWithSavepoint(false, savepointDir.getPath(), SavepointFormatType.CANONICAL) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } } else { jobClient.cancel(); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java index 3dca6c421c76..be559e841275 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java @@ -229,7 +229,8 @@ void testStateRestore( streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); CompletableFuture jobIDCompletableFuture = clusterClient.submitJob(streamGraph); try { - assertThat(resultWithSavepoint.poll(Duration.ofSeconds(5L))).isEqualTo(EMPTY_EVENT); + // Restoring from a savepoint on a busy cluster may take longer than the default 5s poll + assertThat(resultWithSavepoint.poll(Duration.ofSeconds(30L))).isEqualTo(EMPTY_EVENT); } finally { clusterClient.cancel(jobIDCompletableFuture.get()); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 4f394be76f4f..56749b552aa5 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -27,6 +27,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.flink.configuration.Configuration; @@ -373,11 +374,15 @@ protected TableLoader tableLoader() { protected static String closeJobClient(JobClient jobClient, File savepointDir) { if (jobClient != null) { if (savepointDir != null) { - // Stop with savepoint - jobClient.stopWithSavepoint(false, savepointDir.getPath(), SavepointFormatType.CANONICAL); - // Wait until the savepoint is created and the job has been stopped - Awaitility.await().until(() -> savepointDir.listFiles(File::isDirectory).length == 1); - return savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath(); + // Stop with a savepoint; get() blocks until it is fully written and returns its path, so + // that a job restoring from it does not race the savepoint completion + try { + return jobClient + .stopWithSavepoint(false, savepointDir.getPath(), SavepointFormatType.CANONICAL) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } } else { jobClient.cancel(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java index 3dca6c421c76..be559e841275 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java @@ -229,7 +229,8 @@ void testStateRestore( streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); CompletableFuture jobIDCompletableFuture = clusterClient.submitJob(streamGraph); try { - assertThat(resultWithSavepoint.poll(Duration.ofSeconds(5L))).isEqualTo(EMPTY_EVENT); + // Restoring from a savepoint on a busy cluster may take longer than the default 5s poll + assertThat(resultWithSavepoint.poll(Duration.ofSeconds(30L))).isEqualTo(EMPTY_EVENT); } finally { clusterClient.cancel(jobIDCompletableFuture.get()); }