Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ impl GroupValues for GroupValuesRows {
// https://github.com/apache/datafusion/issues/7647
for (field, array) in self.schema.fields.iter().zip(&mut output) {
let expected = field.data_type();
*array =
dictionary_encode_if_necessary(Arc::<dyn Array>::clone(array), expected)?;
*array = dictionary_encode_if_necessary(array, expected)?;
}

self.group_values = Some(group_values);
Expand All @@ -259,7 +258,7 @@ impl GroupValues for GroupValuesRows {
}

fn dictionary_encode_if_necessary(
array: ArrayRef,
array: &ArrayRef,
expected: &DataType,
) -> Result<ArrayRef> {
match (expected, array.data_type()) {
Expand All @@ -269,10 +268,7 @@ fn dictionary_encode_if_necessary(
.iter()
.zip(struct_array.columns())
.map(|(expected_field, column)| {
dictionary_encode_if_necessary(
Arc::<dyn Array>::clone(column),
expected_field.data_type(),
)
dictionary_encode_if_necessary(column, expected_field.data_type())
})
.collect::<Result<Vec<_>>>()?;

Expand All @@ -289,13 +285,13 @@ fn dictionary_encode_if_necessary(
Arc::<arrow::datatypes::Field>::clone(expected_field),
list.offsets().clone(),
dictionary_encode_if_necessary(
Arc::<dyn Array>::clone(list.values()),
list.values(),
expected_field.data_type(),
)?,
list.nulls().cloned(),
)?))
}
(DataType::Dictionary(_, _), _) => Ok(cast(array.as_ref(), expected)?),
(_, _) => Ok(Arc::<dyn Array>::clone(&array)),
(_, _) => Ok(Arc::<dyn Array>::clone(array)),
}
}
11 changes: 6 additions & 5 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl AggregateExec {
fn execute_typed(
&self,
partition: usize,
context: Arc<TaskContext>,
context: &Arc<TaskContext>,
) -> Result<StreamType> {
// no group by at all
if self.group_by.expr.is_empty() {
Expand Down Expand Up @@ -761,7 +761,7 @@ impl AggregateExec {
&self.input_order_mode
}

fn statistics_inner(&self, child_statistics: Statistics) -> Result<Statistics> {
fn statistics_inner(&self, child_statistics: &Statistics) -> Result<Statistics> {
// TODO stats: group expressions:
// - once expressions will be able to compute their own stats, use it here
// - case where we group by on a column for which with have the `distinct` stat
Expand Down Expand Up @@ -1020,7 +1020,7 @@ impl ExecutionPlan for AggregateExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.execute_typed(partition, context)
self.execute_typed(partition, &context)
.map(|stream| stream.into())
}

Expand All @@ -1033,7 +1033,8 @@ impl ExecutionPlan for AggregateExec {
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.statistics_inner(self.input().partition_statistics(partition)?)
let child_statistics = self.input().partition_statistics(partition)?;
self.statistics_inner(&child_statistics)
}

fn cardinality_effect(&self) -> CardinalityEffect {
Expand Down Expand Up @@ -2220,7 +2221,7 @@ mod tests {
Arc::clone(&input_schema),
)?);

let stream = partial_aggregate.execute_typed(0, Arc::clone(&task_ctx))?;
let stream = partial_aggregate.execute_typed(0, &task_ctx)?;

// ensure that we really got the version we wanted
match version {
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ impl AggregateStream {
/// Create a new AggregateStream
pub fn new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pub(crate)

agg: &AggregateExec,
context: Arc<TaskContext>,
context: &Arc<TaskContext>,
partition: usize,
) -> Result<Self> {
let agg_schema = Arc::clone(&agg.schema);
let agg_filter_expr = agg.filter_expr.clone();

let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let input = agg.input.execute(partition, Arc::clone(&context))?;
let input = agg.input.execute(partition, Arc::clone(context))?;

let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?;
let filter_expressions = match agg.mode {
Expand Down Expand Up @@ -115,7 +115,7 @@ impl AggregateStream {
let timer = elapsed_compute.timer();
let result = aggregate_batch(
&this.mode,
batch,
&batch,
&mut this.accumulators,
&this.aggregate_expressions,
&this.filter_expressions,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl RecordBatchStream for AggregateStream {
/// TODO: Make this a member function
fn aggregate_batch(
mode: &AggregateMode,
batch: RecordBatch,
batch: &RecordBatch,
accumulators: &mut [AccumulatorItem],
expressions: &[Vec<Arc<dyn PhysicalExpr>>],
filters: &[Option<Arc<dyn PhysicalExpr>>],
Expand All @@ -215,8 +215,8 @@ fn aggregate_batch(
.try_for_each(|((accum, expr), filter)| {
// 1.2
let batch = match filter {
Some(filter) => Cow::Owned(batch_filter(&batch, filter)?),
None => Cow::Borrowed(&batch),
Some(filter) => Cow::Owned(batch_filter(batch, filter)?),
None => Cow::Borrowed(batch),
};

// 1.3
Expand Down
32 changes: 16 additions & 16 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl GroupedHashAggregateStream {
/// Create a new GroupedHashAggregateStream
pub fn new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pub(crate)

agg: &AggregateExec,
context: Arc<TaskContext>,
context: &Arc<TaskContext>,
partition: usize,
) -> Result<Self> {
debug!("Creating GroupedHashAggregateStream");
Expand All @@ -454,7 +454,7 @@ impl GroupedHashAggregateStream {
let agg_filter_expr = agg.filter_expr.clone();

let batch_size = context.session_config().batch_size();
let input = agg.input.execute(partition, Arc::clone(&context))?;
let input = agg.input.execute(partition, Arc::clone(context))?;
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let group_by_metrics = GroupByMetrics::new(&agg.metrics, partition);

Expand Down Expand Up @@ -685,7 +685,7 @@ impl Stream for GroupedHashAggregateStream {
}

// Do the grouping
self.group_aggregate_batch(batch)?;
self.group_aggregate_batch(&batch)?;

self.update_skip_aggregation_probe(input_rows);

Expand Down Expand Up @@ -728,7 +728,7 @@ impl Stream for GroupedHashAggregateStream {
self.spill_previous_if_necessary(&batch)?;

// Do the grouping
self.group_aggregate_batch(batch)?;
self.group_aggregate_batch(&batch)?;

// If we can begin emitting rows, do so,
// otherwise keep consuming input
Expand Down Expand Up @@ -777,7 +777,7 @@ impl Stream for GroupedHashAggregateStream {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
probe.record_skipped(&batch);
}
let states = self.transform_to_states(batch)?;
let states = self.transform_to_states(&batch)?;
return Poll::Ready(Some(Ok(
states.record_output(&self.baseline_metrics)
)));
Expand Down Expand Up @@ -854,12 +854,12 @@ impl RecordBatchStream for GroupedHashAggregateStream {

impl GroupedHashAggregateStream {
/// Perform group-by aggregation for the given [`RecordBatch`].
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
fn group_aggregate_batch(&mut self, batch: &RecordBatch) -> Result<()> {
// Evaluate the grouping expressions
let group_by_values = if self.spill_state.is_stream_merging {
evaluate_group_by(&self.spill_state.merging_group_by, &batch)?
evaluate_group_by(&self.spill_state.merging_group_by, batch)?
} else {
evaluate_group_by(&self.group_by, &batch)?
evaluate_group_by(&self.group_by, batch)?
};

// Only create the timer if there are actual aggregate arguments to evaluate
Expand All @@ -876,18 +876,18 @@ impl GroupedHashAggregateStream {

// Evaluate the aggregation expressions.
let input_values = if self.spill_state.is_stream_merging {
evaluate_many(&self.spill_state.merging_aggregate_arguments, &batch)?
evaluate_many(&self.spill_state.merging_aggregate_arguments, batch)?
} else {
evaluate_many(&self.aggregate_arguments, &batch)?
evaluate_many(&self.aggregate_arguments, batch)?
};
drop(timer);

// Evaluate the filter expressions, if any, against the inputs
let filter_values = if self.spill_state.is_stream_merging {
let filter_expressions = vec![None; self.accumulators.len()];
evaluate_optional(&filter_expressions, &batch)?
evaluate_optional(&filter_expressions, batch)?
} else {
evaluate_optional(&self.filter_expressions, &batch)?
evaluate_optional(&self.filter_expressions, batch)?
};

for group_values in &group_by_values {
Expand Down Expand Up @@ -1217,10 +1217,10 @@ impl GroupedHashAggregateStream {
}

/// Transforms input batch to intermediate aggregate state, without grouping it
fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> {
let mut group_values = evaluate_group_by(&self.group_by, &batch)?;
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;
fn transform_to_states(&self, batch: &RecordBatch) -> Result<RecordBatch> {
let mut group_values = evaluate_group_by(&self.group_by, batch)?;
let input_values = evaluate_many(&self.aggregate_arguments, batch)?;
let filter_values = evaluate_optional(&self.filter_expressions, batch)?;

assert_eq_or_internal_err!(
group_values.len(),
Expand Down
14 changes: 4 additions & 10 deletions datafusion/physical-plan/src/aggregates/topk/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,7 @@ impl<VAL: ValueType> TopKHeap<VAL> {
}
}

fn _tree_print(
&self,
idx: usize,
prefix: String,
is_tail: bool,
output: &mut String,
) {
fn _tree_print(&self, idx: usize, prefix: &str, is_tail: bool, output: &mut String) {
if let Some(Some(hi)) = self.heap.get(idx) {
let connector = if idx != 0 {
if is_tail {
Expand All @@ -357,10 +351,10 @@ impl<VAL: ValueType> TopKHeap<VAL> {
let right_exists = right_idx < self.len;

if left_exists {
self._tree_print(left_idx, child_prefix.clone(), !right_exists, output);
self._tree_print(left_idx, &child_prefix, !right_exists, output);
}
if right_exists {
self._tree_print(right_idx, child_prefix, true, output);
self._tree_print(right_idx, &child_prefix, true, output);
}
}
}
Expand All @@ -370,7 +364,7 @@ impl<VAL: ValueType> Display for TopKHeap<VAL> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut output = String::new();
if !self.heap.is_empty() {
self._tree_print(0, String::new(), true, &mut output);
self._tree_print(0, "", true, &mut output);
}
write!(f, "{output}")
}
Expand Down
11 changes: 6 additions & 5 deletions datafusion/physical-plan/src/aggregates/topk_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ pub struct GroupedTopKAggregateStream {
impl GroupedTopKAggregateStream {
pub fn new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is private inside physical-plan/aggregates module
(do we have a convenient way to check the external visibility? Now I'm manually traversing to its parents to verify)

aggr: &AggregateExec,
context: Arc<TaskContext>,
context: &Arc<TaskContext>,
partition: usize,
limit: usize,
) -> Result<Self> {
let agg_schema = Arc::clone(&aggr.schema);
let group_by = aggr.group_by.clone();
let input = aggr.input.execute(partition, Arc::clone(&context))?;
let input = aggr.input.execute(partition, Arc::clone(context))?;
let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition);
let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition);
let aggregate_arguments =
Expand Down Expand Up @@ -97,11 +97,12 @@ impl RecordBatchStream for GroupedTopKAggregateStream {
}

impl GroupedTopKAggregateStream {
fn intern(&mut self, ids: ArrayRef, vals: ArrayRef) -> Result<()> {
fn intern(&mut self, ids: &ArrayRef, vals: &ArrayRef) -> Result<()> {
let _timer = self.group_by_metrics.time_calculating_group_ids.timer();

let len = ids.len();
self.priority_map.set_batch(ids, Arc::clone(&vals));
self.priority_map
.set_batch(Arc::clone(ids), Arc::clone(vals));

let has_nulls = vals.null_count() > 0;
for row_idx in 0..len {
Expand Down Expand Up @@ -167,7 +168,7 @@ impl Stream for GroupedTopKAggregateStream {
let input_values = Arc::clone(&input_values[0][0]);

// iterate over each column of group_by values
(*self).intern(group_by_values, input_values)?;
(*self).intern(&group_by_values, &input_values)?;
}
// inner is done, emit all rows and switch to producing output
None => {
Expand Down
10 changes: 5 additions & 5 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ impl ExecutionPlan for AnalyzeExec {
show_statistics,
total_rows,
duration,
captured_input,
captured_schema,
&captured_input,
&captured_schema,
&metric_types,
)
};
Expand All @@ -225,8 +225,8 @@ fn create_output_batch(
show_statistics: bool,
total_rows: usize,
duration: std::time::Duration,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
input: &Arc<dyn ExecutionPlan>,
schema: &SchemaRef,
metric_types: &[MetricType],
) -> Result<RecordBatch> {
let mut type_builder = StringBuilder::with_capacity(1, 1024);
Expand Down Expand Up @@ -262,7 +262,7 @@ fn create_output_batch(
}

RecordBatch::try_new(
schema,
Arc::clone(schema),
vec![
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
Expand Down
13 changes: 13 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
#[expect(clippy::needless_pass_by_value)]
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
plan.properties().evaluation_type == EvaluationType::Eager
}
Expand Down Expand Up @@ -1172,6 +1173,10 @@ pub async fn collect(
///
/// Dropping the stream will abort the execution of the query, and free up
/// any allocated resources
#[expect(
clippy::needless_pass_by_value,
reason = "Public API that historically takes owned Arcs"
)]
pub fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
Expand Down Expand Up @@ -1236,6 +1241,10 @@ pub async fn collect_partitioned(
///
/// Dropping the stream will abort the execution of the query, and free up
/// any allocated resources
#[expect(
clippy::needless_pass_by_value,
reason = "Public API that historically takes owned Arcs"
)]
pub fn execute_stream_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
Expand Down Expand Up @@ -1267,6 +1276,10 @@ pub fn execute_stream_partitioned(
/// violate the `not null` constraints specified in the `sink_schema`. If there are
/// such columns, it wraps the resulting stream to enforce the `not null` constraints
/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
#[expect(
clippy::needless_pass_by_value,
reason = "Public API that historically takes owned Arcs"
)]
pub fn execute_input_stream(
input: Arc<dyn ExecutionPlan>,
sink_schema: SchemaRef,
Expand Down
Loading