Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement non-nested struct statistic extraction #10730

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 161 additions & 39 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
use arrow_schema::{Field, FieldRef, Schema};
use arrow_array::{new_empty_array, new_null_array, Array, StructArray, UInt64Array};
use arrow_schema::{Field, FieldRef, Fields, Schema};
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, Result, ScalarValue,
internal_datafusion_err, internal_err, plan_err, DataFusionError, Result, ScalarValue,
};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use std::sync::Arc;
Expand Down Expand Up @@ -190,20 +190,29 @@ pub(crate) fn parquet_column<'a>(
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
return None;
}
match field.data_type() {
DataType::Struct(_) => {
let parquet_idx = (0..parquet_schema.columns().len())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold could you help here -- is this the correct way to find the correct parquet leaf from an arrow struct type?

.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}
_ => {
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
return None;
}

// This could be made more efficient (#TBD)
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
// This could be made more efficient (#TBD)
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}
}
}

/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
Expand All @@ -216,6 +225,98 @@ pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics
collect_scalars(data_type, scalars)
}

/// Extract the min statistics for struct array
pub(crate) fn struct_min_statistics(
row_groups: &[RowGroupMetaData],
struct_fields: &Fields,
) -> Result<ArrayRef, DataFusionError> {
let mut child_data = Vec::new();
let mut fields = Vec::new();

if struct_fields.iter().any(|f| f.data_type().is_nested()) {
return Ok(new_empty_array(&DataType::Struct(struct_fields.clone())));
}

for (idx, field) in struct_fields.iter().enumerate() {
// Handle non-nested fields
let max_value = row_groups
.iter()
.map(|x| x.column(idx).statistics())
.map(|x| {
x.and_then(|s| get_statistic!(s, min, min_bytes, Some(field.data_type())))
});
let array = collect_scalars(field.data_type(), max_value)?;
child_data.push(Arc::new(array) as Arc<dyn Array>);
fields.push(Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
)));
}
// Create a StructArray from collected fields and data
let struct_array =
StructArray::from(fields.into_iter().zip(child_data).collect::<Vec<_>>());
println!("the struct array is {:?}", struct_array);
Ok(Arc::new(struct_array) as ArrayRef)
}

/// Extract the max statistics for struct array
pub(crate) fn struct_max_statistics(
row_groups: &[RowGroupMetaData],
struct_fields: &Fields,
) -> Result<ArrayRef, DataFusionError> {
let mut child_data = Vec::new();
let mut fields = Vec::new();

if struct_fields.iter().any(|f| f.data_type().is_nested()) {
return Ok(new_empty_array(&DataType::Struct(struct_fields.clone())));
}

for (idx, field) in struct_fields.iter().enumerate() {
// Handle non-nested fields
let max_value = row_groups
.iter()
.map(|x| x.column(idx).statistics())
.map(|x| {
x.and_then(|s| get_statistic!(s, max, max_bytes, Some(field.data_type())))
});
let array = collect_scalars(field.data_type(), max_value)?;
child_data.push(Arc::new(array) as Arc<dyn Array>);
fields.push(Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
)));
}
// Create a StructArray from collected fields and data
let struct_array =
StructArray::from(fields.into_iter().zip(child_data).collect::<Vec<_>>());
Ok(Arc::new(struct_array) as ArrayRef)
}

/// Extract the nullcount statistics for struct array
pub(crate) fn struct_null_count_statistics(
row_groups: &[RowGroupMetaData],
struct_fields: &Fields,
) -> Result<ArrayRef, DataFusionError> {
if struct_fields.iter().any(|f| f.data_type().is_nested()) {
return Ok(Arc::new(new_empty_array(&DataType::UInt64)) as ArrayRef);
}

let null_count = row_groups
.iter()
.filter(|rg| {
struct_fields.iter().enumerate().all(|(idx, _)| {
rg.column(idx)
.statistics()
.map_or(false, |stats| stats.null_count() > 0)
})
})
.count() as u64;

Ok(Arc::new(UInt64Array::from(vec![null_count])) as ArrayRef)
}

/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
pub(crate) fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
Expand Down Expand Up @@ -353,35 +454,51 @@ impl<'a> StatisticsConverter<'a> {

let parquet_schema = metadata.file_metadata().schema_descr();
let row_groups = metadata.row_groups();

// find the column in the parquet schema, if not, return a null array
let Some((parquet_idx, matched_field)) =
parquet_column(parquet_schema, self.arrow_schema, self.column_name)
else {
// column was in the arrow schema but not in the parquet schema, so return a null array
return Ok(new_null_array(data_type, num_row_groups));
};
match matched_field.data_type() {
// need to deal with nested struct
DataType::Struct(fields) => match self.statistics_type {
RequestedStatistics::Max => {
Ok(struct_max_statistics(row_groups, fields)?)
}
RequestedStatistics::Min => {
Ok(struct_min_statistics(row_groups, fields)?)
}
RequestedStatistics::NullCount => {
Ok(struct_null_count_statistics(row_groups, fields)?)
}
},
// currently set to return null
v if v.is_nested() => Ok(new_null_array(data_type, num_row_groups)),
_ => {
// sanity check that matching field matches the arrow field
if matched_field.as_ref() != self.arrow_field {
return internal_err!(
"Matched column '{:?}' does not match original matched column '{:?}'",
matched_field,
self.arrow_field
);
}

// sanity check that matching field matches the arrow field
if matched_field.as_ref() != self.arrow_field {
return internal_err!(
"Matched column '{:?}' does not match original matched column '{:?}'",
matched_field,
self.arrow_field
);
}

// Get an iterator over the column statistics
let iter = row_groups
.iter()
.map(|x| x.column(parquet_idx).statistics());

match self.statistics_type {
RequestedStatistics::Min => min_statistics(data_type, iter),
RequestedStatistics::Max => max_statistics(data_type, iter),
RequestedStatistics::NullCount => {
let null_counts = iter.map(|stats| stats.map(|s| s.null_count()));
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
// Get an iterator over the column statistics
let iter = row_groups
.iter()
.map(|x| x.column(parquet_idx).statistics());

match self.statistics_type {
RequestedStatistics::Min => min_statistics(data_type, iter),
RequestedStatistics::Max => max_statistics(data_type, iter),
RequestedStatistics::NullCount => {
let null_counts = iter.map(|stats| stats.map(|s| s.null_count()));
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
}
}
}
}
}
Expand Down Expand Up @@ -983,7 +1100,12 @@ mod test {
for field in schema.fields() {
if field.data_type().is_nested() {
let lookup = parquet_column(parquet_schema, &schema, field.name());
assert_eq!(lookup, None);
match field.data_type() {
DataType::Struct(_) => {}
_ => {
assert_eq!(lookup, None);
}
}
continue;
}

Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,7 @@ async fn test_boolean() {
}

// struct array
// BUG
// https://github.com/apache/datafusion/issues/10609
// Note that: since I have not worked on struct array before, there may be a bug in the test code rather than the real bug in the code
#[ignore]
// currently support non-nested structArray
#[tokio::test]
async fn test_struct() {
// This creates a parquet files of 1 column named "struct"
Expand Down