From d62cef1bae94683522704fe7503c9c4810b76973 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 21 Nov 2025 16:02:51 +0800 Subject: [PATCH] enforce clippy::needless_pass_by_value to datafusion-physical-plan --- .../src/aggregates/group_values/row.rs | 14 ++--- .../physical-plan/src/aggregates/mod.rs | 11 ++-- .../src/aggregates/no_grouping.rs | 12 ++-- .../physical-plan/src/aggregates/row_hash.rs | 32 +++++----- .../physical-plan/src/aggregates/topk/heap.rs | 14 ++--- .../src/aggregates/topk_stream.rs | 11 ++-- datafusion/physical-plan/src/analyze.rs | 10 +-- .../physical-plan/src/execution_plan.rs | 13 ++++ datafusion/physical-plan/src/filter.rs | 19 +++--- .../physical-plan/src/filter_pushdown.rs | 1 + .../physical-plan/src/joins/hash_join/exec.rs | 24 ++++--- .../src/joins/nested_loop_join.rs | 51 +++++++-------- .../piecewise_merge_join/classic_join.rs | 20 +++--- .../src/joins/sort_merge_join/exec.rs | 2 +- .../src/joins/sort_merge_join/stream.rs | 14 ++--- .../src/joins/symmetric_hash_join.rs | 24 +++---- .../physical-plan/src/joins/test_utils.rs | 6 +- datafusion/physical-plan/src/joins/utils.rs | 32 ++++++---- datafusion/physical-plan/src/lib.rs | 3 + datafusion/physical-plan/src/projection.rs | 4 +- .../physical-plan/src/recursive_query.rs | 6 +- .../physical-plan/src/repartition/mod.rs | 63 ++++++++++--------- datafusion/physical-plan/src/sorts/stream.rs | 6 +- datafusion/physical-plan/src/spill/mod.rs | 1 + datafusion/physical-plan/src/test.rs | 5 +- datafusion/physical-plan/src/topk/mod.rs | 8 ++- datafusion/physical-plan/src/union.rs | 30 +++++---- datafusion/physical-plan/src/unnest.rs | 6 +- .../src/windows/window_agg_exec.rs | 6 +- 29 files changed, 239 insertions(+), 209 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index 34893fcc4ed9..d632a7f0ad8a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -236,8 +236,7 @@ impl GroupValues for GroupValuesRows { // https://github.com/apache/datafusion/issues/7647 for (field, array) in self.schema.fields.iter().zip(&mut output) { let expected = field.data_type(); - *array = - dictionary_encode_if_necessary(Arc::::clone(array), expected)?; + *array = dictionary_encode_if_necessary(array, expected)?; } self.group_values = Some(group_values); @@ -259,7 +258,7 @@ impl GroupValues for GroupValuesRows { } fn dictionary_encode_if_necessary( - array: ArrayRef, + array: &ArrayRef, expected: &DataType, ) -> Result { match (expected, array.data_type()) { @@ -269,10 +268,7 @@ fn dictionary_encode_if_necessary( .iter() .zip(struct_array.columns()) .map(|(expected_field, column)| { - dictionary_encode_if_necessary( - Arc::::clone(column), - expected_field.data_type(), - ) + dictionary_encode_if_necessary(column, expected_field.data_type()) }) .collect::>>()?; @@ -289,13 +285,13 @@ fn dictionary_encode_if_necessary( Arc::::clone(expected_field), list.offsets().clone(), dictionary_encode_if_necessary( - Arc::::clone(list.values()), + list.values(), expected_field.data_type(), )?, list.nulls().cloned(), )?)) } (DataType::Dictionary(_, _), _) => Ok(cast(array.as_ref(), expected)?), - (_, _) => Ok(Arc::::clone(&array)), + (_, _) => Ok(Arc::::clone(array)), } } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5fafce0bea16..6bf59fd3d303 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -626,7 +626,7 @@ impl AggregateExec { fn execute_typed( &self, partition: usize, - context: Arc, + context: &Arc, ) -> Result { // no group by at all if self.group_by.expr.is_empty() { @@ -761,7 +761,7 @@ impl AggregateExec { &self.input_order_mode } - fn statistics_inner(&self, child_statistics: Statistics) -> Result { + fn statistics_inner(&self, child_statistics: &Statistics) -> Result { // TODO stats: group expressions: // - once expressions will be able to compute their own stats, use it here // - case where we group by on a column for which with have the `distinct` stat @@ -1020,7 +1020,7 @@ impl ExecutionPlan for AggregateExec { partition: usize, context: Arc, ) -> Result { - self.execute_typed(partition, context) + self.execute_typed(partition, &context) .map(|stream| stream.into()) } @@ -1033,7 +1033,8 @@ impl ExecutionPlan for AggregateExec { } fn partition_statistics(&self, partition: Option) -> Result { - self.statistics_inner(self.input().partition_statistics(partition)?) + let child_statistics = self.input().partition_statistics(partition)?; + self.statistics_inner(&child_statistics) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -2220,7 +2221,7 @@ mod tests { Arc::clone(&input_schema), )?); - let stream = partial_aggregate.execute_typed(0, Arc::clone(&task_ctx))?; + let stream = partial_aggregate.execute_typed(0, &task_ctx)?; // ensure that we really got the version we wanted match version { diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index fc398427ac1f..a4b202f1ae2a 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -68,14 +68,14 @@ impl AggregateStream { /// Create a new AggregateStream pub fn new( agg: &AggregateExec, - context: Arc, + context: &Arc, partition: usize, ) -> Result { let agg_schema = Arc::clone(&agg.schema); let agg_filter_expr = agg.filter_expr.clone(); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); - let input = agg.input.execute(partition, Arc::clone(&context))?; + let input = agg.input.execute(partition, Arc::clone(context))?; let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?; let filter_expressions = match agg.mode { @@ -115,7 +115,7 @@ impl AggregateStream { let timer = elapsed_compute.timer(); let result = aggregate_batch( &this.mode, - batch, + &batch, &mut this.accumulators, &this.aggregate_expressions, &this.filter_expressions, @@ -195,7 +195,7 @@ impl RecordBatchStream for AggregateStream { /// TODO: Make this a member function fn aggregate_batch( mode: &AggregateMode, - batch: RecordBatch, + batch: &RecordBatch, accumulators: &mut [AccumulatorItem], expressions: &[Vec>], filters: &[Option>], @@ -215,8 +215,8 @@ fn aggregate_batch( .try_for_each(|((accum, expr), filter)| { // 1.2 let batch = match filter { - Some(filter) => Cow::Owned(batch_filter(&batch, filter)?), - None => Cow::Borrowed(&batch), + Some(filter) => Cow::Owned(batch_filter(batch, filter)?), + None => Cow::Borrowed(batch), }; // 1.3 diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 2e1b70da284d..3cbe4828dd4d 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -445,7 +445,7 @@ impl GroupedHashAggregateStream { /// Create a new GroupedHashAggregateStream pub fn new( agg: &AggregateExec, - context: Arc, + context: &Arc, partition: usize, ) -> Result { debug!("Creating GroupedHashAggregateStream"); @@ -454,7 +454,7 @@ impl GroupedHashAggregateStream { let agg_filter_expr = agg.filter_expr.clone(); let batch_size = context.session_config().batch_size(); - let input = agg.input.execute(partition, Arc::clone(&context))?; + let input = agg.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let group_by_metrics = GroupByMetrics::new(&agg.metrics, partition); @@ -685,7 +685,7 @@ impl Stream for GroupedHashAggregateStream { } // Do the grouping - self.group_aggregate_batch(batch)?; + self.group_aggregate_batch(&batch)?; self.update_skip_aggregation_probe(input_rows); @@ -728,7 +728,7 @@ impl Stream for GroupedHashAggregateStream { self.spill_previous_if_necessary(&batch)?; // Do the grouping - self.group_aggregate_batch(batch)?; + self.group_aggregate_batch(&batch)?; // If we can begin emitting rows, do so, // otherwise keep consuming input @@ -777,7 +777,7 @@ impl Stream for GroupedHashAggregateStream { if let Some(probe) = self.skip_aggregation_probe.as_mut() { probe.record_skipped(&batch); } - let states = self.transform_to_states(batch)?; + let states = self.transform_to_states(&batch)?; return Poll::Ready(Some(Ok( states.record_output(&self.baseline_metrics) ))); @@ -854,12 +854,12 @@ impl RecordBatchStream for GroupedHashAggregateStream { impl GroupedHashAggregateStream { /// Perform group-by aggregation for the given [`RecordBatch`]. - fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> { + fn group_aggregate_batch(&mut self, batch: &RecordBatch) -> Result<()> { // Evaluate the grouping expressions let group_by_values = if self.spill_state.is_stream_merging { - evaluate_group_by(&self.spill_state.merging_group_by, &batch)? + evaluate_group_by(&self.spill_state.merging_group_by, batch)? } else { - evaluate_group_by(&self.group_by, &batch)? + evaluate_group_by(&self.group_by, batch)? }; // Only create the timer if there are actual aggregate arguments to evaluate @@ -876,18 +876,18 @@ impl GroupedHashAggregateStream { // Evaluate the aggregation expressions. let input_values = if self.spill_state.is_stream_merging { - evaluate_many(&self.spill_state.merging_aggregate_arguments, &batch)? + evaluate_many(&self.spill_state.merging_aggregate_arguments, batch)? } else { - evaluate_many(&self.aggregate_arguments, &batch)? + evaluate_many(&self.aggregate_arguments, batch)? }; drop(timer); // Evaluate the filter expressions, if any, against the inputs let filter_values = if self.spill_state.is_stream_merging { let filter_expressions = vec![None; self.accumulators.len()]; - evaluate_optional(&filter_expressions, &batch)? + evaluate_optional(&filter_expressions, batch)? } else { - evaluate_optional(&self.filter_expressions, &batch)? + evaluate_optional(&self.filter_expressions, batch)? }; for group_values in &group_by_values { @@ -1217,10 +1217,10 @@ impl GroupedHashAggregateStream { } /// Transforms input batch to intermediate aggregate state, without grouping it - fn transform_to_states(&self, batch: RecordBatch) -> Result { - let mut group_values = evaluate_group_by(&self.group_by, &batch)?; - let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; - let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + fn transform_to_states(&self, batch: &RecordBatch) -> Result { + let mut group_values = evaluate_group_by(&self.group_by, batch)?; + let input_values = evaluate_many(&self.aggregate_arguments, batch)?; + let filter_values = evaluate_optional(&self.filter_expressions, batch)?; assert_eq_or_internal_err!( group_values.len(), diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs b/datafusion/physical-plan/src/aggregates/topk/heap.rs index 23ccf5e17ef6..83d76a919e4f 100644 --- a/datafusion/physical-plan/src/aggregates/topk/heap.rs +++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs @@ -326,13 +326,7 @@ impl TopKHeap { } } - fn _tree_print( - &self, - idx: usize, - prefix: String, - is_tail: bool, - output: &mut String, - ) { + fn _tree_print(&self, idx: usize, prefix: &str, is_tail: bool, output: &mut String) { if let Some(Some(hi)) = self.heap.get(idx) { let connector = if idx != 0 { if is_tail { @@ -357,10 +351,10 @@ impl TopKHeap { let right_exists = right_idx < self.len; if left_exists { - self._tree_print(left_idx, child_prefix.clone(), !right_exists, output); + self._tree_print(left_idx, &child_prefix, !right_exists, output); } if right_exists { - self._tree_print(right_idx, child_prefix, true, output); + self._tree_print(right_idx, &child_prefix, true, output); } } } @@ -370,7 +364,7 @@ impl Display for TopKHeap { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut output = String::new(); if !self.heap.is_empty() { - self._tree_print(0, String::new(), true, &mut output); + self._tree_print(0, "", true, &mut output); } write!(f, "{output}") } diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index eb1b7543cbfd..c706b48e348e 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -54,13 +54,13 @@ pub struct GroupedTopKAggregateStream { impl GroupedTopKAggregateStream { pub fn new( aggr: &AggregateExec, - context: Arc, + context: &Arc, partition: usize, limit: usize, ) -> Result { let agg_schema = Arc::clone(&aggr.schema); let group_by = aggr.group_by.clone(); - let input = aggr.input.execute(partition, Arc::clone(&context))?; + let input = aggr.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition); let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition); let aggregate_arguments = @@ -97,11 +97,12 @@ impl RecordBatchStream for GroupedTopKAggregateStream { } impl GroupedTopKAggregateStream { - fn intern(&mut self, ids: ArrayRef, vals: ArrayRef) -> Result<()> { + fn intern(&mut self, ids: &ArrayRef, vals: &ArrayRef) -> Result<()> { let _timer = self.group_by_metrics.time_calculating_group_ids.timer(); let len = ids.len(); - self.priority_map.set_batch(ids, Arc::clone(&vals)); + self.priority_map + .set_batch(Arc::clone(ids), Arc::clone(vals)); let has_nulls = vals.null_count() > 0; for row_idx in 0..len { @@ -167,7 +168,7 @@ impl Stream for GroupedTopKAggregateStream { let input_values = Arc::clone(&input_values[0][0]); // iterate over each column of group_by values - (*self).intern(group_by_values, input_values)?; + (*self).intern(&group_by_values, &input_values)?; } // inner is done, emit all rows and switch to producing output None => { diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index f0a3da7555aa..01f997f23d6a 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -206,8 +206,8 @@ impl ExecutionPlan for AnalyzeExec { show_statistics, total_rows, duration, - captured_input, - captured_schema, + &captured_input, + &captured_schema, &metric_types, ) }; @@ -225,8 +225,8 @@ fn create_output_batch( show_statistics: bool, total_rows: usize, duration: std::time::Duration, - input: Arc, - schema: SchemaRef, + input: &Arc, + schema: &SchemaRef, metric_types: &[MetricType], ) -> Result { let mut type_builder = StringBuilder::with_capacity(1, 1024); @@ -262,7 +262,7 @@ fn create_output_batch( } RecordBatch::try_new( - schema, + Arc::clone(schema), vec![ Arc::new(type_builder.finish()), Arc::new(plan_builder.finish()), diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 245342f5cd12..553e3e26cec0 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -1118,6 +1118,7 @@ pub fn check_default_invariants( /// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s /// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee /// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee +#[expect(clippy::needless_pass_by_value)] pub fn need_data_exchange(plan: Arc) -> bool { plan.properties().evaluation_type == EvaluationType::Eager } @@ -1172,6 +1173,10 @@ pub async fn collect( /// /// Dropping the stream will abort the execution of the query, and free up /// any allocated resources +#[expect( + clippy::needless_pass_by_value, + reason = "Public API that historically takes owned Arcs" +)] pub fn execute_stream( plan: Arc, context: Arc, @@ -1236,6 +1241,10 @@ pub async fn collect_partitioned( /// /// Dropping the stream will abort the execution of the query, and free up /// any allocated resources +#[expect( + clippy::needless_pass_by_value, + reason = "Public API that historically takes owned Arcs" +)] pub fn execute_stream_partitioned( plan: Arc, context: Arc, @@ -1267,6 +1276,10 @@ pub fn execute_stream_partitioned( /// violate the `not null` constraints specified in the `sink_schema`. If there are /// such columns, it wraps the resulting stream to enforce the `not null` constraints /// by invoking the [`check_not_null_constraints`] function on each batch of the stream. +#[expect( + clippy::needless_pass_by_value, + reason = "Public API that historically takes owned Arcs" +)] pub fn execute_input_stream( input: Arc, sink_schema: SchemaRef, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8a05bfcee3ab..58185c8cdf5b 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -95,6 +95,7 @@ pub struct FilterExec { impl FilterExec { /// Create a FilterExec on an input + #[expect(clippy::needless_pass_by_value)] pub fn try_new( predicate: Arc, input: Arc, @@ -204,12 +205,12 @@ impl FilterExec { /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. fn statistics_helper( - schema: SchemaRef, + schema: &SchemaRef, input_stats: Statistics, predicate: &Arc, default_selectivity: u8, ) -> Result { - if !check_support(predicate, &schema) { + if !check_support(predicate, schema) { let selectivity = default_selectivity as f64 / 100.0; let mut stats = input_stats.to_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); @@ -221,12 +222,10 @@ impl FilterExec { let num_rows = input_stats.num_rows; let total_byte_size = input_stats.total_byte_size; - let input_analysis_ctx = AnalysisContext::try_from_statistics( - &schema, - &input_stats.column_statistics, - )?; + let input_analysis_ctx = + AnalysisContext::try_from_statistics(schema, &input_stats.column_statistics)?; - let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; + let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?; // Estimate (inexact) selectivity of predicate let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); @@ -283,8 +282,9 @@ impl FilterExec { ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: + let schema = input.schema(); let stats = Self::statistics_helper( - input.schema(), + &schema, input.partition_statistics(None)?, predicate, default_selectivity, @@ -443,8 +443,9 @@ impl ExecutionPlan for FilterExec { fn partition_statistics(&self, partition: Option) -> Result { let input_stats = self.input.partition_statistics(partition)?; + let schema = self.schema(); let stats = Self::statistics_helper( - self.schema(), + &schema, input_stats, self.predicate(), self.default_selectivity, diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index f6b1b7448f88..1274e954eaeb 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -412,6 +412,7 @@ impl FilterDescription { /// This method automatically determines filter routing based on column analysis: /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down /// - Otherwise, it cannot be pushed down to that child + #[expect(clippy::needless_pass_by_value)] pub fn from_children( parent_filters: Vec>, children: &[&Arc], diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 680e4368ae70..03bf516eadd1 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -419,7 +419,7 @@ impl HashJoinExec { let cache = Self::compute_properties( &left, &right, - Arc::clone(&join_schema), + &join_schema, *join_type, &on, partition_mode, @@ -550,7 +550,7 @@ impl HashJoinExec { fn compute_properties( left: &Arc, right: &Arc, - schema: SchemaRef, + schema: &SchemaRef, join_type: JoinType, on: JoinOnRef, mode: PartitionMode, @@ -561,7 +561,7 @@ impl HashJoinExec { left.equivalence_properties().clone(), right.equivalence_properties().clone(), &join_type, - Arc::clone(&schema), + Arc::clone(schema), &Self::maintains_input_order(join_type), Some(Self::probe_side()), on, @@ -605,9 +605,8 @@ impl HashJoinExec { // If contains projection, update the PlanProperties. if let Some(projection) = projection { // construct a map from the input expressions to the output expression of the Projection - let projection_mapping = - ProjectionMapping::from_indices(projection, &schema)?; - let out_schema = project_schema(&schema, Some(projection))?; + let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; + let out_schema = project_schema(schema, Some(projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -852,7 +851,7 @@ impl ExecutionPlan for HashJoinExec { cache: Self::compute_properties( &children[0], &children[1], - Arc::clone(&self.join_schema), + &self.join_schema, self.join_type, &self.on, self.mode, @@ -1045,7 +1044,7 @@ impl ExecutionPlan for HashJoinExec { let stats = estimate_join_statistics( self.left.partition_statistics(None)?, self.right.partition_statistics(None)?, - self.on.clone(), + &self.on, &self.join_type, &self.join_schema, )?; @@ -1065,6 +1064,7 @@ impl ExecutionPlan for HashJoinExec { return Ok(None); } + let schema = self.schema(); if let Some(JoinData { projected_left_child, projected_right_child, @@ -1075,7 +1075,7 @@ impl ExecutionPlan for HashJoinExec { self.left(), self.right(), self.on(), - self.schema(), + &schema, self.filter(), )? { Ok(Some(Arc::new(HashJoinExec::try_new( @@ -1299,10 +1299,8 @@ impl BuildSideState { bounds_accumulators: should_compute_dynamic_filters .then(|| { on_left - .iter() - .map(|expr| { - CollectLeftAccumulator::try_new(Arc::clone(expr), schema) - }) + .into_iter() + .map(|expr| CollectLeftAccumulator::try_new(expr, schema)) .collect::>>() }) .transpose()?, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 26533ce4c7e6..f16e2220dfbe 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -219,7 +219,7 @@ impl NestedLoopJoinExec { let cache = Self::compute_properties( &left, &right, - Arc::clone(&join_schema), + &join_schema, *join_type, projection.as_ref(), )?; @@ -266,7 +266,7 @@ impl NestedLoopJoinExec { fn compute_properties( left: &Arc, right: &Arc, - schema: SchemaRef, + schema: &SchemaRef, join_type: JoinType, projection: Option<&Vec>, ) -> Result { @@ -275,7 +275,7 @@ impl NestedLoopJoinExec { left.equivalence_properties().clone(), right.equivalence_properties().clone(), &join_type, - Arc::clone(&schema), + Arc::clone(schema), &Self::maintains_input_order(join_type), None, // No on columns in nested loop join @@ -310,9 +310,8 @@ impl NestedLoopJoinExec { if let Some(projection) = projection { // construct a map from the input expressions to the output expression of the Projection - let projection_mapping = - ProjectionMapping::from_indices(projection, &schema)?; - let out_schema = project_schema(&schema, Some(projection))?; + let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; + let out_schema = project_schema(schema, Some(projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -555,10 +554,11 @@ impl ExecutionPlan for NestedLoopJoinExec { if partition.is_some() { return Ok(Statistics::new_unknown(&self.schema())); } + let join_columns = Vec::new(); estimate_join_statistics( self.left.partition_statistics(None)?, self.right.partition_statistics(None)?, - vec![], + &join_columns, &self.join_type, &self.schema(), ) @@ -576,6 +576,7 @@ impl ExecutionPlan for NestedLoopJoinExec { return Ok(None); } + let schema = self.schema(); if let Some(JoinData { projected_left_child, projected_right_child, @@ -586,7 +587,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self.left(), self.right(), &[], - self.schema(), + &schema, self.filter(), )? { Ok(Some(Arc::new(NestedLoopJoinExec::try_new( @@ -1656,11 +1657,12 @@ impl NestedLoopJoinStream { } let bitmap_sliced = BooleanArray::new(bitmap_sliced.finish(), None); + let right_schema = self.right_data.schema(); build_unmatched_batch( - Arc::clone(&self.output_schema), + &self.output_schema, &left_batch_sliced, bitmap_sliced, - self.right_data.schema(), + &right_schema, &self.column_indices, self.join_type, JoinSide::Left, @@ -1683,10 +1685,10 @@ impl NestedLoopJoinStream { let left_schema = left_data.batch().schema(); let res = build_unmatched_batch( - Arc::clone(&self.output_schema), + &self.output_schema, &cur_right_batch, right_batch_bitmap, - left_schema, + &left_schema, &self.column_indices, self.join_type, JoinSide::Right, @@ -1973,7 +1975,7 @@ fn build_row_join_batch( /// If Some, that's the result batch /// If None, it's not for this special case. Continue execution. fn build_unmatched_batch_empty_schema( - output_schema: SchemaRef, + output_schema: &SchemaRef, batch_bitmap: &BooleanArray, // For left/right/full joins, it needs to fill nulls for another side join_type: JoinType, @@ -1991,7 +1993,7 @@ fn build_unmatched_batch_empty_schema( if output_schema.fields().is_empty() { Ok(Some(create_record_batch_with_empty_schema( - Arc::clone(&output_schema), + Arc::clone(output_schema), result_size, )?)) } else { @@ -2051,11 +2053,11 @@ fn create_record_batch_with_empty_schema( /// Null(bool) Null(Int32) 1 /// Null(bool) Null(Int32) 3 fn build_unmatched_batch( - output_schema: SchemaRef, + output_schema: &SchemaRef, batch: &RecordBatch, batch_bitmap: BooleanArray, // For left/right/full joins, it needs to fill nulls for another side - another_side_schema: SchemaRef, + another_side_schema: &SchemaRef, col_indices: &[ColumnIndex], join_type: JoinType, batch_side: JoinSide, @@ -2065,11 +2067,9 @@ fn build_unmatched_batch( debug_assert_ne!(batch_side, JoinSide::None); // Handle special case (see function comment) - if let Some(batch) = build_unmatched_batch_empty_schema( - Arc::clone(&output_schema), - &batch_bitmap, - join_type, - )? { + if let Some(batch) = + build_unmatched_batch_empty_schema(output_schema, &batch_bitmap, join_type)? + { return Ok(Some(batch)); } @@ -2116,7 +2116,7 @@ fn build_unmatched_batch( debug_assert_ne!(batch_side, JoinSide::None); let opposite_side = batch_side.negate(); - build_row_join_batch(&output_schema, &left_null_batch, 0, batch, Some(flipped_bitmap), col_indices, opposite_side) + build_row_join_batch(output_schema, &left_null_batch, 0, batch, Some(flipped_bitmap), col_indices, opposite_side) }, JoinType::RightSemi | JoinType::RightAnti | JoinType::LeftSemi | JoinType::LeftAnti => { @@ -2149,7 +2149,7 @@ fn build_unmatched_batch( columns.push(filtered_col); } - Ok(Some(RecordBatch::try_new(Arc::clone(&output_schema), columns)?)) + Ok(Some(RecordBatch::try_new(Arc::clone(output_schema), columns)?)) }, JoinType::RightMark | JoinType::LeftMark => { if join_type == JoinType::RightMark { @@ -2181,7 +2181,7 @@ fn build_unmatched_batch( } } - Ok(Some(RecordBatch::try_new(Arc::clone(&output_schema), columns)?)) + Ok(Some(RecordBatch::try_new(Arc::clone(output_schema), columns)?)) } _ => internal_err!("If batch is at right side, this function must be handling Full/Right/RightSemi/RightAnti/RightMark joins"), } @@ -2246,7 +2246,8 @@ pub(crate) mod tests { source = source.try_with_sort_information(vec![ordering]).unwrap(); } - Arc::new(TestMemoryExec::update_cache(Arc::new(source))) + let source = Arc::new(source); + Arc::new(TestMemoryExec::update_cache(&source)) } fn build_left_table() -> Arc { diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs index 646905e0d787..695e73f109f3 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs @@ -280,7 +280,7 @@ impl ClassicPWMJStream { let batch = resolve_classic_join( buffered_side, stream_batch, - Arc::clone(&self.schema), + &self.schema, self.operator, self.sort_option, self.join_type, @@ -455,7 +455,7 @@ impl Stream for ClassicPWMJStream { fn resolve_classic_join( buffered_side: &mut BufferedSideReadyState, stream_batch: &SortedStreamBatch, - join_schema: Arc, + join_schema: &SchemaRef, operator: Operator, sort_options: SortOptions, join_type: JoinType, @@ -504,7 +504,7 @@ fn resolve_classic_join( buffered_side, stream_batch, join_type, - Arc::clone(&join_schema), + join_schema, )?; batch_process_state.output_batches.push_batch(batch)?; @@ -532,7 +532,7 @@ fn resolve_classic_join( buffered_side, stream_batch, join_type, - Arc::clone(&join_schema), + join_schema, )?; // Flush batch and update pointers if we have a completed batch @@ -579,14 +579,14 @@ fn resolve_classic_join( let batch = create_unmatched_batch( &mut batch_process_state.unmatched_indices, stream_batch, - Arc::clone(&join_schema), + join_schema, )?; batch_process_state.output_batches.push_batch(batch)?; } batch_process_state.continue_process = false; - Ok(RecordBatch::new_empty(Arc::clone(&join_schema))) + Ok(RecordBatch::new_empty(Arc::clone(join_schema))) } // Builds a record batch from indices ranges on the buffered and streamed side. @@ -599,7 +599,7 @@ fn build_matched_indices_and_set_buffered_bitmap( buffered_side: &mut BufferedSideReadyState, stream_batch: &SortedStreamBatch, join_type: JoinType, - join_schema: Arc, + join_schema: &SchemaRef, ) -> Result { // Mark the buffered indices as visited if need_produce_result_in_final(join_type) { @@ -622,7 +622,7 @@ fn build_matched_indices_and_set_buffered_bitmap( buffered_columns.extend(streamed_columns); Ok(RecordBatch::try_new( - Arc::clone(&join_schema), + Arc::clone(join_schema), buffered_columns, )?) } @@ -631,7 +631,7 @@ fn build_matched_indices_and_set_buffered_bitmap( fn create_unmatched_batch( streamed_indices: &mut PrimitiveBuilder, stream_batch: &SortedStreamBatch, - join_schema: Arc, + join_schema: &SchemaRef, ) -> Result { let streamed_indices = streamed_indices.finish(); let new_stream_batch = take_record_batch(&stream_batch.batch, &streamed_indices)?; @@ -649,7 +649,7 @@ fn create_unmatched_batch( buffered_columns.extend(streamed_columns); Ok(RecordBatch::try_new( - Arc::clone(&join_schema), + Arc::clone(join_schema), buffered_columns, )?) } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index bc139f43b703..daf47603c217 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -532,7 +532,7 @@ impl ExecutionPlan for SortMergeJoinExec { estimate_join_statistics( self.left.partition_statistics(None)?, self.right.partition_statistics(None)?, - self.on.clone(), + &self.on, &self.join_type, &self.schema, ) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 0325e37d42e7..9acaba1c894c 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -857,7 +857,7 @@ impl SortMergeJoinStream { } } - fn free_reservation(&mut self, buffered_batch: BufferedBatch) -> Result<()> { + fn free_reservation(&mut self, buffered_batch: &BufferedBatch) -> Result<()> { // Shrink memory usage for in-memory batches only if let BufferedBatchState::InMemory(_) = buffered_batch.batch { self.reservation @@ -915,7 +915,7 @@ impl SortMergeJoinStream { self.buffered_data.batches.pop_front() { self.produce_buffered_not_matched(&mut buffered_batch)?; - self.free_reservation(buffered_batch)?; + self.free_reservation(&buffered_batch)?; } } else { // If the head batch is not fully processed, break the loop. @@ -1533,16 +1533,16 @@ impl SortMergeJoinStream { &out_mask }; - self.filter_record_batch_by_join_type(record_batch, corrected_mask) + self.filter_record_batch_by_join_type(&record_batch, corrected_mask) } fn filter_record_batch_by_join_type( &mut self, - record_batch: RecordBatch, + record_batch: &RecordBatch, corrected_mask: &BooleanArray, ) -> Result { let mut filtered_record_batch = - filter_record_batch(&record_batch, corrected_mask)?; + filter_record_batch(record_batch, corrected_mask)?; let left_columns_length = self.streamed_schema.fields.len(); let right_columns_length = self.buffered_schema.fields.len(); @@ -1551,7 +1551,7 @@ impl SortMergeJoinStream { JoinType::Left | JoinType::LeftMark | JoinType::Right | JoinType::RightMark ) { let null_mask = compute::not(corrected_mask)?; - let null_joined_batch = filter_record_batch(&record_batch, &null_mask)?; + let null_joined_batch = filter_record_batch(record_batch, &null_mask)?; let mut right_columns = create_unmatched_columns( self.join_type, @@ -1609,7 +1609,7 @@ impl SortMergeJoinStream { // Find rows which joined by key but Filter predicate evaluated as false let joined_filter_not_matched_mask = compute::not(corrected_mask)?; let joined_filter_not_matched_batch = - filter_record_batch(&record_batch, &joined_filter_not_matched_mask)?; + filter_record_batch(record_batch, &joined_filter_not_matched_mask)?; // Add left unmatched rows adding the right side as nulls let right_null_columns = self diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 8b1d1d2e1171..8b3677713a46 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -1399,7 +1399,7 @@ impl SymmetricHashJoinStream { return Poll::Ready(Ok(StatefulStreamResult::Continue)); } self.set_state(SHJStreamState::PullLeft); - Poll::Ready(self.process_batch_from_right(batch)) + Poll::Ready(self.process_batch_from_right(&batch)) } Some(Err(e)) => Poll::Ready(Err(e)), None => { @@ -1428,7 +1428,7 @@ impl SymmetricHashJoinStream { return Poll::Ready(Ok(StatefulStreamResult::Continue)); } self.set_state(SHJStreamState::PullRight); - Poll::Ready(self.process_batch_from_left(batch)) + Poll::Ready(self.process_batch_from_left(&batch)) } Some(Err(e)) => Poll::Ready(Err(e)), None => { @@ -1457,7 +1457,7 @@ impl SymmetricHashJoinStream { if batch.num_rows() == 0 { return Poll::Ready(Ok(StatefulStreamResult::Continue)); } - Poll::Ready(self.process_batch_after_right_end(batch)) + Poll::Ready(self.process_batch_after_right_end(&batch)) } Some(Err(e)) => Poll::Ready(Err(e)), None => { @@ -1488,7 +1488,7 @@ impl SymmetricHashJoinStream { if batch.num_rows() == 0 { return Poll::Ready(Ok(StatefulStreamResult::Continue)); } - Poll::Ready(self.process_batch_after_left_end(batch)) + Poll::Ready(self.process_batch_after_left_end(&batch)) } Some(Err(e)) => Poll::Ready(Err(e)), None => { @@ -1518,7 +1518,7 @@ impl SymmetricHashJoinStream { fn process_batch_from_right( &mut self, - batch: RecordBatch, + batch: &RecordBatch, ) -> Result>> { self.perform_join_for_given_side(batch, JoinSide::Right) .map(|maybe_batch| { @@ -1532,7 +1532,7 @@ impl SymmetricHashJoinStream { fn process_batch_from_left( &mut self, - batch: RecordBatch, + batch: &RecordBatch, ) -> Result>> { self.perform_join_for_given_side(batch, JoinSide::Left) .map(|maybe_batch| { @@ -1546,14 +1546,14 @@ impl SymmetricHashJoinStream { fn process_batch_after_left_end( &mut self, - right_batch: RecordBatch, + right_batch: &RecordBatch, ) -> Result>> { self.process_batch_from_right(right_batch) } fn process_batch_after_right_end( &mut self, - left_batch: RecordBatch, + left_batch: &RecordBatch, ) -> Result>> { self.process_batch_from_left(left_batch) } @@ -1632,7 +1632,7 @@ impl SymmetricHashJoinStream { /// 5. Combines the results and returns a combined batch or `None` if no batch was produced. fn perform_join_for_given_side( &mut self, - probe_batch: RecordBatch, + probe_batch: &RecordBatch, probe_side: JoinSide, ) -> Result> { let ( @@ -1662,7 +1662,7 @@ impl SymmetricHashJoinStream { probe_side_metrics.input_batches.add(1); probe_side_metrics.input_rows.add(probe_batch.num_rows()); // Update the internal state of the hash joiner for the build side: - probe_hash_joiner.update_internal_state(&probe_batch, &self.random_state)?; + probe_hash_joiner.update_internal_state(probe_batch, &self.random_state)?; // Join the two sides: let equal_result = join_with_probe_batch( build_hash_joiner, @@ -1670,7 +1670,7 @@ impl SymmetricHashJoinStream { &self.schema, self.join_type, self.filter.as_ref(), - &probe_batch, + probe_batch, &self.column_indices, &self.random_state, self.null_equality, @@ -1691,7 +1691,7 @@ impl SymmetricHashJoinStream { calculate_filter_expr_intervals( &build_hash_joiner.input_buffer, build_side_sorted_filter_expr, - &probe_batch, + probe_batch, probe_side_sorted_filter_expr, )?; let prune_length = build_hash_joiner diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index de288724c446..58338bd86021 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -534,9 +534,11 @@ pub fn create_memory_table( let right_schema = right_partition[0].schema(); let right = TestMemoryExec::try_new(&[right_partition], right_schema, None)? .try_with_sort_information(right_sorted)?; + let left = Arc::new(left); + let right = Arc::new(right); Ok(( - Arc::new(TestMemoryExec::update_cache(Arc::new(left))), - Arc::new(TestMemoryExec::update_cache(Arc::new(right))), + Arc::new(TestMemoryExec::update_cache(&left)), + Arc::new(TestMemoryExec::update_cache(&right)), )) } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 4cba85a85128..9087ac415f4b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -414,11 +414,11 @@ struct PartialJoinStatistics { pub(crate) fn estimate_join_statistics( left_stats: Statistics, right_stats: Statistics, - on: JoinOn, + on: &JoinOn, join_type: &JoinType, schema: &Schema, ) -> Result { - let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, &on); + let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, on); let (num_rows, column_statistics) = match join_stats { Some(stats) => (Precision::Inexact(stats.num_rows), stats.column_statistics), None => (Precision::Absent, Statistics::unknown_column(schema)), @@ -567,16 +567,26 @@ fn estimate_inner_join_cardinality( return Some(estimation); }; + let Statistics { + num_rows: left_num_rows, + column_statistics: left_column_statistics, + .. + } = left_stats; + let Statistics { + num_rows: right_num_rows, + column_statistics: right_column_statistics, + .. + } = right_stats; + // The algorithm here is partly based on the non-histogram selectivity estimation // from Spark's Catalyst optimizer. let mut join_selectivity = Precision::Absent; - for (left_stat, right_stat) in left_stats - .column_statistics + for (left_stat, right_stat) in left_column_statistics .iter() - .zip(right_stats.column_statistics.iter()) + .zip(right_column_statistics.iter()) { - let left_max_distinct = max_distinct_count(&left_stats.num_rows, left_stat); - let right_max_distinct = max_distinct_count(&right_stats.num_rows, right_stat); + let left_max_distinct = max_distinct_count(&left_num_rows, left_stat); + let right_max_distinct = max_distinct_count(&right_num_rows, right_stat); let max_distinct = left_max_distinct.max(&right_max_distinct); if max_distinct.get_value().is_some() { // Seems like there are a few implementations of this algorithm that implement @@ -1131,8 +1141,8 @@ pub(crate) fn append_right_indices( ) -> Result<(UInt64Array, UInt32Array)> { if preserve_order_for_right { Ok(append_probe_indices_in_order( - left_indices, - right_indices, + &left_indices, + &right_indices, adjust_range, )) } else { @@ -1276,8 +1286,8 @@ fn build_range_bitmap( /// - A `PrimitiveArray` of `UInt64Type` with the newly constructed build indices. /// - A `PrimitiveArray` of `UInt32Type` with the newly constructed probe indices. fn append_probe_indices_in_order( - build_indices: PrimitiveArray, - probe_indices: PrimitiveArray, + build_indices: &PrimitiveArray, + probe_indices: &PrimitiveArray, range: Range, ) -> (PrimitiveArray, PrimitiveArray) { // Builders for new indices: diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 17628fd8ad1d..b74baf2d0672 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -23,6 +23,9 @@ // Make sure fast / cheap clones on Arc are explicit: // https://github.com/apache/datafusion/issues/11143 #![deny(clippy::clone_on_ref_ptr)] +// https://github.com/apache/datafusion/issues/18503 +#![deny(clippy::needless_pass_by_value)] +#![cfg_attr(test, allow(clippy::needless_pass_by_value))] //! Traits for physical query plan, supporting parallel execution for partitioned relations. //! diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index abd8daa3fd7e..0b8c4ee5fbec 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -480,7 +480,7 @@ pub fn try_pushdown_through_join( join_left: &Arc, join_right: &Arc, join_on: JoinOnRef, - schema: SchemaRef, + schema: &SchemaRef, filter: Option<&JoinFilter>, ) -> Result> { // Convert projected expressions to columns. We can not proceed if this is not possible. @@ -493,7 +493,7 @@ pub fn try_pushdown_through_join( if !join_allows_pushdown( &projection_as_columns, - &schema, + schema, far_right_left_col_ind, far_left_right_col_ind, ) { diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 2c3965c0b6a0..7b966ed23dbd 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -83,7 +83,7 @@ impl RecursiveQueryExec { // Each recursive query needs its own work table let work_table = Arc::new(WorkTable::new()); // Use the same work table for both the WorkTableExec and the recursive term - let recursive_term = assign_work_table(recursive_term, Arc::clone(&work_table))?; + let recursive_term = assign_work_table(recursive_term, &work_table)?; let cache = Self::compute_properties(static_term.schema()); Ok(RecursiveQueryExec { name, @@ -346,12 +346,12 @@ impl RecursiveQueryStream { fn assign_work_table( plan: Arc, - work_table: Arc, + work_table: &Arc, ) -> Result> { let mut work_table_refs = 0; plan.transform_down(|plan| { if let Some(new_plan) = - plan.with_new_state(Arc::clone(&work_table) as Arc) + plan.with_new_state(Arc::clone(work_table) as Arc) { if work_table_refs > 0 { not_impl_err!( diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 6249a0e9b856..9dde0fc96eed 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -228,10 +228,10 @@ impl Debug for RepartitionExecState { impl RepartitionExecState { fn ensure_input_streams_initialized( &mut self, - input: Arc, - metrics: ExecutionPlanMetricsSet, + input: &Arc, + metrics: &ExecutionPlanMetricsSet, output_partitions: usize, - ctx: Arc, + ctx: &Arc, ) -> Result<()> { if !matches!(self, RepartitionExecState::NotInitialized) { return Ok(()); @@ -241,10 +241,10 @@ impl RepartitionExecState { let mut streams_and_metrics = Vec::with_capacity(num_input_partitions); for i in 0..num_input_partitions { - let metrics = RepartitionMetrics::new(i, output_partitions, &metrics); + let metrics = RepartitionMetrics::new(i, output_partitions, metrics); let timer = metrics.fetch_time.timer(); - let stream = input.execute(i, Arc::clone(&ctx))?; + let stream = input.execute(i, Arc::clone(ctx))?; timer.done(); streams_and_metrics.push((stream, metrics)); @@ -256,21 +256,21 @@ impl RepartitionExecState { #[expect(clippy::too_many_arguments)] fn consume_input_streams( &mut self, - input: Arc, - metrics: ExecutionPlanMetricsSet, - partitioning: Partitioning, + input: &Arc, + metrics: &ExecutionPlanMetricsSet, + partitioning: &Partitioning, preserve_order: bool, - name: String, - context: Arc, + name: &str, + context: &Arc, spill_manager: SpillManager, ) -> Result<&mut ConsumingInputStreamsState> { let streams_and_metrics = match self { RepartitionExecState::NotInitialized => { self.ensure_input_streams_initialized( - Arc::clone(&input), - metrics.clone(), + input, + metrics, partitioning.partition_count(), - Arc::clone(&context), + context, )?; let RepartitionExecState::InputStreamsInitialized(value) = self else { // This cannot happen, as ensure_input_streams_initialized() was just called, @@ -854,10 +854,10 @@ impl ExecutionPlan for RepartitionExec { let state = Arc::clone(&self.state); if let Some(mut state) = state.try_lock() { state.ensure_input_streams_initialized( - Arc::clone(&input), - metrics.clone(), + &input, + &metrics, partitioning.partition_count(), - Arc::clone(&context), + &context, )?; } @@ -869,12 +869,12 @@ impl ExecutionPlan for RepartitionExec { // lock mutexes let mut state = state.lock(); let state = state.consume_input_streams( - Arc::clone(&input), - metrics.clone(), - partitioning, + &input, + &metrics, + &partitioning, preserve_order, - name.clone(), - Arc::clone(&context), + &name, + &context, spill_manager.clone(), )?; @@ -1106,8 +1106,7 @@ impl RepartitionExec { partitioning: Partitioning, ) -> Result { let preserve_order = false; - let cache = - Self::compute_properties(&input, partitioning.clone(), preserve_order); + let cache = Self::compute_properties(&input, partitioning, preserve_order); Ok(RepartitionExec { input, state: Default::default(), @@ -2495,7 +2494,8 @@ mod test { // Create physical plan with order preservation let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)? .try_with_sort_information(vec![sort_exprs.clone(), sort_exprs])?; - let exec = Arc::new(TestMemoryExec::update_cache(Arc::new(exec))); + let exec = Arc::new(exec); + let exec = Arc::new(TestMemoryExec::update_cache(&exec)); // Repartition into 3 partitions with order preservation // We expect 1 batch per output partition after repartitioning let exec = RepartitionExec::try_new(exec, Partitioning::RoundRobinBatch(3))? @@ -2611,7 +2611,8 @@ mod test { // Create physical plan with hash partitioning let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?; - let exec = Arc::new(TestMemoryExec::update_cache(Arc::new(exec))); + let exec = Arc::new(exec); + let exec = Arc::new(TestMemoryExec::update_cache(&exec)); // Hash partition into 2 partitions by column c0 let hash_expr = col("c0", &schema)?; let exec = @@ -2695,11 +2696,11 @@ mod test { schema: &SchemaRef, sort_exprs: LexOrdering, ) -> Arc { - Arc::new(TestMemoryExec::update_cache(Arc::new( - TestMemoryExec::try_new(&[vec![]], Arc::clone(schema), None) - .unwrap() - .try_with_sort_information(vec![sort_exprs]) - .unwrap(), - ))) + let exec = TestMemoryExec::try_new(&[vec![]], Arc::clone(schema), None) + .unwrap() + .try_with_sort_information(vec![sort_exprs]) + .unwrap(); + let exec = Arc::new(exec); + Arc::new(TestMemoryExec::update_cache(&exec)) } } diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index f419247c82b7..b0c631cf9135 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -99,8 +99,8 @@ impl ReusableRows { }) } // save the Rows - fn save(&mut self, stream_idx: usize, rows: Arc) { - self.inner[stream_idx][1] = Some(Arc::clone(&rows)); + fn save(&mut self, stream_idx: usize, rows: &Arc) { + self.inner[stream_idx][1] = Some(Arc::clone(rows)); // swap the current with the previous one, so that the next poll can reuse the Rows from the previous poll let [a, b] = &mut self.inner[stream_idx]; std::mem::swap(a, b); @@ -177,7 +177,7 @@ impl RowCursorStream { let rows = Arc::new(rows); - self.rows.save(stream_idx, Arc::clone(&rows)); + self.rows.save(stream_idx, &rows); // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 58fd016a63dd..6be7edcf3291 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -227,6 +227,7 @@ impl RecordBatchStream for SpillReaderStream { since = "46.0.0", note = "This method is deprecated. Use `SpillManager::spill_record_batch_by_size` instead." )] +#[expect(clippy::needless_pass_by_value)] pub fn spill_record_batch_by_size( batch: &RecordBatch, path: PathBuf, diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index bc32192917a8..4f7b843262de 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -271,10 +271,9 @@ impl TestMemoryExec { } // Equivalent of `DataSourceExec::new` - pub fn update_cache(source: Arc) -> TestMemoryExec { + pub fn update_cache(source: &Arc) -> TestMemoryExec { let cache = source.compute_properties(); - let source = &*source; - let mut source = source.clone(); + let mut source = (**source).clone(); source.cache = cache; source } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 96356e50d7ec..99af9b8f7ca1 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -178,7 +178,8 @@ impl TopK { /// Create a new [`TopK`] that stores the top `k` values, as /// defined by the sort expressions in `expr`. // TODO: make a builder or some other nicer API - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] + #[expect(clippy::needless_pass_by_value)] pub fn try_new( partition_id: usize, schema: SchemaRef, @@ -226,6 +227,7 @@ impl TopK { /// Insert `batch`, remembering if any of its values are among /// the top k seen so far. + #[expect(clippy::needless_pass_by_value)] pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { // Updates on drop let baseline = self.metrics.baseline.clone(); @@ -377,7 +379,7 @@ impl TopK { }; // Build the filter expression OUTSIDE any synchronization - let predicate = Self::build_filter_expression(&self.expr, thresholds)?; + let predicate = Self::build_filter_expression(&self.expr, &thresholds)?; let new_threshold = new_threshold_row.to_vec(); // update the threshold. Since there was a lock gap, we must check if it is still the best @@ -420,7 +422,7 @@ impl TopK { /// This is now called outside of any locks to reduce critical section time. fn build_filter_expression( sort_exprs: &[PhysicalSortExpr], - thresholds: Vec, + thresholds: &[ScalarValue], ) -> Result>> { // Create filter expressions for each threshold let mut filters: Vec> = diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 1432c2b09b90..1f4bc8817ed5 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -699,7 +699,7 @@ impl Stream for CombinedRecordBatchStream { fn col_stats_union( mut left: ColumnStatistics, - right: ColumnStatistics, + right: &ColumnStatistics, ) -> ColumnStatistics { left.distinct_count = Precision::Absent; left.min_value = left.min_value.min(&right.min_value); @@ -711,12 +711,18 @@ fn col_stats_union( } fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { - left.num_rows = left.num_rows.add(&right.num_rows); - left.total_byte_size = left.total_byte_size.add(&right.total_byte_size); + let Statistics { + num_rows: right_num_rows, + total_byte_size: right_total_bytes, + column_statistics: right_column_statistics, + .. + } = right; + left.num_rows = left.num_rows.add(&right_num_rows); + left.total_byte_size = left.total_byte_size.add(&right_total_bytes); left.column_statistics = left .column_statistics .into_iter() - .zip(right.column_statistics) + .zip(right_column_statistics.iter()) .map(|(a, b)| col_stats_union(a, b)) .collect::>(); left @@ -926,14 +932,14 @@ mod tests { let first_orderings = convert_to_orderings(first_child_orderings); let second_orderings = convert_to_orderings(second_child_orderings); let union_expected_orderings = convert_to_orderings(union_orderings); - let child1 = Arc::new(TestMemoryExec::update_cache(Arc::new( - TestMemoryExec::try_new(&[], Arc::clone(&schema), None)? - .try_with_sort_information(first_orderings)?, - ))); - let child2 = Arc::new(TestMemoryExec::update_cache(Arc::new( - TestMemoryExec::try_new(&[], Arc::clone(&schema), None)? - .try_with_sort_information(second_orderings)?, - ))); + let child1_exec = TestMemoryExec::try_new(&[], Arc::clone(&schema), None)? + .try_with_sort_information(first_orderings)?; + let child1 = Arc::new(child1_exec); + let child1 = Arc::new(TestMemoryExec::update_cache(&child1)); + let child2_exec = TestMemoryExec::try_new(&[], Arc::clone(&schema), None)? + .try_with_sort_information(second_orderings)?; + let child2 = Arc::new(child2_exec); + let child2 = Arc::new(TestMemoryExec::update_cache(&child2)); let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema)); union_expected_eq.add_orderings(union_expected_orderings); diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 22132f2f8639..3c999b1a40c1 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -90,7 +90,7 @@ impl UnnestExec { &input, &list_column_indices, &struct_column_indices, - Arc::clone(&schema), + &schema, )?; Ok(UnnestExec { @@ -109,7 +109,7 @@ impl UnnestExec { input: &Arc, list_column_indices: &[ListUnnest], struct_column_indices: &[usize], - schema: SchemaRef, + schema: &SchemaRef, ) -> Result { // Find out which indices are not unnested, such that they can be copied over from the input plan let input_schema = input.schema(); @@ -159,7 +159,7 @@ impl UnnestExec { // the unnest operation invalidates any global uniqueness or primary-key constraints. let input_eq_properties = input.equivalence_properties(); let eq_properties = input_eq_properties - .project(&projection_mapping, Arc::clone(&schema)) + .project(&projection_mapping, Arc::clone(schema)) .with_constraints(Constraints::default()); // Output partitioning must use the projection mapping diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index a59b2a5d1d1d..810c97cf4745 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -82,7 +82,7 @@ impl WindowAggExec { let ordered_partition_by_indices = get_ordered_partition_by_indices(window_expr[0].partition_by(), &input)?; - let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr)?; + let cache = Self::compute_properties(&schema, &input, &window_expr)?; Ok(Self { input, window_expr, @@ -120,12 +120,12 @@ impl WindowAggExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( - schema: SchemaRef, + schema: &SchemaRef, input: &Arc, window_exprs: &[Arc], ) -> Result { // Calculate equivalence properties: - let eq_properties = window_equivalence_properties(&schema, input, window_exprs)?; + let eq_properties = window_equivalence_properties(schema, input, window_exprs)?; // Get output partitioning: // Because we can have repartitioning using the partition keys this