Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl IndexTableProvider {
// analyze the predicate. In a real system, using
// `PruningPredicate::prune` would likely be easier to do.
let pruning_predicate =
PruningPredicate::try_new(Arc::clone(predicate), self.schema().clone())?;
PruningPredicate::try_new(Arc::clone(predicate), self.schema())?;

// The PruningPredicate's guarantees must all be satisfied in order for
// the predicate to possibly evaluate to true.
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ mod non_windows {
) {
// Timeout for a long period of BrokenPipe error
let broken_pipe_timeout = Duration::from_secs(10);
let sa = file_path.clone();
let sa = file_path;
// Spawn a new thread to write to the FIFO file
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
tasks.spawn_blocking(move || {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1242,10 +1242,9 @@ mod tests {
#[test]
fn into() {
// Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
let metadata = test_metadata();
let arrow_schema = Schema::new_with_metadata(
vec![Field::new("c0", DataType::Int64, true)],
metadata.clone(),
test_metadata(),
);
let arrow_schema_ref = Arc::new(arrow_schema.clone());

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ mod tests {
column_options_with_non_defaults(&parquet_options),
)]
.into(),
key_value_metadata: [(key.clone(), value.clone())].into(),
key_value_metadata: [(key, value)].into(),
};

let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
Expand Down
8 changes: 4 additions & 4 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4905,7 +4905,7 @@ mod tests {
let data_type =
DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));

assert_eq!(non_null_list_scalar.data_type(), data_type.clone());
assert_eq!(non_null_list_scalar.data_type(), data_type);
assert_eq!(null_list_scalar.data_type(), data_type);
}

Expand Down Expand Up @@ -5582,13 +5582,13 @@ mod tests {

// Define list-of-structs scalars

let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap();
let nl0_array = ScalarValue::iter_to_array(vec![s0, s1.clone()]).unwrap();
let nl0 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl0_array)));

let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap();
let nl1_array = ScalarValue::iter_to_array(vec![s2]).unwrap();
let nl1 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl1_array)));

let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap();
let nl2_array = ScalarValue::iter_to_array(vec![s1]).unwrap();
let nl2 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl2_array)));

