Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement returning dictionary arrays from parquet reader #171

Closed
alamb opened this issue Apr 26, 2021 · 14 comments · Fixed by #1180
Closed

Implement returning dictionary arrays from parquet reader #171

alamb opened this issue Apr 26, 2021 · 14 comments · Fixed by #1180
Assignees
Labels
parquet Changes to the parquet crate

Comments

@alamb
Copy link
Contributor

alamb commented Apr 26, 2021

Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-11410

Currently the Rust parquet reader returns a regular array (e.g. string array) even when the column is dictionary encoded in the parquet file.

If the parquet reader had the ability to return dictionary arrays for dictionary encoded columns this would bring many benefits such as:

  • faster reading of dictionary encoded columns from parquet (as no conversion/expansion into a regular array would be necessary)
  • more efficient memory use as the dictionary array would use less memory when loaded in memory
  • faster filtering operations as SIMD can be used to filter over the numeric keys of a dictionary string array instead of comparing string values in a string array

[~nevime] , [~alamb]  let me know what you think

@alamb alamb added the arrow Changes to the arrow crate label Apr 26, 2021
@alamb
Copy link
Contributor Author

alamb commented Apr 26, 2021

Comment from Andrew Lamb(alamb) @ 2021-02-02T11:50:29.862+0000:

[~yordan-pavlov] I think this would be amazing -- and we would definitely use it in IOx. This is the kind of thing that is on our longer term roadmap and I would love to help (e.g. code review, or testing , or documentation, etc).

Let me know! 

@jorgecarleitao jorgecarleitao changed the title [Rust][Parquet] Implement returning dictionary arrays from parquet reader Implement returning dictionary arrays from parquet reader Apr 29, 2021
@jorgecarleitao jorgecarleitao added parquet Changes to the parquet crate and removed arrow Changes to the arrow crate labels Apr 29, 2021
@tustvold
Copy link
Contributor

tustvold commented Dec 6, 2021

Further to this, if the expected output schema is a dictionary array it will cast back and compute a new dictionary! See here.

I'm going to take a stab at implementing this and see where I get to

@tustvold
Copy link
Contributor

tustvold commented Dec 7, 2021

Spent some time digging into this and think I have a plan of action.

I am off for the next day or so but will be looking to make a start on my return

Background

The lowest level parquet API:

  • FileReader provides the ability to obtain metadata and RowGroupReader for each row group
  • RowGroupReader provides the abilty to get PageReader for the pages within a single column chunk within a row group
  • PageIterator provides the ability to get PageReader for a specific column across a selection of row groups

The next level up is ColumnReader implemented by

  • Is an enumeration containing ColumnReaderImpl<T> for each of the physical parquet types
  • Created by RowGroupReader::get_column_reader
  • Provides the ability to slices of data from a single column chunk
  • Stateful read_batch that populates a physical typed slice (i.e. integer, float, byte array)
  • Reads batch_size levels unless exhausted
  • Only reads number of values corresponding to definition levels read
  • Uses Decoder implementations to decode from the raw byte data to this slice
  • On encountering a dictionary page, will register a specialised DictDecoder with this dictionary

The next level is RecordReader<T>

  • Wraps a ColumnReaderImpl<T>
  • Seems to exist to handle repetition levels, in particular to avoid repeated fields across reads, but appears to be incomplete
  • Computes a null bitmask and pads NULLs in the values array
  • Could avoid having to pad array if ColumnReaderImpl used Decoder::get_spaced

Next up we have the ArrayReader implementations:

  • ArrayReader::next_batch returns Result<ArrayRef>
  • Implementations are selected by TypeVisitor on the parquet schema
  • ArrayReader are provided with FilePageIterator
  • StructArrayReader assumes that all ArrayReader::next_batch will return arrays with the same number of rows for all children
  • ArrayReader are agnostic to RowGroup boundaries

Finally we have ArrowReader the only implementation for which is ParquetFileArrowReader:

  • ParquetFileArrowReader is created from an Arc<dyn FileReader>
  • ArrowReader::get_record_reader and ArrowReader::get_record_reader_by_columns return ParquetRecordBatchReader

ParquetRecordBatchReader provides the abilty to stream RecordBatch from a parquet file

  • Wraps a StructArrayReader created with the root schema of the parquet file
  • Converts the ArrayReader to Iterator<Item=ArrowResult<RecordBatch>>
  • ArrayReader::next_batch returning an empty RecordBatch is used as the termination condition

