From a97accacd3548124716f43065ab5b87ca0a9336e Mon Sep 17 00:00:00 2001 From: Aryan Bagade <73382554+AryanBagade@users.noreply.github.com> Date: Thu, 13 Nov 2025 13:57:02 -0800 Subject: [PATCH 1/2] chore: Enforce lint rule `clippy::needless_pass_by_value` to datafusion-datasource This commit enforces the `clippy::needless_pass_by_value` lint rule to prevent unnecessary data clones and improve performance in the datafusion-datasource crate. Changes: - Added lint rule to datafusion/datasource/src/mod.rs - Fixed 11 violations across 5 files by changing pass-by-value to pass-by-reference - Updated callers in datafusion-core and datafusion-catalog-listing Fixes #18611 Part of #18503 --- datafusion/catalog-listing/src/table.rs | 2 +- datafusion/core/src/physical_planner.rs | 3 +- datafusion/datasource/src/file_scan_config.rs | 17 +++--- datafusion/datasource/src/file_stream.rs | 2 +- datafusion/datasource/src/memory.rs | 48 ++++++++--------- datafusion/datasource/src/mod.rs | 4 ++ datafusion/datasource/src/source.rs | 9 ++-- datafusion/datasource/src/statistics.rs | 54 ++++++++----------- datafusion/datasource/src/write/demux.rs | 4 +- 9 files changed, 66 insertions(+), 77 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 33d5c86bf88d..6a485743c47e 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -656,7 +656,7 @@ impl ListingTable { let file_groups = file_group.split_files(self.options.target_partitions); let (mut file_groups, mut stats) = compute_all_files_statistics( file_groups, - self.schema(), + &self.schema(), self.options.collect_stat, inexact_stats, )?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index f10755a4594c..e7fcf61b360c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -482,8 +482,7 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)? - as _ + MemorySourceConfig::try_new_as_values(schema.inner(), &exprs)? as _ } LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 82052ee4c39c..e2ba48bc7e3b 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -581,7 +581,8 @@ impl DataSource for FileScanConfig { if let Some(filter) = self.file_source.filter() { // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with. // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence. - match Self::add_filter_equivalence_info(filter, &mut eq_properties, &schema) { + match Self::add_filter_equivalence_info(&filter, &mut eq_properties, &schema) + { Ok(()) => {} Err(e) => { warn!("Failed to add filter equivalence info: {e}"); @@ -782,12 +783,12 @@ impl FileScanConfig { } fn add_filter_equivalence_info( - filter: Arc, + filter: &Arc, eq_properties: &mut EquivalenceProperties, schema: &Schema, ) -> Result<()> { // Gather valid equality pairs from the filter expression - let equal_pairs = split_conjunction(&filter).into_iter().filter_map(|expr| { + let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| { // Ignore any binary expressions that reference non-existent columns in the current schema // (e.g. due to unnecessary projections being removed) reassign_expr_columns(Arc::clone(expr), schema) @@ -1147,7 +1148,7 @@ impl PartitionColumnProjector { // - `partition_values`: the list of partition values, one for each partition column pub fn project( &mut self, - file_batch: RecordBatch, + file_batch: &RecordBatch, partition_values: &[ScalarValue], ) -> Result { let expected_cols = @@ -1672,7 +1673,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - file_batch, + &file_batch, &[ wrap_partition_value_in_dict(ScalarValue::from("2021")), wrap_partition_value_in_dict(ScalarValue::from("10")), @@ -1700,7 +1701,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - file_batch, + &file_batch, &[ wrap_partition_value_in_dict(ScalarValue::from("2021")), wrap_partition_value_in_dict(ScalarValue::from("10")), @@ -1730,7 +1731,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - file_batch, + &file_batch, &[ wrap_partition_value_in_dict(ScalarValue::from("2021")), wrap_partition_value_in_dict(ScalarValue::from("10")), @@ -1758,7 +1759,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - file_batch, + &file_batch, &[ ScalarValue::from("2021"), ScalarValue::from("10"), diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 0568b4cc4e5f..85fc29830c57 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -215,7 +215,7 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.stop(); let result = self .pc_projector - .project(batch, partition_values) + .project(&batch, partition_values) .map(|batch| match &mut self.remain { Some(remain) => { if *remain > batch.num_rows() { diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 7d5c8c4834ea..97bfeea8b908 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -283,8 +283,8 @@ impl MemorySourceConfig { /// Create a new execution plan from a list of constant values (`ValuesExec`) pub fn try_new_as_values( - schema: SchemaRef, - data: Vec>>, + schema: &SchemaRef, + data: &[Vec>], ) -> Result> { if data.is_empty() { return plan_err!("Values list cannot be empty"); @@ -326,13 +326,13 @@ impl MemorySourceConfig { .collect::>>()?; let batch = RecordBatch::try_new_with_options( - Arc::clone(&schema), + Arc::clone(schema), arrays, &RecordBatchOptions::new().with_row_count(Some(n_row)), )?; let partitions = vec![batch]; - Self::try_new_from_batches(Arc::clone(&schema), partitions) + Self::try_new_from_batches(schema, partitions) } /// Create a new plan using the provided schema and batches. @@ -340,7 +340,7 @@ impl MemorySourceConfig { /// Errors if any of the batches don't match the provided schema, or if no /// batches are provided. pub fn try_new_from_batches( - schema: SchemaRef, + schema: &SchemaRef, batches: Vec, ) -> Result> { if batches.is_empty() { @@ -349,7 +349,7 @@ impl MemorySourceConfig { for batch in &batches { let batch_schema = batch.schema(); - if batch_schema != schema { + if batch_schema != *schema { return plan_err!( "Batch has invalid schema. Expected: {}, got: {}", schema, @@ -361,8 +361,8 @@ impl MemorySourceConfig { let partitions = vec![batches]; let source = Self { partitions, - schema: Arc::clone(&schema), - projected_schema: Arc::clone(&schema), + schema: Arc::clone(schema), + projected_schema: Arc::clone(schema), projection: None, sort_information: vec![], show_sizes: true, @@ -859,7 +859,7 @@ mod tests { let schema = batch.schema(); let batches = vec![batch.clone(), batch]; - let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap(); + let exec = MemorySourceConfig::try_new_from_batches(&schema, batches).unwrap(); assert_eq!(exec.fetch(), None); let exec = exec.with_fetch(Some(4)).unwrap(); @@ -881,7 +881,7 @@ mod tests { #[tokio::test] async fn values_empty_case() -> Result<()> { let schema = aggr_test_schema(); - let empty = MemorySourceConfig::try_new_as_values(schema, vec![]); + let empty = MemorySourceConfig::try_new_as_values(&schema, &[]); assert!(empty.is_err()); Ok(()) } @@ -891,14 +891,15 @@ mod tests { let batch = make_partition(7); let schema = batch.schema(); let batches = vec![batch.clone(), batch]; - let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap(); + let _exec = MemorySourceConfig::try_new_from_batches(&schema, batches).unwrap(); } #[test] fn new_exec_with_batches_empty() { let batch = make_partition(7); let schema = batch.schema(); - let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err(); + let _ = + MemorySourceConfig::try_new_from_batches(&schema, Vec::new()).unwrap_err(); } #[test] @@ -910,7 +911,7 @@ mod tests { Field::new("col0", DataType::UInt32, false), Field::new("col1", DataType::Utf8, false), ])); - let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches) + let _ = MemorySourceConfig::try_new_from_batches(&invalid_schema, batches) .unwrap_err(); } @@ -922,17 +923,11 @@ mod tests { DataType::UInt32, false, )])); - let _ = MemorySourceConfig::try_new_as_values( - Arc::clone(&schema), - vec![vec![lit(1u32)]], - ) - .unwrap(); + let data1 = vec![vec![lit(1u32)]]; + let _ = MemorySourceConfig::try_new_as_values(&schema, &data1).unwrap(); // Test that a null value is rejected - let _ = MemorySourceConfig::try_new_as_values( - schema, - vec![vec![lit(ScalarValue::UInt32(None))]], - ) - .unwrap_err(); + let data2 = vec![vec![lit(ScalarValue::UInt32(None))]]; + let _ = MemorySourceConfig::try_new_as_values(&schema, &data2).unwrap_err(); } #[test] @@ -943,10 +938,9 @@ mod tests { vec![lit(ScalarValue::Null)], ]; let rows = data.len(); - let values = MemorySourceConfig::try_new_as_values( - Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])), - data, - )?; + let schema = + Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])); + let values = MemorySourceConfig::try_new_as_values(&schema, &data)?; assert_eq!( values.partition_statistics(None)?, diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 8d988bdb31be..2c7d40d2fb3b 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -23,6 +23,10 @@ // Make sure fast / cheap clones on Arc are explicit: // https://github.com/apache/datafusion/issues/11143 #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] +// Enforce lint rule to prevent needless pass by value +// https://github.com/apache/datafusion/issues/18503 +#![deny(clippy::needless_pass_by_value)] +#![cfg_attr(test, allow(clippy::needless_pass_by_value))] //! A table that uses the `ObjectStore` listing capability //! to get the list of files to process. diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 11a8a3867b80..de79512a4101 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -348,8 +348,7 @@ impl ExecutionPlan for DataSourceExec { let mut new_node = self.clone(); new_node.data_source = data_source; // Re-compute properties since we have new filters which will impact equivalence info - new_node.cache = - Self::compute_properties(Arc::clone(&new_node.data_source)); + new_node.cache = Self::compute_properties(&new_node.data_source); Ok(FilterPushdownPropagation { filters: res.filters, @@ -371,7 +370,7 @@ impl DataSourceExec { // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`. pub fn new(data_source: Arc) -> Self { - let cache = Self::compute_properties(Arc::clone(&data_source)); + let cache = Self::compute_properties(&data_source); Self { data_source, cache } } @@ -381,7 +380,7 @@ impl DataSourceExec { } pub fn with_data_source(mut self, data_source: Arc) -> Self { - self.cache = Self::compute_properties(Arc::clone(&data_source)); + self.cache = Self::compute_properties(&data_source); self.data_source = data_source; self } @@ -398,7 +397,7 @@ impl DataSourceExec { self } - fn compute_properties(data_source: Arc) -> PlanProperties { + fn compute_properties(data_source: &Arc) -> PlanProperties { PlanProperties::new( data_source.eq_properties(), data_source.output_partitioning(), diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 0dd9bdb87c40..a922c31bb917 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -152,33 +152,29 @@ impl MinMaxStatistics { .into_iter() .unzip(); - Self::new( - &min_max_sort_order, - &min_max_schema, - RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err( - |e| { - DataFusionError::ArrowError( - Box::new(e), - Some("\ncreate min batch".to_string()), - ) - }, - )?, - RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err( - |e| { - DataFusionError::ArrowError( - Box::new(e), - Some("\ncreate max batch".to_string()), - ) - }, - )?, - ) + let min_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), min_values) + .map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some("\ncreate min batch".to_string()), + ) + })?; + let max_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), max_values) + .map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some("\ncreate max batch".to_string()), + ) + })?; + + Self::new(&min_max_sort_order, &min_max_schema, &min_batch, &max_batch) } pub fn new( sort_order: &LexOrdering, schema: &SchemaRef, - min_values: RecordBatch, - max_values: RecordBatch, + min_values: &RecordBatch, + max_values: &RecordBatch, ) -> Result { use arrow::row::*; @@ -423,7 +419,7 @@ pub async fn get_statistics_with_limit( /// A new file group with summary statistics attached pub fn compute_file_group_statistics( file_group: FileGroup, - file_schema: SchemaRef, + file_schema: &SchemaRef, collect_stats: bool, ) -> Result { if !collect_stats { @@ -434,7 +430,7 @@ pub fn compute_file_group_statistics( let stats = file.statistics.as_ref()?; Some(stats.as_ref()) }); - let statistics = Statistics::try_merge_iter(file_group_stats, &file_schema)?; + let statistics = Statistics::try_merge_iter(file_group_stats, file_schema)?; Ok(file_group.with_statistics(Arc::new(statistics))) } @@ -458,18 +454,14 @@ pub fn compute_file_group_statistics( /// * The summary statistics across all file groups, aka all files summary statistics pub fn compute_all_files_statistics( file_groups: Vec, - table_schema: SchemaRef, + table_schema: &SchemaRef, collect_stats: bool, inexact_stats: bool, ) -> Result<(Vec, Statistics)> { let file_groups_with_stats = file_groups .into_iter() .map(|file_group| { - compute_file_group_statistics( - file_group, - Arc::clone(&table_schema), - collect_stats, - ) + compute_file_group_statistics(file_group, table_schema, collect_stats) }) .collect::>>()?; @@ -479,7 +471,7 @@ pub fn compute_all_files_statistics( .filter_map(|file_group| file_group.file_statistics(None)); let mut statistics = - Statistics::try_merge_iter(file_groups_statistics, &table_schema)?; + Statistics::try_merge_iter(file_groups_statistics, table_schema)?; if inexact_stats { statistics = statistics.to_inexact() diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 52cb17c10453..3fe6149b58b2 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -296,7 +296,7 @@ async fn hive_style_partitions_demuxer( let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?; // Next compute how the batch should be split up to take each distinct key to its own batch - let take_map = compute_take_arrays(&rb, all_partition_values); + let take_map = compute_take_arrays(&rb, &all_partition_values); // Divide up the batch into distinct partition key batches and send each batch for (part_key, mut builder) in take_map.into_iter() { @@ -516,7 +516,7 @@ fn compute_partition_keys_by_row<'a>( fn compute_take_arrays( rb: &RecordBatch, - all_partition_values: Vec>>, + all_partition_values: &[Vec>], ) -> HashMap, UInt64Builder> { let mut take_map = HashMap::new(); for i in 0..rb.num_rows() { From 482ab4a8af5f77784e84984b97290b1c9142ddb2 Mon Sep 17 00:00:00 2001 From: Aryan Bagade <73382554+AryanBagade@users.noreply.github.com> Date: Thu, 13 Nov 2025 19:37:03 -0800 Subject: [PATCH 2/2] Address review feedback: Keep public APIs unchanged - Reverted public API changes to maintain stability - Added #[expect(clippy::needless_pass_by_value)] to public methods - Kept all internal/private function improvements - RecordBatch clone is shallow, performance impact is minimal --- datafusion/catalog-listing/src/table.rs | 2 +- datafusion/core/src/physical_planner.rs | 3 +- datafusion/datasource/src/file_scan_config.rs | 11 ++--- datafusion/datasource/src/file_stream.rs | 2 +- datafusion/datasource/src/memory.rs | 45 +++++++++++-------- datafusion/datasource/src/statistics.rs | 23 ++++++---- 6 files changed, 51 insertions(+), 35 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 6a485743c47e..33d5c86bf88d 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -656,7 +656,7 @@ impl ListingTable { let file_groups = file_group.split_files(self.options.target_partitions); let (mut file_groups, mut stats) = compute_all_files_statistics( file_groups, - &self.schema(), + self.schema(), self.options.collect_stat, inexact_stats, )?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e7fcf61b360c..f10755a4594c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -482,7 +482,8 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - MemorySourceConfig::try_new_as_values(schema.inner(), &exprs)? as _ + MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)? + as _ } LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e2ba48bc7e3b..897206c3d56e 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1146,9 +1146,10 @@ impl PartitionColumnProjector { // to the right positions as deduced from `projected_schema` // - `file_batch`: batch read from the file, with internal projection applied // - `partition_values`: the list of partition values, one for each partition column + #[expect(clippy::needless_pass_by_value)] pub fn project( &mut self, - file_batch: &RecordBatch, + file_batch: RecordBatch, partition_values: &[ScalarValue], ) -> Result { let expected_cols = @@ -1673,7 +1674,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - &file_batch, + file_batch, &[ wrap_partition_value_in_dict(ScalarValue::from("2021")), wrap_partition_value_in_dict(ScalarValue::from("10")), @@ -1701,7 +1702,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - &file_batch, + file_batch, &[ wrap_partition_value_in_dict(ScalarValue::from("2021")), wrap_partition_value_in_dict(ScalarValue::from("10")), @@ -1731,7 +1732,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - &file_batch, + file_batch, &[ wrap_partition_value_in_dict(ScalarValue::from("2021")), wrap_partition_value_in_dict(ScalarValue::from("10")), @@ -1759,7 +1760,7 @@ mod tests { let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection - &file_batch, + file_batch, &[ ScalarValue::from("2021"), ScalarValue::from("10"), diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 85fc29830c57..0568b4cc4e5f 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -215,7 +215,7 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.stop(); let result = self .pc_projector - .project(&batch, partition_values) + .project(batch, partition_values) .map(|batch| match &mut self.remain { Some(remain) => { if *remain > batch.num_rows() { diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 97bfeea8b908..a604c56fa99a 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -282,9 +282,10 @@ impl MemorySourceConfig { } /// Create a new execution plan from a list of constant values (`ValuesExec`) + #[expect(clippy::needless_pass_by_value)] pub fn try_new_as_values( - schema: &SchemaRef, - data: &[Vec>], + schema: SchemaRef, + data: Vec>>, ) -> Result> { if data.is_empty() { return plan_err!("Values list cannot be empty"); @@ -326,21 +327,22 @@ impl MemorySourceConfig { .collect::>>()?; let batch = RecordBatch::try_new_with_options( - Arc::clone(schema), + Arc::clone(&schema), arrays, &RecordBatchOptions::new().with_row_count(Some(n_row)), )?; let partitions = vec![batch]; - Self::try_new_from_batches(schema, partitions) + Self::try_new_from_batches(Arc::clone(&schema), partitions) } /// Create a new plan using the provided schema and batches. /// /// Errors if any of the batches don't match the provided schema, or if no /// batches are provided. + #[expect(clippy::needless_pass_by_value)] pub fn try_new_from_batches( - schema: &SchemaRef, + schema: SchemaRef, batches: Vec, ) -> Result> { if batches.is_empty() { @@ -349,7 +351,7 @@ impl MemorySourceConfig { for batch in &batches { let batch_schema = batch.schema(); - if batch_schema != *schema { + if batch_schema != schema { return plan_err!( "Batch has invalid schema. Expected: {}, got: {}", schema, @@ -361,8 +363,8 @@ impl MemorySourceConfig { let partitions = vec![batches]; let source = Self { partitions, - schema: Arc::clone(schema), - projected_schema: Arc::clone(schema), + schema: Arc::clone(&schema), + projected_schema: Arc::clone(&schema), projection: None, sort_information: vec![], show_sizes: true, @@ -859,7 +861,7 @@ mod tests { let schema = batch.schema(); let batches = vec![batch.clone(), batch]; - let exec = MemorySourceConfig::try_new_from_batches(&schema, batches).unwrap(); + let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap(); assert_eq!(exec.fetch(), None); let exec = exec.with_fetch(Some(4)).unwrap(); @@ -881,7 +883,7 @@ mod tests { #[tokio::test] async fn values_empty_case() -> Result<()> { let schema = aggr_test_schema(); - let empty = MemorySourceConfig::try_new_as_values(&schema, &[]); + let empty = MemorySourceConfig::try_new_as_values(schema, vec![]); assert!(empty.is_err()); Ok(()) } @@ -891,15 +893,14 @@ mod tests { let batch = make_partition(7); let schema = batch.schema(); let batches = vec![batch.clone(), batch]; - let _exec = MemorySourceConfig::try_new_from_batches(&schema, batches).unwrap(); + let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap(); } #[test] fn new_exec_with_batches_empty() { let batch = make_partition(7); let schema = batch.schema(); - let _ = - MemorySourceConfig::try_new_from_batches(&schema, Vec::new()).unwrap_err(); + let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err(); } #[test] @@ -911,7 +912,7 @@ mod tests { Field::new("col0", DataType::UInt32, false), Field::new("col1", DataType::Utf8, false), ])); - let _ = MemorySourceConfig::try_new_from_batches(&invalid_schema, batches) + let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches) .unwrap_err(); } @@ -923,11 +924,17 @@ mod tests { DataType::UInt32, false, )])); - let data1 = vec![vec![lit(1u32)]]; - let _ = MemorySourceConfig::try_new_as_values(&schema, &data1).unwrap(); + let _ = MemorySourceConfig::try_new_as_values( + Arc::clone(&schema), + vec![vec![lit(1u32)]], + ) + .unwrap(); // Test that a null value is rejected - let data2 = vec![vec![lit(ScalarValue::UInt32(None))]]; - let _ = MemorySourceConfig::try_new_as_values(&schema, &data2).unwrap_err(); + let _ = MemorySourceConfig::try_new_as_values( + schema, + vec![vec![lit(ScalarValue::UInt32(None))]], + ) + .unwrap_err(); } #[test] @@ -940,7 +947,7 @@ mod tests { let rows = data.len(); let schema = Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])); - let values = MemorySourceConfig::try_new_as_values(&schema, &data)?; + let values = MemorySourceConfig::try_new_as_values(schema, data)?; assert_eq!( values.partition_statistics(None)?, diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index a922c31bb917..980677e488b8 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -167,14 +167,15 @@ impl MinMaxStatistics { ) })?; - Self::new(&min_max_sort_order, &min_max_schema, &min_batch, &max_batch) + Self::new(&min_max_sort_order, &min_max_schema, min_batch, max_batch) } + #[expect(clippy::needless_pass_by_value)] pub fn new( sort_order: &LexOrdering, schema: &SchemaRef, - min_values: &RecordBatch, - max_values: &RecordBatch, + min_values: RecordBatch, + max_values: RecordBatch, ) -> Result { use arrow::row::*; @@ -417,9 +418,10 @@ pub async fn get_statistics_with_limit( /// /// # Returns /// A new file group with summary statistics attached +#[expect(clippy::needless_pass_by_value)] pub fn compute_file_group_statistics( file_group: FileGroup, - file_schema: &SchemaRef, + file_schema: SchemaRef, collect_stats: bool, ) -> Result { if !collect_stats { @@ -430,7 +432,7 @@ pub fn compute_file_group_statistics( let stats = file.statistics.as_ref()?; Some(stats.as_ref()) }); - let statistics = Statistics::try_merge_iter(file_group_stats, file_schema)?; + let statistics = Statistics::try_merge_iter(file_group_stats, &file_schema)?; Ok(file_group.with_statistics(Arc::new(statistics))) } @@ -452,16 +454,21 @@ pub fn compute_file_group_statistics( /// A tuple containing: /// * The processed file groups with their individual statistics attached /// * The summary statistics across all file groups, aka all files summary statistics +#[expect(clippy::needless_pass_by_value)] pub fn compute_all_files_statistics( file_groups: Vec, - table_schema: &SchemaRef, + table_schema: SchemaRef, collect_stats: bool, inexact_stats: bool, ) -> Result<(Vec, Statistics)> { let file_groups_with_stats = file_groups .into_iter() .map(|file_group| { - compute_file_group_statistics(file_group, table_schema, collect_stats) + compute_file_group_statistics( + file_group, + Arc::clone(&table_schema), + collect_stats, + ) }) .collect::>>()?; @@ -471,7 +478,7 @@ pub fn compute_all_files_statistics( .filter_map(|file_group| file_group.file_statistics(None)); let mut statistics = - Statistics::try_merge_iter(file_groups_statistics, table_schema)?; + Statistics::try_merge_iter(file_groups_statistics, &table_schema)?; if inexact_stats { statistics = statistics.to_inexact()