Skip to content

Commit

Permalink
Flink: Remove reading of the data files to fix flakiness (#9451)
Browse files Browse the repository at this point in the history
Co-authored-by: Peter Vary <peter_vary4@apple.com>
  • Loading branch information
pvary and Peter Vary committed Jan 16, 2024
1 parent 7dd01a3 commit 13d2160
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -63,15 +57,12 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -325,11 +316,6 @@ public void testThrottling() throws Exception {
// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
// 1 or more from the runaway reader should be arrived depending on thread scheduling
waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);

// Get the drift metric, wait for it to be created and reach the expected state
// (100 min - 20 min - 0 min)
// Also this validates that the WatermarkAlignment is working
Expand All @@ -346,12 +332,6 @@ public void testThrottling() throws Exception {
// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
// file_1-recordTs_100)
// We might get 3 records form the non-blocked reader if it gets both new splits
waitForRecords(resultIterator, 3);

// Get the drift metric, wait for it to be created and reach the expected state (100 min - 20
// min - 15 min)
Expand All @@ -362,8 +342,6 @@ public void testThrottling() throws Exception {

// Add some new records which should unblock the throttled reader
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

// Wait for the new drift to decrease below the allowed drift to signal the normal state
Awaitility.await()
Expand Down Expand Up @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) {
return record;
}

protected void assertRecords(
Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) throws Exception {
Set<RowData> expected =
expectedRecords.stream()
.map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
.collect(Collectors.toSet());
Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size()));
}

protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, int num) {
Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
assertThat(
CompletableFuture.supplyAsync(
() -> {
int count = 0;
while (count < num && iterator.hasNext()) {
received.add(iterator.next());
count++;
}

if (count < num) {
throw new IllegalStateException(String.format("Fail to get %d records.", num));
}

return true;
}))
.succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);

return received;
}

private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) {
String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT;
return reporter.findMetrics(jobID, metricsName).values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -63,15 +57,12 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -325,11 +316,6 @@ public void testThrottling() throws Exception {
// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
// 1 or more from the runaway reader should be arrived depending on thread scheduling
waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);

// Get the drift metric, wait for it to be created and reach the expected state
// (100 min - 20 min - 0 min)
// Also this validates that the WatermarkAlignment is working
Expand All @@ -346,12 +332,6 @@ public void testThrottling() throws Exception {
// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
// file_1-recordTs_100)
// We might get 3 records form the non-blocked reader if it gets both new splits
waitForRecords(resultIterator, 3);

// Get the drift metric, wait for it to be created and reach the expected state (100 min - 20
// min - 15 min)
Expand All @@ -362,8 +342,6 @@ public void testThrottling() throws Exception {

// Add some new records which should unblock the throttled reader
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

// Wait for the new drift to decrease below the allowed drift to signal the normal state
Awaitility.await()
Expand Down Expand Up @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) {
return record;
}

protected void assertRecords(
Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) throws Exception {
Set<RowData> expected =
expectedRecords.stream()
.map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
.collect(Collectors.toSet());
Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size()));
}

protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, int num) {
Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
assertThat(
CompletableFuture.supplyAsync(
() -> {
int count = 0;
while (count < num && iterator.hasNext()) {
received.add(iterator.next());
count++;
}

if (count < num) {
throw new IllegalStateException(String.format("Fail to get %d records.", num));
}

return true;
}))
.succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);

return received;
}

private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) {
String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT;
return reporter.findMetrics(jobID, metricsName).values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -63,15 +57,12 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -325,11 +316,6 @@ public void testThrottling() throws Exception {
// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
// 1 or more from the runaway reader should be arrived depending on thread scheduling
waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);

// Get the drift metric, wait for it to be created and reach the expected state
// (100 min - 20 min - 0 min)
// Also this validates that the WatermarkAlignment is working
Expand All @@ -346,12 +332,6 @@ public void testThrottling() throws Exception {
// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
// file_1-recordTs_100)
// We might get 3 records form the non-blocked reader if it gets both new splits
waitForRecords(resultIterator, 3);

// Get the drift metric, wait for it to be created and reach the expected state (100 min - 20
// min - 15 min)
Expand All @@ -362,8 +342,6 @@ public void testThrottling() throws Exception {

// Add some new records which should unblock the throttled reader
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

// Wait for the new drift to decrease below the allowed drift to signal the normal state
Awaitility.await()
Expand Down Expand Up @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) {
return record;
}

protected void assertRecords(
Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) throws Exception {
Set<RowData> expected =
expectedRecords.stream()
.map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
.collect(Collectors.toSet());
Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size()));
}

protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, int num) {
Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
assertThat(
CompletableFuture.supplyAsync(
() -> {
int count = 0;
while (count < num && iterator.hasNext()) {
received.add(iterator.next());
count++;
}

if (count < num) {
throw new IllegalStateException(String.format("Fail to get %d records.", num));
}

return true;
}))
.succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);

return received;
}

private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) {
String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT;
return reporter.findMetrics(jobID, metricsName).values().stream()
Expand Down

0 comments on commit 13d2160

Please sign in to comment.