Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loading page index breaks skipping of pages with nested types #4921

Closed
marekgalovic opened this issue Oct 11, 2023 · 5 comments · Fixed by #4943
Closed

Loading page index breaks skipping of pages with nested types #4921

marekgalovic opened this issue Oct 11, 2023 · 5 comments · Fixed by #4943
Assignees
Labels
bug parquet Changes to the parquet crate

Comments

@marekgalovic
Copy link

Describe the bug
I'm running into an issue where loading the page index in order to prune pages breaks page skipping for columns with nested types in GenericColumnReader::skip_records. This method assumes that if page metadata has num_records set, it is equal to the number of records that should be skipped which seems to be not true for columns with list-typed values because it does not account for repetition levels. Additionally, SerializedPageReader::peek_next_page sets num_rows to a value based on PageLocation which does not consider repetition levels for the column either.

This leads to GenericColumnReader::skip_records skipping over more pages than it should which causes the next read to fail with the following error:

"selection contains less than the number of selected rows"

To Reproduce

  • Two row selectors. The first one filters on uint64 and the second one on List<utf8>.
  • The value that it's failing to read is the last entry in the second column chunk (with List<utf8> type).
  • The first selector produces the following input selection for the second selector
RowSelection { selectors: [RowSelector { row_count: 16313, skip: true }, RowSelector { row_count: 3569, skip: false }, RowSelector { row_count: 48237, skip: true }, RowSelector { row_count: 6097, skip: false }, RowSelector { row_count: 25783, skip: true }, RowSelector { row_count: 1, skip: false }] }
  • Row group size: 100000
  • Page size: 1024

Expected behavior
Based on the input selector, it should read 9667 rows but it only reads 9666 because the page that contains the last record gets unintentionally skipped.

Additional context

@tustvold
Copy link
Contributor

which seems to be not true for columns with list-typed values because it does not account for repetition levels

It should be true, do you have a parquet file you can share? How was it written?

@marekgalovic
Copy link
Author

@tustvold unfortunately, I cannot share the file that fails. Below is an example that reproduces what I suspect is the underlying issue but fails with a slightly different error since there are no predicates. Using only the first column in the projection mask, it correctly reads 9667 rows but when I add the second one (or use the second one alone) it fails with:

ArrowError("Parquet argument error: Parquet error: Invalid offset in sparse column chunk data: 804367")
use std::fs::File;
use std::sync::Arc;

use arrow::array::{ArrayRef, ListArray, ListBuilder, StringBuilder, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::StreamExt;
use itertools::Itertools;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use tempdir::TempDir;

pub fn build_list_array<I: Iterator<Item = impl Into<Option<Vec<String>>>>>(
    values: I,
) -> ListArray {
    let mut list_builder = ListBuilder::new(StringBuilder::new());
    for s in values {
        if let Some(v) = s.into() {
            for value in v.into_iter() {
                list_builder.values().append_value(value);
            }
        }
        list_builder.append(true);
    }
    list_builder.finish()
}

#[tokio::main]
async fn main() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("col_1", DataType::UInt64, false),
        Field::new(
            "col_2",
            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
            true,
        ),
    ]));

    // Default writer properties
    let props = WriterProperties::builder()
        .set_write_batch_size(1024)
        .set_data_page_row_count_limit(1024)
        .set_max_row_group_size(100_000)
        .set_statistics_enabled(EnabledStatistics::Page)
        .set_dictionary_enabled(false)
        .set_bloom_filter_enabled(false);

    // Write data
    let workdir = TempDir::new("parquet").unwrap();
    let file_path = workdir.path().join("data.parquet");
    let mut writer = ArrowWriter::try_new(
        File::create(&file_path).unwrap(),
        schema.clone(),
        Some(props.build()),
    )
    .unwrap();

    (0..200_000_u64).chunks(1024).into_iter().for_each(|ids| {
        let ids: Vec<_> = ids.collect();
        let list_vals = ids
            .iter()
            .map(|id| match id % 3 {
                0 => Some(vec!["val_1".to_string(), format!("id_{id}")]),
                1 => Some(vec![format!("id_{id}")]),
                _ => None,
            })
            .collect_vec();
        let refs = vec![
            Arc::new(UInt64Array::from(ids)) as ArrayRef,
            Arc::new(build_list_array(list_vals.into_iter())) as ArrayRef,
        ];

        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
        writer.write(&batch).unwrap();
    });

    writer.close().unwrap();

    // Read data
    let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
        tokio::fs::File::open(&file_path).await.unwrap(),
        ArrowReaderOptions::new().with_page_index(true),
    )
    .await
    .unwrap();

    let proj_mask = ProjectionMask::roots(reader.parquet_schema(), [0, 1]);

    reader = reader
        .with_projection(proj_mask)
        .with_batch_size(1024)
        .with_row_groups(vec![0])
        .with_row_selection(RowSelection::from(vec![
            RowSelector::skip(16313),
            RowSelector::select(3569),
            RowSelector::skip(48237),
            RowSelector::select(6097),
            RowSelector::skip(25783),
            RowSelector::select(1),
        ]));

    let mut stream = reader.build().unwrap();

    let mut total_rows = 0;
    while let Some(rb) = stream.next().await {
        let rb = rb.unwrap();
        total_rows += rb.num_rows();
    }

    println!("read rows: {total_rows}");
}

@tustvold
Copy link
Contributor

Apologies for the delay, I'll try to find some time to sit down with this in the coming days.

Am I correct in thinking this only occurs with the async reader, as if memory serves the linked error can only occur on that codepath

@marekgalovic
Copy link
Author

Yes, that seems to be the case. Using the exact same setup with the sync record batch reader works fine.

@tustvold tustvold self-assigned this Oct 16, 2023
tustvold added a commit to tustvold/arrow-rs that referenced this issue Oct 16, 2023
tustvold added a commit that referenced this issue Oct 17, 2023
* Assume records not split across pages (#4921)

* More test

* Add PageReader::at_record_boundary

* Fix flush partial
@tustvold
Copy link
Contributor

label_issue.py automatically added labels {'parquet'} from #4943

@tustvold tustvold added the parquet Changes to the parquet crate label Oct 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants