-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support merge manifests on writes (MergeAppend) #363
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start @HonahX Maybe we want to see if there are any things we can split out, such as the rolling manifest writer.
pyiceberg/table/__init__.py
Outdated
# TODO: need to re-consider the name here: manifest containing positional deletes and manifest containing deleted entries | ||
unmerged_deletes_manifests = [manifest for manifest in existing_manifests if manifest.content == ManifestContent.DELETES] | ||
|
||
data_manifest_merge_manager = ManifestMergeManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're changing the append operation from a fast-append to a regular append when it hits a threshold. I would be more comfortable with keeping the compaction separate. This way we know that an append/overwrite is always fast and in constant time. For example, if you have a process that appends data, you know how fast it will run (actually it is a function of the number of manifests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! Totally agree! I was thinking it might be a good time to bring FastAppend
and MergeAppend
to pyiceberg, making them inherit from a _SnapshotProducer
pyiceberg/table/__init__.py
Outdated
@@ -944,7 +949,8 @@ def append(self, df: pa.Table) -> None: | |||
if len(self.spec().fields) > 0: | |||
raise ValueError("Cannot write to partitioned tables") | |||
|
|||
merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self) | |||
# TODO: need to consider how to support both _MergeAppend and _FastAppend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to support both? This part of the Java code has been a major source of (hard to debug) problems. Splitting out the commit and compaction path completely would simplify that quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a good idea to have a separate API in UpdateSnapshot
in #446 to compact manifests only. However, I believe retaining MergeAppend is also necessary due to the commit.manifest-merge.enabled setting. This setting, when enabled (which is the default), leads users to expect automatic merging of manifests when they append/overwrite data, rather than having to compact manifest by another API. What do you think?
# Conflicts: # pyiceberg/table/__init__.py
# Conflicts: # pyiceberg/table/__init__.py
# Conflicts: # pyiceberg/table/__init__.py # tests/integration/test_writes.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @HonahX thanks for working on this and sorry for the late reply. I wanted to take the time to test this properly.
It looks like either the snapshot inheritance is not working properly, or something is off with the writer. I converted the Avro manifest files to JSON using avro-tools
, and noticed the following:
{
"status": 1,
"snapshot_id": {
"long": 6972473597951752000
},
"data_sequence_number": {
"long": -1
},
"file_sequence_number": {
"long": -1
},
...
}
{
"status": 0,
"snapshot_id": {
"long": 3438738529910612500
},
"data_sequence_number": {
"long": -1
},
"file_sequence_number": {
"long": -1
},
...
}
{
"status": 0,
"snapshot_id": {
"long": 1638533332780464400
},
"data_sequence_number": {
"long": 1
},
"file_sequence_number": {
"long": 1
},
....
}
Looks like either the snapshot inheritance is not working properly when rewriting the manifests.
@@ -355,6 +355,44 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w | |||
assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0] | |||
|
|||
|
|||
@pytest.mark.integration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you parameterize the test for both V1 and V2 tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to assert the manifest-entries as well (only for the merge-appended one).
MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100 | ||
|
||
MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled" | ||
MANIFEST_MERGE_ENABLED_DEFAULT = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add these to the docs as well? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for adding this @HonahX . Just one small nit, and otherwise looks good to me!
@@ -1091,7 +1111,7 @@ def append(self, df: pa.Table) -> None: | |||
_check_schema(self.schema(), other_schema=df.schema) | |||
|
|||
with self.transaction() as txn: | |||
with txn.update_snapshot().fast_append() as update_snapshot: | |||
with txn.update_snapshot().merge_append() as update_snapshot: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we update the new add_files method to also use merge_append
?
That seems to be the default choice of snapshot producer in Java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@syun64 Could you elaborate on the motivation to pick merge-append over a fast-append? For Java, it is for historical reasons since the fast-append was added later. The fast-append creates more metadata but also has:
- Takes less time to commit, since it doesn't rewrite any existing manifests. This reduces the chances of having a conflict.
- The time it takes to commit is more predictable and fairly constant to the number of data files that are written.
- When you static-overwrite partitions as you do in your typical ETL, it will speed up the deletes since it can just drop a whole manifest that the previous fast-append has produced.
The main downside is when you do full-table scans that you need to evaluate more metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good argument @Fokko . Especially in a world where we are potentially moving the work of doing table scans into the Rest Catalog, compacting manifests on write isn't important for this function that already looks to prioritize commit speed over anything else.
I think it makes sense to leave the function to use fast_append and let the users rely on other means of optimizing their table scans.
Add
MergeAppendFiles
. This PR will enable the following configurations:commit.manifest-merge.enabled
: Controls whether to automatically merge manifests on writes.commit.manifest.min-count-to-merge
: Minimum number of manifests to accumulate before merging.commit.manifest.target-size-bytes
: Target size when merging manifest files.Since
commit.manifest-merge.enabled
is default toTrue
, we need to makeMergeAppend
as the default way to append data to align with the property definition and java implementation