diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 6ad78a82b9bf..2ca47de990ec 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -33,8 +33,9 @@ use arrow_array::{ use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; use half::f16; +use parquet::data_type::FixedLenByteArray; use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; -use parquet::file::page_index::index::Index; +use parquet::file::page_index::index::{Index, PageIndex}; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; use paste::paste; @@ -495,7 +496,7 @@ macro_rules! get_statistics { } macro_rules! make_data_page_stats_iterator { - ($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => { + ($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => { struct $iterator_type<'a, I> where I: Iterator, @@ -526,7 +527,7 @@ macro_rules! make_data_page_stats_iterator { native_index .indexes .iter() - .map(|x| x.$func) + .map(|x| $func(x)) .collect::>(), ), // No matching `Index` found; @@ -548,11 +549,66 @@ macro_rules! make_data_page_stats_iterator { }; } -make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min, Index::INT32, i32); -make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max, Index::INT32, i32); -make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64); -make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64); - +make_data_page_stats_iterator!( + MinInt32DataPageStatsIterator, + |x: &PageIndex| { x.min }, + Index::INT32, + i32 +); +make_data_page_stats_iterator!( + MaxInt32DataPageStatsIterator, + |x: &PageIndex| { x.max }, + Index::INT32, + i32 +); +make_data_page_stats_iterator!( + MinInt64DataPageStatsIterator, + |x: &PageIndex| { x.min }, + Index::INT64, + i64 +); +make_data_page_stats_iterator!( + MaxInt64DataPageStatsIterator, + |x: &PageIndex| { x.max }, + Index::INT64, + i64 +); +make_data_page_stats_iterator!( + MinFloat16DataPageStatsIterator, + |x: &PageIndex| { x.min.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); +make_data_page_stats_iterator!( + MaxFloat16DataPageStatsIterator, + |x: &PageIndex| { x.max.clone() }, + Index::FIXED_LEN_BYTE_ARRAY, + FixedLenByteArray +); +make_data_page_stats_iterator!( + MinFloat32DataPageStatsIterator, + |x: &PageIndex| { x.min }, + Index::FLOAT, + f32 +); +make_data_page_stats_iterator!( + MaxFloat32DataPageStatsIterator, + |x: &PageIndex| { x.max }, + Index::FLOAT, + f32 +); +make_data_page_stats_iterator!( + MinFloat64DataPageStatsIterator, + |x: &PageIndex| { x.min }, + Index::DOUBLE, + f64 +); +make_data_page_stats_iterator!( + MaxFloat64DataPageStatsIterator, + |x: &PageIndex| { x.max }, + Index::DOUBLE, + f64 +); macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { paste! { @@ -581,6 +637,19 @@ macro_rules! get_data_page_statistics { )), Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Float16) => Ok(Arc::new( + Float16Array::from_iter( + [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter().filter_map(|x| { + x.and_then(|x| Some(from_bytes_to_f16(x.data()))) + }) + }) + .flatten() + ) + )), + Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), _ => unimplemented!() } } @@ -677,6 +746,21 @@ where .iter() .map(|x| x.null_count.map(|x| x as u64)) .collect::>(), + Index::FLOAT(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::>(), + Index::DOUBLE(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::>(), + Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::>(), _ => unimplemented!(), }); diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 4c68a57333e5..bdae9f47867e 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -614,6 +614,94 @@ async fn test_int_8() { .run(); } +#[tokio::test] +async fn test_float_16() { + // This creates a parquet files of 1 column named f + let reader = TestReader { + scenario: Scenario::Float16, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Float16Array::from(vec![ + f16::from_f32(-5.), + f16::from_f32(-4.), + f16::from_f32(-0.), + f16::from_f32(5.), + ])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Float16Array::from(vec![ + f16::from_f32(-1.), + f16::from_f32(0.), + f16::from_f32(4.), + f16::from_f32(9.), + ])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), + column_name: "f", + check: Check::Both, + } + .run(); +} + +#[tokio::test] +async fn test_float_32() { + // This creates a parquet files of 1 column named f + let reader = TestReader { + scenario: Scenario::Float32, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Float32Array::from(vec![-5., -4., -0., 5.0])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Float32Array::from(vec![-1., 0., 4., 9.])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), + column_name: "f", + check: Check::Both, + } + .run(); +} + +#[tokio::test] +async fn test_float_64() { + // This creates a parquet files of 1 column named f + let reader = TestReader { + scenario: Scenario::Float64, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Float64Array::from(vec![-5., -4., -0., 5.0])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Float64Array::from(vec![-1., 0., 4., 9.])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), + column_name: "f", + check: Check::Both, + } + .run(); +} + // timestamp #[tokio::test] async fn test_timestamp() { diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 0434a271c32e..1b68a4aa4eb3 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -90,6 +90,7 @@ enum Scenario { /// -MIN, -100, -1, 0, 1, 100, MAX NumericLimits, Float16, + Float32, Float64, Decimal, Decimal256, @@ -586,6 +587,12 @@ fn make_f64_batch(v: Vec) -> RecordBatch { RecordBatch::try_new(schema, vec![array.clone()]).unwrap() } +fn make_f32_batch(v: Vec) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float32, true)])); + let array = Arc::new(Float32Array::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + fn make_f16_batch(v: Vec) -> RecordBatch { let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float16, true)])); let array = Arc::new(Float16Array::from(v)) as ArrayRef; @@ -1003,6 +1010,14 @@ fn create_data_batch(scenario: Scenario) -> Vec { ), ] } + Scenario::Float32 => { + vec![ + make_f32_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), + make_f32_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]), + make_f32_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]), + make_f32_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]), + ] + } Scenario::Float64 => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),