Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod statistics;
///
/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
/// * decoder API: [`ParquetDecoderBuilder::new`]
///
/// # Features
/// * Projection pushdown: [`Self::with_projection`]
Expand Down Expand Up @@ -93,6 +94,7 @@ pub mod statistics;
/// Millisecond Latency] Arrow blog post.
///
/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
/// [`ParquetDecoderBuilder::new`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder::new
/// [Apache Arrow]: https://arrow.apache.org/
/// [`StatisticsConverter`]: statistics::StatisticsConverter
/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
Expand Down Expand Up @@ -992,12 +994,26 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}

/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
/// read from a parquet data source
///
/// This reader is created by [`ParquetRecordBatchReaderBuilder`], and has all
/// the buffered state (DataPages, etc) necessary to decode the parquet data into
/// Arrow arrays.
pub struct ParquetRecordBatchReader {
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
read_plan: ReadPlan,
}

impl Debug for ParquetRecordBatchReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ParquetRecordBatchReader")
.field("array_reader", &"...")
.field("schema", &self.schema)
.field("read_plan", &self.read_plan)
.finish()
}
}

impl Iterator for ParquetRecordBatchReader {
type Item = Result<RecordBatch, ArrowError>;

Expand Down
5 changes: 2 additions & 3 deletions parquet/src/arrow/arrow_reader/read_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow_select::filter::prep_null_mask_filter;
use std::collections::VecDeque;

/// A builder for [`ReadPlan`]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ReadPlanBuilder {
batch_size: usize,
/// Current to apply, includes all filters
Expand All @@ -51,7 +51,6 @@ impl ReadPlanBuilder {
}

/// Returns the current selection, if any
#[cfg(feature = "async")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these functions are now used by the push decoder, which is not behind the async flag

pub fn selection(&self) -> Option<&RowSelection> {
self.selection.as_ref()
}
Expand All @@ -76,7 +75,6 @@ impl ReadPlanBuilder {
}

/// Returns the number of rows selected, or `None` if all rows are selected.
#[cfg(feature = "async")]
pub fn num_rows_selected(&self) -> Option<usize> {
self.selection.as_ref().map(|s| s.row_count())
}
Expand Down Expand Up @@ -230,6 +228,7 @@ impl LimitedReadPlanBuilder {
/// A plan reading specific rows from a Parquet Row Group.
///
/// See [`ReadPlanBuilder`] to create `ReadPlan`s
#[derive(Debug)]
pub struct ReadPlan {
/// The number of rows to read in each batch
batch_size: usize,
Expand Down
1 change: 0 additions & 1 deletion parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ impl RowSelection {
/// Expands the selection to align with batch boundaries.
/// This is needed when using cached array readers to ensure that
/// the cached data covers full batches.
#[cfg(feature = "async")]
pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self {
if batch_size == 0 {
return self.clone();
Expand Down
250 changes: 19 additions & 231 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use bytes::{Buf, Bytes};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
Expand All @@ -38,10 +38,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Fields, Schema, SchemaRef};

use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{
ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
};
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
RowFilter, RowSelection,
Expand All @@ -51,20 +47,20 @@ use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash
use crate::bloom_filter::{
SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
};
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};

mod metadata;
pub use metadata::*;

#[cfg(feature = "object_store")]
mod store;

use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache};
use crate::arrow::arrow_reader::ReadPlanBuilder;
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup};
use crate::arrow::schema::ParquetField;
#[cfg(feature = "object_store")]
pub use store::*;
Expand Down Expand Up @@ -571,6 +567,8 @@ struct ReaderFactory<T> {
metrics: ArrowReaderMetrics,

/// Maximum size of the predicate cache
///
/// See [`RowGroupCache`] for details.
max_predicate_cache_size: usize,
}

Expand Down Expand Up @@ -967,245 +965,35 @@ where
}
}

/// An in-memory collection of column chunks
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure is now used by the push decoder and the async reader, so I refactored it so the code could be shared

struct InMemoryRowGroup<'a> {
offset_index: Option<&'a [OffsetIndexMetaData]>,
/// Column chunks for this row group
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
row_group_idx: usize,
metadata: &'a ParquetMetaData,
}

// Note this implementation is not with the rest of the InMemoryRowGroup
// implementation because it relies on several async traits and types
// that are only available when the "async" feature is enabled.
impl InMemoryRowGroup<'_> {
/// Fetches any additional column data specified in `projection` that is not already
/// present in `self.column_chunks`.
///
/// If `selection` is provided, only the pages required for the selection
/// are fetched. Otherwise, all pages are fetched.
async fn fetch<T: AsyncFileReader + Send>(
pub(crate) async fn fetch<T: AsyncFileReader + Send>(
&mut self,
input: &mut T,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
batch_size: usize,
cache_mask: Option<&ProjectionMask>,
) -> Result<()> {
let metadata = self.metadata.row_group(self.row_group_idx);
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
let expanded_selection =
selection.expand_to_batch_boundaries(batch_size, self.row_count);
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<u64>> = vec![];

let fetch_ranges = self
.column_chunks
.iter()
.zip(metadata.columns())
.enumerate()
.filter(|&(idx, (chunk, _chunk_meta))| {
chunk.is_none() && projection.leaf_included(idx)
})
.flat_map(|(idx, (_chunk, chunk_meta))| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges: Vec<Range<u64>> = vec![];
let (start, _len) = chunk_meta.byte_range();
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start..first.offset as u64);
}
_ => (),
}

// Expand selection to batch boundaries only for cached columns
let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
if use_expanded {
ranges.extend(
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
);
} else {
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
}
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

ranges
})
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut page_start_offsets = page_start_offsets.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}

if let Some(offsets) = page_start_offsets.next() {
let mut chunks = Vec::with_capacity(offsets.len());
for _ in 0..offsets.len() {
chunks.push(chunk_data.next().unwrap());
}

*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: metadata.column(idx).byte_range().1 as usize,
data: offsets
.into_iter()
.map(|x| x as usize)
.zip(chunks.into_iter())
.collect(),
}))
}
}
} else {
let fetch_ranges = self
.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = metadata.column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}

if let Some(data) = chunk_data.next() {
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: metadata.column(idx).byte_range().0 as usize,
data,
}));
}
}
}

// Figure out what ranges to fetch
let FetchRanges {
ranges,
page_start_offsets,
} = self.fetch_ranges(projection, selection, batch_size, cache_mask);
// do the actual fetch
let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
// update our in memory buffers (self.column_chunks) with the fetched data
self.fill_column_chunks(projection, page_start_offsets, chunk_data);
Ok(())
}
}

impl RowGroups for InMemoryRowGroup<'_> {
fn num_rows(&self) -> usize {
self.row_count
}

/// Return chunks for column i
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
match &self.column_chunks[i] {
None => Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
))),
Some(data) => {
let page_locations = self
.offset_index
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[i].page_locations.clone());
let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
let page_reader = SerializedPageReader::new(
data.clone(),
column_chunk_metadata,
self.row_count,
page_locations,
)?;
let page_reader = page_reader.add_crypto_context(
self.row_group_idx,
i,
self.metadata,
column_chunk_metadata,
)?;

let page_reader: Box<dyn PageReader> = Box::new(page_reader);

Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(page_reader)),
}))
}
}
}
}

/// An in-memory column chunk
#[derive(Clone)]
enum ColumnChunkData {
/// Column chunk data representing only a subset of data pages
Sparse {
/// Length of the full column chunk
length: usize,
/// Subset of data pages included in this sparse chunk.
///
/// Each element is a tuple of (page offset within file, page data).
/// Each entry is a complete page and the list is ordered by offset.
data: Vec<(usize, Bytes)>,
},
/// Full column chunk and the offset within the original file
Dense { offset: usize, data: Bytes },
}

impl ColumnChunkData {
/// Return the data for this column chunk at the given offset
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}

impl Length for ColumnChunkData {
/// Return the total length of the full column chunk
fn len(&self) -> u64 {
match &self {
ColumnChunkData::Sparse { length, .. } => *length as u64,
ColumnChunkData::Dense { data, .. } => data.len() as u64,
}
}
}

impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
Ok(self.get(start)?.slice(..length))
}
}

/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
struct ColumnChunkIterator {
reader: Option<Result<Box<dyn PageReader>>>,
}

impl Iterator for ColumnChunkIterator {
type Item = Result<Box<dyn PageReader>>;

fn next(&mut self) -> Option<Self::Item> {
self.reader.take()
}
}

impl PageIterator for ColumnChunkIterator {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading