Skip to content

Commit

Permalink
Flink: Proper backport for apache#8852 (apache#9146)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored and devangjhabakh committed Apr 22, 2024
1 parent f129b92 commit 59681cd
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 28 deletions.
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -46,6 +44,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
Expand Down Expand Up @@ -151,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());

assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

@Test
Expand Down Expand Up @@ -214,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep

// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -109,8 +109,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

awaitExpectedSplits(sourceContext);
Expand Down Expand Up @@ -143,8 +142,7 @@ public void testConsumeFromStartSnapshotId() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

awaitExpectedSplits(sourceContext);
Expand Down Expand Up @@ -176,11 +174,11 @@ public void testConsumeFromStartTag() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

awaitExpectedSplits(sourceContext);

// Stop the stream task.
function.close();

Expand All @@ -200,8 +198,7 @@ public void testCheckpointRestore() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, func);

awaitExpectedSplits(sourceContext);
Expand All @@ -223,8 +220,7 @@ public void testCheckpointRestore() throws Exception {
harness.initializeState(state);
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, newFunc);

awaitExpectedSplits(sourceContext);
Expand Down
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -46,6 +44,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
Expand Down Expand Up @@ -151,7 +150,8 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());

assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

@Test
Expand Down Expand Up @@ -214,7 +214,8 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep

// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -109,8 +109,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

awaitExpectedSplits(sourceContext);
Expand Down Expand Up @@ -143,8 +142,7 @@ public void testConsumeFromStartSnapshotId() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

awaitExpectedSplits(sourceContext);
Expand Down Expand Up @@ -176,8 +174,7 @@ public void testConsumeFromStartTag() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, function);

awaitExpectedSplits(sourceContext);
Expand All @@ -201,8 +198,7 @@ public void testCheckpointRestore() throws Exception {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, func);

awaitExpectedSplits(sourceContext);
Expand All @@ -224,8 +220,7 @@ public void testCheckpointRestore() throws Exception {
harness.initializeState(state);
harness.open();

CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
TestSourceContext sourceContext = new TestSourceContext(new CountDownLatch(1));
runSourceFunctionInTask(sourceContext, newFunc);

awaitExpectedSplits(sourceContext);
Expand Down

0 comments on commit 59681cd

Please sign in to comment.