Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Remove reading of the data files to fix flakiness #9451

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -63,15 +57,12 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -325,11 +316,6 @@ public void testThrottling() throws Exception {
// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
// 1 or more from the runaway reader should be arrived depending on thread scheduling
waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);

// Get the drift metric, wait for it to be created and reach the expected state
// (100 min - 20 min - 0 min)
// Also this validates that the WatermarkAlignment is working
Expand All @@ -346,12 +332,6 @@ public void testThrottling() throws Exception {
// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
// file_1-recordTs_100)
// We might get 3 records form the non-blocked reader if it gets both new splits
waitForRecords(resultIterator, 3);

// Get the drift metric, wait for it to be created and reach the expected state (100 min - 20
// min - 15 min)
Expand All @@ -362,8 +342,6 @@ public void testThrottling() throws Exception {

// Add some new records which should unblock the throttled reader
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

// Wait for the new drift to decrease below the allowed drift to signal the normal state
Awaitility.await()
Expand Down Expand Up @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) {
return record;
}

protected void assertRecords(
Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) throws Exception {
Set<RowData> expected =
expectedRecords.stream()
.map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
.collect(Collectors.toSet());
Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size()));
}

protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, int num) {
Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
assertThat(
CompletableFuture.supplyAsync(
() -> {
int count = 0;
while (count < num && iterator.hasNext()) {
received.add(iterator.next());
count++;
}

if (count < num) {
throw new IllegalStateException(String.format("Fail to get %d records.", num));
}

return true;
}))
.succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);

return received;
}

private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) {
String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT;
return reporter.findMetrics(jobID, metricsName).values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -63,15 +57,12 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -325,11 +316,6 @@ public void testThrottling() throws Exception {
// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
// 1 or more from the runaway reader should be arrived depending on thread scheduling
waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);

// Get the drift metric, wait for it to be created and reach the expected state
// (100 min - 20 min - 0 min)
// Also this validates that the WatermarkAlignment is working
Expand All @@ -346,12 +332,6 @@ public void testThrottling() throws Exception {
// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
// file_1-recordTs_100)
// We might get 3 records form the non-blocked reader if it gets both new splits
waitForRecords(resultIterator, 3);

// Get the drift metric, wait for it to be created and reach the expected state (100 min - 20
// min - 15 min)
Expand All @@ -362,8 +342,6 @@ public void testThrottling() throws Exception {

// Add some new records which should unblock the throttled reader
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

// Wait for the new drift to decrease below the allowed drift to signal the normal state
Awaitility.await()
Expand Down Expand Up @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) {
return record;
}

protected void assertRecords(
Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) throws Exception {
Set<RowData> expected =
expectedRecords.stream()
.map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
.collect(Collectors.toSet());
Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size()));
}

protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, int num) {
Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
assertThat(
CompletableFuture.supplyAsync(
() -> {
int count = 0;
while (count < num && iterator.hasNext()) {
received.add(iterator.next());
count++;
}

if (count < num) {
throw new IllegalStateException(String.format("Fail to get %d records.", num));
}

return true;
}))
.succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);

return received;
}

private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) {
String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT;
return reporter.findMetrics(jobID, metricsName).values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -63,15 +57,12 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -325,11 +316,6 @@ public void testThrottling() throws Exception {
// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
// 1 or more from the runaway reader should be arrived depending on thread scheduling
waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);

// Get the drift metric, wait for it to be created and reach the expected state
// (100 min - 20 min - 0 min)
// Also this validates that the WatermarkAlignment is working
Expand All @@ -346,12 +332,6 @@ public void testThrottling() throws Exception {
// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
// file_1-recordTs_100)
// We might get 3 records form the non-blocked reader if it gets both new splits
waitForRecords(resultIterator, 3);

// Get the drift metric, wait for it to be created and reach the expected state (100 min - 20
// min - 15 min)
Expand All @@ -362,8 +342,6 @@ public void testThrottling() throws Exception {

// Add some new records which should unblock the throttled reader
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

// Wait for the new drift to decrease below the allowed drift to signal the normal state
Awaitility.await()
Expand Down Expand Up @@ -397,37 +375,6 @@ protected Record generateRecord(int minutes, String str) {
return record;
}

protected void assertRecords(
Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) throws Exception {
Set<RowData> expected =
expectedRecords.stream()
.map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
.collect(Collectors.toSet());
Assert.assertEquals(expected, waitForRecords(iterator, expectedRecords.size()));
}

protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, int num) {
Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
assertThat(
CompletableFuture.supplyAsync(
() -> {
int count = 0;
while (count < num && iterator.hasNext()) {
received.add(iterator.next());
count++;
}

if (count < num) {
throw new IllegalStateException(String.format("Fail to get %d records.", num));
}

return true;
}))
.succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);

return received;
}

private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) {
String metricsName = SOURCE_NAME + ".*" + MetricNames.WATERMARK_ALIGNMENT_DRIFT;
return reporter.findMetrics(jobID, metricsName).values().stream()
Expand Down