There is an additional ArrowArrayReader, ArrayConverter, ValueDecoder implementation added in #384:

  • This was originally intended to replace PrimitiveArrayReader and ComplexObjectArrayReader but this appears to have stalled
  • Only used for StringArray as far as I can tell
  • Since then MapArrayReader has been added (Minimal MapArray support #491)

Given this I think it is acceptable to add a new ArrayReader implementation, and avoid this complexity

Problems

  • RLE dictionaries are per-column chunk, a demarcation the ArrayReader API does not currently respect
  • A ColumnChunk may contain mixed encodings
  • There isn't an efficient way to determine if all pages in a ColumnChunk have a given encoding
  • ColumnReaderImpl hides encoding as an implementation detail

Proposal

Add two settings to ParquetRecordBatchReader (or higher) with defaults of false:

  • delimit_row_groups - don't yield RecordBatches spanning row group boundaries
  • preserve_dictionaries - preserve dictionary encoding for dictionary columns

When reading if delimit_row_groups is set, each ArrayReader will be given a PageIterator for a single column chunk (i.e. a PageReader)

Within build_for_primitive_type_inner if the arrow type is a dictionary of a primitive type, preserve_dictionaries is enabled and the column chunk has a dictionary page:

  • If delimit_row_groups is not set return an error
  • Return a DictionaryArrayReader as the ArrayReader

This DictionaryArrayReader will return an error if it encounters a non-dictionary encoded page

Otherwise it will produce DictionaryArray preserving the RLE dictionary. It will likely need to duplicate some logic from ColumnReaderImpl to achieve this

I think this will avoid making any breaking changes to clients, whilst allowing them to opt in to better behaviour when they know that their parquet files are such that this optimisation can be performed.

@alamb
Copy link
Contributor Author

alamb commented Dec 7, 2021

FYI @yordan-pavlov

@alamb
Copy link
Contributor Author

alamb commented Dec 7, 2021

In general sounds very cool @tustvold

preserve_dictionaries - preserve dictionary encoding for dictionary columns

What is the reason for defaulting this one to false? For any string dictionary column this is likely to almost always be what is wanted.

This DictionaryArrayReader will return an error if it encounters a non-dictionary encoded page

I can't remember if a single column can have pages encoded using dictionary and non dictionary encoding; I think it is possible but I don't know if any real files have such a thing

@yordan-pavlov
Copy link
Contributor

yordan-pavlov commented Dec 7, 2021

@tustvold thank you for looking into this, and for the excellent summary of the parquet reader stack - this should probably go in documentation somewhere as it takes a while to figure out.

The main reason for the stalling of work on the ArrowArrayReader is that a big change happened in my personal life - I became a father, and as much as I would like to spend more time on this project I have much less free time now. I hope that in a few months, I will have some more free time and will be able to contribute again. The other reason is that although I was able to make the new ArrowArrayReader a several times faster for string arrays, and this appears to bring some nice performance improvements in total execution time (the old ArrayReader is slow for string arrays), I was struggling to make it faster in all cases for primitive arrays. I had some ideas (e.g. make the column chunk context a self referential struct so that a dictionary could be built more efficiently from the page buffer by avoiding unnecessary memory copies) but the baby came before I could finish that.

Here are my thoughts on preserving dictionary arrays:

  • performance as a result of dictionary array preservation depends very much on upstream processing (e.g. can filter methods be implemented that can benefit from a dictionary array by e.g. making better use of SIMD, how much of the larger query can be processed before unpacking the dictionary) - I tried to do some synthetic performance tests to measure the impact of unpacking the dictionary at different stages of query processing (including filter operators that can make use of the dictionary), but couldn't see (as far as I can remember) the performance improvements I was expecting; may be my setup was flawed, results might be different with actual code
  • I wonder if any (or both) of the proposed two new config values delimit_row_groups and preserve_dictionaries can be enabled / disabled automatically (e.g. based on query, data source, etc.) so that most of the time they don't need to be changed; my thinking is the default configuration / implementation should work best in most cases and settings should only have to be changed very rarely, under very specific circumstances and by someone who knows very well what they are doing

@yordan-pavlov
Copy link
Contributor

Looks like there already is a proposed implementation of comparison operations for Dictionary Arrays here #984

@alamb
Copy link
Contributor Author

alamb commented Dec 8, 2021

Looks like there already is a proposed implementation of comparison operations for Dictionary Arrays here #984

@matthewmturner and I are working on it :) We still have a ways to go but I think we are making progress !

@tustvold
Copy link
Contributor

tustvold commented Dec 9, 2021

I became a father

Congratulations 🎉

can't remember if a single column can have pages encoded using dictionary and non dictionary encoding

Unfortunately they definitely can, it is in fact explicitly highlighted in the arrow C++ blog post on the same topic here. IIRC it occurs when the dictionary grows too large for a single page.

What is the reason for defaulting this one to false? For any string dictionary column this is likely to almost always be what is wanted.

I wonder if any (or both) of the proposed two new config values

The problem I was trying to avoid is ArrayReader doesn't document its termination condition, and so some codepaths may be making the assumption that if the length of the returned Array is less than the batch size, the reader is exhausted. If I changed the ArrayReader to delimit row groups by default, this would then break such code.

However, having looked again I'm confident that I can avoid needing to expose delimit_row_groups, as ParquetRecordBatchReader is a proper Iterator and so doesn't have the same problem. I'm not sure yet if it will be simpler to have a default off delimit_row_groups on the ArrayReader impls or just have ParquetRecordBatchReader only give them one row group at a time, but it should be possible to avoid exposing this to users in the higher-level APIs 😄

My reasoning for having preserve_dictionaries off by default was that theoretically if you had a column chunk with mixed encodings it would have worked before, and now might not. I guess I should just implement the handling of these PLAIN pages and avoid it needing to be an opt-in config... 🤔

performance as a result of dictionary array preservation depends very much on upstream processing

Indeed, if the upstream is just going to fully materialize the dictionary in order to do anything with it, there is limited benefit to using dictionaries at all 😆

@yordan-pavlov
Copy link
Contributor

yordan-pavlov commented Dec 12, 2021

In the case of partial dictionary encoding, I wonder if the returned record batches could be split so that a single record batch only contains column data based on a single encoding (only dictionary or plain encoded values for each column)? Wouldn't this enable that preserve_dictionaries be true by default and at the same time still provide the benefits of dictionary preservation even when some pages are plain-encoded? It's just that the reader must be aware that for the same column, some of the returned record batches might contain a dictionary array and others might contain a plain array (and of course that the returned batches might be of different length than requested and the requested batch size should be treated more as a max length).

@tustvold
Copy link
Contributor

tustvold commented Dec 12, 2021

It's just that the reader must be aware that for the same column, some of the returned record batches might contain a dictionary array and others might contain a plain array

I think it would be pretty confusing, and break quite a lot of code if ParquetRecordBatchReader returned an iterator of RecordBatch with varying schema?

I wonder if the returned record batches could be split so that a single record batch only contains column data based on a single encoding (only dictionary or plain encoded values for each column)

might be of different length than requested and the requested batch size should be treated more as a max length).

The encoding is per-page, and there is no relationship between what rows belong to what pages across column chunks. To get around this, the current logic requires that all ArrayReader return the batch_size number of rows, unless the PageIterator has been exhausted. This ensures that all children of a StructArrrayReader, MapArrayReader, etc... produce ArrayRef with the same number of rows.

This is why I originally proposed a delimit_row_groups option, as a workaround to still ensure that all ArrayReader return ArrayRef with the same number of rows, but not producing ArrayRef spanning row groups and therefore dictionaries.

However, my current plan I think sidesteps the need for this:

  • Compute new dictionaries for the returned DictionaryArray if
    • It spans a RowGroup and therefore a dictionary boundary
    • One or more of the pages uses plain encoding
  • Modify ParquetRecordBatchReader to not request RecordBatch spanning row groups, likely by reducing the batch_size passed to ArrayReader::next_batch

This avoids the need for config options, or changes to APIs with ambiguous termination criteria, whilst ensuring that most workloads will only compute dictionaries for sections of the parquet file where the dictionary encoding was incomplete. This should make it strictly better than current master, which always computes the dictionaries again having first fully materialized the values.

@yordan-pavlov
Copy link
Contributor

yordan-pavlov commented Dec 12, 2021

@tustvold this latest approach you describe will probably work in many cases, but usually there is a reason for having partial dictionary encoding in parquet files - my understanding is that the reason usually is that the dictionary grew too big. And to have to reconstruct a big dictionary from plain-encoded parquet data sounds expensive and I suspect this will result in suboptimal performance and increased memory use.

If the possibility to have a mix of dictionary-encoded and plain-encoded pages is just how parquet works, then is this something that has to be abstracted / hidden?

Furthermore, if we take the DataFusion Parquet reader as an example, here https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/physical_plan/file_format/parquet.rs#L436 we can see that it doesn't care for the number of rows in the record batches as long as the record batch iterator doesn't return None.

Finally, how would the user know that they have to make changes to batch_size depending on the use of dictionary encoding in their parquet files (so that record batches do not span row groups)?

@yordan-pavlov
Copy link
Contributor

@tustvold I should have read the blog post you linked earlier (https://arrow.apache.org/blog/2019/09/05/faster-strings-cpp-parquet/) before commenting; it appears that the C++ implementation of the arrow parquet reader converts plain-encoded fallback pages into a dictionary similar to the latest approach you described:

When decoding a ColumnChunk, we first append the dictionary values and indices into an Arrow DictionaryBuilder, and when we encounter the “fall back” portion we use a hash table to convert those values to dictionary-encoded form

@tustvold
Copy link
Contributor

tustvold commented Dec 12, 2021

my understanding is that the reason usually is that the dictionary grew too big

This is my understanding also, with the current defaults "too big" would be a dictionary larger than 1MB

And to have to reconstruct a big dictionary from plain-encoded parquet data sounds expensive

I don't disagree with this, this would represent a somewhat degenerate edge case. As currently articulated, we will only attempt to preserve the dictionary if the arrow schema encoded within the parquet file is for a dictionary type. Specifically this means the data that was written was already contained in arrow dictionary arrays.

In order to yield RecordBatch with the same schema as was written we would therefore need to construct one or more dictionaries anyway. This ticket, or at least my interpretation of it, is an optimisation to avoid needing to compute new dictionaries where the parquet dictionary can be reused, and if not, to avoid fully hydrating the values first as the logic currently does.

is this something that has to be abstracted / hidden?

In my opinion, yes. If I write a dictionary array, I expect to get one back. If for some crazy reason I write a column with a dictionary that exceeds the capabilities of the parquet format to store natively, I would expect that to be abstracted away. I do not think there being a performance and compression penalty for over-sized dictionaries is unreasonable?

As an aside the maximum dictionary size is configurable, although I'm not really sure what the implications of increasing it are

we can see that it doesn't care for the number of rows in the record batches as long as the record batch iterator doesn't return None.

Indeed, ArrowReader, which ParquetRecordBatchReader implements, does not have the issues that ArrayReader does, and this is why I intend to put the delimiting logic here. By contrast, ArrayReader has an ambiguous termination condition as it isn't an Iterator, and needs to yield predictable row counts so that readers can be composed together for nested types.

how would the user know that they have to make changes to batch_size depending on the use of dictionary encoding in their parquet files (so that record batches do not span row groups)

My proposal is for this to be an internal optimisation within ParquetRecordBatchReader and not something exposed to the users. They would just use ArrowReader as normal, and receive an iterator of RecordBatch. It would just happen that this would internally will drive the various ArrayReader in such a way to avoid spanning row groups.

FWIW if ArrayReader and its implementations weren't public, as I personally think they should be (#1032), this would be purely an implementation detail.

tustvold added a commit to tustvold/arrow-rs that referenced this issue Jan 17, 2022
tustvold added a commit to tustvold/arrow-rs that referenced this issue Jan 17, 2022
tustvold added a commit to tustvold/arrow-rs that referenced this issue Jan 17, 2022
tustvold added a commit to tustvold/arrow-rs that referenced this issue Jan 18, 2022
tustvold added a commit to tustvold/arrow-rs that referenced this issue Jan 18, 2022
tustvold added a commit to tustvold/arrow-rs that referenced this issue Jan 18, 2022
alamb pushed a commit that referenced this issue Jan 24, 2022
… 60x perf improvement (#171) (#1180)

* Preserve dictionary encoding from parquet (#171)

* Use OffsetBuffer::into_array for dictionary

* Fix and test handling of empty dictionaries

Don't panic if missing dictionary page

* Use ArrayRef instead of Arc<ArrayData>

* Update doc comments

* Add integration test

Tweak RecordReader buffering logic

* Add benchmark

* Set write batch size in parquet fuzz tests

Fix bug in column writer with small page sizes

* Fix test_dictionary_preservation

* Add batch_size comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
4 participants