diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ae8395aef6a4..25cead612bcd 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -20,11 +20,14 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 use arrow::{array::ArrayRef, datatypes::DataType}; -use arrow_array::{new_empty_array, new_null_array, UInt64Array}; +use arrow_array::{ + new_empty_array, new_null_array, BooleanArray, Int8Array, UInt64Array, +}; use arrow_schema::{Field, FieldRef, Schema}; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, }; +use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::metadata::ParquetMetaData; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; @@ -52,6 +55,85 @@ fn sign_extend_be(b: &[u8]) -> [u8; 16] { result } +/// Define an adapter iterator for extracting statistics from an iterator of +/// `ParquetStatistics` +/// +/// +/// Handles checking if the statistics are present and valid with the correct type +/// +/// Parameters: +/// * `$iterator_type` is the name of the iterator type (e.g. `BoolStatsIterator`) +/// * `$func` is the function to call to get the value (e.g. `min` or `max`) +/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`) +/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`) +macro_rules! make_stats_iterator { + ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => { + /// Maps an iterator of `ParquetStatistics` into an iterator of + /// `&$stat_value_type`` + /// + /// Yielded elements: + /// * Some(stats) if valid + /// * None if the statistics are not present, not valid, or not $stat_value_type + struct $iterator_type<'a, I> + where + I: Iterator>, + { + inner: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator>, + { + /// create a new iterator to extract $stat_value_type statistics + fn new(inner: I) -> Self { + Self { inner } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator>, + { + type Item = Option<&'a $stat_value_type>; + + /// return the next statistics value + fn next(&mut self) -> Option { + let next_stat = self.inner.next()?; + let next_stat = next_stat.and_then(|stats| match stats { + $parquet_statistics_type(s) if stats.has_min_max_set() => { + Some(s.$func()) + } + _ => None, + }); + Some(next_stat) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } + } + }; +} + +make_stats_iterator!(MinBoolStatsIterator, min, ParquetStatistics::Boolean, bool); +make_stats_iterator!(MinInt32StatsIterator, min, ParquetStatistics::Int32, i32); +make_stats_iterator!(MinInt64StatsIterator, min, ParquetStatistics::Int64, i64); +make_stats_iterator!(MinF32StatsIterator, min, ParquetStatistics::Float, f32); +make_stats_iterator!(MinF64StatsIterator, min, ParquetStatistics::Double, f64); +make_stats_iterator!( + MinByteArrayStatsIterator, + min, + ParquetStatistics::ByteArray, + ByteArray +); +make_stats_iterator!( + MinFixedLenByteArrayStatsIterator, + min, + ParquetStatistics::FixedLenByteArray, + FixedLenByteArray +); + /// Extract a single min/max statistics from a [`ParquetStatistics`] object /// /// * `$column_statistics` is the `ParquetStatistics` object @@ -211,9 +293,23 @@ pub(crate) fn min_statistics<'a, I: Iterator Result { - let scalars = iterator - .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type)))); - collect_scalars(data_type, scalars) + match data_type { + DataType::Boolean => { + let mins = MinBoolStatsIterator::new(iterator).map(|v| v.cloned()); + Ok(Arc::new(BooleanArray::from_iter(mins))) + } + DataType::Int8 => { + let mins = MinInt32StatsIterator::new(iterator).map(|v| v.map(|v| *v as i8)); + Ok(Arc::new(Int8Array::from_iter(mins))) + } + _ => { + // fallback to scalars + let scalars = iterator.map(|x| { + x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type))) + }); + collect_scalars(data_type, scalars) + } + } } /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]