From 59681cdef8f11badd335903809fb47a84396a7ee Mon Sep 17 00:00:00 2001 From: pvary Date: Fri, 24 Nov 2023 17:57:04 +0100 Subject: [PATCH] Flink: Proper backport for #8852 (#9146) --- .../flink/source/TestIcebergSourceFailover.java | 9 +++++---- .../source/TestStreamingMonitorFunction.java | 16 ++++++---------- .../flink/source/TestIcebergSourceFailover.java | 9 +++++---- .../source/TestStreamingMonitorFunction.java | 15 +++++---------- 4 files changed, 21 insertions(+), 28 deletions(-) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java index 7186db21f709..70e7a79d8373 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.source; -import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords; - import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -46,6 +44,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; @@ -151,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio RecordCounterToFail::continueProcessing, miniClusterResource.getMiniCluster()); - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } @Test @@ -214,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep // wait longer for continuous source to reduce flakiness // because CI servers tend to be overloaded. - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } // ------------------------------------------------------------------------ diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 132bbd7b2a9e..8af1dd883f4c 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -109,8 +109,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -143,8 +142,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -176,11 +174,11 @@ public void testConsumeFromStartTag() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); + // Stop the stream task. function.close(); @@ -200,8 +198,7 @@ public void testCheckpointRestore() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, func); awaitExpectedSplits(sourceContext); @@ -223,8 +220,7 @@ public void testCheckpointRestore() throws Exception { harness.initializeState(state); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, newFunc); awaitExpectedSplits(sourceContext); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java index 7186db21f709..70e7a79d8373 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.flink.source; -import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords; - import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -46,6 +44,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; @@ -151,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio RecordCounterToFail::continueProcessing, miniClusterResource.getMiniCluster()); - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } @Test @@ -214,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep // wait longer for continuous source to reduce flakiness // because CI servers tend to be overloaded. - assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); + SimpleDataUtil.assertTableRecords( + sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120)); } // ------------------------------------------------------------------------ diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 0c3f54cc726a..6d1891baf538 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -109,8 +109,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -143,8 +142,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -176,8 +174,7 @@ public void testConsumeFromStartTag() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, function); awaitExpectedSplits(sourceContext); @@ -201,8 +198,7 @@ public void testCheckpointRestore() throws Exception { harness.setup(); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, func); awaitExpectedSplits(sourceContext); @@ -224,8 +220,7 @@ public void testCheckpointRestore() throws Exception { harness.initializeState(state); harness.open(); - CountDownLatch latch = new CountDownLatch(1); - TestSourceContext sourceContext = new TestSourceContext(latch); + TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1)); runSourceFunctionInTask(sourceContext, newFunc); awaitExpectedSplits(sourceContext);