From fe1872b4c8bc29db4a940311f891f39fe0d711ef Mon Sep 17 00:00:00 2001 From: Rodrigo Date: Mon, 5 Feb 2024 08:22:34 -0800 Subject: [PATCH] Flink: backport #9547 to 1.17 and 1.16 for Adds the ability to read from a branch on the Flink Iceberg Source (#9627) --- .../iceberg/flink/source/ScanContext.java | 5 - .../source/StreamingMonitorFunction.java | 8 +- .../ContinuousSplitPlannerImpl.java | 6 +- .../source/TestIcebergSourceContinuous.java | 87 +++++++++++ .../flink/source/TestStreamScanSql.java | 142 ++++++++++++++++-- .../iceberg/flink/source/ScanContext.java | 5 - .../source/StreamingMonitorFunction.java | 8 +- .../ContinuousSplitPlannerImpl.java | 6 +- .../source/TestIcebergSourceContinuous.java | 87 +++++++++++ .../flink/source/TestStreamScanSql.java | 140 +++++++++++++++-- 10 files changed, 454 insertions(+), 40 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 3dce5dd5901e..cf57a126ae59 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -152,11 +152,6 @@ private void validate() { "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); } - Preconditions.checkArgument( - branch == null, - String.format( - "Cannot scan table using ref %s configured for streaming reader yet", branch)); - Preconditions.checkArgument( tag == null, String.format("Cannot scan table using ref %s configured for streaming reader", tag)); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index c27e29613fed..a07613aee59b 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -130,9 +130,6 @@ public void initializeState(FunctionInitializationContext context) throws Except Preconditions.checkArgument( !(scanContext.startTag() != null && scanContext.startSnapshotId() != null), "START_SNAPSHOT_ID and START_TAG cannot both be set."); - Preconditions.checkArgument( - scanContext.branch() == null, - "Cannot scan table using ref %s configured for streaming reader yet."); Preconditions.checkNotNull( table.currentSnapshot(), "Don't have any available snapshot in table."); @@ -195,7 +192,10 @@ void monitorAndForwardSplits() { // Refresh the table to get the latest committed snapshot. table.refresh(); - Snapshot snapshot = table.currentSnapshot(); + Snapshot snapshot = + scanContext.branch() != null + ? table.snapshot(scanContext.branch()) + : table.currentSnapshot(); if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) { long snapshotId = snapshot.snapshotId(); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java index 450b649253a4..e9e3c159b07b 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java @@ -104,7 +104,11 @@ private Snapshot toSnapshotInclusive( private ContinuousEnumerationResult discoverIncrementalSplits( IcebergEnumeratorPosition lastPosition) { - Snapshot currentSnapshot = table.currentSnapshot(); + Snapshot currentSnapshot = + scanContext.branch() != null + ? table.snapshot(scanContext.branch()) + : table.currentSnapshot(); + if (currentSnapshot == null) { // empty table Preconditions.checkArgument( diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 31e9733fcd60..bfd7fa5758e3 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -370,6 +370,92 @@ public void testSpecificSnapshotTimestamp() throws Exception { } } + @Test + public void testReadingFromBranch() throws Exception { + String branch = "b1"; + GenericAppenderHelper dataAppender = + new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER); + + List batchBase = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(batchBase); + + // create branch + tableResource + .table() + .manageSnapshots() + .createBranch(branch, tableResource.table().currentSnapshot().snapshotId()) + .commit(); + + // snapshot1 to branch + List batch1 = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch1); + + // snapshot2 to branch + List batch2 = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch2); + + List branchExpectedRecords = Lists.newArrayList(); + branchExpectedRecords.addAll(batchBase); + branchExpectedRecords.addAll(batch1); + branchExpectedRecords.addAll(batch2); + // reads from branch: it should contain the first snapshot (before the branch creation) followed + // by the next 2 snapshots added + ScanContext scanContext = + ScanContext.builder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10L)) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .useBranch(branch) + .build(); + + try (CloseableIterator iter = + createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { + List resultMain = waitForResult(iter, 6); + TestHelpers.assertRecords(resultMain, branchExpectedRecords, tableResource.table().schema()); + + // snapshot3 to branch + List batch3 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch3); + + List result3 = waitForResult(iter, 2); + TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + // snapshot4 to branch + List batch4 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch4); + + List result4 = waitForResult(iter, 2); + TestHelpers.assertRecords(result4, batch4, tableResource.table().schema()); + } + + // read only from main branch. Should contain only the first snapshot + scanContext = + ScanContext.builder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10L)) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + try (CloseableIterator iter = + createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { + List resultMain = waitForResult(iter, 2); + TestHelpers.assertRecords(resultMain, batchBase, tableResource.table().schema()); + + List batchMain2 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(batchMain2); + resultMain = waitForResult(iter, 2); + TestHelpers.assertRecords(resultMain, batchMain2, tableResource.table().schema()); + } + } + private DataStream createStream(ScanContext scanContext) throws Exception { // start the source and collect output StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -384,6 +470,7 @@ private DataStream createStream(ScanContext scanContext) throws Exception { .startSnapshotTimestamp(scanContext.startSnapshotTimestamp()) .startSnapshotId(scanContext.startSnapshotId()) .monitorInterval(Duration.ofMillis(10L)) + .branch(scanContext.branch()) .build(), WatermarkStrategy.noWatermarks(), "icebergSource", diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java index 3499e5fdae43..9e043bbbbbd2 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -33,6 +33,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.catalog.TableIdentifier; @@ -98,6 +99,11 @@ public void clean() { } private void insertRows(String partition, Table table, Row... rows) throws IOException { + insertRows(partition, SnapshotRef.MAIN_BRANCH, table, rows); + } + + private void insertRows(String partition, String branch, Table table, Row... rows) + throws IOException { GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory); GenericRecord gRecord = GenericRecord.create(table.schema()); @@ -111,12 +117,16 @@ private void insertRows(String partition, Table table, Row... rows) throws IOExc } if (partition != null) { - appender.appendToTable(TestHelpers.Row.of(partition, 0), records); + appender.appendToTable(TestHelpers.Row.of(partition, 0), branch, records); } else { - appender.appendToTable(records); + appender.appendToTable(branch, records); } } + private void insertRowsInBranch(String branch, Table table, Row... rows) throws IOException { + insertRows(null, branch, table, rows); + } + private void insertRows(Table table, Row... rows) throws IOException { insertRows(null, table, rows); } @@ -205,19 +215,130 @@ public void testConsumeFromBeginning() throws Exception { } @TestTemplate - public void testConsumeFilesWithBranch() throws Exception { + /** + * Insert records on the main branch. Then, insert in a named branch. Reads from the main branch + * and assert that the only records from main are returned + */ + public void testConsumeFilesFromMainBranch() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots on main branch Row row1 = Row.of(1, "aaa", "2021-01-01"); Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1, row2); - Assertions.assertThatThrownBy( - () -> - exec( - "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/", - TABLE)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot scan table using ref b1 configured for streaming reader yet"); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName).commit(); + + // insert on the 'b1' branch + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + + insertRowsInBranch(branchName, table, row3, row4); + + // read from main + TableResult result = + exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE); + + try (CloseableIterator iterator = result.collect()) { + // the start snapshot(row2) is exclusive. + assertRows(ImmutableList.of(row1, row2), iterator); + + Row row5 = Row.of(5, "eee", "2021-01-01"); + Row row6 = Row.of(6, "fff", "2021-01-01"); + insertRows(table, row5, row6); + assertRows(ImmutableList.of(row5, row6), iterator); + + Row row7 = Row.of(7, "ggg", "2021-01-01"); + insertRows(table, row7); + assertRows(ImmutableList.of(row7), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @TestTemplate + /** + * Insert records on the main branch. Creates a named branch. Insert record on named branch. Then + * select from the named branch and assert all the records are returned. + */ + public void testConsumeFilesFromBranch() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots on main branch + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + + insertRows(table, row1, row2); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName).commit(); + + TableResult result = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branchName); + + try (CloseableIterator iterator = result.collect()) { + assertRows(ImmutableList.of(row1, row2), iterator); + // insert on the 'b1' branch + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + insertRowsInBranch(branchName, table, row3, row4); + assertRows(ImmutableList.of(row3, row4), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @TestTemplate + /** + * Insert records on branch b1. Then insert record on b2. Then select from each branch and assert + * the correct records are returned + */ + public void testConsumeFilesFromTwoBranches() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + String branch1 = "b1"; + String branch2 = "b2"; + table.manageSnapshots().createBranch(branch1).commit(); + table.manageSnapshots().createBranch(branch2).commit(); + + // Produce two snapshots on main branch + Row row1Branch1 = Row.of(1, "b1", "2021-01-01"); + Row row2Branch1 = Row.of(2, "b1", "2021-01-01"); + + Row row1Branch2 = Row.of(2, "b2", "2021-01-01"); + Row row2Branch2 = Row.of(3, "b3", "2021-01-01"); + + insertRowsInBranch(branch1, table, row1Branch1, row2Branch1); + insertRowsInBranch(branch2, table, row1Branch2, row2Branch2); + + TableResult resultBranch1 = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branch1); + + try (CloseableIterator iterator = resultBranch1.collect()) { + assertRows(ImmutableList.of(row1Branch1, row2Branch1), iterator); + Row another = Row.of(4, "ccc", "2021-01-01"); + insertRowsInBranch(branch1, table, another); + assertRows(ImmutableList.of(another), iterator); + } + + TableResult resultBranch2 = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branch2); + try (CloseableIterator iterator = resultBranch2.collect()) { + assertRows(ImmutableList.of(row1Branch2, row2Branch2), iterator); + Row another = Row.of(4, "ccc", "2021-01-01"); + insertRowsInBranch(branch2, table, another); + assertRows(ImmutableList.of(another), iterator); + } + + resultBranch1.getJobClient().ifPresent(JobClient::cancel); + resultBranch2.getJobClient().ifPresent(JobClient::cancel); } @TestTemplate @@ -296,6 +417,7 @@ public void testConsumeFromStartTag() throws Exception { assertRows(ImmutableList.of(row7), iterator); } result.getJobClient().ifPresent(JobClient::cancel); + Assertions.assertThatThrownBy( () -> exec( diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 3dce5dd5901e..cf57a126ae59 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -152,11 +152,6 @@ private void validate() { "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); } - Preconditions.checkArgument( - branch == null, - String.format( - "Cannot scan table using ref %s configured for streaming reader yet", branch)); - Preconditions.checkArgument( tag == null, String.format("Cannot scan table using ref %s configured for streaming reader", tag)); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index c27e29613fed..a07613aee59b 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -130,9 +130,6 @@ public void initializeState(FunctionInitializationContext context) throws Except Preconditions.checkArgument( !(scanContext.startTag() != null && scanContext.startSnapshotId() != null), "START_SNAPSHOT_ID and START_TAG cannot both be set."); - Preconditions.checkArgument( - scanContext.branch() == null, - "Cannot scan table using ref %s configured for streaming reader yet."); Preconditions.checkNotNull( table.currentSnapshot(), "Don't have any available snapshot in table."); @@ -195,7 +192,10 @@ void monitorAndForwardSplits() { // Refresh the table to get the latest committed snapshot. table.refresh(); - Snapshot snapshot = table.currentSnapshot(); + Snapshot snapshot = + scanContext.branch() != null + ? table.snapshot(scanContext.branch()) + : table.currentSnapshot(); if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) { long snapshotId = snapshot.snapshotId(); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java index 450b649253a4..e9e3c159b07b 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java @@ -104,7 +104,11 @@ private Snapshot toSnapshotInclusive( private ContinuousEnumerationResult discoverIncrementalSplits( IcebergEnumeratorPosition lastPosition) { - Snapshot currentSnapshot = table.currentSnapshot(); + Snapshot currentSnapshot = + scanContext.branch() != null + ? table.snapshot(scanContext.branch()) + : table.currentSnapshot(); + if (currentSnapshot == null) { // empty table Preconditions.checkArgument( diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 31e9733fcd60..bfd7fa5758e3 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -370,6 +370,92 @@ public void testSpecificSnapshotTimestamp() throws Exception { } } + @Test + public void testReadingFromBranch() throws Exception { + String branch = "b1"; + GenericAppenderHelper dataAppender = + new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER); + + List batchBase = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(batchBase); + + // create branch + tableResource + .table() + .manageSnapshots() + .createBranch(branch, tableResource.table().currentSnapshot().snapshotId()) + .commit(); + + // snapshot1 to branch + List batch1 = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch1); + + // snapshot2 to branch + List batch2 = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch2); + + List branchExpectedRecords = Lists.newArrayList(); + branchExpectedRecords.addAll(batchBase); + branchExpectedRecords.addAll(batch1); + branchExpectedRecords.addAll(batch2); + // reads from branch: it should contain the first snapshot (before the branch creation) followed + // by the next 2 snapshots added + ScanContext scanContext = + ScanContext.builder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10L)) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .useBranch(branch) + .build(); + + try (CloseableIterator iter = + createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { + List resultMain = waitForResult(iter, 6); + TestHelpers.assertRecords(resultMain, branchExpectedRecords, tableResource.table().schema()); + + // snapshot3 to branch + List batch3 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch3); + + List result3 = waitForResult(iter, 2); + TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + // snapshot4 to branch + List batch4 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch4); + + List result4 = waitForResult(iter, 2); + TestHelpers.assertRecords(result4, batch4, tableResource.table().schema()); + } + + // read only from main branch. Should contain only the first snapshot + scanContext = + ScanContext.builder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10L)) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + try (CloseableIterator iter = + createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { + List resultMain = waitForResult(iter, 2); + TestHelpers.assertRecords(resultMain, batchBase, tableResource.table().schema()); + + List batchMain2 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(batchMain2); + resultMain = waitForResult(iter, 2); + TestHelpers.assertRecords(resultMain, batchMain2, tableResource.table().schema()); + } + } + private DataStream createStream(ScanContext scanContext) throws Exception { // start the source and collect output StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -384,6 +470,7 @@ private DataStream createStream(ScanContext scanContext) throws Exception { .startSnapshotTimestamp(scanContext.startSnapshotTimestamp()) .startSnapshotId(scanContext.startSnapshotId()) .monitorInterval(Duration.ofMillis(10L)) + .branch(scanContext.branch()) .build(), WatermarkStrategy.noWatermarks(), "icebergSource", diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java index 09d5a5481aee..9e043bbbbbd2 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -33,6 +33,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.catalog.TableIdentifier; @@ -98,6 +99,11 @@ public void clean() { } private void insertRows(String partition, Table table, Row... rows) throws IOException { + insertRows(partition, SnapshotRef.MAIN_BRANCH, table, rows); + } + + private void insertRows(String partition, String branch, Table table, Row... rows) + throws IOException { GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory); GenericRecord gRecord = GenericRecord.create(table.schema()); @@ -111,12 +117,16 @@ private void insertRows(String partition, Table table, Row... rows) throws IOExc } if (partition != null) { - appender.appendToTable(TestHelpers.Row.of(partition, 0), records); + appender.appendToTable(TestHelpers.Row.of(partition, 0), branch, records); } else { - appender.appendToTable(records); + appender.appendToTable(branch, records); } } + private void insertRowsInBranch(String branch, Table table, Row... rows) throws IOException { + insertRows(null, branch, table, rows); + } + private void insertRows(Table table, Row... rows) throws IOException { insertRows(null, table, rows); } @@ -205,20 +215,130 @@ public void testConsumeFromBeginning() throws Exception { } @TestTemplate - public void testConsumeFilesWithBranch() throws Exception { + /** + * Insert records on the main branch. Then, insert in a named branch. Reads from the main branch + * and assert that the only records from main are returned + */ + public void testConsumeFilesFromMainBranch() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots on main branch Row row1 = Row.of(1, "aaa", "2021-01-01"); Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1, row2); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName).commit(); - Assertions.assertThatThrownBy( - () -> - exec( - "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/", - TABLE)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot scan table using ref b1 configured for streaming reader yet"); + // insert on the 'b1' branch + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + + insertRowsInBranch(branchName, table, row3, row4); + + // read from main + TableResult result = + exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE); + + try (CloseableIterator iterator = result.collect()) { + // the start snapshot(row2) is exclusive. + assertRows(ImmutableList.of(row1, row2), iterator); + + Row row5 = Row.of(5, "eee", "2021-01-01"); + Row row6 = Row.of(6, "fff", "2021-01-01"); + insertRows(table, row5, row6); + assertRows(ImmutableList.of(row5, row6), iterator); + + Row row7 = Row.of(7, "ggg", "2021-01-01"); + insertRows(table, row7); + assertRows(ImmutableList.of(row7), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @TestTemplate + /** + * Insert records on the main branch. Creates a named branch. Insert record on named branch. Then + * select from the named branch and assert all the records are returned. + */ + public void testConsumeFilesFromBranch() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots on main branch + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + + insertRows(table, row1, row2); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName).commit(); + + TableResult result = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branchName); + + try (CloseableIterator iterator = result.collect()) { + assertRows(ImmutableList.of(row1, row2), iterator); + // insert on the 'b1' branch + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + insertRowsInBranch(branchName, table, row3, row4); + assertRows(ImmutableList.of(row3, row4), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @TestTemplate + /** + * Insert records on branch b1. Then insert record on b2. Then select from each branch and assert + * the correct records are returned + */ + public void testConsumeFilesFromTwoBranches() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + String branch1 = "b1"; + String branch2 = "b2"; + table.manageSnapshots().createBranch(branch1).commit(); + table.manageSnapshots().createBranch(branch2).commit(); + + // Produce two snapshots on main branch + Row row1Branch1 = Row.of(1, "b1", "2021-01-01"); + Row row2Branch1 = Row.of(2, "b1", "2021-01-01"); + + Row row1Branch2 = Row.of(2, "b2", "2021-01-01"); + Row row2Branch2 = Row.of(3, "b3", "2021-01-01"); + + insertRowsInBranch(branch1, table, row1Branch1, row2Branch1); + insertRowsInBranch(branch2, table, row1Branch2, row2Branch2); + + TableResult resultBranch1 = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branch1); + + try (CloseableIterator iterator = resultBranch1.collect()) { + assertRows(ImmutableList.of(row1Branch1, row2Branch1), iterator); + Row another = Row.of(4, "ccc", "2021-01-01"); + insertRowsInBranch(branch1, table, another); + assertRows(ImmutableList.of(another), iterator); + } + + TableResult resultBranch2 = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branch2); + try (CloseableIterator iterator = resultBranch2.collect()) { + assertRows(ImmutableList.of(row1Branch2, row2Branch2), iterator); + Row another = Row.of(4, "ccc", "2021-01-01"); + insertRowsInBranch(branch2, table, another); + assertRows(ImmutableList.of(another), iterator); + } + + resultBranch1.getJobClient().ifPresent(JobClient::cancel); + resultBranch2.getJobClient().ifPresent(JobClient::cancel); } @TestTemplate