Skip to content

Commit

Permalink
Flink: backport apache#9547 to 1.17 and 1.16 for Adds the ability to …
Browse files Browse the repository at this point in the history
…read from a branch on the Flink Iceberg Source (apache#9627)
  • Loading branch information
rodmeneses authored and devangjhabakh committed Apr 22, 2024
1 parent 5f87ee8 commit fe1872b
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ private void validate() {
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
}

Preconditions.checkArgument(
branch == null,
String.format(
"Cannot scan table using ref %s configured for streaming reader yet", branch));

Preconditions.checkArgument(
tag == null,
String.format("Cannot scan table using ref %s configured for streaming reader", tag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ public void initializeState(FunctionInitializationContext context) throws Except
Preconditions.checkArgument(
!(scanContext.startTag() != null && scanContext.startSnapshotId() != null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");
Preconditions.checkArgument(
scanContext.branch() == null,
"Cannot scan table using ref %s configured for streaming reader yet.");
Preconditions.checkNotNull(
table.currentSnapshot(), "Don't have any available snapshot in table.");

Expand Down Expand Up @@ -195,7 +192,10 @@ void monitorAndForwardSplits() {
// Refresh the table to get the latest committed snapshot.
table.refresh();

Snapshot snapshot = table.currentSnapshot();
Snapshot snapshot =
scanContext.branch() != null
? table.snapshot(scanContext.branch())
: table.currentSnapshot();
if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
long snapshotId = snapshot.snapshotId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ private Snapshot toSnapshotInclusive(

private ContinuousEnumerationResult discoverIncrementalSplits(
IcebergEnumeratorPosition lastPosition) {
Snapshot currentSnapshot = table.currentSnapshot();
Snapshot currentSnapshot =
scanContext.branch() != null
? table.snapshot(scanContext.branch())
: table.currentSnapshot();

if (currentSnapshot == null) {
// empty table
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,92 @@ public void testSpecificSnapshotTimestamp() throws Exception {
}
}

@Test
public void testReadingFromBranch() throws Exception {
String branch = "b1";
GenericAppenderHelper dataAppender =
new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);

List<Record> batchBase =
RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(batchBase);

// create branch
tableResource
.table()
.manageSnapshots()
.createBranch(branch, tableResource.table().currentSnapshot().snapshotId())
.commit();

// snapshot1 to branch
List<Record> batch1 =
RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch1);

// snapshot2 to branch
List<Record> batch2 =
RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch2);

List<Record> branchExpectedRecords = Lists.newArrayList();
branchExpectedRecords.addAll(batchBase);
branchExpectedRecords.addAll(batch1);
branchExpectedRecords.addAll(batch2);
// reads from branch: it should contain the first snapshot (before the branch creation) followed
// by the next 2 snapshots added
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.monitorInterval(Duration.ofMillis(10L))
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.useBranch(branch)
.build();

try (CloseableIterator<Row> iter =
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
List<Row> resultMain = waitForResult(iter, 6);
TestHelpers.assertRecords(resultMain, branchExpectedRecords, tableResource.table().schema());

// snapshot3 to branch
List<Record> batch3 =
RandomGenericData.generate(
tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch3);

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

// snapshot4 to branch
List<Record> batch4 =
RandomGenericData.generate(
tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch4);

List<Row> result4 = waitForResult(iter, 2);
TestHelpers.assertRecords(result4, batch4, tableResource.table().schema());
}

// read only from main branch. Should contain only the first snapshot
scanContext =
ScanContext.builder()
.streaming(true)
.monitorInterval(Duration.ofMillis(10L))
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
try (CloseableIterator<Row> iter =
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
List<Row> resultMain = waitForResult(iter, 2);
TestHelpers.assertRecords(resultMain, batchBase, tableResource.table().schema());

List<Record> batchMain2 =
RandomGenericData.generate(
tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(batchMain2);
resultMain = waitForResult(iter, 2);
TestHelpers.assertRecords(resultMain, batchMain2, tableResource.table().schema());
}
}

private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
// start the source and collect output
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -384,6 +470,7 @@ private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
.startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
.startSnapshotId(scanContext.startSnapshotId())
.monitorInterval(Duration.ofMillis(10L))
.branch(scanContext.branch())
.build(),
WatermarkStrategy.noWatermarks(),
"icebergSource",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -98,6 +99,11 @@ public void clean() {
}

private void insertRows(String partition, Table table, Row... rows) throws IOException {
insertRows(partition, SnapshotRef.MAIN_BRANCH, table, rows);
}

private void insertRows(String partition, String branch, Table table, Row... rows)
throws IOException {
GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory);

GenericRecord gRecord = GenericRecord.create(table.schema());
Expand All @@ -111,12 +117,16 @@ private void insertRows(String partition, Table table, Row... rows) throws IOExc
}

if (partition != null) {
appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
appender.appendToTable(TestHelpers.Row.of(partition, 0), branch, records);
} else {
appender.appendToTable(records);
appender.appendToTable(branch, records);
}
}

private void insertRowsInBranch(String branch, Table table, Row... rows) throws IOException {
insertRows(null, branch, table, rows);
}

private void insertRows(Table table, Row... rows) throws IOException {
insertRows(null, table, rows);
}
Expand Down Expand Up @@ -205,19 +215,130 @@ public void testConsumeFromBeginning() throws Exception {
}

@TestTemplate
public void testConsumeFilesWithBranch() throws Exception {
/**
* Insert records on the main branch. Then, insert in a named branch. Reads from the main branch
* and assert that the only records from main are returned
*/
public void testConsumeFilesFromMainBranch() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));

// Produce two snapshots on main branch
Row row1 = Row.of(1, "aaa", "2021-01-01");
Row row2 = Row.of(2, "bbb", "2021-01-01");

insertRows(table, row1, row2);
Assertions.assertThatThrownBy(
() ->
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
TABLE))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot scan table using ref b1 configured for streaming reader yet");
String branchName = "b1";
table.manageSnapshots().createBranch(branchName).commit();

// insert on the 'b1' branch
Row row3 = Row.of(3, "ccc", "2021-01-01");
Row row4 = Row.of(4, "ddd", "2021-01-01");

insertRowsInBranch(branchName, table, row3, row4);

// read from main
TableResult result =
exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);

try (CloseableIterator<Row> iterator = result.collect()) {
// the start snapshot(row2) is exclusive.
assertRows(ImmutableList.of(row1, row2), iterator);

Row row5 = Row.of(5, "eee", "2021-01-01");
Row row6 = Row.of(6, "fff", "2021-01-01");
insertRows(table, row5, row6);
assertRows(ImmutableList.of(row5, row6), iterator);

Row row7 = Row.of(7, "ggg", "2021-01-01");
insertRows(table, row7);
assertRows(ImmutableList.of(row7), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);
}

@TestTemplate
/**
* Insert records on the main branch. Creates a named branch. Insert record on named branch. Then
* select from the named branch and assert all the records are returned.
*/
public void testConsumeFilesFromBranch() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));

