New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-9658: [Python] Python bindings for dataset writing #7921
ARROW-9658: [Python] Python bindings for dataset writing #7921
Conversation
eadb9fd
to
5fe91e7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking great, thanks for doing this!
python/pyarrow/_dataset.pyx
Outdated
# data is list of batches | ||
for batch in data: | ||
c_batches.push_back((<RecordBatch> batch).sp_batch) | ||
|
||
c_fragment = shared_ptr[CFragment]( | ||
new CInMemoryFragment(c_batches, _true.unwrap())) | ||
c_fragment_vector.push_back(c_fragment) | ||
|
||
with nogil: | ||
check_status( | ||
CFileSystemDataset.Write( | ||
c_schema, | ||
c_format, | ||
c_filesystem, | ||
c_base_dir, | ||
c_partitioning, | ||
c_context, | ||
MakeVectorIterator(c_fragment_vector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we only parallelize over written fragments, so since this only creates a single written fragment use_threads is effectively ignored. I'd recommend creating a fragment wrapping each batch. This will result in one or more files per input record batch however.
# data is list of batches | |
for batch in data: | |
c_batches.push_back((<RecordBatch> batch).sp_batch) | |
c_fragment = shared_ptr[CFragment]( | |
new CInMemoryFragment(c_batches, _true.unwrap())) | |
c_fragment_vector.push_back(c_fragment) | |
with nogil: | |
check_status( | |
CFileSystemDataset.Write( | |
c_schema, | |
c_format, | |
c_filesystem, | |
c_base_dir, | |
c_partitioning, | |
c_context, | |
MakeVectorIterator(c_fragment_vector) | |
# data is list of batches | |
for batch in data: | |
c_batches.push_back((<RecordBatch> batch).sp_batch) | |
c_fragment = shared_ptr[CFragment]( | |
new CInMemoryFragment(c_batches, _true.unwrap())) | |
c_batches.clear() | |
c_fragment_vector.push_back(c_fragment) | |
with nogil: | |
check_status( | |
CFileSystemDataset.Write( | |
c_schema, | |
c_format, | |
c_filesystem, | |
c_base_dir, | |
c_partitioning, | |
c_context, | |
MakeVectorIterator(move(c_fragment_vector)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is one of the aspects I wanted to ask: indeed right now by creating a single Fragment, it gets written to a single file. If you have a big table, you might want to make a file per batch. But on the other hand, that might also result in a lot of small files (certainly if you also split on some partition columns)? Eg in Parquet, you typically have files with multiple row groups (which might be somewhat comparable to multiple record batches)
So we should maybe have some configurability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bkietz So I updated this to enable the use case of writing multiple fragments, but by explicit choice by the user -> passing a single Table/RecordBatch = writing a single fragment (even if the Table has multiple batches), passing a list of Table/RecordBatches -> written as multiple fragments.
So if the user has a Table with multiple batches and wants to write this as multiple files instead of a single file (in case there is no partitioning), they can do write_dataset(table.to_batches)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, this provides a minimal handle on write parallelism; this will be necessary until the c++ implementation can be made more robustly parallel than "one thread per written fragment".
5fe91e7
to
9109dd3
Compare
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
2db7a25
to
18f5df4
Compare
Going to merge this, so I can add Parquet support in a follow-up PR. |
Do think it is possible to add in support to repartition datasets? I am facing some issues with many small files just due to the frequency that I need to download data, which is compounded by the partitions. I asked this on Jira as well but:
I would then want to read the data and repartition the files using a callback function so the dozens of files in partition ("date", "==", "2020-09-15") would become 2020-09-15.parquet, consolidated as a single file to keep things tidy. I know I can do this with Spark, but it would be nice to have a native pyarrow method. |
Closes apache#7921 from jorisvandenbossche/ARROW-9658-dataset-writing Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Closes apache#7921 from jorisvandenbossche/ARROW-9658-dataset-writing Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
No description provided.