Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,8 @@ mod tests {
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_array::Int64Array;
use arrow_array::types::Int32Type;
use arrow_array::{DictionaryArray, Int32Array, Int64Array};
use arrow_schema::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::cast::{
Expand All @@ -1158,6 +1159,7 @@ mod tests {
};
use datafusion_common::config::ParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::Utf8;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -1439,6 +1441,48 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> {
// Data for column c_dic: ["a", "b", "c", "d"]
let values = StringArray::from_iter_values(["a", "b", "c", "d"]);
let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
let dic_array =
DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
let c_dic: ArrayRef = Arc::new(dic_array);

let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)]).unwrap();

// Use store_parquet to write each batch to its own file
// . batch1 written into first file and includes:
// - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available.
let store = Arc::new(LocalFileSystem::new()) as _;
let (files, _file_names) = store_parquet(vec![batch1], false).await?;

let state = SessionContext::new().state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&state, &store, &files).await.unwrap();

// Fetch statistics for first file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
assert_eq!(stats.num_rows, Precision::Exact(4));

// column c_dic
let c_dic_stats = &stats.column_statistics[0];

assert_eq!(c_dic_stats.null_count, Precision::Exact(0));
assert_eq!(
c_dic_stats.max_value,
Precision::Exact(Utf8(Some("d".into())))
);
assert_eq!(
c_dic_stats.min_value,
Precision::Exact(Utf8(Some("a".into())))
);

Ok(())
}

#[tokio::test]
async fn test_statistics_from_parquet_metadata() -> Result<()> {
// Data for column c1: ["Foo", null, "bar"]
Expand Down
21 changes: 19 additions & 2 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use arrow_schema::DataType;

use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;
Expand Down Expand Up @@ -156,12 +157,16 @@ pub(crate) fn create_max_min_accs(
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
.iter()
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
.map(|field| {
MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
})
.collect();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
.iter()
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
.map(|field| {
MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
})
.collect();
(max_values, min_values)
}
Expand Down Expand Up @@ -218,6 +223,18 @@ pub(crate) fn get_col_stats(
.collect()
}

// Min/max aggregation can take Dictionary encode input but always produces unpacked
// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
// The reason min/max aggregate produces unpacked output because there is only one
// min/max value per group; there is no needs to keep them Dictionary encode
fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
if let DataType::Dictionary(_, value_type) = input_type {
value_type.as_ref()
} else {
input_type
}
}

/// If the given value is numerically greater than the original maximum value,
/// return the new maximum value with appropriate exactness information.
fn set_max_if_greater(
Expand Down