Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod metadata;
mod metrics;
mod opener;
mod page_filter;
mod push_decoder;
mod reader;
mod row_filter;
mod row_group_filter;
Expand Down
310 changes: 37 additions & 273 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

use crate::access_plan::PreparedAccessPlan;
use crate::page_filter::PagePruningAccessPlanFilter;
use crate::row_filter::{self, ParquetReadPlan, build_projection_read_plan};
use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
use crate::row_filter::{RowFilterGenerator, build_projection_read_plan};
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
use crate::{
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
apply_file_schema_type_coercions, coerce_int96_to_resolution,
};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::array::RecordBatch;
use arrow::datatypes::DataType;
use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer};
use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr_adapter::replace_columns_with_literals;
use std::collections::{HashMap, VecDeque};
Expand All @@ -39,12 +40,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::datatypes::{Schema, SchemaRef, TimeUnit};
use arrow::datatypes::{SchemaRef, TimeUnit};
use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::stats::Precision;
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err,
};
use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_err};
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
Expand All @@ -53,8 +52,8 @@ use datafusion_physical_expr_common::physical_expr::{
};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
MetricCategory, PruningMetrics,
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
PruningMetrics,
};
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};

Expand All @@ -66,18 +65,14 @@ use futures::{
FutureExt, Stream, StreamExt, future::BoxFuture, ready, stream::BoxStream,
};
use log::debug;
use parquet::DecodeResult;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelectionPolicy,
};
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::parquet_column;
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
use parquet::basic::Type;
use parquet::bloom_filter::Sbbf;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};

/// Stateless Parquet morselizer implementation.
///
Expand Down Expand Up @@ -108,7 +103,7 @@ pub(super) struct ParquetMorselizer {
/// Factory for instantiating parquet reader
pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
/// Should the filters be evaluated during the parquet scan using
/// [`DataFusionArrowPredicate`](row_filter::DatafusionArrowPredicate)?
/// [`DatafusionArrowPredicate`](crate::row_filter::DatafusionArrowPredicate)?
pub pushdown_filters: bool,
/// Should the filters be reordered to optimize the scan?
pub reorder_filters: bool,
Expand Down Expand Up @@ -1158,8 +1153,17 @@ impl RowGroupsPrunedParquetOpen {
);

let (decoder, pending_decoders, remaining_limit) = {
let mut row_filter_generator =
RowFilterGenerator::new(&prepared, file_metadata.as_ref());
let pushdown_predicate = prepared
.pushdown_filters
.then_some(prepared.predicate.as_ref())
.flatten();
let mut row_filter_generator = RowFilterGenerator::new(
pushdown_predicate,
&prepared.physical_file_schema,
file_metadata.as_ref(),
prepared.reorder_predicates,
&prepared.file_metrics,
);

// Split into consecutive runs of row groups that share the same filter
// requirement. Fully matched row groups skip the RowFilter; others need it.
Expand Down Expand Up @@ -1227,272 +1231,32 @@ impl RowGroupsPrunedParquetOpen {
let output_schema = Arc::clone(&prepared.output_schema);
let files_ranges_pruned_statistics =
prepared.file_metrics.files_ranges_pruned_statistics.clone();
let stream = futures::stream::unfold(
PushDecoderStreamState {
decoder,
pending_decoders,
remaining_limit,
reader: prepared.async_file_reader,
projector,
output_schema,
replace_schema,
arrow_reader_metrics,
predicate_cache_inner_records,
predicate_cache_records,
baseline_metrics: prepared.baseline_metrics,
},
|state| async move { state.transition().await },
)
.fuse();
let stream = PushDecoderStreamState {
decoder,
pending_decoders,
remaining_limit,
reader: prepared.async_file_reader,
projector,
output_schema,
replace_schema,
arrow_reader_metrics,
predicate_cache_inner_records,
predicate_cache_records,
baseline_metrics: prepared.baseline_metrics,
}
.into_stream();

// Wrap the stream so a dynamic filter can stop the file scan early.
if let Some(file_pruner) = prepared.file_pruner {
let stream = stream.boxed();
Ok(EarlyStoppingStream::new(
stream,
file_pruner,
files_ranges_pruned_statistics,
)
.boxed())
} else {
Ok(stream.boxed())
}
}
}

/// Builds row filters for decoder runs.
///
/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple
/// decoder runs need a fresh filter for each run that evaluates row predicates.
struct RowFilterGenerator<'a> {
predicate: Option<&'a Arc<dyn PhysicalExpr>>,
physical_file_schema: &'a SchemaRef,
file_metadata: &'a ParquetMetaData,
reorder_predicates: bool,
file_metrics: &'a ParquetFileMetrics,
first_row_filter: Option<RowFilter>,
}

impl<'a> RowFilterGenerator<'a> {
fn new(
prepared: &'a PreparedParquetOpen,
file_metadata: &'a ParquetMetaData,
) -> Self {
let predicate = prepared
.pushdown_filters
.then_some(prepared.predicate.as_ref())
.flatten();

let mut generator = Self {
predicate,
physical_file_schema: &prepared.physical_file_schema,
file_metadata,
reorder_predicates: prepared.reorder_predicates,
file_metrics: &prepared.file_metrics,
first_row_filter: None,
};
generator.first_row_filter = generator.build_row_filter();
generator
}

fn has_row_filter(&self) -> bool {
self.first_row_filter.is_some()
}

fn next_filter(&mut self) -> Option<RowFilter> {
self.first_row_filter
.take()
.or_else(|| self.build_row_filter())
}

fn build_row_filter(&self) -> Option<RowFilter> {
let predicate = self.predicate?;
match row_filter::build_row_filter(
predicate,
self.physical_file_schema,
self.file_metadata,
self.reorder_predicates,
self.file_metrics,
) {
Ok(Some(filter)) => Some(filter),
Ok(None) => None,
Err(e) => {
debug!("Ignoring error building row filter for '{predicate:?}': {e}");
None
}
}
}
}

/// State shared while building [`ParquetPushDecoder`]s for one file scan.
///
/// A scan can be split into multiple decoder runs when row groups have
/// different filtering requirements. This config holds the options that apply
/// to every [`ParquetPushDecoderBuilder`], while each run supplies its own
/// [`PreparedAccessPlan`] and optional row filter.
struct DecoderBuilderConfig<'a> {
read_plan: &'a ParquetReadPlan,
batch_size: usize,
arrow_reader_metrics: &'a ArrowReaderMetrics,
force_filter_selections: bool,
decoder_limit: Option<usize>,
}

impl DecoderBuilderConfig<'_> {
fn build(
&self,
prepared_access_plan: PreparedAccessPlan,
metadata: ArrowReaderMetadata,
) -> ParquetPushDecoderBuilder {
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata)
.with_projection(self.read_plan.projection_mask.clone())
.with_batch_size(self.batch_size)
.with_metrics(self.arrow_reader_metrics.clone());
if self.force_filter_selections {
builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
}
if let Some(row_selection) = prepared_access_plan.row_selection {
builder = builder.with_row_selection(row_selection);
}
builder = builder.with_row_groups(prepared_access_plan.row_group_indexes);
if let Some(limit) = self.decoder_limit {
builder = builder.with_limit(limit);
}
builder
}
}

/// State for a stream that decodes a single Parquet file using a push-based decoder.
///
/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests
/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the
/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the file is
/// fully consumed.
struct PushDecoderStreamState {
decoder: ParquetPushDecoder,
/// Additional decoders to process after the current one finishes.
/// Used when fully matched row groups split the scan into consecutive
/// runs with different filter configurations, maintaining original order.
pending_decoders: VecDeque<ParquetPushDecoder>,
/// Global remaining row limit across all decoder runs.
///
/// Decoder-local limits are only safe for single-run scans. When the scan
/// is split across multiple decoders, the combined stream limit is enforced
/// here instead.
remaining_limit: Option<usize>,
reader: Box<dyn AsyncFileReader>,
projector: Projector,
output_schema: Arc<Schema>,
replace_schema: bool,
arrow_reader_metrics: ArrowReaderMetrics,
predicate_cache_inner_records: Gauge,
predicate_cache_records: Gauge,
baseline_metrics: BaselineMetrics,
}

impl PushDecoderStreamState {
/// Advances the decoder state machine until the next [`RecordBatch`] is
/// produced, the file is fully consumed, or an error occurs.
///
/// On each iteration the decoder is polled via [`ParquetPushDecoder::try_decode`]:
/// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges are
/// fetched from the [`AsyncFileReader`] and fed back into the decoder.
/// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned.
/// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`).
///
/// Takes `self` by value (rather than `&mut self`) so the generated future
/// owns the state directly. This avoids a Stacked Borrows violation under
/// miri where `&mut self` creates a single opaque borrow that conflicts
/// with `unfold`'s ownership across yield points.
async fn transition(mut self) -> Option<(Result<RecordBatch>, Self)> {
loop {
if self.remaining_limit == Some(0) {
return None;
}
match self.decoder.try_decode() {
Ok(DecodeResult::NeedsData(ranges)) => {
let data = self
.reader
.get_byte_ranges(ranges.clone())
.await
.map_err(DataFusionError::from);
match data {
Ok(data) => {
if let Err(e) = self.decoder.push_ranges(ranges, data) {
return Some((Err(DataFusionError::from(e)), self));
}
}
Err(e) => return Some((Err(e), self)),
}
}
Ok(DecodeResult::Data(batch)) => {
let batch = if let Some(remaining_limit) = self.remaining_limit {
if batch.num_rows() > remaining_limit {
self.remaining_limit = Some(0);
batch.slice(0, remaining_limit)
} else {
self.remaining_limit =
Some(remaining_limit - batch.num_rows());
batch
}
} else {
batch
};
let mut timer = self.baseline_metrics.elapsed_compute().timer();
self.copy_arrow_reader_metrics();
let result = self.project_batch(&batch);
timer.stop();
// Release the borrow on baseline_metrics before moving self
drop(timer);
return Some((result, self));
}
Ok(DecodeResult::Finished) => {
// If there are pending decoders (e.g. for consecutive runs
// with different filter configurations), switch to the next.
if let Some(next) = self.pending_decoders.pop_front() {
self.decoder = next;
continue;
}
return None;
}
Err(e) => {
return Some((Err(DataFusionError::from(e)), self));
}
}
}
}

/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
fn copy_arrow_reader_metrics(&self) {
if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() {
self.predicate_cache_inner_records.set(v);
}
if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() {
self.predicate_cache_records.set(v);
}
}

fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
let mut batch = self.projector.project_batch(batch)?;
if self.replace_schema {
// Ensure the output batch has the expected schema.
// This handles things like schema level and field level metadata, which may not be present
// in the physical file schema.
// It is also possible for nullability to differ; some writers create files with
// OPTIONAL fields even when there are no nulls in the data.
// In these cases it may make sense for the logical schema to be `NOT NULL`.
// RecordBatch::try_new_with_options checks that if the schema is NOT NULL
// the array cannot contain nulls, amongst other checks.
let (_stream_schema, arrays, num_rows) = batch.into_parts();
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
batch = RecordBatch::try_new_with_options(
Arc::clone(&self.output_schema),
arrays,
&options,
)?;
Ok(stream)
}
Ok(batch)
}
}

Expand Down
Loading
Loading