Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 100 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
use arrow_array::{
new_empty_array, new_null_array, BooleanArray, Int8Array, UInt64Array,
};
use arrow_schema::{Field, FieldRef, Schema};
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, Result, ScalarValue,
};
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
Expand Down Expand Up @@ -52,6 +55,85 @@ fn sign_extend_be(b: &[u8]) -> [u8; 16] {
result
}

/// Define an adapter iterator for extracting statistics from an iterator of
/// `ParquetStatistics`
///
///
/// Handles checking if the statistics are present and valid with the correct type
///
/// Parameters:
/// * `$iterator_type` is the name of the iterator type (e.g. `BoolStatsIterator`)
/// * `$func` is the function to call to get the value (e.g. `min` or `max`)
/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`)
/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`)
macro_rules! make_stats_iterator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an idea to create structs like BoolStatsIterator which will return an iterator of Option<ValueStatstics<bool>> and do the validitiy / type checking

($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => {
/// Maps an iterator of `ParquetStatistics` into an iterator of
/// `&$stat_value_type``
///
/// Yielded elements:
/// * Some(stats) if valid
/// * None if the statistics are not present, not valid, or not $stat_value_type
struct $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
inner: I,
}

impl<'a, I> $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
/// create a new iterator to extract $stat_value_type statistics
fn new(inner: I) -> Self {
Self { inner }
}
}

impl<'a, I> Iterator for $iterator_type<'a, I>
where
I: Iterator<Item = Option<&'a ParquetStatistics>>,
{
type Item = Option<&'a $stat_value_type>;

/// return the next statistics value
fn next(&mut self) -> Option<Self::Item> {
let next_stat = self.inner.next()?;
let next_stat = next_stat.and_then(|stats| match stats {
$parquet_statistics_type(s) if stats.has_min_max_set() => {
Some(s.$func())
}
_ => None,
});
Some(next_stat)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
};
}

make_stats_iterator!(MinBoolStatsIterator, min, ParquetStatistics::Boolean, bool);
make_stats_iterator!(MinInt32StatsIterator, min, ParquetStatistics::Int32, i32);
make_stats_iterator!(MinInt64StatsIterator, min, ParquetStatistics::Int64, i64);
make_stats_iterator!(MinF32StatsIterator, min, ParquetStatistics::Float, f32);
make_stats_iterator!(MinF64StatsIterator, min, ParquetStatistics::Double, f64);
make_stats_iterator!(
MinByteArrayStatsIterator,
min,
ParquetStatistics::ByteArray,
ByteArray
);
make_stats_iterator!(
MinFixedLenByteArrayStatsIterator,
min,
ParquetStatistics::FixedLenByteArray,
FixedLenByteArray
);

/// Extract a single min/max statistics from a [`ParquetStatistics`] object
///
/// * `$column_statistics` is the `ParquetStatistics` object
Expand Down Expand Up @@ -211,9 +293,23 @@ pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
let scalars = iterator
.map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type))));
collect_scalars(data_type, scalars)
match data_type {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example of how they work -- you make BoolStatsIterator::new(iterator) which then returns a sequence of Option<&bool> and then the min/max can be extracted out

DataType::Boolean => {
let mins = MinBoolStatsIterator::new(iterator).map(|v| v.cloned());
Ok(Arc::new(BooleanArray::from_iter(mins)))
}
DataType::Int8 => {
let mins = MinInt32StatsIterator::new(iterator).map(|v| v.map(|v| *v as i8));
Ok(Arc::new(Int8Array::from_iter(mins)))
}
_ => {
// fallback to scalars
let scalars = iterator.map(|x| {
x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type)))
});
collect_scalars(data_type, scalars)
}
}
}

/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
Expand Down