Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: Next steps / requirements to support append files #329

Open
Tracked by #346
marvinlanhenke opened this issue Apr 7, 2024 · 10 comments
Open
Tracked by #346

Comments

@marvinlanhenke
Copy link
Contributor

...out of curiosity, I took a closer look at the pyiceberg impl and how the Table.append() works.

Now, I would like to pick your brain, in order to understand and track the next steps we have to take to support append as well (since we should be getting close to having write support). The goal here is, to extract and create actionable issues.

Here is what I understand from the python impl so far (high-level):

  1. we call append() on the Table class with our DataFrame: pa.Table and the snaphot_properties: Dict[str, str]
  2. we create a Transaction that basically does two things:
    2.1. It creates a _MergingSnapshotProducer which is (on a high-level) responsible for writing a new ManifestList, creating a new Snapshot (returned as AddSnaphotUpdate)
    2.2 It calls update_table on the respective Catalog which creates a new metadata.json and returns the new metadata as well as the new metadata_location

pyiceberg-link

Here is what I think we need to implement (rough sketch):

  1. implfn append(...) on struct Table:
    This should probably accept a RecordBatch as a param, create a new Transaction, and delegates further action to the transaction.
  2. implfn append(...) on struct Transaction:
    Receives RecordBatch and snapshot_properties. Performs validation checks. Converts the RecordBatch to a collection of DataFiles and creates a _MergingSnapshotProducer with the collection.
  3. impl_MergingSnapshotProducer:
    :: write manifests (added, deleted, existing)
    :: get next_sequence_number from TableMetadata
    :: update snapshot summaries
    :: generate manifest_list_path
    :: write manifest_list
    :: create a new Snapshot
    :: return TableUpdate: AddSnapshot
  4. impl update_table on the concrete Catalog implementations

What could be possible Issues here?
I think we need to start with the _MergingSnapshotProducer (possibly split into mutliple parts) and work our way up the list?
Once we have the MergingSnapshotProducer, we can implement the append function on Transaction which basically orchestrates?

@marvinlanhenke
Copy link
Contributor Author

@marvinlanhenke marvinlanhenke changed the title Discussion: Next Steps / Requirement to support append files Discussion: Next steps / requirements to support append files Apr 7, 2024
@sdd
Copy link
Contributor

sdd commented Apr 7, 2024

Thanks for spending the time thinking about this and putting your thoughts into words. I need to spend some time re-reading the associated parts of the spec and looking through the Java and possibly python implementations before being able to comment. I should get chance tomorrow.

@ZENOTME
Copy link
Contributor

ZENOTME commented Apr 7, 2024

I'm not sure whether my understanding is correct:
The target of table.append() is used to insert a batch of data into the table. It's seems like a high level API which will use two lower API:

  1. writer API for convert RecordBatch to DataFile
  2. transaction API for commit the DataFile(update the table metadata)

To separate these two interfaces, I think we don't need to delegate the conversion between RecordBatch and DataFile in the transaction.

@marvinlanhenke
Copy link
Contributor Author

I'm not sure whether my understanding is correct: The target of table.append() is used to insert a batch of data into the table. It's seems like a high level API which will use two lower API:

  1. writer API for convert RecordBatch to DataFile
  2. transaction API for commit the DataFile(update the table metadata)

To separate these two interfaces, I think we don't need to delegate the conversion between RecordBatch and DataFile in the transaction.

I think your understanding is correct - and I agree if the writer API already does the conversion from RecordBatch to DataFile, the Transaction shouldn't be concerned with this issue, since it is a higher-level API. However, the Transaction calls the writer that writes the actual DataFile, which seems reasonable.

So the Transaction append (if I understand the py impl correctly) does all of those things:

  • calling the writer to write the DataFile
  • create an instance of MergingSnapshotProducer -> responsible for writing the manifest, manifest_list, snapshot_update
  • commit -> update_table() on the Catalog with TableUpdate & TableRequirements

@ZENOTME
Where would the writer API (which I only know from the design spec in #34) fit best here? Should a Transaction create a new writer everytime a new transaction is created? Or should the Table itself hold a ref to a writer?

@viirya
Copy link
Member

viirya commented Apr 7, 2024

calling the writer to write the DataFile
create an instance of MergingSnapshotProducer -> responsible for writing the manifest, manifest_list, snapshot_update
commit -> update_table() on the Catalog with TableUpdate & TableRequirements

If any error happens during generating metadata relation info like manifest etc., as the writer already wrote DataFiles, should we go to delete the written DataFiles?

I think your understanding is correct - and I agree if the writer API already does the conversion from RecordBatch to DataFile, the Transaction shouldn't be concerned with this issue, since it is a higher-level API. However, the Transaction calls the writer that writes the actual DataFile, which seems reasonable.

I think this is also what the python implementation does. In Transaction.append, it calls _dataframe_to_data_files to generate DataFiles based on the pa.Table.

we create a Transaction that basically does two things:
2.1. It creates a _MergingSnapshotProducer which is (on a high-level) responsible for writing a new ManifestList, creating a new Snapshot (returned as AddSnaphotUpdate)

Yea, specifically, it is a FastAppendFiles for appending files. Although the manifest commit logic is actually implemented in _MergingSnapshotProducer.

@sdd
Copy link
Contributor

sdd commented Apr 9, 2024

This should probably accept a RecordBatch as a param, create a new Transaction, and delegates further action to the transaction.

Is there a reason why append wouldn't take a RecordBatchStream? It would permit us to make appends that are larger than would fit into memory, if the underlying IO method (eg multipart upload) supported it. I for one would find this useful.

If any error happens during generating metadata relation info like manifest etc., as the writer already wrote DataFiles, should we go to delete the written DataFiles?

I think that this becomes the responsibility of the https://iceberg.apache.org/docs/latest/maintenance/#delete-orphan-files maintenance task, rather than the writer. If we decide that the writer could attempt to do this, it should be optional. This would slow down writes in the case where there is a lot of write contention.

@Fokko
Copy link
Contributor

Fokko commented Apr 16, 2024

@marvinlanhenke Sorry for being late to the party here. Appending a file is rather straightforward, but all the conditions must be met. This is the high-level way of appending a file:

  • Write a Parquet file with the field IDs populated.
  • Collect the metrics to populate the statistics in the manifest file. We do this in PyIceberg here.
  • Write the snapshot following the concept of a fast-append. A normal append will append the new files to an existing manifest, and a fast-append will write a new manifest file with the new entries. This is much easier to implement, since you don't have to worry about sequence-number inheritance and such.
  • Rewrite the manifest-list to add the newly created manifest.
  • Generate a snapshot summary
  • Update the metadata. When you are using a traditional catalog like Glue and Hive, this can be a bit of work. If you use the Iceberg REST catalog, this is much easier since it is the responsibility of the REST catalog.

calling the writer to write the DataFile

I think this is also what the python implementation does. In Transaction.append, it calls _dataframe_to_data_files to generate DataFiles based on the pa.Table.

In PyIceberg we have _dataframe_to_data_files that writes out the Arrow table to one or more Parquet files. Then we collect all the statistics and return a Datafile that can be appended to the table. I hope in the future that we can push this down to iceberg-rust :)

If any error happens during generating metadata relation info like manifest etc., as the writer already wrote DataFiles, should we go to delete the written DataFiles?

Iceberg Java does this best effort. If it fails, it tries to clean it up, but it is always possible that this won't happen (Looking at you OOMs). This is where the maintenance tasks kick in, as @sdd already pointed out.

Talking about prioritization: Things can happen in parallel. For example, something simpler like updating table properties will make sure that the commit path is in place. The Snapshot summary generation can be a PR. The same goes for collecting the column metrics.

@liurenjie1024
Copy link
Collaborator

I think to implement appending data file, there are two main tasks:

  1. Implement transaction api to append data file
  2. Implement file writer to write record batches to parquet files, and generate data file structs.

Currently there is no design or plan for 1, and @ZENOTME is working on 2.

@liurenjie1024
Copy link
Collaborator

I've compiled a doc for discussing roadmaps and features for iceberg-rust, welcome to share you thoughts and feel free to add what's in your mind. cc @viirya @marvinlanhenke

@viirya
Copy link
Member

viirya commented Apr 25, 2024

Thanks @liurenjie1024. The roadmaps doc looks good to me. I added a few items under DataFusion integration. Feel free to modify it. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants