Skip to content

Commit

Permalink
Check partition id for all data files
Browse files Browse the repository at this point in the history
  • Loading branch information
hsiang-c committed Jul 7, 2023
1 parent 32074d6 commit f29a865
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 80 deletions.
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -1485,19 +1486,17 @@ public void testPartitionsTableLastUpdatedSnapshot() {
AvroSchemaUtil.convert(
partitionsTable.schema().findType("partition").asStructType(), "partition"));

List<DataFile> dataFilesFromFirstCommit = dataFiles(table, firstCommitId);
assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2));

List<DataFile> dataFilesFromSecondCommit = dataFiles(table, secondCommitId);
assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2));
List<DataFile> dataFiles = dataFiles(table);
Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size());
assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));

List<GenericData.Record> expected = Lists.newArrayList();
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand All @@ -1513,8 +1512,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("file_count", 2)
.set(
"total_data_file_size_in_bytes",
dataFilesFromFirstCommit.get(1).fileSizeInBytes()
+ dataFilesFromSecondCommit.get(0).fileSizeInBytes())
dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -1552,7 +1550,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -2061,23 +2059,18 @@ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
}

private List<DataFile> dataFiles(Table table, long commitId) {
return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io()));
private List<DataFile> dataFiles(Table table) {
CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
}

private void assertDataFilePartitions(
List<DataFile> dataFilesFromCommit,
int expectedDataFileCount,
List<Integer> expectedPartitionIds) {
Assert.assertEquals(
"Commit should have " + expectedDataFileCount + " data files",
expectedDataFileCount,
dataFilesFromCommit.size());
for (int i = 0; i < expectedDataFileCount; ++i) {
List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
for (int i = 0; i < dataFiles.size(); ++i) {
Assert.assertEquals(
"Data file should have partition of id " + expectedPartitionIds.get(i),
expectedPartitionIds.get(i).intValue(),
dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue());
dataFiles.get(i).partition().get(0, Integer.class).intValue());
}
}
}
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -1490,19 +1491,17 @@ public void testPartitionsTableLastUpdatedSnapshot() {
AvroSchemaUtil.convert(
partitionsTable.schema().findType("partition").asStructType(), "partition"));

List<DataFile> dataFilesFromFirstCommit = dataFiles(table, firstCommitId);
assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2));

List<DataFile> dataFilesFromSecondCommit = dataFiles(table, secondCommitId);
assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2));
List<DataFile> dataFiles = dataFiles(table);
Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size());
assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));

List<GenericData.Record> expected = Lists.newArrayList();
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand All @@ -1518,8 +1517,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("file_count", 2)
.set(
"total_data_file_size_in_bytes",
dataFilesFromFirstCommit.get(1).fileSizeInBytes()
+ dataFilesFromSecondCommit.get(0).fileSizeInBytes())
dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -1557,7 +1555,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -2198,23 +2196,18 @@ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
}

private List<DataFile> dataFiles(Table table, long commitId) {
return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io()));
private List<DataFile> dataFiles(Table table) {
CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
}

private void assertDataFilePartitions(
List<DataFile> dataFilesFromCommit,
int expectedDataFileCount,
List<Integer> expectedPartitionIds) {
Assert.assertEquals(
"Commit should have " + expectedDataFileCount + " data files",
expectedDataFileCount,
dataFilesFromCommit.size());
for (int i = 0; i < expectedDataFileCount; ++i) {
List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
for (int i = 0; i < dataFiles.size(); ++i) {
Assert.assertEquals(
"Data file should have partition of id " + expectedPartitionIds.get(i),
expectedPartitionIds.get(i).intValue(),
dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue());
dataFiles.get(i).partition().get(0, Integer.class).intValue());
}
}
}
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -1488,11 +1489,9 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.orderBy("partition.id")
.collectAsList();

List<DataFile> dataFilesFromFirstCommit = dataFiles(table, firstCommitId);
assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2));

