Skip to content
Closed
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -79,6 +80,18 @@ public List<FileScanTask> tasks() {
public boolean lastIndexOfSnapshot() {
return lastIndexOfSnapshot;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("snapshotId", snapshotId())
.add("startFileIndex", startFileIndex())
.add("endFileIndex", endFileIndex())
.add("sizeInBytes", sizeInBytes())
.add("tasks", tasks())
.add("lastIndexOfSnapshot", lastIndexOfSnapshot())
.toString();
}
}

public static MicroBatchBuilder from(Snapshot snapshot, FileIO io) {
Expand Down Expand Up @@ -145,7 +158,7 @@ private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFil
/**
* Method to skip the manifest file in which the index is smaller than startFileIndex. For example, if the
* index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned manifest index list is:
* (m2, 3), (m3, 5).
* (m3, 5).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update to this java doc comment is what inspired this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is not correct. If the startFileIndex is 4, then "(m2, 3)" should not be skipped, the 4th index is included in "(m2, 3)"

*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip the processed manifests.
Expand Down
124 changes: 107 additions & 17 deletions core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ public void setupTableProperties() {
public void testGenerateMicroBatch() {
add(table.newAppend(), files("A", "B", "C", "D", "E"));

MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io())
MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, Long.MAX_VALUE, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 5);
Assert.assertEquals(batch.sizeInBytes(), 50);
Assert.assertTrue(batch.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A", "B", "C", "D", "E"), filesToScan(batch.tasks()));
Assert.assertEquals(batch0.snapshotId(), 1L);
Assert.assertEquals(batch0.startFileIndex(), 0);
Assert.assertEquals(batch0.endFileIndex(), 5);
Assert.assertEquals(batch0.sizeInBytes(), 50);
Assert.assertTrue(batch0.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A", "B", "C", "D", "E"), filesToScan(batch0.tasks()));

MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
Expand Down Expand Up @@ -89,19 +89,20 @@ public void testGenerateMicroBatch() {
public void testGenerateMicroBatchWithSmallTargetSize() {
add(table.newAppend(), files("A", "B", "C", "D", "E"));

MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io())
MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 10L, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 1);
Assert.assertEquals(batch.sizeInBytes(), 10);
Assert.assertFalse(batch.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch.tasks()));
Assert.assertEquals(batch0.snapshotId(), 1L);
Assert.assertEquals(batch0.startFileIndex(), 0);
Assert.assertEquals(batch0.endFileIndex(), 1);
Assert.assertEquals(batch0.sizeInBytes(), 10);
Assert.assertFalse(batch0.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch0.tasks()));

MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch.endFileIndex(), 5L, true);
.generate(batch0.endFileIndex(), 5L, true);
Assert.assertEquals(batch1.startFileIndex(), 1);
Assert.assertEquals(batch1.endFileIndex(), 2);
Assert.assertEquals(batch1.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
Expand Down Expand Up @@ -140,15 +141,104 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
Assert.assertTrue(batch5.lastIndexOfSnapshot());
}

private static DataFile file(String name) {
@Test
public void testMicroBatchRespectsRequestedMaximumSize() {
// Add files A-E, all of 10kb, and process in multiple microbatches of varying sizes,
// emulating perhaps a dynamically growing source (assuming the total batch is being
// built by one process, as in Flink or on the driver in Spark).
add(table.newAppend(), files("A", "B", "C", "D", "E"));

// Request 10kb - Receive file A.
MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 10L, true);
Assert.assertEquals(batch0.snapshotId(), 1L);
Assert.assertEquals(batch0.startFileIndex(), 0);
Assert.assertEquals(batch0.endFileIndex(), 1);
Assert.assertEquals(batch0.sizeInBytes(), 10);
Assert.assertFalse(batch0.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch0.tasks()));

// Request 30kb. Receive B, C, and D.
MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch0.endFileIndex(), 35L, false);
Assert.assertEquals(batch1.startFileIndex(), 1);
Assert.assertEquals(batch1.endFileIndex(), 4);
Assert.assertEquals(batch1.sizeInBytes(), 30);
filesMatch(Lists.newArrayList("B", "C", "D"), filesToScan(batch1.tasks()));
Assert.assertFalse(batch1.lastIndexOfSnapshot());

// Request 35kb - Receive File E which is the end of the input, only 10kb.
MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 35L, false);
Assert.assertEquals(4, batch2.startFileIndex());
Assert.assertEquals(5, batch2.endFileIndex());
Assert.assertEquals(10, batch2.sizeInBytes());
filesMatch(Lists.newArrayList("E"), filesToScan(batch2.tasks()));
}

@Test
public void testReadingSnapshotIsNotInterruptedByChildSnapshot() {
// Add files A-E, all of 10kb, and process the single generated snapshot
// in multiple microbatches.
add(table.newAppend(), files("A", "B", "C", "D", "E"));
Assert.assertEquals(1L, table.currentSnapshot().snapshotId());

// Request a batch of 40kb - Reads in A, B, C, and D.
MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 40L, false);
Assert.assertEquals(0, batch0.startFileIndex());
Assert.assertEquals(4, batch0.endFileIndex());
Assert.assertEquals(40, batch0.sizeInBytes());
filesMatch(Lists.newArrayList("A", "B", "C", "D"), filesToScan(batch0.tasks()));
Assert.assertFalse(batch0.lastIndexOfSnapshot());

// Concurrent write sometime after the start of the last batch and before the next batch.
final long sizeOfFileF = 25L;
add(table.newAppend(),
Collections.singletonList(fileWithSize("F", sizeOfFileF)));
Assert.assertEquals(2L, table.currentSnapshot().snapshotId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will generate a new snapshot, seems unrelated to the MicroBatch processed before, not sure what's the purpose of this testing.


// Read the last 10kb of Snapshot 1.
// Simulates desired stream behavior for example on Trigger.Once().
MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch0.endFileIndex(), 40L, false);
Assert.assertEquals(4, batch1.startFileIndex());
Assert.assertEquals(5, batch1.endFileIndex());
Assert.assertEquals(10, batch1.sizeInBytes());
filesMatch(Lists.newArrayList("E"), filesToScan(batch1.tasks()));
Assert.assertTrue(batch1.lastIndexOfSnapshot());

// Show that the next batch / snapshot can be read.
MicroBatch batch2 = MicroBatches.from(table.currentSnapshot(), table.io())
.specsById(table.specs())
.generate(0, 40L, true);
Assert.assertEquals(0, batch2.startFileIndex());
Assert.assertEquals(1, batch2.endFileIndex());
Assert.assertEquals(sizeOfFileF, batch2.sizeInBytes());
filesMatch(Lists.newArrayList("F"), filesToScan(batch2.tasks()));
Assert.assertTrue(batch2.lastIndexOfSnapshot());

}

private static DataFile fileWithSize(String name, long newFileSizeInBytes) {
return DataFiles.builder(SPEC)
.withPath(name + ".parquet")
.withFileSizeInBytes(10)
.withFileSizeInBytes(newFileSizeInBytes)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(1)
.build();
}

// Returns default parquet file with size of 10b and 1 record in bucket 0.
private static DataFile file(String name) {
return fileWithSize(name, 10L);
}

private static void add(AppendFiles appendFiles, List<DataFile> adds) {
for (DataFile f : adds) {
appendFiles.appendFile(f);
Expand Down