// Produce two snapshots on main branch
Row row1 = Row.of(1, "aaa", "2021-01-01");
Row row2 = Row.of(2, "bbb", "2021-01-01");

insertRows(table, row1, row2);
String branchName = "b1";
table.manageSnapshots().createBranch(branchName).commit();

TableResult result =
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ",
TABLE, branchName);

try (CloseableIterator<Row> iterator = result.collect()) {
assertRows(ImmutableList.of(row1, row2), iterator);
// insert on the 'b1' branch
Row row3 = Row.of(3, "ccc", "2021-01-01");
Row row4 = Row.of(4, "ddd", "2021-01-01");
insertRowsInBranch(branchName, table, row3, row4);
assertRows(ImmutableList.of(row3, row4), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);
}

@TestTemplate
/**
* Insert records on branch b1. Then insert record on b2. Then select from each branch and assert
* the correct records are returned
*/
public void testConsumeFilesFromTwoBranches() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));

String branch1 = "b1";
String branch2 = "b2";
table.manageSnapshots().createBranch(branch1).commit();
table.manageSnapshots().createBranch(branch2).commit();

// Produce two snapshots on main branch
Row row1Branch1 = Row.of(1, "b1", "2021-01-01");
Row row2Branch1 = Row.of(2, "b1", "2021-01-01");

Row row1Branch2 = Row.of(2, "b2", "2021-01-01");
Row row2Branch2 = Row.of(3, "b3", "2021-01-01");

insertRowsInBranch(branch1, table, row1Branch1, row2Branch1);
insertRowsInBranch(branch2, table, row1Branch2, row2Branch2);

TableResult resultBranch1 =
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ",
TABLE, branch1);

try (CloseableIterator<Row> iterator = resultBranch1.collect()) {
assertRows(ImmutableList.of(row1Branch1, row2Branch1), iterator);
Row another = Row.of(4, "ccc", "2021-01-01");
insertRowsInBranch(branch1, table, another);
assertRows(ImmutableList.of(another), iterator);
}

TableResult resultBranch2 =
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ",
TABLE, branch2);
try (CloseableIterator<Row> iterator = resultBranch2.collect()) {
assertRows(ImmutableList.of(row1Branch2, row2Branch2), iterator);
Row another = Row.of(4, "ccc", "2021-01-01");
insertRowsInBranch(branch2, table, another);
assertRows(ImmutableList.of(another), iterator);
}

resultBranch1.getJobClient().ifPresent(JobClient::cancel);
resultBranch2.getJobClient().ifPresent(JobClient::cancel);
}

@TestTemplate
Expand Down Expand Up @@ -296,6 +417,7 @@ public void testConsumeFromStartTag() throws Exception {
assertRows(ImmutableList.of(row7), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);

Assertions.assertThatThrownBy(
() ->
exec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ private void validate() {
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
}

Preconditions.checkArgument(
branch == null,
String.format(
"Cannot scan table using ref %s configured for streaming reader yet", branch));

Preconditions.checkArgument(
tag == null,
String.format("Cannot scan table using ref %s configured for streaming reader", tag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ public void initializeState(FunctionInitializationContext context) throws Except
Preconditions.checkArgument(
!(scanContext.startTag() != null && scanContext.startSnapshotId() != null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");
Preconditions.checkArgument(
scanContext.branch() == null,
"Cannot scan table using ref %s configured for streaming reader yet.");
Preconditions.checkNotNull(
table.currentSnapshot(), "Don't have any available snapshot in table.");

Expand Down Expand Up @@ -195,7 +192,10 @@ void monitorAndForwardSplits() {
// Refresh the table to get the latest committed snapshot.
table.refresh();

Snapshot snapshot = table.currentSnapshot();
Snapshot snapshot =
scanContext.branch() != null
? table.snapshot(scanContext.branch())
: table.currentSnapshot();
if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
long snapshotId = snapshot.snapshotId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ private Snapshot toSnapshotInclusive(

private ContinuousEnumerationResult discoverIncrementalSplits(
IcebergEnumeratorPosition lastPosition) {
Snapshot currentSnapshot = table.currentSnapshot();
Snapshot currentSnapshot =
scanContext.branch() != null
? table.snapshot(scanContext.branch())
: table.currentSnapshot();

if (currentSnapshot == null) {
// empty table
Preconditions.checkArgument(
Expand Down
Loading

0 comments on commit fe1872b

Please sign in to comment.