New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add total data size to Partitions table #7920
Conversation
cc: @szehon-ho as you are mostly working and reviewing this area. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks , yea I was chatting earlier with @hsiang-c for this :)
@@ -82,7 +82,12 @@ public class PartitionsTable extends BaseMetadataTable { | |||
10, | |||
"last_updated_snapshot_id", | |||
Types.LongType.get(), | |||
"Id of snapshot that last updated this partition")); | |||
"Id of snapshot that last updated this partition"), | |||
Types.NestedField.required( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this topic always comes up, but what do you think of the position? @ajantha-bhat @dramaticlly . Maybe its better after file_count? (so we have 3 columns for data, pos_delete, and eq_delete)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think what Szehon said make sense, given last 2 columns are optional and new column is required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I have already kept it beside file_count
for partition stats.
Note: Here we should not modify field id while reordering to maintain the compatibility.
"total_data_size_in_bytes", | ||
StreamSupport.stream( | ||
table.currentSnapshot().addedDataFiles(table.io()).spliterator(), false) | ||
.mapToLong(DataFile::fileSizeInBytes) | ||
.sum()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably worth extract a variable instead of inline the computation.
also I saw you added coverage for unpartitioned table only, shall we also add one for partitioned table to make sure it s data size in bytes match for each partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dramaticlly Thank you for your feedback.
Yes, we should add tests for partitioned table. I was able to do it for testPartitionsTable
and testPartitionsTableDeleteStats
but not testPartitionsTableLastUpdatedSnapshot
.
Will dig into it more today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dramaticlly I think I fixed testPartitionsTableLastUpdatedSnapshot
, please take a look, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just some style nits
11, | ||
"total_data_file_size_in_bytes", | ||
Types.LongType.get(), | ||
"Total bytes of data files in a partition"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 'total size in bytes'
@@ -2028,4 +2042,10 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) { | |||
public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) { | |||
return SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct(); | |||
} | |||
|
|||
private long getDataFileSizeInBytes(Iterable<DataFile> dataFiles) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can remove 'get' (Iceberg code style guideline are a bit different: https://iceberg.apache.org/contribute/#method-naming)
@@ -275,6 +283,7 @@ static class Partition { | |||
private int eqDeleteFileCount; | |||
private Long lastUpdatedMs; | |||
private Long lastUpdatedSnapshotId; | |||
private long dataFileSizeInBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we move after dataFileCount? (as its part of 'dataFile' group)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
speak of which, @szehon-ho do you feel we shall do the same in Schema method to move this new field with id 11 to be right after file_count (field id 3)? It seem to fit into same dataFile group by it might be some concern about reference by position to mess up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yea , i think that was the consensus from the other comemnt: #7920 (comment) @hsiang-c do you think we can move it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho Sure thing!
11, | ||
"total_data_file_size_in_bytes", | ||
Types.LongType.get(), | ||
"Total size in bytes"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, in my previous comment I meant just change "total bytes' => 'total size in bytes', but the rest was ok.
So can we revert back the original end of sentence where you talked about data files?
'Total size in bytes of data files' (maybe 'in a partition' was redundant there)
docs/flink-queries.md
Outdated
@@ -436,7 +436,7 @@ SELECT * FROM prod.db.table$partitions; | |||
| {20211002, 10} | 1 | 1 | 0 | | |||
|
|||
Note: | |||
For unpartitioned tables, the partitions table will contain only the record_count and file_count columns. | |||
For unpartitioned tables, the partitions table will contain only the record_count, file_count, position_delete_record_count, position_delete_file_count, equality_delete_record_count, equality_delete_file_count, last_updated_ms, last_updated_snapshot_id and total_data_file_size_in_bytes columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do this in another pr? I feel we need to edit the table above as well.
Also, I think we can just say 'For unpartitioned tables, the partitions table will not contain the partition and spec_id field', as the list of fields we do support is becoming too big.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, we can follow up with doc PR after this is merged
@@ -73,6 +73,8 @@ public class PartitionsTable extends BaseMetadataTable { | |||
"equality_delete_file_count", | |||
Types.IntegerType.get(), | |||
"Count of equality delete files"), | |||
Types.NestedField.required( | |||
11, "total_data_file_size_in_bytes", Types.LongType.get(), "Total size in bytes"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still not changed back? "Total size in bytes of data files" Sorry if its still pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also let's move it up to between 3 and 5 since it belong to data file group
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
@@ -1469,12 +1483,20 @@ public void testPartitionsTableLastUpdatedSnapshot() { | |||
new GenericRecordBuilder( | |||
AvroSchemaUtil.convert( | |||
partitionsTable.schema().findType("partition").asStructType(), "partition")); | |||
|
|||
List<DataFile> dataFilesFromFirstCommit = listDataFilesFromCommitId(table, firstCommitId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it work to make a method List dataFiles(table) to get all the data files, so we don't have to do add data files from both commits?
I did this before here: https://github.com/apache/iceberg/blob/master/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java#L682
(maybe we can do it without column stats here, to be shorter).
If we do this, we can even extract to TestHelpers in a later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho Thanks for pointing out! Adopted it.
If we do this, we can even extract to TestHelpers in a later PR.
+1, let's do the extraction in a later PR.
0373042
to
f29a865
Compare
return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); | ||
} | ||
|
||
private void assertDataFilePartitions(List<DataFile> dataFiles, int[] expectedPartitionIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can put back the size check.
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
Outdated
Show resolved
Hide resolved
Merged , thanks a lot @hsiang-c for the first contribution, and thanks @ajantha-bhat and @dramaticlly for additional reviews! |
(Feel free to make follow prs to update the docs) |
Closes #7896
This PR adds
total_data_file_size_in_bytes
to Partitions Table