Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-8061: [C++][Dataset] Provide RowGroup fragments for ParquetFileFormat #6670

Closed
wants to merge 12 commits into from

Conversation

bkietz
Copy link
Member

@bkietz bkietz commented Mar 19, 2020

Provides ParquetFileFragment, which may view a subset of row groups within a parquet file. The indices of viewed row groups are available through the row_groups() property which is exposed to python. Construction of subset-viewing ParquetFileFragments is not yet exposed to python.

@github-actions
Copy link

@bkietz bkietz force-pushed the 8061-Ability-to-specify-granul branch from 1b9fca0 to 3515126 Compare March 19, 2020 20:30
@bkietz bkietz marked this pull request as ready for review March 19, 2020 20:30
@bkietz bkietz force-pushed the 8061-Ability-to-specify-granul branch from 9f64e84 to 4f658b2 Compare March 19, 2020 21:18
@fsaintjacques fsaintjacques self-requested a review March 23, 2020 16:34
cpp/src/arrow/dataset/file_parquet.cc Outdated Show resolved Hide resolved
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
@bkietz bkietz force-pushed the 8061-Ability-to-specify-granul branch from 69c243e to 3757e2f Compare March 23, 2020 20:57
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
@jorisvandenbossche
Copy link
Member

I was just trying this out, and the row_groups property seems to work correctly for a dataset that I read with dataset(..).
But, to have this usable from Python, we still need a way to control this behaviour (as right now I cannot create a Fragment directly):

  • Do we want a discovery option indicating to use row groups instead of files as fragment granularity? Or should this be part of instead of ParquetFileFormatReaderOptions instead of FileSystemFactoryOptions? (but the reader options are generally just for reading, not for discovery, so that might be a bit weird) Or do we need a ParquetFactoryOptions ... ?

  • Do we want to be able to construct a ParquetFileFragment in the python API? And then we can optionally provide a row_groups index/indices. But, to make this useful, we probably also need to be able to create a dataset from a fragment (or be able to scan a fragment from the python API)

@bkietz
Copy link
Member Author

bkietz commented Mar 24, 2020

@jorisvandenbossche my intention was that fragments would exclusively be discovered containing all row groups for a file. Fragments with a refined row group selection could then be created from these whole-file Fragments as desired:

def single_row_group_fragments(parquet_dataset, filter, columns):
    for fragment in parquet_dataset.get_fragments(filter=filter, columns=columns):
        for row_group in range(fragment.metadata.num_row_groups):
            yield parquet_dataset.format.make_fragment(fragment.path, row_groups=[row_group])

Fragments can be scanned in C++ so I can expose that to Python as well:

assert first_row_group_fragment.row_groups == {0}
first_row_group_fragment.scan(memory_pool)
# NB: filter, columns already specified in get_fragments. See ARROW-8065

@jorisvandenbossche
Copy link
Member

One potential advantage of having this at the dataset level, is that dataset.get_fragments(filter=...) could yield only those fragments / row groups that respect the filter.

Now, an API where we can construct / scan fragments would certainly already be useful as well.

@jorisvandenbossche
Copy link
Member

Another potential problem is that, assuming discovery happens only for full files, for an application like dask where the fragments are then re-created per row group, dask needs to know the number of row groups per file.
Leaving this logic to the user to create those fragments, means that they will need to open the metadata for each parquet file. Which might interfere with how we are going to handle things like _metadata (although it depends on how we solve that issue). Eg, it might mean that we should expose this metadata as well on the dataset level / have a mapping of this metadata to the fragments (if we want to avoid reading the metadata again).

(just thinking out loud)

@bkietz
Copy link
Member Author

bkietz commented Mar 24, 2020

I see. In that case, in C++ I'll add ParquetFileFormat::GetRowGroupFragments(dataset, max_row_groups_per_fragment, ...) and expose this in Python as a special case of FileSystemDataset.get_fragments():

for fragment in parquet_dataset.get_fragments(filter, columns,
                                              max_row_groups_per_fragment=1):
    yield from fragment.scan(memory_pool)

WRT using _metadata: that seems like a separate problem. Once we have agreed on a mechanism for sharing metadata between files we'll use that to reimplement ParquetFileFragment.metadata