// iter_to_array for list-of-struct
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ mod tests {
let precision: Precision<ScalarValue> =
Precision::Exact(ScalarValue::Int64(Some(42)));
// Clippy would complain about this if it were Copy
#[allow(clippy::redundant_clone)]
let p2 = precision.clone();
assert_eq!(precision, p2);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ async fn prune_partitions(
Default::default(),
)?;

let batch = RecordBatch::try_new(schema.clone(), arrays)?;
let batch = RecordBatch::try_new(schema, arrays)?;

// TODO: Plumb this down
let props = ExecutionProps::new();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ impl ListingTable {
.collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics.clone()),
Some(statistics) => Ok(statistics),
None => {
let statistics = self
.options
Expand Down
7 changes: 2 additions & 5 deletions datafusion/core/src/datasource/physical_plan/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ mod test {
#[test]
fn repartition_empty_file_only() {
let partitioned_file_empty = pfile("empty", 0);
let file_group = vec![vec![partitioned_file_empty.clone()]];
let file_group = vec![vec![partitioned_file_empty]];

let partitioned_files = FileGroupPartitioner::new()
.with_target_partitions(4)
Expand Down Expand Up @@ -817,10 +817,7 @@ mod test {
.with_preserve_order_within_groups(true)
.repartition_file_groups(&file_groups);

assert_partitioned_files(
repartitioned.clone(),
repartitioned_preserving_sort.clone(),
);
assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
repartitioned
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ mod tests {
schema.clone(),
Some(vec![0, 3, 5, schema.fields().len()]),
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols.clone()),
to_partition_cols(partition_cols),
)
.projected_file_schema();

Expand Down Expand Up @@ -941,7 +941,7 @@ mod tests {
schema.clone(),
None,
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols.clone()),
to_partition_cols(partition_cols),
)
.projected_file_schema();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ mod tests {
let f1 = Field::new("id", DataType::Int32, true);
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
let schema = Arc::new(Schema::new(vec![f1, f2]));

let extra_column = Arc::new(StringArray::from(vec!["foo"]));
let mut new_columns = batch.columns().to_vec();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ impl SessionContext {
// check schema uniqueness
let mut batches = batches.into_iter().peekable();
let schema = if let Some(batch) = batches.peek() {
batch.schema().clone()
batch.schema()
} else {
Arc::new(Schema::empty())
};
Expand Down
23 changes: 11 additions & 12 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3908,7 +3908,7 @@ pub(crate) mod tests {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet =
aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias);

let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
Expand All @@ -3934,7 +3934,7 @@ pub(crate) mod tests {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet =
aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias);

let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
Expand Down Expand Up @@ -3964,7 +3964,7 @@ pub(crate) mod tests {
options: SortOptions::default(),
}];
let plan_parquet = limit_exec(sort_exec(sort_key.clone(), parquet_exec(), false));
let plan_csv = limit_exec(sort_exec(sort_key.clone(), csv_exec(), false));
let plan_csv = limit_exec(sort_exec(sort_key, csv_exec(), false));

let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
Expand Down Expand Up @@ -4000,8 +4000,7 @@ pub(crate) mod tests {
parquet_exec(),
false,
)));
let plan_csv =
limit_exec(filter_exec(sort_exec(sort_key.clone(), csv_exec(), false)));
let plan_csv = limit_exec(filter_exec(sort_exec(sort_key, csv_exec(), false)));

let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
Expand Down Expand Up @@ -4042,7 +4041,7 @@ pub(crate) mod tests {
);
let plan_csv = aggregate_exec_with_alias(
limit_exec(filter_exec(limit_exec(csv_exec()))),
alias.clone(),
alias,
);

let expected_parquet = &[
Expand Down Expand Up @@ -4126,7 +4125,7 @@ pub(crate) mod tests {
);
let plan_csv = sort_preserving_merge_exec(
sort_key.clone(),
csv_exec_with_sort(vec![sort_key.clone()]),
csv_exec_with_sort(vec![sort_key]),
);

// parallelization is not beneficial for SortPreservingMerge
Expand Down Expand Up @@ -4154,7 +4153,7 @@ pub(crate) mod tests {
union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]);
let input_csv = union_exec(vec![csv_exec_with_sort(vec![sort_key.clone()]); 2]);
let plan_parquet = sort_preserving_merge_exec(sort_key.clone(), input_parquet);
let plan_csv = sort_preserving_merge_exec(sort_key.clone(), input_csv);
let plan_csv = sort_preserving_merge_exec(sort_key, input_csv);

// should not repartition (union doesn't benefit from increased parallelism)
// should not sort (as the data was already sorted)
Expand Down Expand Up @@ -4224,8 +4223,8 @@ pub(crate) mod tests {
("c".to_string(), "c2".to_string()),
];
let proj_parquet = projection_exec_with_alias(
parquet_exec_with_sort(vec![sort_key.clone()]),
alias_pairs.clone(),
parquet_exec_with_sort(vec![sort_key]),
alias_pairs,
);
let sort_key_after_projection = vec![PhysicalSortExpr {
expr: col("c2", &proj_parquet.schema()).unwrap(),
Expand Down Expand Up @@ -4560,7 +4559,7 @@ pub(crate) mod tests {
}];
let alias = vec![("a".to_string(), "a".to_string())];
let input = parquet_exec_with_sort(vec![sort_key]);
let physical_plan = aggregate_exec_with_alias(input, alias.clone());
let physical_plan = aggregate_exec_with_alias(input, alias);

let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
Expand All @@ -4584,7 +4583,7 @@ pub(crate) mod tests {
let alias = vec![("a".to_string(), "a".to_string())];
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let aggregate = aggregate_exec_with_alias(input, alias.clone());
let physical_plan = aggregate_exec_with_alias(aggregate, alias.clone());
let physical_plan = aggregate_exec_with_alias(aggregate, alias);

let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
Expand Down
21 changes: 10 additions & 11 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ mod tests_statistical {
);

let optimized_join = JoinSelection::new()
.optimize(join.clone(), &ConfigOptions::new())
.optimize(join, &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
Expand Down Expand Up @@ -964,7 +964,7 @@ mod tests_statistical {
);

let optimized_join = JoinSelection::new()
.optimize(join.clone(), &ConfigOptions::new())
.optimize(join, &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
Expand Down Expand Up @@ -1140,7 +1140,7 @@ mod tests_statistical {
);

let optimized_join = JoinSelection::new()
.optimize(join.clone(), &ConfigOptions::new())
.optimize(join, &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
Expand Down Expand Up @@ -1180,7 +1180,7 @@ mod tests_statistical {
);

let optimized_join = JoinSelection::new()
.optimize(join.clone(), &ConfigOptions::new())
.optimize(join, &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
Expand Down Expand Up @@ -1356,7 +1356,7 @@ mod tests_statistical {
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()) as _,
)];
check_join_partition_mode(
big.clone(),
big,
small.clone(),
join_on,
true,
Expand All @@ -1380,8 +1380,8 @@ mod tests_statistical {
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()) as _,
)];
check_join_partition_mode(
empty.clone(),
small.clone(),
empty,
small,
join_on,
true,
PartitionMode::CollectLeft,
Expand Down Expand Up @@ -1424,7 +1424,7 @@ mod tests_statistical {
Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _,
)];
check_join_partition_mode(
bigger.clone(),
bigger,
big.clone(),
join_on,
true,
Expand Down Expand Up @@ -1472,7 +1472,7 @@ mod tests_statistical {
);

let optimized_join = JoinSelection::new()
.optimize(join.clone(), &ConfigOptions::new())
.optimize(join, &ConfigOptions::new())
.unwrap();

if !is_swapped {
Expand Down Expand Up @@ -1913,8 +1913,7 @@ mod hash_join_tests {
false,
)?);

let optimized_join_plan =
hash_join_swap_subrule(join.clone(), &ConfigOptions::new())?;
let optimized_join_plan = hash_join_swap_subrule(join, &ConfigOptions::new())?;

// If swap did happen
let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>();
Expand Down
18 changes: 6 additions & 12 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1692,12 +1692,9 @@ mod tests {
]));
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_projection(Some(vec![0, 1, 2, 3, 4])),
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_projection(Some(vec![0, 1, 2, 3, 4])),
)
.with_has_header(false)
.with_delimeter(0)
Expand All @@ -1719,12 +1716,9 @@ mod tests {
]));
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_projection(Some(vec![3, 2, 1])),
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_projection(Some(vec![3, 2, 1])),
)
.with_has_header(false)
.with_delimeter(0)
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,6 @@ fn build_predicate_expression(
let change_expr = in_list
.list()
.iter()
.cloned()
.map(|e| {
Arc::new(phys_expr::BinaryExpr::new(
in_list.expr().clone(),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ impl DefaultPhysicalPlanner {
let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
aggregates,
filters.clone(),
input_exec,
physical_input_schema.clone(),
Expand Down Expand Up @@ -2569,7 +2569,7 @@ mod tests {

impl NoOpExecutionPlan {
fn new(schema: SchemaRef) -> Self {
let cache = Self::compute_properties(schema.clone());
let cache = Self::compute_properties(schema);
Self { cache }
}

Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,10 @@ impl JoinFuzzTestCase {
fn left_right(&self) -> (Arc<MemoryExec>, Arc<MemoryExec>) {
let schema1 = self.input1[0].schema();
let schema2 = self.input2[0].schema();
let left = Arc::new(
MemoryExec::try_new(&[self.input1.clone()], schema1.clone(), None).unwrap(),
);
let right = Arc::new(
MemoryExec::try_new(&[self.input2.clone()], schema2.clone(), None).unwrap(),
);
let left =
Arc::new(MemoryExec::try_new(&[self.input1.clone()], schema1, None).unwrap());
let right =
Arc::new(MemoryExec::try_new(&[self.input2.clone()], schema2, None).unwrap());
(left, right)
}

Expand Down
Loading