diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 29b593a70ca0..c83d33cb2e02 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -18,6 +18,7 @@ //! Helper functions for the table implementation use std::collections::HashMap; +use std::mem; use std::sync::Arc; use super::PartitionedFile; @@ -138,10 +139,19 @@ pub fn split_files( // effectively this is div with rounding up instead of truncating let chunk_size = (partitioned_files.len() + n - 1) / n; - partitioned_files - .chunks(chunk_size) - .map(|c| c.to_vec()) - .collect() + let mut chunks = Vec::with_capacity(n); + + let mut current_chunk = Vec::with_capacity(chunk_size); + for file in partitioned_files.drain(..) { + current_chunk.push(file); + if current_chunk.len() == chunk_size { + chunks.push(mem::take(&mut current_chunk)); + } + } + if !current_chunk.is_empty() { + chunks.push(current_chunk) + } + chunks } struct Partition { diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 44f92760908d..c5a742d50e53 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -78,10 +78,11 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, + pub statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, } + impl PartitionedFile { /// Create a simple file without metadata or partition pub fn new(path: impl Into, size: u64) -> Self { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 80f49e4eb8e6..c971f357c7b8 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -978,10 +978,12 @@ impl ListingTable { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; part_file.statistics = Some(statistics.clone()); - Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> + Ok((part_file, statistics)) } else { - Ok((part_file, Statistics::new_unknown(&self.file_schema))) - as Result<(PartitionedFile, Statistics)> + Ok(( + part_file, + Arc::new(Statistics::new_unknown(&self.file_schema)), + )) } }) .boxed() @@ -1011,12 +1013,12 @@ impl ListingTable { ctx: &SessionState, store: &Arc, part_file: &PartitionedFile, - ) -> Result { + ) -> Result> { let statistics_cache = self.collected_statistics.clone(); - return match statistics_cache + match statistics_cache .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) { - Some(statistics) => Ok(statistics.as_ref().clone()), + Some(statistics) => Ok(statistics.clone()), None => { let statistics = self .options @@ -1028,14 +1030,15 @@ impl ListingTable { &part_file.object_meta, ) .await?; + let statistics = Arc::new(statistics); statistics_cache.put_with_extra( &part_file.object_meta.location, - statistics.clone().into(), + statistics.clone(), &part_file.object_meta, ); Ok(statistics) } - }; + } } } diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 17850ea7585a..566464cdfab0 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -1193,7 +1193,7 @@ mod tests { }, partition_values: vec![ScalarValue::from(file.date)], range: None, - statistics: Some(Statistics { + statistics: Some(Arc::new(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: file @@ -1213,7 +1213,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), - }), + })), extensions: None, } } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 8c789e461b08..d9cbe4f8ac29 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::mem; +use std::sync::Arc; + use super::listing::PartitionedFile; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; @@ -26,8 +29,6 @@ use datafusion_common::stats::Precision; use datafusion_common::ScalarValue; use futures::{Stream, StreamExt}; -use itertools::izip; -use itertools::multiunzip; /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to @@ -35,7 +36,7 @@ use itertools::multiunzip; /// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive /// call to `multiunzip` for constructing file level summary statistics. pub async fn get_statistics_with_limit( - all_files: impl Stream>, + all_files: impl Stream)>>, file_schema: SchemaRef, limit: Option, collect_stats: bool, @@ -48,9 +49,7 @@ pub async fn get_statistics_with_limit( // - zero for summations, and // - neutral element for extreme points. let size = file_schema.fields().len(); - let mut null_counts: Vec> = vec![Precision::Absent; size]; - let mut max_values: Vec> = vec![Precision::Absent; size]; - let mut min_values: Vec> = vec![Precision::Absent; size]; + let mut col_stats_set = vec![ColumnStatistics::default(); size]; let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; @@ -62,12 +61,14 @@ pub async fn get_statistics_with_limit( result_files.push(file); // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() { - null_counts[index] = file_column.null_count; - max_values[index] = file_column.max_value; - min_values[index] = file_column.min_value; + num_rows = file_stats.num_rows.clone(); + total_byte_size = file_stats.total_byte_size.clone(); + for (index, file_column) in + file_stats.column_statistics.clone().into_iter().enumerate() + { + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; } // If the number of rows exceeds the limit, we can stop processing @@ -90,38 +91,28 @@ pub async fn get_statistics_with_limit( // counts across all the files in question. If any file does not // provide any information or provides an inexact value, we demote // the statistic precision to inexact. - num_rows = add_row_stats(file_stats.num_rows, num_rows); + num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows); total_byte_size = - add_row_stats(file_stats.total_byte_size, total_byte_size); + add_row_stats(file_stats.total_byte_size.clone(), total_byte_size); - (null_counts, max_values, min_values) = multiunzip( - izip!( - file_stats.column_statistics.into_iter(), - null_counts.into_iter(), - max_values.into_iter(), - min_values.into_iter() - ) - .map( - |( - ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - distinct_count: _, - }, - null_count, - max_value, - min_value, - )| { - ( - add_row_stats(file_nc, null_count), - set_max_if_greater(file_max, max_value), - set_min_if_lesser(file_min, min_value), - ) - }, - ), - ); + for (file_col_stats, col_stats) in file_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + distinct_count: _, + } = file_col_stats; + + col_stats.null_count = + add_row_stats(file_nc.clone(), col_stats.null_count.clone()); + set_max_if_greater(file_max, &mut col_stats.max_value); + set_min_if_lesser(file_min, &mut col_stats.min_value) + } // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also @@ -139,7 +130,7 @@ pub async fn get_statistics_with_limit( let mut statistics = Statistics { num_rows, total_byte_size, - column_statistics: get_col_stats_vec(null_counts, max_values, min_values), + column_statistics: col_stats_set, }; if all_files.next().await.is_some() { // If we still have files in the stream, it means that the limit kicked @@ -182,21 +173,6 @@ fn add_row_stats( } } -pub(crate) fn get_col_stats_vec( - null_counts: Vec>, - max_values: Vec>, - min_values: Vec>, -) -> Vec { - izip!(null_counts, max_values, min_values) - .map(|(null_count, max_value, min_value)| ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: Precision::Absent, - }) - .collect() -} - pub(crate) fn get_col_stats( schema: &Schema, null_counts: Vec>, @@ -238,45 +214,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { /// If the given value is numerically greater than the original maximum value, /// return the new maximum value with appropriate exactness information. fn set_max_if_greater( - max_nominee: Precision, - max_values: Precision, -) -> Precision { - match (&max_values, &max_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee, + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 < val2 => { - max_nominee.to_inexact() + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); } - (Precision::Exact(_), Precision::Absent) => max_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => max_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => max_values, + _ => {} } } /// If the given value is numerically lesser than the original minimum value, /// return the new minimum value with appropriate exactness information. fn set_min_if_lesser( - min_nominee: Precision, - min_values: Precision, -) -> Precision { - match (&min_values, &min_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee, + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 > val2 => { - min_nominee.to_inexact() + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); } - (Precision::Exact(_), Precision::Absent) => min_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => min_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => min_values, + _ => {} } } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index bc0a19336bae..44fa8a9a717c 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -559,7 +559,11 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, + statistics: val + .statistics + .as_ref() + .map(|v| v.try_into().map(Arc::new)) + .transpose()?, extensions: None, }) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 57cd22a99ae1..ae330e7421d4 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -527,7 +527,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, - statistics: pf.statistics.as_ref().map(|s| s.into()), + statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()), }) } }