Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ impl DataSource for FileScanConfig {
if let Some(filter) = self.file_source.filter() {
// We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
// Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
match Self::add_filter_equivalence_info(filter, &mut eq_properties, &schema) {
match Self::add_filter_equivalence_info(&filter, &mut eq_properties, &schema)
{
Ok(()) => {}
Err(e) => {
warn!("Failed to add filter equivalence info: {e}");
Expand Down Expand Up @@ -782,12 +783,12 @@ impl FileScanConfig {
}

fn add_filter_equivalence_info(
filter: Arc<dyn PhysicalExpr>,
filter: &Arc<dyn PhysicalExpr>,
eq_properties: &mut EquivalenceProperties,
schema: &Schema,
) -> Result<()> {
// Gather valid equality pairs from the filter expression
let equal_pairs = split_conjunction(&filter).into_iter().filter_map(|expr| {
let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
// Ignore any binary expressions that reference non-existent columns in the current schema
// (e.g. due to unnecessary projections being removed)
reassign_expr_columns(Arc::clone(expr), schema)
Expand Down Expand Up @@ -1145,6 +1146,7 @@ impl PartitionColumnProjector {
// to the right positions as deduced from `projected_schema`
// - `file_batch`: batch read from the file, with internal projection applied
// - `partition_values`: the list of partition values, one for each partition column
#[expect(clippy::needless_pass_by_value)]
pub fn project(
&mut self,
file_batch: RecordBatch,
Expand Down
9 changes: 5 additions & 4 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl MemorySourceConfig {
}

/// Create a new execution plan from a list of constant values (`ValuesExec`)
#[expect(clippy::needless_pass_by_value)]
pub fn try_new_as_values(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

schema: SchemaRef,
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
Expand Down Expand Up @@ -339,6 +340,7 @@ impl MemorySourceConfig {
///
/// Errors if any of the batches don't match the provided schema, or if no
/// batches are provided.
#[expect(clippy::needless_pass_by_value)]
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
Expand Down Expand Up @@ -943,10 +945,9 @@ mod tests {
vec![lit(ScalarValue::Null)],
];
let rows = data.len();
let values = MemorySourceConfig::try_new_as_values(
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
data,
)?;
let schema =
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)]));
let values = MemorySourceConfig::try_new_as_values(schema, data)?;

assert_eq!(
values.partition_statistics(None)?,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
// Make sure fast / cheap clones on Arc are explicit:
// https://github.com/apache/datafusion/issues/11143
#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
// Enforce lint rule to prevent needless pass by value
// https://github.com/apache/datafusion/issues/18503
#![deny(clippy::needless_pass_by_value)]
#![cfg_attr(test, allow(clippy::needless_pass_by_value))]

//! A table that uses the `ObjectStore` listing capability
//! to get the list of files to process.
Expand Down
9 changes: 4 additions & 5 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,7 @@ impl ExecutionPlan for DataSourceExec {
let mut new_node = self.clone();
new_node.data_source = data_source;
// Re-compute properties since we have new filters which will impact equivalence info
new_node.cache =
Self::compute_properties(Arc::clone(&new_node.data_source));
new_node.cache = Self::compute_properties(&new_node.data_source);

Ok(FilterPushdownPropagation {
filters: res.filters,
Expand All @@ -371,7 +370,7 @@ impl DataSourceExec {

// Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
pub fn new(data_source: Arc<dyn DataSource>) -> Self {
let cache = Self::compute_properties(Arc::clone(&data_source));
let cache = Self::compute_properties(&data_source);
Self { data_source, cache }
}

Expand All @@ -381,7 +380,7 @@ impl DataSourceExec {
}

pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
self.cache = Self::compute_properties(Arc::clone(&data_source));
self.cache = Self::compute_properties(&data_source);
self.data_source = data_source;
self
}
Expand All @@ -398,7 +397,7 @@ impl DataSourceExec {
self
}

fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
fn compute_properties(data_source: &Arc<dyn DataSource>) -> PlanProperties {
PlanProperties::new(
data_source.eq_properties(),
data_source.output_partitioning(),
Expand Down
39 changes: 19 additions & 20 deletions datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,28 +152,25 @@ impl MinMaxStatistics {
.into_iter()
.unzip();

Self::new(
&min_max_sort_order,
&min_max_schema,
RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err(
|e| {
DataFusionError::ArrowError(
Box::new(e),
Some("\ncreate min batch".to_string()),
)
},
)?,
RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err(
|e| {
DataFusionError::ArrowError(
Box::new(e),
Some("\ncreate max batch".to_string()),
)
},
)?,
)
let min_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), min_values)
.map_err(|e| {
DataFusionError::ArrowError(
Box::new(e),
Some("\ncreate min batch".to_string()),
)
})?;
let max_batch = RecordBatch::try_new(Arc::clone(&min_max_schema), max_values)
.map_err(|e| {
DataFusionError::ArrowError(
Box::new(e),
Some("\ncreate max batch".to_string()),
)
})?;

Self::new(&min_max_sort_order, &min_max_schema, min_batch, max_batch)
}

#[expect(clippy::needless_pass_by_value)]
pub fn new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

sort_order: &LexOrdering,
schema: &SchemaRef,
Expand Down Expand Up @@ -421,6 +418,7 @@ pub async fn get_statistics_with_limit(
///
/// # Returns
/// A new file group with summary statistics attached
#[expect(clippy::needless_pass_by_value)]
pub fn compute_file_group_statistics(
file_group: FileGroup,
file_schema: SchemaRef,
Expand Down Expand Up @@ -456,6 +454,7 @@ pub fn compute_file_group_statistics(
/// A tuple containing:
/// * The processed file groups with their individual statistics attached
/// * The summary statistics across all file groups, aka all files summary statistics
#[expect(clippy::needless_pass_by_value)]
pub fn compute_all_files_statistics(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

file_groups: Vec<FileGroup>,
table_schema: SchemaRef,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ async fn hive_style_partitions_demuxer(
let all_partition_values = compute_partition_keys_by_row(&rb, &partition_by)?;

// Next compute how the batch should be split up to take each distinct key to its own batch
let take_map = compute_take_arrays(&rb, all_partition_values);
let take_map = compute_take_arrays(&rb, &all_partition_values);

// Divide up the batch into distinct partition key batches and send each batch
for (part_key, mut builder) in take_map.into_iter() {
Expand Down Expand Up @@ -516,7 +516,7 @@ fn compute_partition_keys_by_row<'a>(

fn compute_take_arrays(
rb: &RecordBatch,
all_partition_values: Vec<Vec<Cow<str>>>,
all_partition_values: &[Vec<Cow<str>>],
) -> HashMap<Vec<String>, UInt64Builder> {
let mut take_map = HashMap::new();
for i in 0..rb.num_rows() {
Expand Down