List<DataFile> dataFilesFromSecondCommit = dataFiles(table, secondCommitId);
assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2));
List<DataFile> dataFiles = dataFiles(table);
Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size());
assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));

GenericRecordBuilder builder =
new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions"));
Expand All @@ -1506,7 +1505,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand All @@ -1522,8 +1521,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("file_count", 2)
.set(
"total_data_file_size_in_bytes",
dataFilesFromFirstCommit.get(1).fileSizeInBytes()
+ dataFilesFromSecondCommit.get(0).fileSizeInBytes())
dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -1561,7 +1559,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -2265,23 +2263,18 @@ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
}

private List<DataFile> dataFiles(Table table, long commitId) {
return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io()));
private List<DataFile> dataFiles(Table table) {
CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
}

private void assertDataFilePartitions(
List<DataFile> dataFilesFromCommit,
int expectedDataFileCount,
List<Integer> expectedPartitionIds) {
Assert.assertEquals(
"Commit should have " + expectedDataFileCount + " data files",
expectedDataFileCount,
dataFilesFromCommit.size());
for (int i = 0; i < expectedDataFileCount; ++i) {
List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
for (int i = 0; i < dataFiles.size(); ++i) {
Assert.assertEquals(
"Data file should have partition of id " + expectedPartitionIds.get(i),
expectedPartitionIds.get(i).intValue(),
dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue());
dataFiles.get(i).partition().get(0, Integer.class).intValue());
}
}
}
Expand Up @@ -39,6 +39,7 @@
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -1492,19 +1493,17 @@ public void testPartitionsTableLastUpdatedSnapshot() {
AvroSchemaUtil.convert(
partitionsTable.schema().findType("partition").asStructType(), "partition"));

List<DataFile> dataFilesFromFirstCommit = dataFiles(table, firstCommitId);
assertDataFilePartitions(dataFilesFromFirstCommit, 2, Arrays.asList(1, 2));

List<DataFile> dataFilesFromSecondCommit = dataFiles(table, secondCommitId);
assertDataFilePartitions(dataFilesFromSecondCommit, 1, Arrays.asList(2));
List<DataFile> dataFiles = dataFiles(table);
Assert.assertEquals("Table should have 3 data files", 3, dataFiles.size());
assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));

List<GenericData.Record> expected = Lists.newArrayList();
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand All @@ -1520,8 +1519,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("file_count", 2)
.set(
"total_data_file_size_in_bytes",
dataFilesFromFirstCommit.get(1).fileSizeInBytes()
+ dataFilesFromSecondCommit.get(0).fileSizeInBytes())
dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -1559,7 +1557,7 @@ public void testPartitionsTableLastUpdatedSnapshot() {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
.set("total_data_file_size_in_bytes", dataFilesFromFirstCommit.get(0).fileSizeInBytes())
.set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
Expand Down Expand Up @@ -2262,23 +2260,18 @@ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
}

private List<DataFile> dataFiles(Table table, long commitId) {
return Lists.newArrayList(table.snapshot(commitId).addedDataFiles(table.io()));
private List<DataFile> dataFiles(Table table) {
CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
}

private void assertDataFilePartitions(
List<DataFile> dataFilesFromCommit,
int expectedDataFileCount,
List<Integer> expectedPartitionIds) {
Assert.assertEquals(
"Commit should have " + expectedDataFileCount + " data files",
expectedDataFileCount,
dataFilesFromCommit.size());
for (int i = 0; i < expectedDataFileCount; ++i) {
List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
for (int i = 0; i < dataFiles.size(); ++i) {
Assert.assertEquals(
"Data file should have partition of id " + expectedPartitionIds.get(i),
expectedPartitionIds.get(i).intValue(),
dataFilesFromCommit.get(i).partition().get(0, Integer.class).intValue());
dataFiles.get(i).partition().get(0, Integer.class).intValue());
}
}
}

0 comments on commit f29a865

Please sign in to comment.