-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28047: Iceberg: Major QB Compaction with a single commit #5052
base: master
Are you sure you want to change the base?
HIVE-28047: Iceberg: Major QB Compaction with a single commit #5052
Conversation
7e4a2de
to
db45274
Compare
db45274
to
2ab395d
Compare
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
Show resolved
Hide resolved
...erg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
Outdated
Show resolved
Hide resolved
|
||
List<DataFile> existingDataFiles = Lists.newArrayList(); | ||
List<DeleteFile> existingDeleteFiles = Lists.newArrayList(); | ||
IcebergTableUtil.getFiles(table, existingDataFiles, existingDeleteFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse FilesForCommit results
to get the dataFiles and deleteFiles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, because FilesForCommit contains only the compacted, new files. They don't contain existing data and delete files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the Major QB Compaction is essentially a IOW
operation, why don't we follow the regular IOW
flow? Why do we need the extra operation to find the existing data and delete files?
Just reuse the ReplacePartitions
api:
Lines 556 to 561 in e08a600
ReplacePartitions overwrite = transaction.newReplacePartitions(); | |
results.dataFiles().forEach(overwrite::addFile); | |
if (StringUtils.isNotEmpty(branchName)) { | |
overwrite.toBranch(HiveUtils.getTableSnapshotRef(branchName)); | |
} | |
overwrite.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because IOW isn't supported on tables that had schema evolution as per https://issues.apache.org/jira/browse/HIVE-26133, but we want to support compaction even for tables that have had schema evolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thx.
It seems that we have tradeoff about the change.
(twice commit) vs (single commit: loop to get and store all existing data and delte files)
I know twice commit will generate two snapshot and maybe get inconsistency of data when users reading table.
But for this change, if the original iceberg table has many many small data and delete files, and we need to loop and store these files in the List
, can this behavior would give memory pressure on HS?
I am not sure which way is the best one. ;(
Can other folks give some other thought? @deniskuzZ @ayushtkn @SourabhBadhya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@difin, do we know if DeleteFiles
API is doing the same full table listing when Expressions.alwaysTrue() filter is provided. If not, maybe we could extend OverwriteFiles
API to support delete filters?
…ve/IcebergTableUtil.java Code cleaning in getting Iceberg tables' data and delete files Co-authored-by: Butao Zhang <butaozhang1@163.com>
…ve/HiveIcebergOutputCommitter.java Code cleaning, removed unneeded call to stream() Co-authored-by: Butao Zhang <butaozhang1@163.com>
...erg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion this approach looks more expensive, maybe we should reach out to iceberg community with the. proposal to extend current API with atomic IOW semantic?
…d transaction API usage from IOW commit method.
I agree it is a little bit more expensive approach, but it doesn't have the correctness issue like the existing approach and this approach is used in another engines like Amoro, Trino and Impala is also in process of changing to this approach. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
@@ -328,4 +336,26 @@ public static PartitionData toPartitionData(StructLike key, Types.StructType key | |||
} | |||
return data; | |||
} | |||
|
|||
public static Pair<List<DataFile>, List<DeleteFile>> getDataAndDeleteFiles(Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should be doing
long startingSnapshotId = table.currentSnapshot().snapshotId();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
planFileGroups(startingSnapshotId);
CloseableIterable<FileScanTask> fileScanTasks =
table
.newScan()
.useSnapshot(startingSnapshotId)
.filter(filter)
.ignoreResiduals()
.planFiles();
see RewriteDataFilesSparkAction
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
What changes were proposed in this pull request?
Improvements to Hive Iceberg QB Major Compaction to perform compaction in one commit instead of two commits as was done till now.
Why are the changes needed?
Existing implementation of compaction creates 2 commits which creates 2 snapshots: first snapshot with all the files deleted and second snapshot with compacted files. If a user queries the table based on snapshot id of the first snapshot, the result would be invalid as no data is present in the table in that snapshot. To avoid this problem this PR is proposed.
Does this PR introduce any user-facing change?
No
Is the change a dependency upgrade?
No
How was this patch tested?
Hive contains 4 query tests for testing Hive Iceberg QB Major Compaction. The outputs of these q-tests were updated as part of this PR.