diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java index 7186db21f709..70e7a79d8373 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.source; -import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords; - import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -46,6 +44,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; @@ -151,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio RecordCounterToFail::continueProcessing, miniClusterResource.getMiniCluster()); - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } @Test @@ -214,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep // wait longer for continuous source to reduce flakiness // because CI servers tend to be overloaded. - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } // ------------------------------------------------------------------------ diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 132bbd7b2a9e..8af1dd883f4c 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -109,8 +109,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -143,8 +142,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -176,11 +174,11 @@ public void testConsumeFromStartTag() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); + // Stop the stream task. function.close(); @@ -200,8 +198,7 @@ public void testCheckpointRestore() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, func); awaitExpectedSplits(sourceContext); @@ -223,8 +220,7 @@ public void testCheckpointRestore() throws Exception { harness.initializeState(state); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, newFunc); awaitExpectedSplits(sourceContext); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java index 7186db21f709..70e7a79d8373 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.source; -import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords; - import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -46,6 +44,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; @@ -151,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio RecordCounterToFail::continueProcessing, miniClusterResource.getMiniCluster()); - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } @Test @@ -214,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep // wait longer for continuous source to reduce flakiness // because CI servers tend to be overloaded. - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } // ------------------------------------------------------------------------ diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 0c3f54cc726a..6d1891baf538 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -109,8 +109,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -143,8 +142,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -176,8 +174,7 @@ public void testConsumeFromStartTag() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -201,8 +198,7 @@ public void testCheckpointRestore() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, func); awaitExpectedSplits(sourceContext); @@ -224,8 +220,7 @@ public void testCheckpointRestore() throws Exception { harness.initializeState(state); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, newFunc); awaitExpectedSplits(sourceContext);