diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 21ae3f7f668a..7e1cb05445e6 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -38,6 +38,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -1485,11 +1486,9 @@ public void testPartitionsTableLastUpdatedSnapshot() { AvroSchemaUtil.convert( partitionsTable.schema().findType("partition").asStructType(), "partition")); - List dataFilesFromFirstCommit = dataFiles(table, firstCommitId); - assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2)); - - List dataFilesFromSecondCommit = dataFiles(table, secondCommitId); - assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2)); + List dataFiles = dataFiles(table); + Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size()); + assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2)); List expected = Lists.newArrayList(); expected.add( @@ -1497,7 +1496,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1513,8 +1512,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("file_count", 2) .set( "total_data_file_size_in_bytes", - dataFilesFromFirstCommit.get(1).fileSizeInBytes() - + dataFilesFromSecondCommit.get(0).fileSizeInBytes()) + dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1552,7 +1550,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -2061,23 +2059,18 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private List dataFiles(Table table, long commitId) { - return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io())); + private List dataFiles(Table table) { + CloseableIterable tasks = table.newScan().planFiles(); + return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); } private void assertDataFilePartitions( - List dataFilesFromCommit, - int expectedDataFileCount, - List expectedPartitionIds) { - Assert.assertEquals( - "Commit should have " + expectedDataFileCount + " data files", - expectedDataFileCount, - dataFilesFromCommit.size()); - for (int i = 0; i < expectedDataFileCount; ++i) { + List dataFiles, List expectedPartitionIds) { + for (int i = 0; i < dataFiles.size(); ++i) { Assert.assertEquals( "Data file should have partition of id " + expectedPartitionIds.get(i), expectedPartitionIds.get(i).intValue(), - dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue()); + dataFiles.get(i).partition().get(0, Integer.class).intValue()); } } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index c32eb6192515..9f152ea91b6f 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -37,6 +37,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; @@ -1490,11 +1491,9 @@ public void testPartitionsTableLastUpdatedSnapshot() { AvroSchemaUtil.convert( partitionsTable.schema().findType("partition").asStructType(), "partition")); - List dataFilesFromFirstCommit = dataFiles(table, firstCommitId); - assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2)); - - List dataFilesFromSecondCommit = dataFiles(table, secondCommitId); - assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2)); + List dataFiles = dataFiles(table); + Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size()); + assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2)); List expected = Lists.newArrayList(); expected.add( @@ -1502,7 +1501,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1518,8 +1517,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("file_count", 2) .set( "total_data_file_size_in_bytes", - dataFilesFromFirstCommit.get(1).fileSizeInBytes() - + dataFilesFromSecondCommit.get(0).fileSizeInBytes()) + dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1557,7 +1555,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -2198,23 +2196,18 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private List dataFiles(Table table, long commitId) { - return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io())); + private List dataFiles(Table table) { + CloseableIterable tasks = table.newScan().planFiles(); + return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); } private void assertDataFilePartitions( - List dataFilesFromCommit, - int expectedDataFileCount, - List expectedPartitionIds) { - Assert.assertEquals( - "Commit should have " + expectedDataFileCount + " data files", - expectedDataFileCount, - dataFilesFromCommit.size()); - for (int i = 0; i < expectedDataFileCount; ++i) { + List dataFiles, List expectedPartitionIds) { + for (int i = 0; i < dataFiles.size(); ++i) { Assert.assertEquals( "Data file should have partition of id " + expectedPartitionIds.get(i), expectedPartitionIds.get(i).intValue(), - dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue()); + dataFiles.get(i).partition().get(0, Integer.class).intValue()); } } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 2d0bd55d2af2..0ac38e2562fa 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -40,6 +40,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; @@ -1488,11 +1489,9 @@ public void testPartitionsTableLastUpdatedSnapshot() { .orderBy("partition.id") .collectAsList(); - List dataFilesFromFirstCommit = dataFiles(table, firstCommitId); - assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2)); - - List dataFilesFromSecondCommit = dataFiles(table, secondCommitId); - assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2)); + List dataFiles = dataFiles(table); + Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size()); + assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2)); GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions")); @@ -1506,7 +1505,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1522,8 +1521,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("file_count", 2) .set( "total_data_file_size_in_bytes", - dataFilesFromFirstCommit.get(1).fileSizeInBytes() - + dataFilesFromSecondCommit.get(0).fileSizeInBytes()) + dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1561,7 +1559,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -2265,23 +2263,18 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private List dataFiles(Table table, long commitId) { - return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io())); + private List dataFiles(Table table) { + CloseableIterable tasks = table.newScan().planFiles(); + return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); } private void assertDataFilePartitions( - List dataFilesFromCommit, - int expectedDataFileCount, - List expectedPartitionIds) { - Assert.assertEquals( - "Commit should have " + expectedDataFileCount + " data files", - expectedDataFileCount, - dataFilesFromCommit.size()); - for (int i = 0; i < expectedDataFileCount; ++i) { + List dataFiles, List expectedPartitionIds) { + for (int i = 0; i < dataFiles.size(); ++i) { Assert.assertEquals( "Data file should have partition of id " + expectedPartitionIds.get(i), expectedPartitionIds.get(i).intValue(), - dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue()); + dataFiles.get(i).partition().get(0, Integer.class).intValue()); } } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index dc494884fb94..039b9a237a35 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -39,6 +39,7 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; @@ -1492,11 +1493,9 @@ public void testPartitionsTableLastUpdatedSnapshot() { AvroSchemaUtil.convert( partitionsTable.schema().findType("partition").asStructType(), "partition")); - List dataFilesFromFirstCommit = dataFiles(table, firstCommitId); - assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2)); - - List dataFilesFromSecondCommit = dataFiles(table, secondCommitId); - assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2)); + List dataFiles = dataFiles(table); + Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size()); + assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2)); List expected = Lists.newArrayList(); expected.add( @@ -1504,7 +1503,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1520,8 +1519,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("file_count", 2) .set( "total_data_file_size_in_bytes", - dataFilesFromFirstCommit.get(1).fileSizeInBytes() - + dataFilesFromSecondCommit.get(0).fileSizeInBytes()) + dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -1559,7 +1557,7 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) - .set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes()) + .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) .set("equality_delete_record_count", 0L) @@ -2262,23 +2260,18 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private List dataFiles(Table table, long commitId) { - return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io())); + private List dataFiles(Table table) { + CloseableIterable tasks = table.newScan().planFiles(); + return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); } private void assertDataFilePartitions( - List dataFilesFromCommit, - int expectedDataFileCount, - List expectedPartitionIds) { - Assert.assertEquals( - "Commit should have " + expectedDataFileCount + " data files", - expectedDataFileCount, - dataFilesFromCommit.size()); - for (int i = 0; i < expectedDataFileCount; ++i) { + List dataFiles, List expectedPartitionIds) { + for (int i = 0; i < dataFiles.size(); ++i) { Assert.assertEquals( "Data file should have partition of id " + expectedPartitionIds.get(i), expectedPartitionIds.get(i).intValue(), - dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue()); + dataFiles.get(i).partition().get(0, Integer.class).intValue()); } } }