@fsaintjacques
Copy link
Contributor

We have the same problem with number of rows. We need to expose a lazy accessor or something like this. This is related to the _metadata file where we load this information once (and the partitions + statistics).

I'd say leave it out for now?

Copy link
Contributor

@fsaintjacques fsaintjacques left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for me on the C++ side, I'll let @jorisvandenbossche decide on the python part.

@bkietz bkietz force-pushed the 8061-Ability-to-specify-granul branch from 2bf51db to 5d890dc Compare March 25, 2020 00:46
@jorisvandenbossche
Copy link
Member

This example segfaults for me:

import pyarrow as pa
import pyarrow.parquet as pq  

table = pa.table({'a': ['a', 'a', 'b', 'b'], 'b': [1, 2, 3, 4]}) 
pq.write_to_dataset(table, "test_parquet_dataset", partition_cols=["a"]) 

import pyarrow.dataset as ds  
dataset = ds.dataset("test_parquet_dataset/", format="parquet", partitioning="hive")
fragments = list(dataset.get_fragments()) 
f = fragments[0] 
parquet_format = f.format 
parquet_format.make_fragment(f.path, f.filesystem, partition_expression=f.partition_expression)

@jorisvandenbossche
Copy link
Member

Another question: shouldn't there be a schema involved in the creation of a Fragment?
How can it otherwise reflect a possibly unified schema of a dataset?

@bkietz
Copy link
Member Author

bkietz commented Mar 25, 2020

@jorisvandenbossche I've added an optional schema argument to make_fragment (the default is to inspect the fragment and infer a schema). I've also added a test similar to your segfaulting case which passes here

@jorisvandenbossche
Copy link
Member

Thanks!

So now the snippet above doesn't segfault, but creating a fragment with a row group specified still does:

parquet_format.make_fragment(
    f.path, f.filesystem, partition_expression=f.partition_expression, row_groups={1})

Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do some more thorough testing tomorrow (I can also push some more python docstrings / tests then)

python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
@@ -400,6 +438,34 @@ cdef class Fragment:
"""
return Expression.wrap(self.fragment.partition_expression())

def scan(self, MemoryPool memory_pool=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be able to pass a columns/filter arguments here? (similar as Dataset.scan and Scanner)

Copy link
Member Author

@bkietz bkietz Mar 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, these options are already specified in Dataset.get_fragments(). I could add optional parameters which would refine the filter/projection further but this would involve reconstructing the fragment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, they are specified there indeed, if creating from a Dataset. But when re-creating a new fragment from its parts, it's still useful to specify filter/columns

Copy link
Member Author

@bkietz bkietz Mar 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fragment creation can be accomplished with FileFormat.make_fragment, which does take filter/columns. Please compare this method to Scanner.scan(self) rather than Dataset.scan(self, filter, columns, ...). I'm opposed to putting any more parameters here as they're already specified elsewhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's desired that fragment store the filter, is that a temporary solution until ARROW-8065?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think most of the discussion / disagreement is indeed to bring back to that issue.

I find it strange to tie the filter/columns to fragment creation, as for me conceptually that are scanning options. And if we decide to remove ScanOptions from the fragment creation, that would indeed entail that?

@jorisvandenbossche
Copy link
Member

I am trying this out with my dask POC, and running into the following issue: I would basically need a Fragment.to_table() method. There is a scan method, but with that I need to loop through the tasks / execute them / combine into a table in python, which gives unnecessary overhead (and re-acquiring the GIL constantly, which will make it inefficient for dask).
Or alternatively, if I could turn the single fragment into a dataset or scanner, can also use to the to_table method from there.

I am also still getting segfaults in some cases, see #6670 (comment)

@bkietz
Copy link
Member Author

bkietz commented Mar 26, 2020

@jorisvandenbossche I'll extract CScanner::ToTable() so that it can be used directly on a CFragment, then add a wrapping Fragment.to_table method

@bkietz bkietz force-pushed the 8061-Ability-to-specify-granul branch from f032598 to 07664d7 Compare March 26, 2020 16:54
@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented Mar 30, 2020

One thing I already encountered is that fragment.to_table() doesn't work for fragments of partitioned datasets, because of some schema mismatch (with vs without partition key columns).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants