From a7cf60c0ff7f554f903f158f4613a878557e5318 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 May 2024 15:13:08 -0400 Subject: [PATCH 1/3] Proposal: typed statistics iterators --- .../physical_plan/parquet/statistics.rs | 80 +++++++++++++++++-- 1 file changed, 75 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index ae8395aef6a4..63f7030db097 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -20,13 +20,13 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 use arrow::{array::ArrayRef, datatypes::DataType}; -use arrow_array::{new_empty_array, new_null_array, UInt64Array}; +use arrow_array::{new_empty_array, new_null_array, BooleanArray, UInt64Array}; use arrow_schema::{Field, FieldRef, Schema}; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, }; use parquet::file::metadata::ParquetMetaData; -use parquet::file::statistics::Statistics as ParquetStatistics; +use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics}; use parquet::schema::types::SchemaDescriptor; use std::sync::Arc; @@ -52,6 +52,65 @@ fn sign_extend_be(b: &[u8]) -> [u8; 16] { result } +/// Define an adapter iterator for extracting statistics from an iterator of +/// `ParquetStatistics` +/// +/// Handles checking if the statistics are present and valid with the correct type +/// +/// Parameters: +/// * `$iterator_type` is the name of the iterator type (e.g. `BoolStatsIterator`) +/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`) +/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`) +macro_rules! make_stats_iterator { + ($iterator_type:ident, $parquet_statistics_type:path, $stat_value_type:ty) => { + /// Maps an iterator of `ParquetStatistics` into an iterator of + /// `ValueStatistics<$stat_value_type>` + /// + /// Yielded elements: + /// * Some(stats) if valid + /// * None if the statistics are not present, not valid, or not $stat_value_type + struct $iterator_type<'a, I> + where + I: Iterator>, + { + inner: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator>, + { + /// create a new iterator to extract $stat_value_type statistics + fn new(inner: I) -> Self { + Self { inner } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator>, + { + type Item = Option<&'a ValueStatistics<$stat_value_type>>; + + /// return the next statistics value + fn next(&mut self) -> Option { + let next_stat = self.inner.next()?; + let next_stat = next_stat.and_then(|stats| match stats { + $parquet_statistics_type(s) if stats.has_min_max_set() => Some(s), + _ => None, + }); + Some(next_stat) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } + } + }; +} + +make_stats_iterator!(BoolStatsIterator, ParquetStatistics::Boolean, bool); + /// Extract a single min/max statistics from a [`ParquetStatistics`] object /// /// * `$column_statistics` is the `ParquetStatistics` object @@ -211,9 +270,20 @@ pub(crate) fn min_statistics<'a, I: Iterator Result { - let scalars = iterator - .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type)))); - collect_scalars(data_type, scalars) + match data_type { + DataType::Boolean => { + // build a boolean array from the min statistics directly + let mins = BoolStatsIterator::new(iterator).map(|v| v.map(|v| *v.min())); + Ok(Arc::new(BooleanArray::from_iter(mins))) + } + _ => { + // fallback to scalars + let scalars = iterator.map(|x| { + x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type))) + }); + collect_scalars(data_type, scalars) + } + } } /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] From 992e2a324ca14bbb7199636ef1d0b747975fc624 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 May 2024 15:18:50 -0400 Subject: [PATCH 2/3] make other iterators --- .../physical_plan/parquet/statistics.rs | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 63f7030db097..db0c6ff13748 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -20,11 +20,14 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 use arrow::{array::ArrayRef, datatypes::DataType}; -use arrow_array::{new_empty_array, new_null_array, BooleanArray, UInt64Array}; +use arrow_array::{ + new_empty_array, new_null_array, BooleanArray, Int8Array, UInt64Array, +}; use arrow_schema::{Field, FieldRef, Schema}; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, }; +use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::metadata::ParquetMetaData; use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics}; use parquet::schema::types::SchemaDescriptor; @@ -110,6 +113,20 @@ macro_rules! make_stats_iterator { } make_stats_iterator!(BoolStatsIterator, ParquetStatistics::Boolean, bool); +make_stats_iterator!(Int32StatsIterator, ParquetStatistics::Int32, i32); +make_stats_iterator!(Int64StatsIterator, ParquetStatistics::Int64, i64); +make_stats_iterator!(F32StatsIterator, ParquetStatistics::Float, f32); +make_stats_iterator!(F64StatsIterator, ParquetStatistics::Double, f64); +make_stats_iterator!( + ByteArrayStatsIterator, + ParquetStatistics::ByteArray, + ByteArray +); +make_stats_iterator!( + FixedLenByteArrayStatsIterator, + ParquetStatistics::FixedLenByteArray, + FixedLenByteArray +); /// Extract a single min/max statistics from a [`ParquetStatistics`] object /// @@ -276,6 +293,15 @@ pub(crate) fn min_statistics<'a, I: Iterator { + let mins = Int32StatsIterator::new(iterator).map(|v| { + v.map(|v| { + let v: i8 = (*v.min()).try_into().unwrap(); + v + }) + }); + Ok(Arc::new(Int8Array::from_iter(mins))) + } _ => { // fallback to scalars let scalars = iterator.map(|x| { From cec9b440db74fd06c8be94d0a37d6a78ec6c959e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 29 May 2024 15:33:51 -0400 Subject: [PATCH 3/3] make min --- .../physical_plan/parquet/statistics.rs | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index db0c6ff13748..25cead612bcd 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -29,7 +29,7 @@ use datafusion_common::{ }; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::statistics::{Statistics as ParquetStatistics, ValueStatistics}; +use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use std::sync::Arc; @@ -58,16 +58,18 @@ fn sign_extend_be(b: &[u8]) -> [u8; 16] { /// Define an adapter iterator for extracting statistics from an iterator of /// `ParquetStatistics` /// +/// /// Handles checking if the statistics are present and valid with the correct type /// /// Parameters: /// * `$iterator_type` is the name of the iterator type (e.g. `BoolStatsIterator`) +/// * `$func` is the function to call to get the value (e.g. `min` or `max`) /// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`) /// * `$stat_value_type` is the type of the statistics value (e.g. `bool`) macro_rules! make_stats_iterator { - ($iterator_type:ident, $parquet_statistics_type:path, $stat_value_type:ty) => { + ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => { /// Maps an iterator of `ParquetStatistics` into an iterator of - /// `ValueStatistics<$stat_value_type>` + /// `&$stat_value_type`` /// /// Yielded elements: /// * Some(stats) if valid @@ -93,13 +95,15 @@ macro_rules! make_stats_iterator { where I: Iterator>, { - type Item = Option<&'a ValueStatistics<$stat_value_type>>; + type Item = Option<&'a $stat_value_type>; /// return the next statistics value fn next(&mut self) -> Option { let next_stat = self.inner.next()?; let next_stat = next_stat.and_then(|stats| match stats { - $parquet_statistics_type(s) if stats.has_min_max_set() => Some(s), + $parquet_statistics_type(s) if stats.has_min_max_set() => { + Some(s.$func()) + } _ => None, }); Some(next_stat) @@ -112,18 +116,20 @@ macro_rules! make_stats_iterator { }; } -make_stats_iterator!(BoolStatsIterator, ParquetStatistics::Boolean, bool); -make_stats_iterator!(Int32StatsIterator, ParquetStatistics::Int32, i32); -make_stats_iterator!(Int64StatsIterator, ParquetStatistics::Int64, i64); -make_stats_iterator!(F32StatsIterator, ParquetStatistics::Float, f32); -make_stats_iterator!(F64StatsIterator, ParquetStatistics::Double, f64); +make_stats_iterator!(MinBoolStatsIterator, min, ParquetStatistics::Boolean, bool); +make_stats_iterator!(MinInt32StatsIterator, min, ParquetStatistics::Int32, i32); +make_stats_iterator!(MinInt64StatsIterator, min, ParquetStatistics::Int64, i64); +make_stats_iterator!(MinF32StatsIterator, min, ParquetStatistics::Float, f32); +make_stats_iterator!(MinF64StatsIterator, min, ParquetStatistics::Double, f64); make_stats_iterator!( - ByteArrayStatsIterator, + MinByteArrayStatsIterator, + min, ParquetStatistics::ByteArray, ByteArray ); make_stats_iterator!( - FixedLenByteArrayStatsIterator, + MinFixedLenByteArrayStatsIterator, + min, ParquetStatistics::FixedLenByteArray, FixedLenByteArray ); @@ -289,17 +295,11 @@ pub(crate) fn min_statistics<'a, I: Iterator Result { match data_type { DataType::Boolean => { - // build a boolean array from the min statistics directly - let mins = BoolStatsIterator::new(iterator).map(|v| v.map(|v| *v.min())); + let mins = MinBoolStatsIterator::new(iterator).map(|v| v.cloned()); Ok(Arc::new(BooleanArray::from_iter(mins))) } DataType::Int8 => { - let mins = Int32StatsIterator::new(iterator).map(|v| { - v.map(|v| { - let v: i8 = (*v.min()).try_into().unwrap(); - v - }) - }); + let mins = MinInt32StatsIterator::new(iterator).map(|v| v.map(|v| *v as i8)); Ok(Arc::new(Int8Array::from_iter(mins))) } _ => {