From 6b0c7aac5f3855e743ee524b209109948a69529d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 29 Jul 2025 18:02:44 -0400 Subject: [PATCH 01/13] [Parquet] Refactor InMemoryRowGroup to separate CPU and IO --- parquet/src/arrow/async_reader/mod.rs | 86 +++++++++++++++++++++------ 1 file changed, 68 insertions(+), 18 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index fe6414705282..5ae015e94204 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -977,6 +977,15 @@ struct InMemoryRowGroup<'a> { metadata: &'a ParquetMetaData, } +/// What ranges to fetch for the columns in this row group +#[derive(Debug)] +struct FetchRanges { + /// The byte ranges to fetch + ranges: Vec>, + /// If `Some`, the start offsets of each page for each column chunk + page_start_offsets: Option>>, +} + impl InMemoryRowGroup<'_> { /// Fetches any additional column data specified in `projection` that is not already /// present in `self.column_chunks`. @@ -991,6 +1000,25 @@ impl InMemoryRowGroup<'_> { batch_size: usize, cache_mask: Option<&ProjectionMask>, ) -> Result<()> { + // Figure out what ranges to fetch + let FetchRanges { + ranges, + page_start_offsets, + } = self.fetch_ranges(projection, selection); + // do the actual fetch + let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); + // update our in memory buffers (self.column_chunks) with the fetched data + self.fill_column_chunks(projection, page_start_offsets, chunk_data); + Ok(()) + } + + /// Returns the byte ranges to fetch for the columns specified in + /// `projection` and `selection`. + fn fetch_ranges( + &self, + projection: &ProjectionMask, + selection: Option<&RowSelection>, + ) -> FetchRanges { let metadata = self.metadata.row_group(self.row_group_idx); if let Some((selection, offset_index)) = selection.zip(self.offset_index) { let expanded_selection = @@ -999,7 +1027,7 @@ impl InMemoryRowGroup<'_> { // `RowSelection` let mut page_start_offsets: Vec> = vec![]; - let fetch_ranges = self + let ranges = self .column_chunks .iter() .zip(metadata.columns()) @@ -1033,8 +1061,46 @@ impl InMemoryRowGroup<'_> { ranges }) .collect(); + FetchRanges { + ranges, + page_start_offsets: Some(page_start_offsets), + } + } else { + let ranges = self + .column_chunks + .iter() + .enumerate() + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .map(|(idx, _chunk)| { + let column = metadata.column(idx); + let (start, length) = column.byte_range(); + start..(start + length) + }) + .collect(); + FetchRanges { + ranges, + page_start_offsets: None, + } + } + } - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + /// Fills in `self.column_chunks` with the data fetched from `chunk_data`. + /// + /// This function **must** be called with the data from the ranges returned by + /// `fetch_ranges` and the corresponding page_start_offsets, with the exact same and `selection`. + fn fill_column_chunks( + &mut self, + projection: &ProjectionMask, + page_start_offsets: Option>>, + chunk_data: I, + ) where + I: IntoIterator, + { + let mut chunk_data = chunk_data.into_iter(); + let metadata = self.metadata.row_group(self.row_group_idx); + if let Some(page_start_offsets) = page_start_offsets { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { @@ -1059,20 +1125,6 @@ impl InMemoryRowGroup<'_> { } } } else { - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .map(|(idx, _chunk)| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start..(start + length) - }) - .collect(); - - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { if chunk.is_some() || !projection.leaf_included(idx) { continue; @@ -1086,8 +1138,6 @@ impl InMemoryRowGroup<'_> { } } } - - Ok(()) } } From 477fbafe0bb6817d5b6b77677efaa98fd1db92fe Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 8 Aug 2025 15:48:43 -0400 Subject: [PATCH 02/13] Implement ParquetDecoder push API --- parquet/src/arrow/arrow_reader/mod.rs | 16 + parquet/src/arrow/arrow_reader/read_plan.rs | 5 +- parquet/src/arrow/arrow_reader/selection.rs | 1 - parquet/src/arrow/async_reader/mod.rs | 279 +--- parquet/src/arrow/in_memory_row_group.rs | 296 +++++ parquet/src/arrow/mod.rs | 6 + parquet/src/arrow/push_decoder/mod.rs | 1150 +++++++++++++++++ .../arrow/push_decoder/reader_builder/data.rs | 231 ++++ .../push_decoder/reader_builder/filter.rs | 136 ++ .../arrow/push_decoder/reader_builder/mod.rs | 654 ++++++++++ parquet/src/arrow/push_decoder/remaining.rs | 118 ++ parquet/src/file/metadata/mod.rs | 1 - parquet/src/lib.rs | 16 +- parquet/src/util/push_buffers.rs | 25 + 14 files changed, 2650 insertions(+), 284 deletions(-) create mode 100644 parquet/src/arrow/in_memory_row_group.rs create mode 100644 parquet/src/arrow/push_decoder/mod.rs create mode 100644 parquet/src/arrow/push_decoder/reader_builder/data.rs create mode 100644 parquet/src/arrow/push_decoder/reader_builder/filter.rs create mode 100644 parquet/src/arrow/push_decoder/reader_builder/mod.rs create mode 100644 parquet/src/arrow/push_decoder/remaining.rs diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2506aa9110eb..c363213559c9 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -58,6 +58,7 @@ pub mod statistics; /// /// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`] /// * `async` API: [`ParquetRecordBatchStreamBuilder::new`] +/// * decoder API: [`ParquetDecoderBuilder::new`] /// /// # Features /// * Projection pushdown: [`Self::with_projection`] @@ -93,6 +94,7 @@ pub mod statistics; /// Millisecond Latency] Arrow blog post. /// /// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new +/// [`ParquetDecoderBuilder::new`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder::new /// [Apache Arrow]: https://arrow.apache.org/ /// [`StatisticsConverter`]: statistics::StatisticsConverter /// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/ @@ -991,12 +993,26 @@ impl PageIterator for ReaderPageIterator {} /// An `Iterator>` that yields [`RecordBatch`] /// read from a parquet data source +/// +/// This reader is created by [`ParquetRecordBatchReaderBuilder`], and has all +/// the buffered state (DataPages, etc) necessary to decode the parquet data into +/// Arrow arrays. pub struct ParquetRecordBatchReader { array_reader: Box, schema: SchemaRef, read_plan: ReadPlan, } +impl Debug for ParquetRecordBatchReader { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetRecordBatchReader") + .field("array_reader", &"...") + .field("schema", &self.schema) + .field("read_plan", &self.read_plan) + .finish() + } +} + impl Iterator for ParquetRecordBatchReader { type Item = Result; diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 754fcd339c5b..2210f47df2c1 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -28,7 +28,7 @@ use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; /// A builder for [`ReadPlan`] -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ReadPlanBuilder { batch_size: usize, /// Current to apply, includes all filters @@ -51,7 +51,6 @@ impl ReadPlanBuilder { } /// Returns the current selection, if any - #[cfg(feature = "async")] pub fn selection(&self) -> Option<&RowSelection> { self.selection.as_ref() } @@ -76,7 +75,6 @@ impl ReadPlanBuilder { } /// Returns the number of rows selected, or `None` if all rows are selected. - #[cfg(feature = "async")] pub fn num_rows_selected(&self) -> Option { self.selection.as_ref().map(|s| s.row_count()) } @@ -230,6 +228,7 @@ impl LimitedReadPlanBuilder { /// A plan reading specific rows from a Parquet Row Group. /// /// See [`ReadPlanBuilder`] to create `ReadPlan`s +#[derive(Debug)] pub struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 9c3caec0b4a5..b23709b7e534 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -447,7 +447,6 @@ impl RowSelection { /// Expands the selection to align with batch boundaries. /// This is needed when using cached array readers to ensure that /// the cached data covers full batches. - #[cfg(feature = "async")] pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self { if batch_size == 0 { return self.clone(); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5ae015e94204..0f88179a9091 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -29,7 +29,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; @@ -38,10 +38,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Fields, Schema, SchemaRef}; -use crate::arrow::ProjectionMask; -use crate::arrow::array_reader::{ - ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups, -}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, @@ -51,11 +47,8 @@ use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash use crate::bloom_filter::{ SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset, }; -use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; -use crate::file::page_index::offset_index::OffsetIndexMetaData; -use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; mod metadata; pub use metadata::*; @@ -63,8 +56,11 @@ pub use metadata::*; #[cfg(feature = "object_store")] mod store; +use crate::arrow::ProjectionMask; +use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache}; use crate::arrow::arrow_reader::ReadPlanBuilder; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup}; use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; @@ -967,32 +963,13 @@ where } } -/// An in-memory collection of column chunks -struct InMemoryRowGroup<'a> { - offset_index: Option<&'a [OffsetIndexMetaData]>, - /// Column chunks for this row group - column_chunks: Vec>>, - row_count: usize, - row_group_idx: usize, - metadata: &'a ParquetMetaData, -} - -/// What ranges to fetch for the columns in this row group -#[derive(Debug)] -struct FetchRanges { - /// The byte ranges to fetch - ranges: Vec>, - /// If `Some`, the start offsets of each page for each column chunk - page_start_offsets: Option>>, -} - impl InMemoryRowGroup<'_> { /// Fetches any additional column data specified in `projection` that is not already /// present in `self.column_chunks`. /// /// If `selection` is provided, only the pages required for the selection /// are fetched. Otherwise, all pages are fetched. - async fn fetch( + pub(crate) async fn fetch( &mut self, input: &mut T, projection: &ProjectionMask, @@ -1004,258 +981,14 @@ impl InMemoryRowGroup<'_> { let FetchRanges { ranges, page_start_offsets, - } = self.fetch_ranges(projection, selection); + } = self.fetch_ranges(projection, selection, batch_size, cache_mask); // do the actual fetch let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); // update our in memory buffers (self.column_chunks) with the fetched data self.fill_column_chunks(projection, page_start_offsets, chunk_data); Ok(()) } - - /// Returns the byte ranges to fetch for the columns specified in - /// `projection` and `selection`. - fn fetch_ranges( - &self, - projection: &ProjectionMask, - selection: Option<&RowSelection>, - ) -> FetchRanges { - let metadata = self.metadata.row_group(self.row_group_idx); - if let Some((selection, offset_index)) = selection.zip(self.offset_index) { - let expanded_selection = - selection.expand_to_batch_boundaries(batch_size, self.row_count); - // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the - // `RowSelection` - let mut page_start_offsets: Vec> = vec![]; - - let ranges = self - .column_chunks - .iter() - .zip(metadata.columns()) - .enumerate() - .filter(|&(idx, (chunk, _chunk_meta))| { - chunk.is_none() && projection.leaf_included(idx) - }) - .flat_map(|(idx, (_chunk, chunk_meta))| { - // If the first page does not start at the beginning of the column, - // then we need to also fetch a dictionary page. - let mut ranges: Vec> = vec![]; - let (start, _len) = chunk_meta.byte_range(); - match offset_index[idx].page_locations.first() { - Some(first) if first.offset as u64 != start => { - ranges.push(start..first.offset as u64); - } - _ => (), - } - - // Expand selection to batch boundaries only for cached columns - let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); - if use_expanded { - ranges.extend( - expanded_selection.scan_ranges(&offset_index[idx].page_locations), - ); - } else { - ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); - } - page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); - - ranges - }) - .collect(); - FetchRanges { - ranges, - page_start_offsets: Some(page_start_offsets), - } - } else { - let ranges = self - .column_chunks - .iter() - .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .map(|(idx, _chunk)| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start..(start + length) - }) - .collect(); - FetchRanges { - ranges, - page_start_offsets: None, - } - } - } - - /// Fills in `self.column_chunks` with the data fetched from `chunk_data`. - /// - /// This function **must** be called with the data from the ranges returned by - /// `fetch_ranges` and the corresponding page_start_offsets, with the exact same and `selection`. - fn fill_column_chunks( - &mut self, - projection: &ProjectionMask, - page_start_offsets: Option>>, - chunk_data: I, - ) where - I: IntoIterator, - { - let mut chunk_data = chunk_data.into_iter(); - let metadata = self.metadata.row_group(self.row_group_idx); - if let Some(page_start_offsets) = page_start_offsets { - // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the - // `RowSelection` - let mut page_start_offsets = page_start_offsets.into_iter(); - - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; - } - - if let Some(offsets) = page_start_offsets.next() { - let mut chunks = Vec::with_capacity(offsets.len()); - for _ in 0..offsets.len() { - chunks.push(chunk_data.next().unwrap()); - } - - *chunk = Some(Arc::new(ColumnChunkData::Sparse { - length: metadata.column(idx).byte_range().1 as usize, - data: offsets - .into_iter() - .map(|x| x as usize) - .zip(chunks.into_iter()) - .collect(), - })) - } - } - } else { - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; - } - - if let Some(data) = chunk_data.next() { - *chunk = Some(Arc::new(ColumnChunkData::Dense { - offset: metadata.column(idx).byte_range().0 as usize, - data, - })); - } - } - } - } -} - -impl RowGroups for InMemoryRowGroup<'_> { - fn num_rows(&self) -> usize { - self.row_count - } - - /// Return chunks for column i - fn column_chunks(&self, i: usize) -> Result> { - match &self.column_chunks[i] { - None => Err(ParquetError::General(format!( - "Invalid column index {i}, column was not fetched" - ))), - Some(data) => { - let page_locations = self - .offset_index - // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) - .map(|index| index[i].page_locations.clone()); - let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i); - let page_reader = SerializedPageReader::new( - data.clone(), - column_chunk_metadata, - self.row_count, - page_locations, - )?; - let page_reader = page_reader.add_crypto_context( - self.row_group_idx, - i, - self.metadata, - column_chunk_metadata, - )?; - - let page_reader: Box = Box::new(page_reader); - - Ok(Box::new(ColumnChunkIterator { - reader: Some(Ok(page_reader)), - })) - } - } - } -} - -/// An in-memory column chunk -#[derive(Clone)] -enum ColumnChunkData { - /// Column chunk data representing only a subset of data pages - Sparse { - /// Length of the full column chunk - length: usize, - /// Subset of data pages included in this sparse chunk. - /// - /// Each element is a tuple of (page offset within file, page data). - /// Each entry is a complete page and the list is ordered by offset. - data: Vec<(usize, Bytes)>, - }, - /// Full column chunk and the offset within the original file - Dense { offset: usize, data: Bytes }, } - -impl ColumnChunkData { - /// Return the data for this column chunk at the given offset - fn get(&self, start: u64) -> Result { - match &self { - ColumnChunkData::Sparse { data, .. } => data - .binary_search_by_key(&start, |(offset, _)| *offset as u64) - .map(|idx| data[idx].1.clone()) - .map_err(|_| { - ParquetError::General(format!( - "Invalid offset in sparse column chunk data: {start}" - )) - }), - ColumnChunkData::Dense { offset, data } => { - let start = start as usize - *offset; - Ok(data.slice(start..)) - } - } - } -} - -impl Length for ColumnChunkData { - /// Return the total length of the full column chunk - fn len(&self) -> u64 { - match &self { - ColumnChunkData::Sparse { length, .. } => *length as u64, - ColumnChunkData::Dense { data, .. } => data.len() as u64, - } - } -} - -impl ChunkReader for ColumnChunkData { - type T = bytes::buf::Reader; - - fn get_read(&self, start: u64) -> Result { - Ok(self.get(start)?.reader()) - } - - fn get_bytes(&self, start: u64, length: usize) -> Result { - Ok(self.get(start)?.slice(..length)) - } -} - -/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] -struct ColumnChunkIterator { - reader: Option>>, -} - -impl Iterator for ColumnChunkIterator { - type Item = Result>; - - fn next(&mut self) -> Option { - self.reader.take() - } -} - -impl PageIterator for ColumnChunkIterator {} - #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs new file mode 100644 index 000000000000..b39a45131b19 --- /dev/null +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::ProjectionMask; +use crate::arrow::array_reader::RowGroups; +use crate::arrow::arrow_reader::RowSelection; +use crate::column::page::{PageIterator, PageReader}; +use crate::errors::ParquetError; +use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; +use bytes::{Buf, Bytes}; +use std::ops::Range; +use std::sync::Arc; + +/// An in-memory collection of column chunks +#[derive(Debug)] +pub(crate) struct InMemoryRowGroup<'a> { + pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>, + /// Column chunks for this row group + pub(crate) column_chunks: Vec>>, + pub(crate) row_count: usize, + pub(crate) row_group_idx: usize, + pub(crate) metadata: &'a ParquetMetaData, +} + +/// What ranges to fetch for the columns in this row group +#[derive(Debug)] +pub(crate) struct FetchRanges { + /// The byte ranges to fetch + pub(crate) ranges: Vec>, + /// If `Some`, the start offsets of each page for each column chunk + pub(crate) page_start_offsets: Option>>, +} + +impl InMemoryRowGroup<'_> { + /// Returns the byte ranges to fetch for the columns specified in + /// `projection` and `selection`. + pub(crate) fn fetch_ranges( + &self, + projection: &ProjectionMask, + selection: Option<&RowSelection>, + batch_size: usize, + cache_mask: Option<&ProjectionMask>, + ) -> FetchRanges { + let metadata = self.metadata.row_group(self.row_group_idx); + if let Some((selection, offset_index)) = selection.zip(self.offset_index) { + let expanded_selection = + selection.expand_to_batch_boundaries(batch_size, self.row_count); + + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut page_start_offsets: Vec> = vec![]; + + let ranges = self + .column_chunks + .iter() + .zip(metadata.columns()) + .enumerate() + .filter(|&(idx, (chunk, _chunk_meta))| { + chunk.is_none() && projection.leaf_included(idx) + }) + .flat_map(|(idx, (_chunk, chunk_meta))| { + // If the first page does not start at the beginning of the column, + // then we need to also fetch a dictionary page. + let mut ranges: Vec> = vec![]; + let (start, _len) = chunk_meta.byte_range(); + match offset_index[idx].page_locations.first() { + Some(first) if first.offset as u64 != start => { + ranges.push(start..first.offset as u64); + } + _ => (), + } + + // Expand selection to batch boundaries only for cached columns + let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); + if use_expanded { + ranges.extend( + expanded_selection.scan_ranges(&offset_index[idx].page_locations), + ); + } else { + ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); + } + page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); + + ranges + }) + .collect(); + FetchRanges { + ranges, + page_start_offsets: Some(page_start_offsets), + } + } else { + let ranges = self + .column_chunks + .iter() + .enumerate() + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .map(|(idx, _chunk)| { + let column = metadata.column(idx); + let (start, length) = column.byte_range(); + start..(start + length) + }) + .collect(); + FetchRanges { + ranges, + page_start_offsets: None, + } + } + } + + /// Fills in `self.column_chunks` with the data fetched from `chunk_data`. + /// + /// This function **must** be called with the data from the ranges returned by + /// `fetch_ranges` and the corresponding page_start_offsets, with the exact same and `selection`. + pub(crate) fn fill_column_chunks( + &mut self, + projection: &ProjectionMask, + page_start_offsets: Option>>, + chunk_data: I, + ) where + I: IntoIterator, + { + let mut chunk_data = chunk_data.into_iter(); + let metadata = self.metadata.row_group(self.row_group_idx); + if let Some(page_start_offsets) = page_start_offsets { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut page_start_offsets = page_start_offsets.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(offsets) = page_start_offsets.next() { + let mut chunks = Vec::with_capacity(offsets.len()); + for _ in 0..offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(Arc::new(ColumnChunkData::Sparse { + length: metadata.column(idx).byte_range().1 as usize, + data: offsets + .into_iter() + .map(|x| x as usize) + .zip(chunks.into_iter()) + .collect(), + })) + } + } + } else { + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(Arc::new(ColumnChunkData::Dense { + offset: metadata.column(idx).byte_range().0 as usize, + data, + })); + } + } + } + } +} + +impl RowGroups for InMemoryRowGroup<'_> { + fn num_rows(&self) -> usize { + self.row_count + } + + /// Return chunks for column i + fn column_chunks(&self, i: usize) -> crate::errors::Result> { + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {i}, column was not fetched" + ))), + Some(data) => { + let page_locations = self + .offset_index + // filter out empty offset indexes (old versions specified Some(vec![]) when no present) + .filter(|index| !index.is_empty()) + .map(|index| index[i].page_locations.clone()); + let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i); + let page_reader = SerializedPageReader::new( + data.clone(), + column_chunk_metadata, + self.row_count, + page_locations, + )?; + let page_reader = page_reader.add_crypto_context( + self.row_group_idx, + i, + self.metadata, + column_chunk_metadata, + )?; + + let page_reader: Box = Box::new(page_reader); + + Ok(Box::new(ColumnChunkIterator { + reader: Some(Ok(page_reader)), + })) + } + } + } +} + +/// An in-memory column chunk +#[derive(Clone, Debug)] +pub(crate) enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + /// Subset of data pages included in this sparse chunk. + /// + /// Each element is a tuple of (page offset within file, page data). + /// Each entry is a complete page and the list is ordered by offset. + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk and the offset within the original file + Dense { offset: usize, data: Bytes }, +} + +impl ColumnChunkData { + /// Return the data for this column chunk at the given offset + fn get(&self, start: u64) -> crate::errors::Result { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .binary_search_by_key(&start, |(offset, _)| *offset as u64) + .map(|idx| data[idx].1.clone()) + .map_err(|_| { + ParquetError::General(format!( + "Invalid offset in sparse column chunk data: {start}" + )) + }), + ColumnChunkData::Dense { offset, data } => { + let start = start as usize - *offset; + Ok(data.slice(start..)) + } + } + } +} + +impl Length for ColumnChunkData { + /// Return the total length of the full column chunk + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, + } + } +} + +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64) -> crate::errors::Result { + Ok(self.get(start)?.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> crate::errors::Result { + Ok(self.get(start)?.slice(..length)) + } +} + +/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] +struct ColumnChunkIterator { + reader: Option>>, +} + +impl Iterator for ColumnChunkIterator { + type Item = crate::errors::Result>; + + fn next(&mut self) -> Option { + self.reader.take() + } +} + +impl PageIterator for ColumnChunkIterator {} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 94e359065107..34aac8b08aa0 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -190,9 +190,15 @@ pub mod async_reader; #[cfg(feature = "async")] pub mod async_writer; +pub mod push_decoder; + +mod in_memory_row_group; mod record_reader; + experimental!(mod schema); +use std::fmt::Debug; + pub use self::arrow_writer::ArrowWriter; #[cfg(feature = "async")] pub use self::async_reader::ParquetRecordBatchStreamBuilder; diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs new file mode 100644 index 000000000000..bbe1f1adf713 --- /dev/null +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -0,0 +1,1150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the +//! caller (rather than from an underlying reader). + +mod reader_builder; +mod remaining; + +use crate::DecodeResult; +use crate::arrow::arrow_reader::{ + ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, +}; +use crate::errors::ParquetError; +use crate::file::metadata::ParquetMetaData; +use crate::util::push_buffers::PushBuffers; +use arrow_array::RecordBatch; +use bytes::Bytes; +use reader_builder::RowGroupReaderBuilder; +use remaining::RemainingRowGroups; +use std::ops::Range; +use std::sync::Arc; + +/// A builder for [`ParquetPushDecoder`]. +/// +/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`] and pass +/// the file length and metadata of the Parquet file to decode. +/// +/// You can decode the metadata from a Parquet file using either +/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`]. +/// +/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader +/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder +/// +/// Note the "input" type is `u64` which represents the length of the Parquet file +/// being decoded. This is needed to initialize the internal buffers that track +/// what data has been provided to the decoder. +/// +/// # Example +/// ``` +/// # use std::ops::Range; +/// # use std::sync::Arc; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::file::metadata::ParquetMetaDataPushDecoder; +/// # let file_bytes = { +/// # let mut buffer = vec![]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # // mimic IO by returning a function that returns the bytes for a given range +/// # let get_range = |range: &Range| -> Bytes { +/// # let start = range.start as usize; +/// # let end = range.end as usize; +/// # file_bytes.slice(start..end) +/// # }; +/// # let file_length = file_bytes.len() as u64; +/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap(); +/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap(); +/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") }; +/// # let parquet_metadata = Arc::new(parquet_metadata); +/// // The file length and metadata are required to create the decoder +/// let mut decoder = +/// ParquetPushDecoderBuilder::try_new_decoder(file_length, parquet_metadata) +/// .unwrap() +/// // Optionally configure the decoder, e.g. batch size +/// .with_batch_size(1024) +/// // Build the decoder +/// .build() +/// .unwrap(); +/// +/// // In a loop, ask the decoder what it needs next, and provide it with the required data +/// loop { +/// match decoder.try_decode().unwrap() { +/// DecodeResult::NeedsData(ranges) => { +/// // The decoder needs more data. Fetch the data for the given ranges +/// let data = ranges.iter().map(|r| get_range(r)).collect::>(); +/// // Push the data to the decoder +/// decoder.push_ranges(ranges, data).unwrap(); +/// // After pushing the data, we can try to decode again on the next iteration +/// } +/// DecodeResult::Data(batch) => { +/// // Successfully decoded a batch of data +/// assert!(batch.num_rows() > 0); +/// } +/// DecodeResult::Finished => { +/// // The decoder has finished decoding exit the loop +/// break; +/// } +/// } +/// } +/// ``` +pub type ParquetPushDecoderBuilder = ArrowReaderBuilder; + +/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for +/// more options that can be configured. +impl ParquetPushDecoderBuilder { + /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file. + /// + /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file. + /// + /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder + /// + /// See example on [`ParquetPushDecoderBuilder`] + pub fn try_new_decoder( + file_len: u64, + parquet_metadata: Arc, + ) -> Result { + Self::try_new_decoder_with_options( + file_len, + parquet_metadata, + ArrowReaderOptions::default(), + ) + } + + /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file + /// with the given reader options. + /// + /// This is similar to [`Self::try_new_decoder`] but allows configuring + /// options such as Arrow schema + pub fn try_new_decoder_with_options( + file_len: u64, + parquet_metadata: Arc, + arrow_reader_options: ArrowReaderOptions, + ) -> Result { + let arrow_reader_metadata = + ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?; + Ok(Self::new_with_metadata(file_len, arrow_reader_metadata)) + } + + /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`]. + /// + /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from + /// the Parquet metadata and reader options. + pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: ArrowReaderMetadata) -> Self { + Self::new_builder(file_len, arrow_reader_metadata) + } + + /// Create a [`ParquetPushDecoder`] with the configured options + pub fn build(self) -> Result { + let Self { + input: file_len, + metadata: parquet_metadata, + schema: _, + fields, + batch_size, + row_groups, + projection, + filter, + selection, + limit, + offset, + metrics, + max_predicate_cache_size, + } = self; + + // If no row groups were specified, read all of them + let row_groups = + row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect()); + + // Prepare to build RowGroup readers + let buffers = PushBuffers::new(file_len); + let row_group_reader_builder = RowGroupReaderBuilder::new( + batch_size, + projection, + Arc::clone(&parquet_metadata), + fields, + filter, + limit, + offset, + metrics, + max_predicate_cache_size, + buffers, + ); + + // Initialize the decoder with the configured options + let remaining_row_groups = RemainingRowGroups::new( + parquet_metadata, + row_groups, + selection, + row_group_reader_builder, + ); + + Ok(ParquetPushDecoder { + state: ParquetDecoderState::ReadingRowGroup { + remaining_row_groups: Box::new(remaining_row_groups), + }, + }) + } +} + +/// A push based Parquet Decoder +/// +/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder. +/// +/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an +/// underlying reader for performing IO, and thus offers fine grained control +/// over how data is fetched and decoded. +/// +/// When more data is needed to make progress, instead of reading data directly +/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges +/// are needed. Once the caller provides the requested ranges via +/// [`Self::push_ranges`], they try to decode again by calling +/// [`Self::try_decode`]. +/// +/// The decoder's internal state tracks what has been already decoded and what +/// is needed next. +#[derive(Debug)] +pub struct ParquetPushDecoder { + /// The inner state. + /// + /// This state is consumed on every transition and a new state is produced + /// so the Rust compiler can ensure that the state is always valid and + /// transitions are not missed. + state: ParquetDecoderState, +} + +impl ParquetPushDecoder { + /// Attempt to decode the next batch of data, or return what data is needed + /// + /// The the decoder communicates the next state with a [`DecodeResult`] + /// + /// See full example in [`ParquetPushDecoderBuilder`] + /// + /// ```no_run + /// # use parquet::arrow::push_decoder::ParquetPushDecoder; + /// use parquet::DecodeResult; + /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() } + /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec>) { unimplemented!() } + /// let mut decoder = get_decoder(); + /// loop { + /// match decoder.try_decode().unwrap() { + /// DecodeResult::NeedsData(ranges) => { + /// // The decoder needs more data. Fetch the data for the given ranges + /// // call decoder.push_ranges(ranges, data) and call again + /// push_data(&mut decoder, ranges); + /// } + /// DecodeResult::Data(batch) => { + /// // Successfully decoded the next batch of data + /// println!("Got batch with {} rows", batch.num_rows()); + /// } + /// DecodeResult::Finished => { + /// // The decoder has finished decoding all data + /// break; + /// } + /// } + /// } + ///``` + pub fn try_decode(&mut self) -> Result, ParquetError> { + let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished); + let (new_state, decode_result) = current_state.try_transition()?; + self.state = new_state; + Ok(decode_result) + } + + /// Push data into the decoder for processing + /// + /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a + /// single range of data. + /// + /// Note this can be the entire file or just a part of it. If it is part of the file, + /// the ranges should correspond to the data ranges requested by the decoder. + /// + /// See example in [`ParquetPushDecoderBuilder`] + pub fn push_range(&mut self, range: Range, data: Bytes) -> Result<(), ParquetError> { + self.push_ranges(vec![range], vec![data]) + } + + /// Push data into the decoder for processing + /// + /// This should correspond to the data ranges requested by the decoder + pub fn push_ranges( + &mut self, + ranges: Vec>, + data: Vec, + ) -> Result<(), ParquetError> { + let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished); + self.state = current_state.push_data(ranges, data)?; + Ok(()) + } + + /// Returns the total number of buffered bytes in the decoder + /// + /// This is the sum of the size of all [`Bytes`] that has been pushed to the + /// decoder but not yet consumed. + /// + /// Note that this does not include any overhead of the internal data + /// structures and that since [`Bytes`] are ref counted memory, this may not + /// reflect additional memory usage. + /// + /// This can be used to monitor memory usage of the decoder. + pub fn buffered_bytes(&self) -> u64 { + self.state.buffered_bytes() + } +} + +/// Internal state machine for the [`ParquetPushDecoder`] +#[derive(Debug)] +enum ParquetDecoderState { + /// Waiting for data needed to decode the next RowGroup + ReadingRowGroup { + remaining_row_groups: Box, + }, + /// The decoder is actively decoding a RowGroup + DecodingRowGroup { + /// Current active reader + record_batch_reader: Box, + remaining_row_groups: Box, + }, + /// The decoder has finished processing all data + Finished, +} + +impl ParquetDecoderState { + /// Current state --> next state + output + /// + /// This function is called to check if the decoder has any RecordBatches + /// and [`Self::push_data`] is called when new data is available. + /// + /// # Notes + /// + /// This structure is used to reduce the indentation level of the main loop + /// in try_build + fn try_transition(self) -> Result<(Self, DecodeResult), ParquetError> { + match self { + Self::ReadingRowGroup { + mut remaining_row_groups, + } => { + match remaining_row_groups.try_next_reader()? { + // If we have a next reader, we can transition to decoding it + DecodeResult::Data(record_batch_reader) => { + // Transition to decoding the row group + Self::DecodingRowGroup { + record_batch_reader: Box::new(record_batch_reader), + remaining_row_groups, + } + .try_transition() + } + // If there are no more readers, we are finished + DecodeResult::NeedsData(ranges) => { + // If we need more data, we return the ranges needed and stay in Reading + // RowGroup state + Ok(( + Self::ReadingRowGroup { + remaining_row_groups, + }, + DecodeResult::NeedsData(ranges), + )) + } + DecodeResult::Finished => { + // No more row groups to read, we are finished + Ok((Self::Finished, DecodeResult::Finished)) + } + } + } + Self::DecodingRowGroup { + mut record_batch_reader, + remaining_row_groups, + } => { + // Decide the next record batch + match record_batch_reader.next() { + Some(Ok(batch)) => { + // Successfully decoded a batch, return it + Ok(( + Self::DecodingRowGroup { + record_batch_reader, + remaining_row_groups, + }, + DecodeResult::Data(batch), + )) + } + None => { + // No more batches in this row group, move to the next row group + // or finish if there are no more row groups + Self::ReadingRowGroup { + remaining_row_groups, + } + .try_transition() + } + Some(Err(e)) => Err(ParquetError::from(e)), // some error occurred while decoding + } + } + Self::Finished => Ok((Self::Finished, DecodeResult::Finished)), + } + } + + /// Push data, and transition state if needed + /// + /// This should correspond to the data ranges requested by the decoder + pub fn push_data( + self, + ranges: Vec>, + data: Vec, + ) -> Result { + match self { + ParquetDecoderState::ReadingRowGroup { + mut remaining_row_groups, + } => { + // Push data to the RowGroupReaderBuilder + remaining_row_groups.push_data(ranges, data); + Ok(ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + }) + } + // it is ok to get data before we asked for it + ParquetDecoderState::DecodingRowGroup { + record_batch_reader, + mut remaining_row_groups, + } => { + remaining_row_groups.push_data(ranges, data); + Ok(ParquetDecoderState::DecodingRowGroup { + record_batch_reader, + remaining_row_groups, + }) + } + ParquetDecoderState::Finished => Err(ParquetError::General( + "Cannot push data to a finished decoder".to_string(), + )), + } + } + + /// How many bytes are currently buffered in the decoder? + fn buffered_bytes(&self) -> u64 { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.buffered_bytes(), + ParquetDecoderState::DecodingRowGroup { + record_batch_reader: _, + remaining_row_groups, + } => remaining_row_groups.buffered_bytes(), + ParquetDecoderState::Finished => 0, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::DecodeResult; + use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector}; + use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; + use crate::arrow::{ArrowWriter, ProjectionMask}; + use crate::errors::ParquetError; + use crate::file::metadata::ParquetMetaDataPushDecoder; + use crate::file::properties::WriterProperties; + use arrow::compute::kernels::cmp::{gt, lt}; + use arrow_array::cast::AsArray; + use arrow_array::types::Int64Type; + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray}; + use arrow_select::concat::concat_batches; + use bytes::Bytes; + use std::fmt::Debug; + use std::ops::Range; + use std::sync::{Arc, LazyLock}; + + /// Test decoder struct size (as they are copied around on each transition, they + /// should not grow too large) + #[test] + fn test_decoder_size() { + assert_eq!(std::mem::size_of::(), 24); + } + + /// Decode the entire file at once, simulating a scenario where all data is + /// available in memory + #[test] + fn test_decoder_all_data() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap() + .build() + .unwrap(); + + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + let results = vec![ + // first row group should be decoded without needing more data + expect_data(decoder.try_decode()), + // second row group should be decoded without needing more data + expect_data(decoder.try_decode()), + ]; + expect_finished(decoder.try_decode()); + + let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap(); + // Check that the output matches the input batch + assert_eq!(all_output, *TEST_BATCH); + } + + /// Decode the entire file incrementally, simulating a scenario where data is + /// fetched as needed + #[test] + fn test_decoder_incremental() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap() + .build() + .unwrap(); + + let mut results = vec![]; + + // First row group, expect a single request + let ranges = expect_needs_data(decoder.try_decode()); + let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + push_ranges_to_decoder(&mut decoder, ranges); + // The decoder should currently only store the data it needs to decode the first row group + assert_eq!(decoder.buffered_bytes(), num_bytes_requested); + results.push(expect_data(decoder.try_decode())); + // the decoder should have consumed the data for the first row group and freed it + assert_eq!(decoder.buffered_bytes(), 0); + + // Second row group, + let ranges = expect_needs_data(decoder.try_decode()); + let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + push_ranges_to_decoder(&mut decoder, ranges); + // The decoder should currently only store the data it needs to decode the second row group + assert_eq!(decoder.buffered_bytes(), num_bytes_requested); + results.push(expect_data(decoder.try_decode())); + // the decoder should have consumed the data for the second row group and freed it + assert_eq!(decoder.buffered_bytes(), 0); + expect_finished(decoder.try_decode()); + + // Check that the output matches the input batch + let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap(); + assert_eq!(all_output, *TEST_BATCH); + } + + /// Decode the entire file incrementally, simulating partial reads + #[test] + fn test_decoder_partial() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap() + .build() + .unwrap(); + + // First row group, expect a single request for all data needed to read "a" and "b" + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + let batch1 = expect_data(decoder.try_decode()); + let expected1 = TEST_BATCH.slice(0, 200); + assert_eq!(batch1, expected1); + + // Second row group, this time provide the data in two steps + let ranges = expect_needs_data(decoder.try_decode()); + let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2); + assert!(!ranges1.is_empty()); + assert!(!ranges2.is_empty()); + // push first half to simulate partial read + push_ranges_to_decoder(&mut decoder, ranges1.to_vec()); + + // still expect more data + let ranges = expect_needs_data(decoder.try_decode()); + assert_eq!(ranges, ranges2); // should be the remaining ranges + // push empty ranges should be a no-op + push_ranges_to_decoder(&mut decoder, vec![]); + let ranges = expect_needs_data(decoder.try_decode()); + assert_eq!(ranges, ranges2); // should be the remaining ranges + push_ranges_to_decoder(&mut decoder, ranges); + + let batch2 = expect_data(decoder.try_decode()); + let expected2 = TEST_BATCH.slice(200, 200); + assert_eq!(batch2, expected2); + + expect_finished(decoder.try_decode()); + } + + /// Decode multiple columns "a" and "b", expect that the decoder requests + /// only a single request per row group + #[test] + fn test_decoder_selection_does_one_request() { + let builder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap(); + + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let mut decoder = builder + .with_projection( + ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b" + ) + .build() + .unwrap(); + + // First row group, expect a single request for all data needed to read "a" and "b" + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + let batch1 = expect_data(decoder.try_decode()); + let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap(); + assert_eq!(batch1, expected1); + + // Second row group, similarly expect a single request for all data needed to read "a" and "b" + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + let batch2 = expect_data(decoder.try_decode()); + let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap(); + assert_eq!(batch2, expected2); + + expect_finished(decoder.try_decode()); + } + + /// Decode with a filter that requires multiple requests, but only provide part + /// of the data needed for the filter at a time simulating partial reads. + #[test] + fn test_decoder_single_filter_partial() { + let builder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap(); + + // Values in column "a" range 0..399 + // First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1) + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // a > 250 + let row_filter_a = ArrowPredicateFn::new( + // claim to use both a and b so we get two ranges requests for the filter pages + ProjectionMask::columns(&schema_descr, ["a", "b"]), + |batch: RecordBatch| { + let scalar_250 = Int64Array::new_scalar(250); + let column = batch.column(0).as_primitive::(); + gt(column, &scalar_250) + }, + ); + + let mut decoder = builder + .with_projection( + // read only column "a" to test that filter pages are reused + ProjectionMask::columns(&schema_descr, ["a"]), // read "a" + ) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)])) + .build() + .unwrap(); + + // First row group, + let ranges = expect_needs_data(decoder.try_decode()); + // only provide half the ranges + let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2); + assert!(!ranges1.is_empty()); + assert!(!ranges2.is_empty()); + push_ranges_to_decoder(&mut decoder, ranges1.to_vec()); + // still expect more data + let ranges = expect_needs_data(decoder.try_decode()); + assert_eq!(ranges, ranges2); // should be the remaining ranges + let ranges = expect_needs_data(decoder.try_decode()); + assert_eq!(ranges, ranges2); // should be the remaining ranges + push_ranges_to_decoder(&mut decoder, ranges2.to_vec()); + + // expect the first row group to be filtered out (no rows match) + + // Second row group + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + let batch = expect_data(decoder.try_decode()); + let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap(); + assert_eq!(batch, expected); + + expect_finished(decoder.try_decode()); + } + + /// Decode with a filter where we also skip one of the RowGroups via a RowSelection + #[test] + fn test_decoder_single_filter_and_row_selection() { + let builder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap(); + + // Values in column "a" range 0..399 + // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1) + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // a > 250 + let row_filter_a = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["a"]), + |batch: RecordBatch| { + let scalar_250 = Int64Array::new_scalar(250); + let column = batch.column(0).as_primitive::(); + gt(column, &scalar_250) + }, + ); + + let mut decoder = builder + .with_projection( + // read only column "a" to test that filter pages are reused + ProjectionMask::columns(&schema_descr, ["b"]), // read "b" + ) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)])) + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(200), // skip first row group + RowSelector::select(100), // first 100 rows of second row group + RowSelector::skip(100), + ])) + .build() + .unwrap(); + + // expect the first row group to be filtered out (no filter is evaluated due to row selection) + + // Second row group, first filter (a > 250) + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // Second row group + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + let batch = expect_data(decoder.try_decode()); + let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap(); + assert_eq!(batch, expected); + + expect_finished(decoder.try_decode()); + } + + /// Decode with multiple filters that require multiple requests + #[test] + fn test_decoder_multi_filters() { + // Create a decoder for decoding parquet data (note it does not have any IO / readers) + let builder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap(); + + // Values in column "a" range 0..399 + // Values in column "b" range 400..799 + // First filter: "a" > 175 (last data page in Row Group 0) + // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // a > 175 + let row_filter_a = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["a"]), + |batch: RecordBatch| { + let scalar_175 = Int64Array::new_scalar(175); + let column = batch.column(0).as_primitive::(); + gt(column, &scalar_175) + }, + ); + + // b < 625 + let row_filter_b = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + lt(column, &scalar_625) + }, + ); + + let mut decoder = builder + .with_projection( + ProjectionMask::columns(&schema_descr, ["c"]), // read "c" + ) + .with_row_filter(RowFilter::new(vec![ + Box::new(row_filter_a), + Box::new(row_filter_b), + ])) + .build() + .unwrap(); + + // First row group, first filter (a > 175) + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // first row group, second filter (b < 625) + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // first row group, data pages for "c" + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the first batch to be decoded: rows 176..199, column "c" + let batch1 = expect_data(decoder.try_decode()); + let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap(); + assert_eq!(batch1, expected1); + + // Second row group, first filter (a > 175) + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // Second row group, second filter (b < 625) + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // Second row group, data pages for "c" + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the second batch to be decoded: rows 200..224, column "c" + let batch2 = expect_data(decoder.try_decode()); + let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap(); + assert_eq!(batch2, expected2); + + expect_finished(decoder.try_decode()); + } + + /// Decode with a filter that uses a column that is also projected, and expect + /// that the filter pages are reused (don't refetch them) + #[test] + fn test_decoder_reuses_filter_pages() { + // Create a decoder for decoding parquet data (note it does not have any IO / readers) + let builder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap(); + + // Values in column "a" range 0..399 + // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1) + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // a > 250 + let row_filter_a = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["a"]), + |batch: RecordBatch| { + let scalar_250 = Int64Array::new_scalar(250); + let column = batch.column(0).as_primitive::(); + gt(column, &scalar_250) + }, + ); + + let mut decoder = builder + .with_projection( + // read only column "a" to test that filter pages are reused + ProjectionMask::columns(&schema_descr, ["a"]), // read "a" + ) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)])) + .build() + .unwrap(); + + // First row group, first filter (a > 175) + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the first row group to be filtered out (no rows match) + + // Second row group, first filter (a > 250) + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect that the second row group is decoded: rows 251..399, column "a" + // Note that the filter pages for "a" should be reused and no additional data + // should be requested + let batch = expect_data(decoder.try_decode()); + let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap(); + assert_eq!(batch, expected); + + expect_finished(decoder.try_decode()); + } + + #[test] + fn test_decoder_empty_filters() { + let builder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap(); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // only read column "c", but with empty filters + let mut decoder = builder + .with_projection( + ProjectionMask::columns(&schema_descr, ["c"]), // read "c" + ) + .with_row_filter(RowFilter::new(vec![ + // empty filters should be ignored + ])) + .build() + .unwrap(); + + // First row group + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the first batch to be decoded: rows 0..199, column "c" + let batch1 = expect_data(decoder.try_decode()); + let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap(); + assert_eq!(batch1, expected1); + + // Second row group, + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the second batch to be decoded: rows 200..399, column "c" + let batch2 = expect_data(decoder.try_decode()); + let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap(); + + assert_eq!(batch2, expected2); + + expect_finished(decoder.try_decode()); + } + + #[test] + fn test_decoder_offset_limit() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap() + // skip entire first row group (200 rows) and first 25 rows of second row group + .with_offset(225) + // and limit to 20 rows + .with_limit(20) + .build() + .unwrap(); + + // First row group should be skipped, + + // Second row group + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the first ane only batch to be decoded + let batch1 = expect_data(decoder.try_decode()); + let expected1 = TEST_BATCH.slice(225, 20); + assert_eq!(batch1, expected1); + + expect_finished(decoder.try_decode()); + } + + #[test] + fn test_decoder_row_group_selection() { + // take only the second row group + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap() + .with_row_groups(vec![1]) + .build() + .unwrap(); + + // First row group should be skipped, + + // Second row group + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the first ane only batch to be decoded + let batch1 = expect_data(decoder.try_decode()); + let expected1 = TEST_BATCH.slice(200, 200); + assert_eq!(batch1, expected1); + + expect_finished(decoder.try_decode()); + } + + #[test] + fn test_decoder_row_selection() { + // take only the second row group + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( + test_file_len(), + test_file_parquet_metadata(), + ) + .unwrap() + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(225), // skip first row group and 25 rows of second]) + RowSelector::select(20), // take 20 rows + ])) + .build() + .unwrap(); + + // First row group should be skipped, + + // Second row group + let ranges = expect_needs_data(decoder.try_decode()); + push_ranges_to_decoder(&mut decoder, ranges); + + // expect the first ane only batch to be decoded + let batch1 = expect_data(decoder.try_decode()); + let expected1 = TEST_BATCH.slice(225, 20); + assert_eq!(batch1, expected1); + + expect_finished(decoder.try_decode()); + } + + /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c" + /// + /// Note c is a different types (so the data page sizes will be different) + static TEST_BATCH: LazyLock = LazyLock::new(|| { + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() + }); + + /// Create a parquet file in memory for testing. + /// + /// See [`TEST_BATCH`] for the data in the file. + /// + /// Each column is written in 4 data pages, each with 100 rows, across 2 + /// row groups. Each column in each row group has two data pages. + /// + /// The data is split across row groups like this + /// + /// Column | Values | Data Page | Row Group + /// -------|------------------------|-----------|----------- + /// a | 0..99 | 1 | 0 + /// a | 100..199 | 2 | 0 + /// a | 200..299 | 1 | 1 + /// a | 300..399 | 2 | 1 + /// + /// b | 400..499 | 1 | 0 + /// b | 500..599 | 2 | 0 + /// b | 600..699 | 1 | 1 + /// b | 700..799 | 2 | 1 + /// + /// c | "string_0".."string_99" | 1 | 0 + /// c | "string_100".."string_199" | 2 | 0 + /// c | "string_200".."string_299" | 1 | 1 + /// c | "string_300".."string_399" | 2 | 1 + static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + let input_batch = &TEST_BATCH; + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) + }); + + /// Return the length of [`TEST_FILE_DATA`], in bytes + fn test_file_len() -> u64 { + TEST_FILE_DATA.len() as u64 + } + + /// Return a range that covers the entire [`TEST_FILE_DATA`] + fn test_file_range() -> Range { + 0..test_file_len() + } + + /// Return a slice of the test file data from the given range + pub fn test_file_slice(range: Range) -> Bytes { + let start: usize = range.start.try_into().unwrap(); + let end: usize = range.end.try_into().unwrap(); + TEST_FILE_DATA.slice(start..end) + } + + /// return the metadata for the test file + pub fn test_file_parquet_metadata() -> Arc { + let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap(); + push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]); + let metadata = metadata_decoder.try_decode().unwrap(); + let DecodeResult::Data(metadata) = metadata else { + panic!("Expected metadata to be decoded successfully"); + }; + Arc::new(metadata) + } + + /// Push the given ranges to the metadata decoder, simulating reading from a file + fn push_ranges_to_metadata_decoder( + metadata_decoder: &mut ParquetMetaDataPushDecoder, + ranges: Vec>, + ) { + let data = ranges + .iter() + .map(|range| test_file_slice(range.clone())) + .collect::>(); + metadata_decoder.push_ranges(ranges, data).unwrap(); + } + + fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec>) { + let data = ranges + .iter() + .map(|range| test_file_slice(range.clone())) + .collect::>(); + decoder.push_ranges(ranges, data).unwrap(); + } + + /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element + fn expect_data(result: Result, ParquetError>) -> T { + match result.expect("Expected Ok(DecodeResult::Data(T))") { + DecodeResult::Data(data) => data, + result => panic!("Expected DecodeResult::Data, got {result:?}"), + } + } + + /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges + fn expect_needs_data( + result: Result, ParquetError>, + ) -> Vec> { + match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") { + DecodeResult::NeedsData(ranges) => ranges, + result => panic!("Expected DecodeResult::NeedsData, got {result:?}"), + } + } + + fn expect_finished(result: Result, ParquetError>) { + match result.expect("Expected Ok(DecodeResult::Finished)") { + DecodeResult::Finished => {} + result => panic!("Expected DecodeResult::Finished, got {result:?}"), + } + } +} diff --git a/parquet/src/arrow/push_decoder/reader_builder/data.rs b/parquet/src/arrow/push_decoder/reader_builder/data.rs new file mode 100644 index 000000000000..7b95300ddb0c --- /dev/null +++ b/parquet/src/arrow/push_decoder/reader_builder/data.rs @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`DataRequest`] tracks and holds data needed to construct InMemoryRowGroups + +use crate::arrow::ProjectionMask; +use crate::arrow::arrow_reader::RowSelection; +use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges, InMemoryRowGroup}; +use crate::errors::ParquetError; +use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::offset_index::OffsetIndexMetaData; +use crate::file::reader::ChunkReader; +use crate::util::push_buffers::PushBuffers; +use bytes::Bytes; +use std::ops::Range; +use std::sync::Arc; + +/// Contains in-progress state to construct InMemoryRowGroups +/// +/// See [`DataRequestBuilder`] for creating new requests +#[derive(Debug)] +pub(super) struct DataRequest { + /// Any previously read column chunk data + column_chunks: Vec>>, + /// The ranges of data that are needed next + ranges: Vec>, + /// Optional page start offsets each requested range. This is used + /// to create the relevant InMemoryRowGroup + page_start_offsets: Option>>, +} + +impl DataRequest { + /// return what ranges are still needed to satisfy this request. Returns an empty vec + /// if all ranges are satisfied + pub fn needed_ranges(&self, buffers: &PushBuffers) -> Vec> { + self.ranges + .iter() + .filter(|&range| !buffers.has_range(range)) + .cloned() + .collect() + } + + /// Returns the chunks from the buffers that satisfy this request + fn get_chunks(&self, buffers: &PushBuffers) -> Result, ParquetError> { + self.ranges + .iter() + .map(|range| { + let length: usize = (range.end - range.start) + .try_into() + .expect("overflow for offset"); + // should have all the data due to the check above + buffers.get_bytes(range.start, length).map_err(|e| { + ParquetError::General(format!( + "Internal Error missing data for range {range:?} in buffers: {e}", + )) + }) + }) + .collect() + } + + /// Create a new InMemoryRowGroup, and fill it with provided data + /// + /// Assumes that all needed data is present in the buffers + /// and clears any explicitly requested ranges + pub fn try_into_in_memory_row_group<'a>( + self, + row_group_idx: usize, + row_count: usize, + parquet_metadata: &'a ParquetMetaData, + projection: &ProjectionMask, + buffers: &mut PushBuffers, + ) -> Result, ParquetError> { + let chunks = self.get_chunks(buffers)?; + + let Self { + column_chunks, + ranges, + page_start_offsets, + } = self; + + // Create an InMemoryRowGroup to hold the column chunks, this is a + // temporary structure used to tell the ArrowReaders what pages are + // needed for decoding + let mut in_memory_row_group = InMemoryRowGroup { + row_count, + column_chunks, + offset_index: get_offset_index(parquet_metadata, row_group_idx), + row_group_idx, + metadata: parquet_metadata, + }; + + in_memory_row_group.fill_column_chunks(projection, page_start_offsets, chunks); + + // Clear the ranges that were explicitly requested + buffers.clear_ranges(&ranges); + + Ok(in_memory_row_group) + } +} + +/// Builder for [`DataRequest`] +pub(super) struct DataRequestBuilder<'a> { + /// The row group index + row_group_idx: usize, + /// The number of rows in the row group + row_count: usize, + /// The batch size to read + batch_size: usize, + /// The parquet metadata + parquet_metadata: &'a ParquetMetaData, + /// The projection mask (which columns to read) + projection: &'a ProjectionMask, + /// Optional row selection to apply + selection: Option<&'a RowSelection>, + /// Optional projection mask for caching purposes + cache_projection: Option<&'a ProjectionMask>, + /// Any previously read column chunks + column_chunks: Option>>>, +} + +impl<'a> DataRequestBuilder<'a> { + pub(super) fn new( + row_group_idx: usize, + row_count: usize, + batch_size: usize, + parquet_metadata: &'a ParquetMetaData, + projection: &'a ProjectionMask, + ) -> Self { + Self { + row_group_idx, + row_count, + batch_size, + parquet_metadata, + projection, + selection: None, + cache_projection: None, + column_chunks: None, + } + } + + /// Set an optional row selection to apply + pub(super) fn with_selection(mut self, selection: Option<&'a RowSelection>) -> Self { + self.selection = selection; + self + } + + /// set columns to cache, if any + pub(super) fn with_cache_projection( + mut self, + cache_projection: Option<&'a ProjectionMask>, + ) -> Self { + self.cache_projection = cache_projection; + self + } + + /// Provide any previously read column chunks + pub(super) fn with_column_chunks( + mut self, + column_chunks: Option>>>, + ) -> Self { + self.column_chunks = column_chunks; + self + } + + pub(crate) fn build(self) -> DataRequest { + let Self { + row_group_idx, + row_count, + batch_size, + parquet_metadata, + projection, + selection, + cache_projection, + column_chunks, + } = self; + + let row_group_meta_data = parquet_metadata.row_group(row_group_idx); + + // If no previously read column chunks are provided, create a new location to hold them + let column_chunks = + column_chunks.unwrap_or_else(|| vec![None; row_group_meta_data.columns().len()]); + + // Create an InMemoryRowGroup to hold the column chunks, this is a + // temporary structure used to tell the ArrowReaders what pages are + // needed for decoding + let row_group = InMemoryRowGroup { + row_count, + column_chunks, + offset_index: get_offset_index(parquet_metadata, row_group_idx), + row_group_idx, + metadata: parquet_metadata, + }; + + let FetchRanges { + ranges, + page_start_offsets, + } = row_group.fetch_ranges(projection, selection, batch_size, cache_projection); + + DataRequest { + // Save any previously read column chunks + column_chunks: row_group.column_chunks, + ranges, + page_start_offsets, + } + } +} + +fn get_offset_index( + parquet_metadata: &ParquetMetaData, + row_group_idx: usize, +) -> Option<&[OffsetIndexMetaData]> { + parquet_metadata + .offset_index() + // filter out empty offset indexes (old versions specified Some(vec![]) when no present) + .filter(|index| !index.is_empty()) + .map(|x| x[row_group_idx].as_slice()) +} diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs b/parquet/src/arrow/push_decoder/reader_builder/filter.rs new file mode 100644 index 000000000000..9a73059f8a0d --- /dev/null +++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`FilterInfo`] state machine for evaluating row filters + +use crate::arrow::ProjectionMask; +use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache}; +use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter}; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; + +/// State machine for evaluating a sequence of predicates +/// +/// This is somewhat more complicated than one might expect because the +/// RowFilter must be owned by the FilterInfo so that predicates can +/// be evaluated (requires mutable access). +#[derive(Debug)] +pub(super) struct FilterInfo { + /// The predicates to evaluate, in order + /// + /// These must be owned by FilterInfo because they may be mutated as part of + /// evaluation so there is a bunch of complexity of handing them back and forth + filter: RowFilter, + /// The next filter to be evaluated + next_predicate: NonZeroUsize, + /// Stores previously computed filter results + cache_info: CacheInfo, +} + +/// Predicate cache +/// +/// Note this is basically the same as CacheOptionsBuilder +/// but it owns the ProjectionMask and RowGroupCache +#[derive(Debug)] +pub(super) struct CacheInfo { + /// The columns to cache in the predicate cache + cache_projection: ProjectionMask, + row_group_cache: Arc>, +} + +impl CacheInfo { + pub(super) fn new( + cache_projection: ProjectionMask, + row_group_cache: Arc>, + ) -> Self { + Self { + cache_projection, + row_group_cache, + } + } + + pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> { + CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache) + } +} + +pub(super) enum AdvanceResult { + /// advanced to the next predicate + Continue(FilterInfo), + /// no more predicates returns the row filter and cache info + Done(RowFilter, CacheInfo), +} + +impl FilterInfo { + /// Create a new FilterInfo + pub(super) fn new(filter: RowFilter, cache_info: CacheInfo) -> Self { + Self { + filter, + next_predicate: NonZeroUsize::new(1).expect("1 is always non-zero"), + cache_info, + } + } + + /// Advance to the next predicate, returning either the updated FilterInfo + /// or the completed RowFilter if there are no more predicates + pub(super) fn advance(mut self) -> AdvanceResult { + if self.next_predicate.get() >= self.filter.predicates.len() { + AdvanceResult::Done(self.filter, self.cache_info) + } else { + self.next_predicate = self + .next_predicate + .checked_add(1) + .expect("no usize overflow"); + AdvanceResult::Continue(self) + } + } + + /// Return the current predicate to evaluate, mutablely + /// Panics if done() is true + pub(super) fn current_mut(&mut self) -> &mut dyn ArrowPredicate { + self.filter + .predicates + .get_mut(self.next_predicate.get() - 1) + .expect("current predicate out of bounds") + .as_mut() + } + + /// Return the current predicate to evaluate + /// Panics if done() is true + pub(super) fn current(&self) -> &dyn ArrowPredicate { + self.filter + .predicates + .get(self.next_predicate.get() - 1) + .expect("current predicate out of bounds") + .as_ref() + } + + /// Return a reference to the cache projection + pub(super) fn cache_projection(&self) -> &ProjectionMask { + &self.cache_info.cache_projection + } + + /// Return a cache builder to save the results of predicate evaluation + pub(super) fn cache_builder(&self) -> CacheOptionsBuilder<'_> { + self.cache_info.builder() + } + + /// Returns the inner filter, consuming this FilterInfo + pub(super) fn into_filter(self) -> RowFilter { + self.filter + } +} diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs new file mode 100644 index 000000000000..1c5bae8caf40 --- /dev/null +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -0,0 +1,654 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod data; +mod filter; + +use crate::DecodeResult; +use crate::arrow::ProjectionMask; +use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use crate::arrow::arrow_reader::{ + ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, +}; +use crate::arrow::in_memory_row_group::ColumnChunkData; +use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder; +use crate::arrow::push_decoder::reader_builder::filter::CacheInfo; +use crate::arrow::schema::ParquetField; +use crate::errors::ParquetError; +use crate::file::metadata::ParquetMetaData; +use crate::util::push_buffers::PushBuffers; +use bytes::Bytes; +use data::DataRequest; +use filter::AdvanceResult; +use filter::FilterInfo; +use std::ops::Range; +use std::sync::{Arc, Mutex}; + +/// The current row group being read and the read plan +#[derive(Debug)] +struct RowGroupInfo { + row_group_idx: usize, + row_count: usize, + plan_builder: ReadPlanBuilder, +} + +/// This is the inner state machine for reading a single row group. +#[derive(Debug)] +enum RowGroupDecoderState { + Start { + row_group_info: RowGroupInfo, + }, + /// Planning filters, but haven't yet requested data to evaluate them + Filters { + row_group_info: RowGroupInfo, + /// Any previously read column chunk data from prior filters + column_chunks: Option>>>, + filter_info: FilterInfo, + }, + /// Needs data to evaluate current filter + WaitingOnFilterData { + row_group_info: RowGroupInfo, + filter_info: FilterInfo, + data_request: DataRequest, + }, + /// Know what data to actually read, after all predicates + StartData { + row_group_info: RowGroupInfo, + /// Any previously read column chunk data from the filtering phase + column_chunks: Option>>>, + /// Any cached filter results + cache_info: Option, + }, + /// Needs data to proceed with reading the output + WaitingOnData { + row_group_info: RowGroupInfo, + data_request: DataRequest, + /// Any cached filter results + cache_info: Option, + }, + /// Finished (or not yet started) reading this group + Finished, +} + +/// Result of a state transition +#[derive(Debug)] +struct NextState { + next_state: RowGroupDecoderState, + /// result to return, if any + /// + /// * `Some`: the processing should stop and return the result + /// * `None`: processing should continue + result: Option>, +} + +impl NextState { + /// The next state with no result. + /// + /// This indicates processing should continue + fn again(next_state: RowGroupDecoderState) -> Self { + Self { + next_state, + result: None, + } + } + + /// Create a NextState with a result that should be returned + fn result( + next_state: RowGroupDecoderState, + result: DecodeResult, + ) -> Self { + Self { + next_state, + result: Some(result), + } + } +} + +/// Builder for [`ParquetRecordBatchReader`] for a single row group +/// +/// This struct drives the main state machine for decoding each row group -- it +/// determines what data is needed, and then assembles the +/// `ParquetRecordBatchReader` when all data is available. +#[derive(Debug)] +pub(crate) struct RowGroupReaderBuilder { + /// The output batch size + batch_size: usize, + + /// What columns to project (produce in each output batch) + projection: ProjectionMask, + + /// The Parquet file metadata + metadata: Arc, + + /// Top level parquet schema and arrow schema mapping + fields: Option>, + + /// Optional filter + filter: Option, + + /// Limit to apply to remaining row groups (decremented as rows are read) + limit: Option, + + /// Offset to apply to remaining row groups (decremented as rows are read) + offset: Option, + + /// The size in bytes of the predicate cache + max_predicate_cache_size: usize, + + /// The metrics collector + metrics: ArrowReaderMetrics, + + /// Current state of the decoder. + /// + /// It is taken when processing, and must be put back before returning + /// it is a bug error if it is not put back after transitioning states. + state: Option, + + /// The underlying data store + buffers: PushBuffers, +} + +impl RowGroupReaderBuilder { + /// Create a new RowGroupReaderBuilder + #[expect(clippy::too_many_arguments)] + pub(crate) fn new( + batch_size: usize, + projection: ProjectionMask, + metadata: Arc, + fields: Option>, + filter: Option, + limit: Option, + offset: Option, + metrics: ArrowReaderMetrics, + max_predicate_cache_size: usize, + buffers: PushBuffers, + ) -> Self { + Self { + batch_size, + projection, + metadata, + fields, + filter, + limit, + offset, + metrics, + max_predicate_cache_size, + state: Some(RowGroupDecoderState::Finished), + buffers, + } + } + + /// Push new data buffers that can be used to satisfy pending requests + pub fn push_data(&mut self, ranges: Vec>, buffers: Vec) { + self.buffers.push_ranges(ranges, buffers); + } + + /// Returns the total number of buffered bytes available + pub fn buffered_bytes(&self) -> u64 { + self.buffers.buffered_bytes() + } + + /// take the current state, leaving None in its place. + /// + /// Returns an error if there the state wasn't put back after the previous + /// call to [`Self::take_state`]. + /// + /// Any code that calls this method must ensure that the state is put back + /// before returning, otherwise the reader error next time it is called + fn take_state(&mut self) -> Result { + self.state.take().ok_or_else(|| { + ParquetError::General(String::from( + "Internal Error: RowGroupReader in invalid state", + )) + }) + } + + /// Setup this reader to read the next row group + pub(crate) fn next_row_group( + &mut self, + row_group_idx: usize, + row_count: usize, + selection: Option, + ) -> Result<(), ParquetError> { + let state = self.take_state()?; + if !matches!(state, RowGroupDecoderState::Finished) { + return Err(ParquetError::General(format!( + "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}" + ))); + } + let plan_builder = ReadPlanBuilder::new(self.batch_size).with_selection(selection); + + let row_group_info = RowGroupInfo { + row_group_idx, + row_count, + plan_builder, + }; + + self.state = Some(RowGroupDecoderState::Start { row_group_info }); + Ok(()) + } + + /// Try to build the next `ParquetRecordBatchReader` from this RowGroupReader. + /// + /// If more data is needed, returns [`DecodeResult::NeedsData`] with the + /// ranges of data that are needed to proceed. + /// + /// If a [`ParquetRecordBatchReader`] is ready, it is returned in + /// `DecodeResult::Data`. + pub(crate) fn try_build( + &mut self, + ) -> Result, ParquetError> { + loop { + let current_state = self.take_state()?; + match self.try_transition(current_state)? { + NextState { + next_state, + result: Some(result), + } => { + // put back the next state + self.state = Some(next_state); + return Ok(result); + } + NextState { + next_state, + result: None, + } => { + // continue processing + self.state = Some(next_state); + } + } + } + } + + /// Current state --> next state + optional output + /// + /// This is the main state transition function for the row group reader + /// and encodes the row group decoding state machine. + /// + /// # Notes + /// + /// This structure is used to reduce the indentation level of the main loop + /// in try_build + fn try_transition( + &mut self, + current_state: RowGroupDecoderState, + ) -> Result { + let result = match current_state { + RowGroupDecoderState::Start { row_group_info } => { + let column_chunks = None; // no prior column chunks + + let Some(filter) = self.filter.take() else { + // no filter, start trying to read data immediately + return Ok(NextState::again(RowGroupDecoderState::StartData { + row_group_info, + column_chunks, + cache_info: None, + })); + }; + // no predicates in filter, so start reading immediately + if filter.predicates.is_empty() { + return Ok(NextState::again(RowGroupDecoderState::StartData { + row_group_info, + column_chunks, + cache_info: None, + })); + }; + + // we have predicates to evaluate + let cache_projection = + self.compute_cache_projection(row_group_info.row_group_idx, &filter); + + let cache_info = CacheInfo::new( + cache_projection, + Arc::new(Mutex::new(RowGroupCache::new( + self.batch_size, + self.max_predicate_cache_size, + ))), + ); + + let filter_info = FilterInfo::new(filter, cache_info); + NextState::again(RowGroupDecoderState::Filters { + row_group_info, + filter_info, + column_chunks, + }) + } + // need to evaluate filters + RowGroupDecoderState::Filters { + row_group_info, + column_chunks, + filter_info, + } => { + let RowGroupInfo { + row_group_idx, + row_count, + plan_builder, + } = row_group_info; + + // If nothing is selected, we are done with this row group + if !plan_builder.selects_any() { + // ruled out entire row group + self.filter = Some(filter_info.into_filter()); + return Ok(NextState::result( + RowGroupDecoderState::Finished, + DecodeResult::Finished, + )); + } + + // Make a request for the data needed to evaluate the current predicate + let predicate = filter_info.current(); + + // need to fetch pages the column needs for decoding, figure + // that out based on the current selection and projection + let data_request = DataRequestBuilder::new( + row_group_idx, + row_count, + self.batch_size, + &self.metadata, + predicate.projection(), // use the predicate's projection + ) + .with_selection(plan_builder.selection()) + // Fetch predicate columns; expand selection only for cached predicate columns + .with_cache_projection(Some(filter_info.cache_projection())) + .with_column_chunks(column_chunks) + .build(); + + let row_group_info = RowGroupInfo { + row_group_idx, + row_count, + plan_builder, + }; + + NextState::again(RowGroupDecoderState::WaitingOnFilterData { + row_group_info, + filter_info, + data_request, + }) + } + RowGroupDecoderState::WaitingOnFilterData { + row_group_info, + data_request, + mut filter_info, + } => { + // figure out what ranges we still need + let needed_ranges = data_request.needed_ranges(&self.buffers); + if !needed_ranges.is_empty() { + // still need data + return Ok(NextState::result( + RowGroupDecoderState::WaitingOnFilterData { + row_group_info, + filter_info, + data_request, + }, + DecodeResult::NeedsData(needed_ranges), + )); + } + + // otherwise we have all the data we need to evaluate the predicate + let RowGroupInfo { + row_group_idx, + row_count, + mut plan_builder, + } = row_group_info; + + let predicate = filter_info.current(); + + let row_group = data_request.try_into_in_memory_row_group( + row_group_idx, + row_count, + &self.metadata, + predicate.projection(), + &mut self.buffers, + )?; + + let cache_options = filter_info.cache_builder().producer(); + + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_cache_options(Some(&cache_options)) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; + + plan_builder = + plan_builder.with_predicate(array_reader, filter_info.current_mut())?; + + let row_group_info = RowGroupInfo { + row_group_idx, + row_count, + plan_builder, + }; + + // Take back the column chunks that were read + let column_chunks = Some(row_group.column_chunks); + + // advance to the next predicate, if any + match filter_info.advance() { + AdvanceResult::Continue(filter_info) => { + NextState::again(RowGroupDecoderState::Filters { + row_group_info, + column_chunks, + filter_info, + }) + } + // done with predicates, proceed to reading data + AdvanceResult::Done(filter, cache_info) => { + // remember we need to put back the filter + assert!(self.filter.is_none()); + self.filter = Some(filter); + NextState::again(RowGroupDecoderState::StartData { + row_group_info, + column_chunks, + cache_info: Some(cache_info), + }) + } + } + } + RowGroupDecoderState::StartData { + row_group_info, + column_chunks, + cache_info, + } => { + let RowGroupInfo { + row_group_idx, + row_count, + plan_builder, + } = row_group_info; + + // Compute the number of rows in the selection before applying limit and offset + let rows_before = plan_builder.num_rows_selected().unwrap_or(row_count); + + if rows_before == 0 { + // ruled out entire row group + return Ok(NextState::result( + RowGroupDecoderState::Finished, + DecodeResult::Finished, + )); + } + + // Apply any limit and offset + let plan_builder = plan_builder + .limited(row_count) + .with_offset(self.offset) + .with_limit(self.limit) + .build_limited(); + + let rows_after = plan_builder.num_rows_selected().unwrap_or(row_count); + + // Update running offset and limit for after the current row group is read + if let Some(offset) = &mut self.offset { + // Reduction is either because of offset or limit, as limit is applied + // after offset has been "exhausted" can just use saturating sub here + *offset = offset.saturating_sub(rows_before - rows_after) + } + + if rows_after == 0 { + // no rows left after applying limit/offset + return Ok(NextState::result( + RowGroupDecoderState::Finished, + DecodeResult::Finished, + )); + } + + if let Some(limit) = &mut self.limit { + *limit -= rows_after; + } + + let data_request = DataRequestBuilder::new( + row_group_idx, + row_count, + self.batch_size, + &self.metadata, + &self.projection, + ) + .with_selection(plan_builder.selection()) + .with_column_chunks(column_chunks) + // Final projection fetch shouldn't expand selection for cache + // so don't call with_cache_projection here + .build(); + + let row_group_info = RowGroupInfo { + row_group_idx, + row_count, + plan_builder, + }; + + NextState::again(RowGroupDecoderState::WaitingOnData { + row_group_info, + data_request, + cache_info, + }) + } + // Waiting on data to proceed with reading the output + RowGroupDecoderState::WaitingOnData { + row_group_info, + data_request, + cache_info, + } => { + let needed_ranges = data_request.needed_ranges(&self.buffers); + if !needed_ranges.is_empty() { + // still need data + return Ok(NextState::result( + RowGroupDecoderState::WaitingOnData { + row_group_info, + data_request, + cache_info, + }, + DecodeResult::NeedsData(needed_ranges), + )); + } + + // otherwise we have all the data we need to proceed + let RowGroupInfo { + row_group_idx, + row_count, + plan_builder, + } = row_group_info; + + let row_group = data_request.try_into_in_memory_row_group( + row_group_idx, + row_count, + &self.metadata, + &self.projection, + &mut self.buffers, + )?; + + let plan = plan_builder.build(); + + // if we have any cached results, connect them up + let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics); + let array_reader = if let Some(cache_info) = cache_info.as_ref() { + let cache_options = cache_info.builder().consumer(); + array_reader_builder + .with_cache_options(Some(&cache_options)) + .build_array_reader(self.fields.as_deref(), &self.projection) + } else { + array_reader_builder + .build_array_reader(self.fields.as_deref(), &self.projection) + }?; + + let reader = ParquetRecordBatchReader::new(array_reader, plan); + NextState::result(RowGroupDecoderState::Finished, DecodeResult::Data(reader)) + } + RowGroupDecoderState::Finished => { + // nothing left to read + NextState::result(RowGroupDecoderState::Finished, DecodeResult::Finished) + } + }; + Ok(result) + } + + /// Which columns should be cached? + /// + /// Returns the columns that are used by the filters *and* then used in the + /// final projection, excluding any nested columns. + fn compute_cache_projection(&self, row_group_idx: usize, filter: &RowFilter) -> ProjectionMask { + let meta = self.metadata.row_group(row_group_idx); + match self.compute_cache_projection_inner(filter) { + Some(projection) => projection, + None => ProjectionMask::none(meta.columns().len()), + } + } + + fn compute_cache_projection_inner(&self, filter: &RowFilter) -> Option { + let mut cache_projection = filter.predicates.first()?.projection().clone(); + for predicate in filter.predicates.iter() { + cache_projection.union(predicate.projection()); + } + cache_projection.intersect(&self.projection); + self.exclude_nested_columns_from_cache(&cache_projection) + } + + /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) + fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option { + let schema = self.metadata.file_metadata().schema_descr(); + let num_leaves = schema.num_columns(); + + // Count how many leaves each root column has + let num_roots = schema.root_schema().get_fields().len(); + let mut root_leaf_counts = vec![0usize; num_roots]; + for leaf_idx in 0..num_leaves { + let root_idx = schema.get_column_root_idx(leaf_idx); + root_leaf_counts[root_idx] += 1; + } + + // Keep only leaves whose root has exactly one leaf (non-nested) + let mut included_leaves = Vec::new(); + for leaf_idx in 0..num_leaves { + if mask.leaf_included(leaf_idx) { + let root_idx = schema.get_column_root_idx(leaf_idx); + if root_leaf_counts[root_idx] == 1 { + included_leaves.push(leaf_idx); + } + } + } + + if included_leaves.is_empty() { + None + } else { + Some(ProjectionMask::leaves(schema, included_leaves)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + // Verify that the size of RowGroupDecoderState does not grow too large + fn test_structure_size() { + assert_eq!(std::mem::size_of::(), 184); + } +} diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs new file mode 100644 index 000000000000..4613fda08749 --- /dev/null +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::DecodeResult; +use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder; +use crate::errors::ParquetError; +use crate::file::metadata::ParquetMetaData; +use bytes::Bytes; +use std::collections::VecDeque; +use std::ops::Range; +use std::sync::Arc; + +/// State machine that tracks the remaining high level chunks (row groups) of +/// Parquet data are left to read. +/// +/// This is currently a row group, but the author aspires to extend the pattern +/// to data boundaries other than RowGroups in the future. +#[derive(Debug)] +pub(crate) struct RemainingRowGroups { + /// The underlying Parquet metadata + parquet_metadata: Arc, + + /// The row groups that have not yet been read + row_groups: VecDeque, + + /// Remaining selection to apply to the next row groups + selection: Option, + + /// State for building the reader for the current row group + row_group_reader_builder: RowGroupReaderBuilder, +} + +impl RemainingRowGroups { + pub fn new( + parquet_metadata: Arc, + row_groups: Vec, + selection: Option, + row_group_reader_builder: RowGroupReaderBuilder, + ) -> Self { + Self { + parquet_metadata, + row_groups: VecDeque::from(row_groups), + selection, + row_group_reader_builder, + } + } + + /// Push new data buffers that can be used to satisfy pending requests + pub fn push_data(&mut self, ranges: Vec>, buffers: Vec) { + self.row_group_reader_builder.push_data(ranges, buffers); + } + + /// Return the total number of bytes buffered so far + pub fn buffered_bytes(&self) -> u64 { + self.row_group_reader_builder.buffered_bytes() + } + + /// returns [`ParquetRecordBatchReader`] suitable for reading the next + /// group of rows from the Parquet data, or the list of data ranges still + /// needed to proceed + pub fn try_next_reader( + &mut self, + ) -> Result, ParquetError> { + loop { + // Are we ready yet to start reading? + let result: DecodeResult = + self.row_group_reader_builder.try_build()?; + match result { + DecodeResult::Finished => { + // reader is done, proceed to the next row group + // fall through to the next row group + // This happens if the row group was completely filtered out + } + DecodeResult::NeedsData(ranges) => { + // need more data to proceed + return Ok(DecodeResult::NeedsData(ranges)); + } + DecodeResult::Data(batch_reader) => { + // ready to read the row group + return Ok(DecodeResult::Data(batch_reader)); + } + } + + // No current reader, proceed to the next row group if any + let row_group_idx = match self.row_groups.pop_front() { + None => return Ok(DecodeResult::Finished), + Some(idx) => idx, + }; + + let row_count: usize = self + .parquet_metadata + .row_group(row_group_idx) + .num_rows() + .try_into() + .map_err(|e| ParquetError::General(format!("Row count overflow: {e}")))?; + + let selection = self.selection.as_mut().map(|s| s.split_off(row_count)); + self.row_group_reader_builder + .next_row_group(row_group_idx, row_count, selection)?; + // the next iteration will try to build the reader for the new row group + } + } +} diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index b6da97a03739..10162cd0da8b 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -42,7 +42,6 @@ //! * [`ParquetMetaDataPushDecoder`] for decoding from bytes without I/O //! * [`ParquetMetaDataWriter`] for writing. //! -//! //! # Examples //! //! Please see [`external_metadata.rs`] diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index d9cff3462395..a6a739f5fbe5 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -55,25 +55,28 @@ //! ## Reading and Writing Arrow (`arrow` feature) //! //! The [`arrow`] module supports reading and writing Parquet data to/from -//! Arrow `RecordBatch`es. Using Arrow is simple and performant, and allows workloads +//! Arrow [`RecordBatch`]es. Using Arrow is simple and performant, and allows workloads //! to leverage the wide range of data transforms provided by the [arrow] crate, and by the //! ecosystem of [Arrow] compatible systems. //! //! Most users will use [`ArrowWriter`] for writing and [`ParquetRecordBatchReaderBuilder`] for -//! reading. +//! reading from synchronous IO sources such as files or in-memory buffers. //! -//! Lower level APIs include [`ArrowColumnWriter`] for writing using multiple -//! threads, and [`RowFilter`] to apply filters during decode. +//! Lower level APIs include +//! * [`ParquetPushDecoder`] for file grained control over interleaving of IO and CPU. +//! * [`ArrowColumnWriter`] for writing using multiple threads, +//! * [`RowFilter`] to apply filters during decode //! //! [`ArrowWriter`]: arrow::arrow_writer::ArrowWriter //! [`ParquetRecordBatchReaderBuilder`]: arrow::arrow_reader::ParquetRecordBatchReaderBuilder +//! [`ParquetPushDecoder`]: arrow::push_decoder::ParquetPushDecoder //! [`ArrowColumnWriter`]: arrow::arrow_writer::ArrowColumnWriter //! [`RowFilter`]: arrow::arrow_reader::RowFilter //! -//! ## `async` Reading and Writing Arrow (`async` feature) +//! ## `async` Reading and Writing Arrow (`arrow` feature + `async` feature) //! //! The [`async_reader`] and [`async_writer`] modules provide async APIs to -//! read and write `RecordBatch`es asynchronously. +//! read and write [`RecordBatch`]es asynchronously. //! //! Most users will use [`AsyncArrowWriter`] for writing and [`ParquetRecordBatchStreamBuilder`] //! for reading. When the `object_store` feature is enabled, [`ParquetObjectReader`] @@ -104,6 +107,7 @@ //! //! [arrow]: https://docs.rs/arrow/latest/arrow/index.html //! [Arrow]: https://arrow.apache.org/ +//! [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/array/struct.RecordBatch.html //! [CSV]: https://en.wikipedia.org/wiki/Comma-separated_values //! [Dremel]: https://research.google/pubs/pub36632/ //! [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs index b30f91a81b70..0475d48768f0 100644 --- a/parquet/src/util/push_buffers.rs +++ b/parquet/src/util/push_buffers.rs @@ -129,6 +129,31 @@ impl PushBuffers { self.offset = offset; self } + + /// Return the total of all buffered ranges + #[cfg(feature = "arrow")] + pub fn buffered_bytes(&self) -> u64 { + self.ranges.iter().map(|r| r.end - r.start).sum() + } + + /// Clear any range and corresponding buffer that is exactly in the ranges_to_clear + #[cfg(feature = "arrow")] + pub fn clear_ranges(&mut self, ranges_to_clear: &[Range]) { + let mut new_ranges = Vec::new(); + let mut new_buffers = Vec::new(); + + for (range, buffer) in self.iter() { + if !ranges_to_clear + .iter() + .any(|r| r.start == range.start && r.end == range.end) + { + new_ranges.push(range.clone()); + new_buffers.push(buffer.clone()); + } + } + self.ranges = new_ranges; + self.buffers = new_buffers; + } } impl Length for PushBuffers { From e4cd99d80d240d398b07ffae248ddf92eb8c528e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Oct 2025 13:52:28 -0400 Subject: [PATCH 03/13] Apply suggestions from code review Co-authored-by: Matthijs Brobbel --- parquet/src/arrow/push_decoder/mod.rs | 36 +++++++++---------- .../arrow/push_decoder/reader_builder/data.rs | 2 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index bbe1f1adf713..440c2323f933 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -60,20 +60,20 @@ use std::sync::Arc; /// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder; /// # use parquet::arrow::ArrowWriter; /// # use parquet::file::metadata::ParquetMetaDataPushDecoder; -/// # let file_bytes = { -/// # let mut buffer = vec![]; -/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); -/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); -/// # writer.write(&batch).unwrap(); -/// # writer.close().unwrap(); -/// # Bytes::from(buffer) -/// # }; -/// # // mimic IO by returning a function that returns the bytes for a given range -/// # let get_range = |range: &Range| -> Bytes { -/// # let start = range.start as usize; -/// # let end = range.end as usize; -/// # file_bytes.slice(start..end) -/// # }; +/// # let file_bytes = { +/// # let mut buffer = vec![]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # // mimic IO by returning a function that returns the bytes for a given range +/// # let get_range = |range: &Range| -> Bytes { +/// # let start = range.start as usize; +/// # let end = range.end as usize; +/// # file_bytes.slice(start..end) +/// # }; /// # let file_length = file_bytes.len() as u64; /// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap(); /// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap(); @@ -550,7 +550,7 @@ mod test { assert_eq!(all_output, *TEST_BATCH); } - /// Decode the entire file incrementally, simulating partial reads + /// Decode the entire file incrementally, simulating partial reads #[test] fn test_decoder_partial() { let mut decoder = ParquetPushDecoderBuilder::try_new_decoder( @@ -731,7 +731,7 @@ mod test { // expect the first row group to be filtered out (no filter is evaluated due to row selection) - // Second row group, first filter (a > 250) + // First row group, first filter (a > 250) let ranges = expect_needs_data(decoder.try_decode()); push_ranges_to_decoder(&mut decoder, ranges); @@ -946,7 +946,7 @@ mod test { let ranges = expect_needs_data(decoder.try_decode()); push_ranges_to_decoder(&mut decoder, ranges); - // expect the first ane only batch to be decoded + // expect the first and only batch to be decoded let batch1 = expect_data(decoder.try_decode()); let expected1 = TEST_BATCH.slice(225, 20); assert_eq!(batch1, expected1); @@ -972,7 +972,7 @@ mod test { let ranges = expect_needs_data(decoder.try_decode()); push_ranges_to_decoder(&mut decoder, ranges); - // expect the first ane only batch to be decoded + // expect the first and only batch to be decoded let batch1 = expect_data(decoder.try_decode()); let expected1 = TEST_BATCH.slice(200, 200); assert_eq!(batch1, expected1); diff --git a/parquet/src/arrow/push_decoder/reader_builder/data.rs b/parquet/src/arrow/push_decoder/reader_builder/data.rs index 7b95300ddb0c..b3435de8da35 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/data.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/data.rs @@ -38,7 +38,7 @@ pub(super) struct DataRequest { column_chunks: Vec>>, /// The ranges of data that are needed next ranges: Vec>, - /// Optional page start offsets each requested range. This is used + /// Optional page start offsets for each requested range. This is used /// to create the relevant InMemoryRowGroup page_start_offsets: Option>>, } From 5a99fcafa6cda6b91103adc712a87d43c370187a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 15:35:23 -0400 Subject: [PATCH 04/13] Move InMemoryRowGroup::fetch into the same module --- parquet/src/arrow/async_reader/mod.rs | 28 +----------------------- parquet/src/arrow/in_memory_row_group.rs | 28 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c65ecd99313a..111c387c2da6 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -60,7 +60,7 @@ use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache}; use crate::arrow::arrow_reader::ReadPlanBuilder; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup}; +use crate::arrow::in_memory_row_group::InMemoryRowGroup; use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; @@ -963,32 +963,6 @@ where } } -impl InMemoryRowGroup<'_> { - /// Fetches any additional column data specified in `projection` that is not already - /// present in `self.column_chunks`. - /// - /// If `selection` is provided, only the pages required for the selection - /// are fetched. Otherwise, all pages are fetched. - pub(crate) async fn fetch( - &mut self, - input: &mut T, - projection: &ProjectionMask, - selection: Option<&RowSelection>, - batch_size: usize, - cache_mask: Option<&ProjectionMask>, - ) -> Result<()> { - // Figure out what ranges to fetch - let FetchRanges { - ranges, - page_start_offsets, - } = self.fetch_ranges(projection, selection, batch_size, cache_mask); - // do the actual fetch - let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); - // update our in memory buffers (self.column_chunks) with the fetched data - self.fill_column_chunks(projection, page_start_offsets, chunk_data); - Ok(()) - } -} #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs index b39a45131b19..dd3d48a7a01d 100644 --- a/parquet/src/arrow/in_memory_row_group.rs +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -18,6 +18,7 @@ use crate::arrow::ProjectionMask; use crate::arrow::array_reader::RowGroups; use crate::arrow::arrow_reader::RowSelection; +use crate::arrow::async_reader::AsyncFileReader; use crate::column::page::{PageIterator, PageReader}; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; @@ -48,6 +49,33 @@ pub(crate) struct FetchRanges { } impl InMemoryRowGroup<'_> { + /// Fetches any additional column data specified in `projection` that is not already + /// present in `self.column_chunks`. + /// + /// If `selection` is provided, only the pages required for the selection + /// are fetched. Otherwise, all pages are fetched. + /// + /// See [`Self::fetch_ranges`] for details on how `cache_mask` affects the fetch. + pub(crate) async fn fetch( + &mut self, + input: &mut T, + projection: &ProjectionMask, + selection: Option<&RowSelection>, + batch_size: usize, + cache_mask: Option<&ProjectionMask>, + ) -> crate::errors::Result<()> { + // Figure out what ranges to fetch + let FetchRanges { + ranges, + page_start_offsets, + } = self.fetch_ranges(projection, selection, batch_size, cache_mask); + // do the actual fetch + let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); + // update our in memory buffers (self.column_chunks) with the fetched data + self.fill_column_chunks(projection, page_start_offsets, chunk_data); + Ok(()) + } + /// Returns the byte ranges to fetch for the columns specified in /// `projection` and `selection`. pub(crate) fn fetch_ranges( From 4cfc174992c7ad06987967a7d64a576aeec0c64b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 15:40:07 -0400 Subject: [PATCH 05/13] Add comments about caching --- parquet/src/arrow/in_memory_row_group.rs | 13 ++++++++++--- .../src/arrow/push_decoder/reader_builder/data.rs | 4 +++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs index dd3d48a7a01d..c81f9b5adffd 100644 --- a/parquet/src/arrow/in_memory_row_group.rs +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -78,6 +78,11 @@ impl InMemoryRowGroup<'_> { /// Returns the byte ranges to fetch for the columns specified in /// `projection` and `selection`. + /// + /// `cache_mask` indicates which columns, if any, are being cached by + /// [`RowGroupCache`](crate::arrow::array_reader::row_group_cache::RowGroupCache). + /// The `selection` for Cached columns is expanded to batch boundaries to simplify + /// accounting for what data is cached. pub(crate) fn fetch_ranges( &self, projection: &ProjectionMask, @@ -90,8 +95,9 @@ impl InMemoryRowGroup<'_> { let expanded_selection = selection.expand_to_batch_boundaries(batch_size, self.row_count); - // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the - // `RowSelection` + // If we have a `RowSelection` and an `OffsetIndex` then only fetch + // pages required for the `RowSelection` + // Consider preallocating outer vec: https://github.com/apache/arrow-rs/issues/8667 let mut page_start_offsets: Vec> = vec![]; let ranges = self @@ -114,7 +120,8 @@ impl InMemoryRowGroup<'_> { _ => (), } - // Expand selection to batch boundaries only for cached columns + // Expand selection to batch boundaries if needed for caching + // (see doc comment for this function for details on `cache_mask`) let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); if use_expanded { ranges.extend( diff --git a/parquet/src/arrow/push_decoder/reader_builder/data.rs b/parquet/src/arrow/push_decoder/reader_builder/data.rs index b3435de8da35..749eef517496 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/data.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/data.rs @@ -126,7 +126,9 @@ pub(super) struct DataRequestBuilder<'a> { projection: &'a ProjectionMask, /// Optional row selection to apply selection: Option<&'a RowSelection>, - /// Optional projection mask for caching purposes + /// Optional projection mask if using + /// [`RowGroupCache`](crate::arrow::array_reader::row_group_cache::RowGroupCache) + /// for caching decoded columns. cache_projection: Option<&'a ProjectionMask>, /// Any previously read column chunks column_chunks: Option>>>, From 41aba3b674ed614793e5fb9fb307dfee8a529a73 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 16:09:17 -0400 Subject: [PATCH 06/13] clarify test comments / intent --- parquet/src/arrow/push_decoder/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 440c2323f933..4b932fb08034 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -665,7 +665,7 @@ mod test { .build() .unwrap(); - // First row group, + // First row group, evaluating filters let ranges = expect_needs_data(decoder.try_decode()); // only provide half the ranges let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2); @@ -679,7 +679,8 @@ mod test { assert_eq!(ranges, ranges2); // should be the remaining ranges push_ranges_to_decoder(&mut decoder, ranges2.to_vec()); - // expect the first row group to be filtered out (no rows match) + // Since no rows in the first row group pass the filters, there is no + // additional requests to read data pages for "b" here // Second row group let ranges = expect_needs_data(decoder.try_decode()); From 95230615ae3828a400726944bdfd1aeea4da1c79 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 16:18:31 -0400 Subject: [PATCH 07/13] Clarify more comments --- parquet/src/arrow/in_memory_row_group.rs | 2 +- .../arrow/push_decoder/reader_builder/data.rs | 2 +- .../push_decoder/reader_builder/filter.rs | 23 +++++++++++-------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs index c81f9b5adffd..7a2688ef4b0a 100644 --- a/parquet/src/arrow/in_memory_row_group.rs +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -80,7 +80,7 @@ impl InMemoryRowGroup<'_> { /// `projection` and `selection`. /// /// `cache_mask` indicates which columns, if any, are being cached by - /// [`RowGroupCache`](crate::arrow::array_reader::row_group_cache::RowGroupCache). + /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache). /// The `selection` for Cached columns is expanded to batch boundaries to simplify /// accounting for what data is cached. pub(crate) fn fetch_ranges( diff --git a/parquet/src/arrow/push_decoder/reader_builder/data.rs b/parquet/src/arrow/push_decoder/reader_builder/data.rs index 749eef517496..6fbc2090b06e 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/data.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/data.rs @@ -127,7 +127,7 @@ pub(super) struct DataRequestBuilder<'a> { /// Optional row selection to apply selection: Option<&'a RowSelection>, /// Optional projection mask if using - /// [`RowGroupCache`](crate::arrow::array_reader::row_group_cache::RowGroupCache) + /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache) /// for caching decoded columns. cache_projection: Option<&'a ProjectionMask>, /// Any previously read column chunks diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs b/parquet/src/arrow/push_decoder/reader_builder/filter.rs index 9a73059f8a0d..e0643fbac2fd 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/filter.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs @@ -23,21 +23,21 @@ use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -/// State machine for evaluating a sequence of predicates +/// State machine for evaluating a sequence of predicates. /// -/// This is somewhat more complicated than one might expect because the -/// RowFilter must be owned by the FilterInfo so that predicates can -/// be evaluated (requires mutable access). +/// The `FilterInfo` owns the [`RowFilter`] being evaluated and tracks the current +/// predicate to evaluate. #[derive(Debug)] pub(super) struct FilterInfo { /// The predicates to evaluate, in order /// - /// These must be owned by FilterInfo because they may be mutated as part of - /// evaluation so there is a bunch of complexity of handing them back and forth + /// RowFilter is owned by `FilterInfo` because they may be mutated as part + /// of evaluation. Specifically, [`ArrowPredicate`] requires &mut self for + /// evaluation. filter: RowFilter, /// The next filter to be evaluated next_predicate: NonZeroUsize, - /// Stores previously computed filter results + /// Previously computed filter results cache_info: CacheInfo, } @@ -85,8 +85,13 @@ impl FilterInfo { } } - /// Advance to the next predicate, returning either the updated FilterInfo - /// or the completed RowFilter if there are no more predicates + /// Advance to the next predicate + /// + /// Returns + /// * [`AdvanceResult::Continue`] returning the `FilterInfo` if there are + /// more predicate to evaluate. + /// * [`AdvanceResult::Done`] with the inner [`RowFilter`] and [`CacheInfo]` + /// if there are no more predicates pub(super) fn advance(mut self) -> AdvanceResult { if self.next_predicate.get() >= self.filter.predicates.len() { AdvanceResult::Done(self.filter, self.cache_info) From d18b6c20b0e25302d4757ed1163e8c83b60f6d26 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 16:23:31 -0400 Subject: [PATCH 08/13] clarify FilterInfo predicates --- .../src/arrow/push_decoder/reader_builder/filter.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs b/parquet/src/arrow/push_decoder/reader_builder/filter.rs index e0643fbac2fd..495abccc0c8d 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/filter.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs @@ -104,23 +104,23 @@ impl FilterInfo { } } - /// Return the current predicate to evaluate, mutablely - /// Panics if done() is true + /// Return a mutable reference to the current predicate pub(super) fn current_mut(&mut self) -> &mut dyn ArrowPredicate { self.filter .predicates .get_mut(self.next_predicate.get() - 1) - .expect("current predicate out of bounds") + // advance ensures next_predicate is always in bounds + .unwrap() .as_mut() } /// Return the current predicate to evaluate - /// Panics if done() is true pub(super) fn current(&self) -> &dyn ArrowPredicate { self.filter .predicates .get(self.next_predicate.get() - 1) - .expect("current predicate out of bounds") + // advance ensures next_predicate is always in bounds + .unwrap() .as_ref() } From 255ee028866b80912105859dfad03d87c67345c1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 16:25:03 -0400 Subject: [PATCH 09/13] Add link to RowGroupCache --- parquet/src/arrow/async_reader/mod.rs | 2 ++ parquet/src/arrow/push_decoder/reader_builder/mod.rs | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 111c387c2da6..2c696a83764a 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -567,6 +567,8 @@ struct ReaderFactory { metrics: ArrowReaderMetrics, /// Maximum size of the predicate cache + /// + /// See [`RowGroupCache`] for details. max_predicate_cache_size: usize, } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 1c5bae8caf40..5d97b0e0bc62 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -147,7 +147,9 @@ pub(crate) struct RowGroupReaderBuilder { /// Offset to apply to remaining row groups (decremented as rows are read) offset: Option, - /// The size in bytes of the predicate cache + /// The size in bytes of the predicate cache to use + /// + /// See [`RowGroupCache`] for details. max_predicate_cache_size: usize, /// The metrics collector From 01867594e1567d67127980d544230586099647f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 16:29:55 -0400 Subject: [PATCH 10/13] Clarify transitions in RowGroupReaderBuilder --- parquet/src/arrow/push_decoder/reader_builder/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 5d97b0e0bc62..27af638503be 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -148,7 +148,7 @@ pub(crate) struct RowGroupReaderBuilder { offset: Option, /// The size in bytes of the predicate cache to use - /// + /// /// See [`RowGroupCache`] for details. max_predicate_cache_size: usize, @@ -257,7 +257,9 @@ impl RowGroupReaderBuilder { ) -> Result, ParquetError> { loop { let current_state = self.take_state()?; + // Try to transition the decoder. match self.try_transition(current_state)? { + // Either produced a batch reader, needed input, or finished NextState { next_state, result: Some(result), @@ -266,6 +268,7 @@ impl RowGroupReaderBuilder { self.state = Some(next_state); return Ok(result); } + // completed one internal state, maybe can proceed further NextState { next_state, result: None, From 7ccdb1a1c5b1d3fe405e709214f03f9e42aa9cea Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Oct 2025 16:34:07 -0400 Subject: [PATCH 11/13] Apply suggestions from code review Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- parquet/src/arrow/in_memory_row_group.rs | 8 ++++++-- .../src/arrow/push_decoder/reader_builder/filter.rs | 10 ++++++---- parquet/src/arrow/push_decoder/reader_builder/mod.rs | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs index b39a45131b19..4c984b3b2a62 100644 --- a/parquet/src/arrow/in_memory_row_group.rs +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -221,10 +221,14 @@ impl RowGroups for InMemoryRowGroup<'_> { } } -/// An in-memory column chunk +/// An in-memory column chunk. +/// This allows us to hold either dense column chunks or sparse column chunks and easily +/// access them by offset. #[derive(Clone, Debug)] pub(crate) enum ColumnChunkData { - /// Column chunk data representing only a subset of data pages + /// Column chunk data representing only a subset of data pages. + /// For example if a row selection (possibly caused by a filter in a query) causes us to read only + /// a subset of the rows in the column. Sparse { /// Length of the full column chunk length: usize, diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs b/parquet/src/arrow/push_decoder/reader_builder/filter.rs index 9a73059f8a0d..ac0561da2aaa 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/filter.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs @@ -47,7 +47,9 @@ pub(super) struct FilterInfo { /// but it owns the ProjectionMask and RowGroupCache #[derive(Debug)] pub(super) struct CacheInfo { - /// The columns to cache in the predicate cache + /// The columns to cache in the predicate cache. + /// Normally these are the columns that filters may look at such that + /// if we have a filter like `(a + 10 > 5) AND (a + b = 0)` we cache `a` to avoid re-reading it between evaluating `a + 10 > 5` and `a + b = 0`. cache_projection: ProjectionMask, row_group_cache: Arc>, } @@ -69,9 +71,9 @@ impl CacheInfo { } pub(super) enum AdvanceResult { - /// advanced to the next predicate + /// Advanced to the next predicate Continue(FilterInfo), - /// no more predicates returns the row filter and cache info + /// No more predicates returns the row filter and cache info Done(RowFilter, CacheInfo), } @@ -99,7 +101,7 @@ impl FilterInfo { } } - /// Return the current predicate to evaluate, mutablely + /// Return the current predicate to evaluate as a mutable reference /// Panics if done() is true pub(super) fn current_mut(&mut self) -> &mut dyn ArrowPredicate { self.filter diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 1c5bae8caf40..6ce9c5a88f90 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -209,7 +209,7 @@ impl RowGroupReaderBuilder { /// call to [`Self::take_state`]. /// /// Any code that calls this method must ensure that the state is put back - /// before returning, otherwise the reader error next time it is called + /// before returning, otherwise the reader will error next time it is called fn take_state(&mut self) -> Result { self.state.take().ok_or_else(|| { ParquetError::General(String::from( From 11380b94251ec90c175c83edf3374e7d995c2665 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 Oct 2025 05:55:16 -0400 Subject: [PATCH 12/13] Revert "Move InMemoryRowGroup::fetch into the same module" This reverts commit 5a99fcafa6cda6b91103adc712a87d43c370187a. --- parquet/src/arrow/async_reader/mod.rs | 28 +++++++++++++++++++++++- parquet/src/arrow/in_memory_row_group.rs | 28 ------------------------ 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 2c696a83764a..e22c5f2ff5cb 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -60,7 +60,7 @@ use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache}; use crate::arrow::arrow_reader::ReadPlanBuilder; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use crate::arrow::in_memory_row_group::InMemoryRowGroup; +use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup}; use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; @@ -965,6 +965,32 @@ where } } +impl InMemoryRowGroup<'_> { + /// Fetches any additional column data specified in `projection` that is not already + /// present in `self.column_chunks`. + /// + /// If `selection` is provided, only the pages required for the selection + /// are fetched. Otherwise, all pages are fetched. + pub(crate) async fn fetch( + &mut self, + input: &mut T, + projection: &ProjectionMask, + selection: Option<&RowSelection>, + batch_size: usize, + cache_mask: Option<&ProjectionMask>, + ) -> Result<()> { + // Figure out what ranges to fetch + let FetchRanges { + ranges, + page_start_offsets, + } = self.fetch_ranges(projection, selection, batch_size, cache_mask); + // do the actual fetch + let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); + // update our in memory buffers (self.column_chunks) with the fetched data + self.fill_column_chunks(projection, page_start_offsets, chunk_data); + Ok(()) + } +} #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs index 585a2ce23630..34e46cd34e91 100644 --- a/parquet/src/arrow/in_memory_row_group.rs +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -18,7 +18,6 @@ use crate::arrow::ProjectionMask; use crate::arrow::array_reader::RowGroups; use crate::arrow::arrow_reader::RowSelection; -use crate::arrow::async_reader::AsyncFileReader; use crate::column::page::{PageIterator, PageReader}; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; @@ -49,33 +48,6 @@ pub(crate) struct FetchRanges { } impl InMemoryRowGroup<'_> { - /// Fetches any additional column data specified in `projection` that is not already - /// present in `self.column_chunks`. - /// - /// If `selection` is provided, only the pages required for the selection - /// are fetched. Otherwise, all pages are fetched. - /// - /// See [`Self::fetch_ranges`] for details on how `cache_mask` affects the fetch. - pub(crate) async fn fetch( - &mut self, - input: &mut T, - projection: &ProjectionMask, - selection: Option<&RowSelection>, - batch_size: usize, - cache_mask: Option<&ProjectionMask>, - ) -> crate::errors::Result<()> { - // Figure out what ranges to fetch - let FetchRanges { - ranges, - page_start_offsets, - } = self.fetch_ranges(projection, selection, batch_size, cache_mask); - // do the actual fetch - let chunk_data = input.get_byte_ranges(ranges).await?.into_iter(); - // update our in memory buffers (self.column_chunks) with the fetched data - self.fill_column_chunks(projection, page_start_offsets, chunk_data); - Ok(()) - } - /// Returns the byte ranges to fetch for the columns specified in /// `projection` and `selection`. /// From a3492160de7c688f435107fe8ccf04f83e853d44 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 Oct 2025 05:57:22 -0400 Subject: [PATCH 13/13] Add comments about why InMemoryRowGroup implementation is split --- parquet/src/arrow/async_reader/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index e22c5f2ff5cb..9b81e8e56905 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -965,6 +965,9 @@ where } } +// Note this implementation is not with the rest of the InMemoryRowGroup +// implementation because it relies on several async traits and types +// that are only available when the "async" feature is enabled. impl InMemoryRowGroup<'_> { /// Fetches any additional column data specified in `projection` that is not already /// present in `self.column_chunks`.