Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34859][SQL] parquet vectorized reader - support column index with rowIndexes #31998

Conversation

lxian
Copy link
Contributor

@lxian lxian commented Mar 29, 2021

What changes were proposed in this pull request?

Use row indexes to read column values properly in Parquet vectorized reader. It works like follow

  1. Save rowIndexes to VectorizedColumnReader
  2. In the loop of a readBatch, write batch number of values into a temp vector
  3. Check the indexes of values in the temp vector with rowIndexes, write matched values to the result vector
  4. If the result vector is not full, go back to step 2

Why are the changes needed?

When using column index, pages among different columns may not be aligned. And rowIndexes is needed to synchronize rows in different columns.

Does this PR introduce any user-facing change?

no.

How was this patch tested?

still work in progress. below testes are needed

  1. Test for the correctness of filtered result on column index. It need to be tested on unaligned pages.
  2. A benchmark on the column index

@lxian
Copy link
Contributor Author

lxian commented Mar 29, 2021

I did a simple benchmark on it, and the result looks good

Running benchmark: simple filters
  Running case: Parquet Vectorized 
  Stopped after 5 iterations, 6065 ms
  Running case: Parquet Vectorized (columnIndex)
  Stopped after 27 iterations, 2048 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Mac OS X 10.14.4
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
simple filters:                           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized                                 1169           1213          55         13.5          74.3       1.0X
Parquet Vectorized (columnIndex)                     61             76          11        258.4           3.9      19.2X

Running benchmark: range filters
  Running case: Parquet Vectorized 
  Stopped after 5 iterations, 6338 ms
  Running case: Parquet Vectorized (columnIndex)
  Stopped after 6 iterations, 2128 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Mac OS X 10.14.4
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
range filters:                            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized                                 1222           1268          79         12.9          77.7       1.0X
Parquet Vectorized (columnIndex)                    346            355           8         45.4          22.0       3.5X

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions github-actions bot added the SQL label Mar 29, 2021
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it wouldn't affect the performance significantly when the dataset is actually large with many files in production? I doubt if it's worthwhile to manage a separate index.

It would be great if we run TPC-DS or a proper benchmark and see if there are significant performance improvement.

@lxian
Copy link
Contributor Author

lxian commented Mar 31, 2021

I think it wouldn't affect the performance significantly when the dataset is actually large with many files in production? I doubt if it's worthwhile to manage a separate index.

It would be great if we run TPC-DS or a proper benchmark and see if there are significant performance improvement.

I think the performance will depend on the selectivity of the filter in the query. The index will only be applied if there are some pages to be skipped in a rowgroup. There is some benchmark by
@wangyum #31393

I will run a TPC-DS benchmark later to see if there is some improvements

@HyukjinKwon
Copy link
Member

Most of row groups will be filtered before we actually read the batch, right. Does the row-index work for row group filtering too?

@lxian
Copy link
Contributor Author

lxian commented Apr 4, 2021

The row indexes is not for filtering pages or rowgroups. The row indexes is used to work with the parquet column index to ensure that the filtered pages in a row group is synchronized among different columns.

@HyukjinKwon
Copy link
Member

My point is that it the row index will be only effective in the underlying row group. Most of row groups will be filtered out in production. Thats why I said the actual impact won't be significant.

@lxian lxian force-pushed the SPARK-34859-vecorized-parquet-reader-with-columnindex branch from f7ad62b to ddc233a Compare April 8, 2021 19:51
@lxian lxian changed the title [WIP][SPARK-34859][SQL] parquet vectorized reader - support column index with rowIndexes [SPARK-34859][SQL] parquet vectorized reader - support column index with rowIndexes Apr 19, 2021
@lxian
Copy link
Contributor Author

lxian commented Apr 19, 2021

https://gist.github.com/lxian/bba60a0460a74d3427994ce0d60d4c79 I've run a benchmark on tpcds with scale 10 and the impact of column index looks subtle.

@sunchao
Copy link
Member

sunchao commented May 26, 2021

@lxian does it mean that, without this PR, vectorized Parquet reader may return incorrect results?

@lxian
Copy link
Contributor Author

lxian commented May 26, 2021

@lxian does it mean that, without this PR, vectorized Parquet reader may return incorrect results?

Yes, the result may be incorrect in cases that data page among columns are unaligned (for example, columns of different data types)

@sunchao
Copy link
Member

sunchao commented May 28, 2021

@lxian In the current approach we'd have to copy values from one vector to another. I think a better and more efficient approach may be to feed the row indexes to VectorizedRleValuesReader#readXXX and skip rows if they are not in the range, so basically we increment both rowId and row indexes in parallel.

@lxian
Copy link
Contributor Author

lxian commented May 28, 2021

@lxian In the current approach we'd have to copy values from one vector to another. I think a better and more efficient approach may be to feed the row indexes to VectorizedRleValuesReader#readXXX and skip rows if they are not in the range, so basically we increment both rowId and row indexes in parallel.

In VectorizedRleValuesReader the values are read in batch or in a simple loop. I am wondering whether it will make it slow if we put the row index filtering there.

@sunchao
Copy link
Member

sunchao commented May 28, 2021

@lxian I'm thinking that the extra cost is just incrementing two indexes at the same time, so it should be fairly cheap. You can also refer to how SynchronizingColumnReader is doing that.

Porting that logic to Spark is a bit tricky though, especially when it comes to handle the RLE-encoded definition levels. Let me try experimenting this idea too on my side.

@lxian
Copy link
Contributor Author

lxian commented May 31, 2021

@sunchao you are right. it's real tricky and would require a long of changes as well. VectorizedValuesReader currently read and put all values to columns vector for a single data page. Interfaces need to be changed to allow VectorizedValuesReader to read a portion of a data page.

@sunchao
Copy link
Member

sunchao commented Jun 2, 2021

@lxian Yes. I opened #32753 to demonstrate the idea. It's about 1K LOC but mostly because the same code has to be duplicated in several places. This is an existing issue in the Parquet vectorized code path but I think it's possible to eliminate the duplication.

dongjoon-hyun pushed a commit that referenced this pull request Jun 30, 2021
…reader

### What changes were proposed in this pull request?

Make the current vectorized Parquet reader to work with column index introduced in Parquet 1.11. In particular, this PR makes the following changes:
1. in `ParquetReadState`, track row ranges returned via `PageReadStore.getRowIndexes` as well as the first row index for each page via `DataPage.getFirstRowIndex`.
1. introduced a new API `ParquetVectorUpdater.skipValues` which skips a batch of values from a Parquet value reader. As part of the process also renamed existing `updateBatch` to `readValues`, and `update` to `readValue` to keep the method names consistent.
1. in correspondence as above, also introduced new API `VectorizedValuesReader.skipXXX` for different data types, as well as the implementations. These are useful when the reader knows that the given batch of values can be skipped, for instance, due to the batch is not covered in the row ranges generated by column index filtering.
2. changed `VectorizedRleValuesReader` to handle column index filtering. This is done by comparing the range that is going to be read next within the current RLE/PACKED block (let's call this block range), against the current row range. There are three cases:
    * if the block range is before the current row range, skip all the values in the block range
    * if the block range is after the current row range, advance the row range and repeat the steps
    * if the block range overlaps with the current row range, only read the values within the overlapping area and skip the rest.

### Why are the changes needed?

[Parquet Column Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md) is a new feature in Parquet 1.11 which allows very efficient filtering on page level (some benchmark numbers can be found [here](https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/)), especially when data is sorted. The feature is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`). In Spark, the non-vectorized Parquet reader can automatically benefit from the feature after upgrading to Parquet 1.11.x, without any code change. However, the same is not true for vectorized Parquet reader since Spark chose to implement its own logic such as reading Parquet pages, handling definition levels, reading values into columnar batches, etc.

Previously, [SPARK-26345](https://issues.apache.org/jira/browse/SPARK-26345) / (#31393) updated Spark to only scan pages filtered by column index from parquet-mr side. This is done by calling `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. The implementation, however, only work for a few limited cases: in the scenario where there are multiple columns and their type width are different (e.g., `int` and `bigint`), it could return incorrect result. For this issue, please see SPARK-34859 for a detailed description.

In order to fix the above, Spark needs to leverage the API `PageReadStore.getRowIndexes` and `DataPage.getFirstRowIndex`. The former returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for nested schema they're different) after filtering within a Parquet row group. The latter returns the first row index within a single data page. With the combination of the two, one is able to know which rows/values should be filtered while scanning a Parquet page.

### Does this PR introduce _any_ user-facing change?

Yes. Now the vectorized Parquet reader should work correctly with column index.

### How was this patch tested?

Borrowed tests from #31998 and added a few more tests.

Closes #32753 from sunchao/SPARK-34859.

Lead-authored-by: Chao Sun <sunchao@apple.com>
Co-authored-by: Li Xian <lxian2shell@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@HyukjinKwon HyukjinKwon closed this Jul 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants