Skip to content

Commit

Permalink
Skip RowSelectors with zero rows (#2678)
Browse files Browse the repository at this point in the history
* Skip RowSelectors with zero rows

* include test for zero RowSelector fix

Co-authored-by: srib <srib@MacDesktop.local>
  • Loading branch information
askoa and srib committed Sep 8, 2022
1 parent df4906d commit 0b59f05
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
6 changes: 6 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,12 @@ impl Iterator for ParquetRecordBatchReader {
continue;
}

//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
if front.row_count == 0 {
continue;
}

// try to read record
let need_read = self.batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Expand Down
69 changes: 69 additions & 0 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,75 @@ mod tests {
}
}

#[tokio::test]
async fn test_async_reader_zero_row_selector() {
//See https://github.com/apache/arrow-rs/issues/2669
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let mut rand = thread_rng();

let mut expected_rows = 0;
let mut total_rows = 0;
let mut skip = false;
let mut selectors = vec![];

selectors.push(RowSelector {
row_count: 0,
skip: false,
});

while total_rows < 7300 {
let row_count: usize = rand.gen_range(1..100);

let row_count = row_count.min(7300 - total_rows);

selectors.push(RowSelector { row_count, skip });

total_rows += row_count;
if !skip {
expected_rows += row_count;
}

skip = !skip;
}

let selection = RowSelection::from(selectors);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();

let col_idx: usize = rand.gen_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);

let stream = builder
.with_projection(mask.clone())
.with_row_selection(selection.clone())
.build()
.expect("building stream");

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();

assert_eq!(actual_rows, expected_rows);
}

#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
Expand Down

0 comments on commit 0b59f05

Please sign in to comment.