From 26e4a70c163d958743f051ea7ea75bcdea5181d3 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Fri, 28 Jun 2024 11:05:09 -0700 Subject: [PATCH 1/2] fix: Support dictionary type in parquet metadata statistics. --- .../src/datasource/file_format/parquet.rs | 55 ++++++++++++++++++- datafusion/core/src/datasource/statistics.rs | 21 ++++++- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 44c9cc4ec4a9..f5b5b84bb237 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1149,7 +1149,8 @@ mod tests { use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow_array::Int64Array; + use arrow_array::types::Int32Type; + use arrow_array::{DictionaryArray, Int32Array, Int64Array}; use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ @@ -1158,6 +1159,7 @@ mod tests { }; use datafusion_common::config::ParquetOptions; use datafusion_common::ScalarValue; + use datafusion_common::ScalarValue::Utf8; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -1439,6 +1441,57 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> { + // Data for column c_dic: ["a", "b", "c", "d"] + let values = StringArray::from_iter_values(["a", "b", "c", "d"]); + let keys = Int32Array::from_iter_values([0, 0, 1, 2]); + let dic_array = + DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + let boxed_array: Box = Box::new(dic_array); + let c_dic: ArrayRef = Arc::from(boxed_array); + + // Define the schema + let field = Field::new( + "c_dic", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ); + let schema = Schema::new(vec![field]); + // Create the RecordBatch + let batch1 = RecordBatch::try_new(Arc::new(schema), vec![c_dic]).unwrap(); + + // Use store_parquet to write each batch to its own file + // . batch1 written into first file and includes: + // - column c_dic that has 4 rows with no null. Stats min and max of string column is missing for this test even the column has values + let store = Arc::new(LocalFileSystem::new()) as _; + let (files, _file_names) = store_parquet(vec![batch1], false).await?; + + let state = SessionContext::new().state(); + let format = ParquetFormat::default(); + let schema = format.infer_schema(&state, &store, &files).await.unwrap(); + + // Fetch statistics for first file + let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; + let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?; + assert_eq!(stats.num_rows, Precision::Exact(4)); + + // column c_dic + let c_dic_stats = &stats.column_statistics[0]; + + assert_eq!(c_dic_stats.null_count, Precision::Exact(0)); + assert_eq!( + c_dic_stats.max_value, + Precision::Exact(Utf8(Some("c".into()))) + ); + assert_eq!( + c_dic_stats.min_value, + Precision::Exact(Utf8(Some("a".into()))) + ); + + Ok(()) + } + #[tokio::test] async fn test_statistics_from_parquet_metadata() -> Result<()> { // Data for column c1: ["Foo", null, "bar"] diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index c67227f966a2..a243a1c3558f 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -20,6 +20,7 @@ use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics}; +use arrow_schema::DataType; use datafusion_common::stats::Precision; use datafusion_common::ScalarValue; @@ -156,12 +157,16 @@ pub(crate) fn create_max_min_accs( let max_values: Vec> = schema .fields() .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) + .map(|field| { + MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() + }) .collect(); let min_values: Vec> = schema .fields() .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) + .map(|field| { + MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() + }) .collect(); (max_values, min_values) } @@ -218,6 +223,18 @@ pub(crate) fn get_col_stats( .collect() } +// Min/max aggregation can take Dictionary encode input but always produces unpacked +// (aka non Dictionary) output. We need to adjust the output data type to reflect this. +// The reason min/max aggregate produces unpacked output because there is only one +// min/max value per group; there is no needs to keep them Dictionary encode +fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { + if let DataType::Dictionary(_, value_type) = input_type { + value_type.as_ref() + } else { + input_type + } +} + /// If the given value is numerically greater than the original maximum value, /// return the new maximum value with appropriate exactness information. fn set_max_if_greater( From 1b56dc5955233ed99f13327007e9edc69180f538 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Fri, 28 Jun 2024 18:09:46 -0700 Subject: [PATCH 2/2] Simplify tests. --- .../src/datasource/file_format/parquet.rs | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index f5b5b84bb237..cecc2f246031 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1445,25 +1445,16 @@ mod tests { async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> { // Data for column c_dic: ["a", "b", "c", "d"] let values = StringArray::from_iter_values(["a", "b", "c", "d"]); - let keys = Int32Array::from_iter_values([0, 0, 1, 2]); + let keys = Int32Array::from_iter_values([0, 1, 2, 3]); let dic_array = DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); - let boxed_array: Box = Box::new(dic_array); - let c_dic: ArrayRef = Arc::from(boxed_array); - - // Define the schema - let field = Field::new( - "c_dic", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - false, - ); - let schema = Schema::new(vec![field]); - // Create the RecordBatch - let batch1 = RecordBatch::try_new(Arc::new(schema), vec![c_dic]).unwrap(); + let c_dic: ArrayRef = Arc::new(dic_array); + + let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)]).unwrap(); // Use store_parquet to write each batch to its own file // . batch1 written into first file and includes: - // - column c_dic that has 4 rows with no null. Stats min and max of string column is missing for this test even the column has values + // - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available. let store = Arc::new(LocalFileSystem::new()) as _; let (files, _file_names) = store_parquet(vec![batch1], false).await?; @@ -1482,7 +1473,7 @@ mod tests { assert_eq!(c_dic_stats.null_count, Precision::Exact(0)); assert_eq!( c_dic_stats.max_value, - Precision::Exact(Utf8(Some("c".into()))) + Precision::Exact(Utf8(Some("d".into()))) ); assert_eq!( c_dic_stats.min_value,