From 082e35e76912159fd37d87d687fb8600c69efda6 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 12 Jun 2021 15:19:56 +0800 Subject: [PATCH 1/3] turn on clippy rule for needless borrow --- .../core/src/execution_plans/query_stage.rs | 2 +- .../core/src/serde/logical_plan/to_proto.rs | 2 +- ballista/rust/executor/src/flight_service.rs | 2 +- ballista/rust/scheduler/src/state/mod.rs | 2 +- benchmarks/src/bin/tpch.rs | 16 ++--- datafusion/benches/aggregate_query_sql.rs | 2 +- datafusion/benches/filter_query_sql.rs | 2 +- datafusion/benches/math_query_sql.rs | 2 +- datafusion/benches/sort_limit_query_sql.rs | 2 +- datafusion/src/datasource/csv.rs | 2 +- datafusion/src/datasource/json.rs | 2 +- datafusion/src/execution/context.rs | 8 +-- datafusion/src/execution/dataframe_impl.rs | 2 +- datafusion/src/lib.rs | 1 + datafusion/src/logical_plan/dfschema.rs | 4 +- datafusion/src/logical_plan/plan.rs | 32 ++++----- datafusion/src/optimizer/filter_push_down.rs | 20 +++--- .../src/optimizer/projection_push_down.rs | 14 ++-- .../src/optimizer/simplify_expressions.rs | 4 +- datafusion/src/physical_optimizer/pruning.rs | 12 ++-- datafusion/src/physical_plan/aggregates.rs | 2 +- .../src/physical_plan/expressions/case.rs | 2 +- .../physical_plan/expressions/row_number.rs | 2 +- datafusion/src/physical_plan/filter.rs | 2 +- datafusion/src/physical_plan/functions.rs | 6 +- .../src/physical_plan/hash_aggregate.rs | 8 +-- datafusion/src/physical_plan/hash_join.rs | 16 ++--- datafusion/src/physical_plan/planner.rs | 22 +++---- datafusion/src/physical_plan/projection.rs | 2 +- datafusion/src/physical_plan/repartition.rs | 2 +- .../physical_plan/sort_preserving_merge.rs | 2 +- .../src/physical_plan/string_expressions.rs | 6 +- datafusion/src/physical_plan/type_coercion.rs | 6 +- datafusion/src/physical_plan/windows.rs | 6 +- datafusion/src/sql/planner.rs | 66 +++++++++---------- datafusion/src/sql/utils.rs | 4 +- datafusion/tests/sql.rs | 28 ++++---- 37 files changed, 158 insertions(+), 157 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs b/ballista/rust/core/src/execution_plans/query_stage.rs index 233dee5b9b52..264c44dc43dc 100644 --- a/ballista/rust/core/src/execution_plans/query_stage.rs +++ b/ballista/rust/core/src/execution_plans/query_stage.rs @@ -139,7 +139,7 @@ impl ExecutionPlan for QueryStageExec { info!("Writing results to {}", path); // stream results to disk - let stats = utils::write_stream_to_disk(&mut stream, &path) + let stats = utils::write_stream_to_disk(&mut stream, path) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 5d996843d624..3a509dcbf263 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1034,7 +1034,7 @@ impl TryInto for &Expr { .collect::, _>>()?; let window_frame = window_frame.map(|window_frame| { protobuf::window_expr_node::WindowFrame::Frame( - window_frame.clone().into(), + window_frame.into(), ) }); let window_expr = Box::new(protobuf::WindowExprNode { diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index d4eb1229c294..99424b6e8db4 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -279,7 +279,7 @@ fn create_flight_iter( options: &IpcWriteOptions, ) -> Box>> { let (flight_dictionaries, flight_batch) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + arrow_flight::utils::flight_data_from_arrow_batch(batch, options); Box::new( flight_dictionaries .into_iter() diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index a15efd618ff1..1c184f3a945b 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -223,7 +223,7 @@ impl SchedulerState { .collect(); let executors = self.get_executors_metadata().await?; 'tasks: for (_key, value) in kvs.iter() { - let mut status: TaskStatus = decode_protobuf(&value)?; + let mut status: TaskStatus = decode_protobuf(value)?; if status.status.is_none() { let partition = status.partition_id.as_ref().unwrap(); let plan = self diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 9ac66e136dbd..34b8d3a27b19 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -350,7 +350,7 @@ async fn execute_query( if debug { println!("Logical plan:\n{:?}", plan); } - let plan = ctx.optimize(&plan)?; + let plan = ctx.optimize(plan)?; if debug { println!("Optimized logical plan:\n{:?}", plan); } @@ -921,9 +921,9 @@ mod tests { .iter() .map(|field| { Field::new( - Field::name(&field), + Field::name(field), DataType::Utf8, - Field::is_nullable(&field), + Field::is_nullable(field), ) }) .collect::>(), @@ -939,8 +939,8 @@ mod tests { .iter() .map(|field| { Field::new( - Field::name(&field), - Field::data_type(&field).to_owned(), + Field::name(field), + Field::data_type(field).to_owned(), true, ) }) @@ -990,10 +990,10 @@ mod tests { .map(|field| { Expr::Alias( Box::new(Cast { - expr: Box::new(trim(col(Field::name(&field)))), - data_type: Field::data_type(&field).to_owned(), + expr: Box::new(trim(col(Field::name(field)))), + data_type: Field::data_type(field).to_owned(), }), - Field::name(&field).to_string(), + Field::name(field).to_string(), ) }) .collect::>(), diff --git a/datafusion/benches/aggregate_query_sql.rs b/datafusion/benches/aggregate_query_sql.rs index 8f1a97e198d3..74798ae572cd 100644 --- a/datafusion/benches/aggregate_query_sql.rs +++ b/datafusion/benches/aggregate_query_sql.rs @@ -47,7 +47,7 @@ fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); // execute the query - let df = ctx.lock().unwrap().sql(&sql).unwrap(); + let df = ctx.lock().unwrap().sql(sql).unwrap(); rt.block_on(df.collect()).unwrap(); } diff --git a/datafusion/benches/filter_query_sql.rs b/datafusion/benches/filter_query_sql.rs index 8600bdc88c6a..253ef455f5af 100644 --- a/datafusion/benches/filter_query_sql.rs +++ b/datafusion/benches/filter_query_sql.rs @@ -28,7 +28,7 @@ use std::sync::Arc; async fn query(ctx: &mut ExecutionContext, sql: &str) { // execute the query - let df = ctx.sql(&sql).unwrap(); + let df = ctx.sql(sql).unwrap(); let results = df.collect().await.unwrap(); // display the relation diff --git a/datafusion/benches/math_query_sql.rs b/datafusion/benches/math_query_sql.rs index 1aaa2d3403cf..51e52e8acddb 100644 --- a/datafusion/benches/math_query_sql.rs +++ b/datafusion/benches/math_query_sql.rs @@ -40,7 +40,7 @@ fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); // execute the query - let df = ctx.lock().unwrap().sql(&sql).unwrap(); + let df = ctx.lock().unwrap().sql(sql).unwrap(); rt.block_on(df.collect()).unwrap(); } diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs index 1e8339ea31eb..5a875d3d8799 100644 --- a/datafusion/benches/sort_limit_query_sql.rs +++ b/datafusion/benches/sort_limit_query_sql.rs @@ -35,7 +35,7 @@ fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); // execute the query - let df = ctx.lock().unwrap().sql(&sql).unwrap(); + let df = ctx.lock().unwrap().sql(sql).unwrap(); rt.block_on(df.collect()).unwrap(); } diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs index e1a61595f2ee..906a1ce415f6 100644 --- a/datafusion/src/datasource/csv.rs +++ b/datafusion/src/datasource/csv.rs @@ -204,7 +204,7 @@ impl TableProvider for CsvFile { } } Source::Path(p) => { - CsvExec::try_new(&p, opts, projection.clone(), batch_size, limit)? + CsvExec::try_new(p, opts, projection.clone(), batch_size, limit)? } }; Ok(Arc::new(exec)) diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs index f916f6c1e382..90fedfd6f528 100644 --- a/datafusion/src/datasource/json.rs +++ b/datafusion/src/datasource/json.rs @@ -149,7 +149,7 @@ impl TableProvider for NdJsonFile { } } Source::Path(p) => { - NdJsonExec::try_new(&p, opts, projection.clone(), batch_size, limit)? + NdJsonExec::try_new(p, opts, projection.clone(), batch_size, limit)? } }; Ok(Arc::new(exec)) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 950ba2b88691..f09d7f4f90c9 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -275,7 +275,7 @@ impl ExecutionContext { ) -> Result> { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), - &LogicalPlanBuilder::scan_csv(&filename, options, None)?.build()?, + &LogicalPlanBuilder::scan_csv(filename, options, None)?.build()?, ))) } @@ -284,7 +284,7 @@ impl ExecutionContext { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), &LogicalPlanBuilder::scan_parquet( - &filename, + filename, None, self.state.lock().unwrap().config.concurrency, )? @@ -328,7 +328,7 @@ impl ExecutionContext { /// executed against this context. pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> { let table = ParquetTable::try_new( - &filename, + filename, self.state.lock().unwrap().config.concurrency, )?; self.register_table(name, Arc::new(table))?; @@ -3205,7 +3205,7 @@ mod tests { .expect("Executing CREATE EXTERNAL TABLE"); let sql = "SELECT * from csv_with_timestamps"; - let result = plan_and_collect(&mut ctx, &sql).await.unwrap(); + let result = plan_and_collect(&mut ctx, sql).await.unwrap(); let expected = vec![ "+--------+-------------------------+", "| name | ts |", diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 19f71eb79268..a674e3cdb0f1 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -373,7 +373,7 @@ mod tests { ctx.register_csv( "aggregate_test_100", &format!("{}/csv/aggregate_test_100.csv", testdata), - CsvReadOptions::new().schema(&schema.as_ref()), + CsvReadOptions::new().schema(schema.as_ref()), )?; Ok(()) } diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index e4501a78ada4..cb4c5eef14f0 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -24,6 +24,7 @@ clippy::type_complexity, clippy::upper_case_acronyms )] +#![deny(clippy::needless_borrow)] //! [DataFusion](https://github.com/apache/arrow-datafusion) //! is an extensible query execution framework that uses diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs index 9adb22b43d07..5a9167e58b05 100644 --- a/datafusion/src/logical_plan/dfschema.rs +++ b/datafusion/src/logical_plan/dfschema.rs @@ -325,12 +325,12 @@ impl DFField { /// Returns an immutable reference to the `DFField`'s unqualified name pub fn name(&self) -> &String { - &self.field.name() + self.field.name() } /// Returns an immutable reference to the `DFField`'s data-type pub fn data_type(&self) -> &DataType { - &self.field.data_type() + self.field.data_type() } /// Indicates whether this `DFField` supports null values diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 3344dce1d81d..a80bc54b4a2f 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -221,23 +221,23 @@ impl LogicalPlan { /// Get a reference to the logical plan's schema pub fn schema(&self) -> &DFSchemaRef { match self { - LogicalPlan::EmptyRelation { schema, .. } => &schema, + LogicalPlan::EmptyRelation { schema, .. } => schema, LogicalPlan::TableScan { projected_schema, .. - } => &projected_schema, - LogicalPlan::Projection { schema, .. } => &schema, + } => projected_schema, + LogicalPlan::Projection { schema, .. } => schema, LogicalPlan::Filter { input, .. } => input.schema(), - LogicalPlan::Window { schema, .. } => &schema, - LogicalPlan::Aggregate { schema, .. } => &schema, + LogicalPlan::Window { schema, .. } => schema, + LogicalPlan::Aggregate { schema, .. } => schema, LogicalPlan::Sort { input, .. } => input.schema(), - LogicalPlan::Join { schema, .. } => &schema, - LogicalPlan::CrossJoin { schema, .. } => &schema, + LogicalPlan::Join { schema, .. } => schema, + LogicalPlan::CrossJoin { schema, .. } => schema, LogicalPlan::Repartition { input, .. } => input.schema(), LogicalPlan::Limit { input, .. } => input.schema(), - LogicalPlan::CreateExternalTable { schema, .. } => &schema, - LogicalPlan::Explain { schema, .. } => &schema, - LogicalPlan::Extension { node } => &node.schema(), - LogicalPlan::Union { schema, .. } => &schema, + LogicalPlan::CreateExternalTable { schema, .. } => schema, + LogicalPlan::Explain { schema, .. } => schema, + LogicalPlan::Extension { node } => node.schema(), + LogicalPlan::Union { schema, .. } => schema, } } @@ -246,12 +246,12 @@ impl LogicalPlan { match self { LogicalPlan::TableScan { projected_schema, .. - } => vec![&projected_schema], + } => vec![projected_schema], LogicalPlan::Window { input, schema, .. } | LogicalPlan::Aggregate { input, schema, .. } | LogicalPlan::Projection { input, schema, .. } => { let mut schemas = input.all_schemas(); - schemas.insert(0, &schema); + schemas.insert(0, schema); schemas } LogicalPlan::Join { @@ -267,16 +267,16 @@ impl LogicalPlan { } => { let mut schemas = left.all_schemas(); schemas.extend(right.all_schemas()); - schemas.insert(0, &schema); + schemas.insert(0, schema); schemas } LogicalPlan::Union { schema, .. } => { vec![schema] } - LogicalPlan::Extension { node } => vec![&node.schema()], + LogicalPlan::Extension { node } => vec![node.schema()], LogicalPlan::Explain { schema, .. } | LogicalPlan::EmptyRelation { schema, .. } - | LogicalPlan::CreateExternalTable { schema, .. } => vec![&schema], + | LogicalPlan::CreateExternalTable { schema, .. } => vec![schema], LogicalPlan::Limit { input, .. } | LogicalPlan::Repartition { input, .. } | LogicalPlan::Sort { input, .. } diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 4b1ae76927b4..85d1f812f41a 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -137,7 +137,7 @@ fn get_join_predicates<'a>( let all_in_right = right.len() == columns.len(); !all_in_left && !all_in_right }) - .map(|((ref a, ref b), _)| (a, b)) + .map(|((a, b), _)| (a, b)) .unzip(); (pushable_to_left, pushable_to_right, keep) } @@ -151,7 +151,7 @@ fn push_down(state: &State, plan: &LogicalPlan) -> Result { .collect::>>()?; let expr = plan.expressions(); - utils::from_plan(&plan, &expr, &new_inputs) + utils::from_plan(plan, &expr, &new_inputs) } /// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with @@ -225,8 +225,8 @@ fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { op: Operator::And, left, } => { - split_members(&left, predicates); - split_members(&right, predicates); + split_members(left, predicates); + split_members(right, predicates); } other => predicates.push(other), } @@ -297,7 +297,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // optimize inner let new_input = optimize(input, state)?; - utils::from_plan(&plan, &expr, &[new_input]) + utils::from_plan(plan, expr, &[new_input]) } LogicalPlan::Aggregate { input, aggr_expr, .. @@ -335,7 +335,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { LogicalPlan::Join { left, right, .. } | LogicalPlan::CrossJoin { left, right, .. } => { let (pushable_to_left, pushable_to_right, keep) = - get_join_predicates(&state, &left.schema(), &right.schema()); + get_join_predicates(&state, left.schema(), right.schema()); let mut left_state = state.clone(); left_state.filters = keep_filters(&left_state.filters, &pushable_to_left); @@ -347,7 +347,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // create a new Join with the new `left` and `right` let expr = plan.expressions(); - let plan = utils::from_plan(&plan, &expr, &[left, right])?; + let plan = utils::from_plan(plan, &expr, &[left, right])?; if keep.0.is_empty() { Ok(plan) @@ -437,11 +437,11 @@ impl FilterPushDown { /// replaces columns by its name on the projection. fn rewrite(expr: &Expr, projection: &HashMap) -> Result { - let expressions = utils::expr_sub_expressions(&expr)?; + let expressions = utils::expr_sub_expressions(expr)?; let expressions = expressions .iter() - .map(|e| rewrite(e, &projection)) + .map(|e| rewrite(e, projection)) .collect::>>()?; if let Expr::Column(name) = expr { @@ -450,7 +450,7 @@ fn rewrite(expr: &Expr, projection: &HashMap) -> Result { } } - utils::rewrite_expression(&expr, &expressions) + utils::rewrite_expression(expr, &expressions) } #[cfg(test)] diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index f0b364ab9852..ad795f5f5dd5 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -146,7 +146,7 @@ fn optimize_plan( let new_input = optimize_plan( optimizer, - &input, + input, &new_required_columns, true, execution_props, @@ -176,14 +176,14 @@ fn optimize_plan( Ok(LogicalPlan::Join { left: Arc::new(optimize_plan( optimizer, - &left, + left, &new_required_columns, true, execution_props, )?), right: Arc::new(optimize_plan( optimizer, - &right, + right, &new_required_columns, true, execution_props, @@ -204,7 +204,7 @@ fn optimize_plan( let mut new_window_expr = Vec::new(); { window_expr.iter().try_for_each(|expr| { - let name = &expr.name(&schema)?; + let name = &expr.name(schema)?; if required_columns.contains(name) { new_window_expr.push(expr.clone()); new_required_columns.insert(name.clone()); @@ -235,7 +235,7 @@ fn optimize_plan( window_expr: new_window_expr, input: Arc::new(optimize_plan( optimizer, - &input, + input, &new_required_columns, true, execution_props, @@ -259,7 +259,7 @@ fn optimize_plan( // Gather all columns needed for expressions in this Aggregate let mut new_aggr_expr = Vec::new(); aggr_expr.iter().try_for_each(|expr| { - let name = &expr.name(&schema)?; + let name = &expr.name(schema)?; if required_columns.contains(name) { new_aggr_expr.push(expr.clone()); @@ -286,7 +286,7 @@ fn optimize_plan( aggr_expr: new_aggr_expr, input: Arc::new(optimize_plan( optimizer, - &input, + input, &new_required_columns, true, execution_props, diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index 0697d689c401..9ad7a94d8bfe 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -248,7 +248,7 @@ fn simplify(expr: &Expr) -> Expr { }) .unwrap_or_else(|| expr.clone()), Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr { - left: Box::new(simplify(&left)), + left: Box::new(simplify(left)), op: *op, right: Box::new(simplify(right)), }, @@ -267,7 +267,7 @@ fn optimize(plan: &LogicalPlan) -> Result { .into_iter() .map(|x| simplify(&x)) .collect::>(); - utils::from_plan(&plan, &expr, &new_inputs) + utils::from_plan(plan, &expr, &new_inputs) } impl OptimizerRule for SimplifyExpressions { diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index c65733bd7526..8e0459b16073 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -420,7 +420,7 @@ impl<'a> PruningExpressionBuilder<'a> { fn min_column_expr(&mut self) -> Result { self.required_columns.min_column_expr( &self.column_name, - &self.column_expr, + self.column_expr, self.field, ) } @@ -428,7 +428,7 @@ impl<'a> PruningExpressionBuilder<'a> { fn max_column_expr(&mut self) -> Result { self.required_columns.max_column_expr( &self.column_name, - &self.column_expr, + self.column_expr, self.field, ) } @@ -440,7 +440,7 @@ fn rewrite_column_expr( column_old_name: &str, column_new_name: &str, ) -> Result { - let expressions = utils::expr_sub_expressions(&expr)?; + let expressions = utils::expr_sub_expressions(expr)?; let expressions = expressions .iter() .map(|e| rewrite_column_expr(e, column_old_name, column_new_name)) @@ -451,7 +451,7 @@ fn rewrite_column_expr( return Ok(Expr::Column(column_new_name.to_string())); } } - utils::rewrite_expression(&expr, &expressions) + utils::rewrite_expression(expr, &expressions) } /// Given a column reference to `column_name`, returns a pruning @@ -515,7 +515,7 @@ fn build_predicate_expression( let (left, op, right) = match expr { Expr::BinaryExpr { left, op, right } => (left, *op, right), Expr::Column(name) => { - let expr = build_single_column_expr(&name, schema, required_columns, false) + let expr = build_single_column_expr(name, schema, required_columns, false) .unwrap_or(unhandled); return Ok(expr); } @@ -523,7 +523,7 @@ fn build_predicate_expression( Expr::Not(input) => { if let Expr::Column(name) = input.as_ref() { let expr = - build_single_column_expr(&name, schema, required_columns, true) + build_single_column_expr(name, schema, required_columns, true) .unwrap_or(unhandled); return Ok(expr); } else { diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 60025a316228..897c78fd46ff 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -127,7 +127,7 @@ pub fn create_aggregate_expr( .map(|e| e.data_type(input_schema)) .collect::>>()?; - let return_type = return_type(&fun, &arg_types)?; + let return_type = return_type(fun, &arg_types)?; Ok(match (fun, distinct) { (AggregateFunction::Count, false) => { diff --git a/datafusion/src/physical_plan/expressions/case.rs b/datafusion/src/physical_plan/expressions/case.rs index 95ae5325af11..f89ea8d1e296 100644 --- a/datafusion/src/physical_plan/expressions/case.rs +++ b/datafusion/src/physical_plan/expressions/case.rs @@ -377,7 +377,7 @@ impl CaseExpr { let then_value = then_value.into_array(batch.num_rows()); current_value = Some(if_then_else( - &when_value, + when_value, then_value, current_value.unwrap(), &return_type, diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index f399995461f7..eaf9b21cbc64 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -49,7 +49,7 @@ impl BuiltInWindowFunctionExpr for RowNumber { fn field(&self) -> Result { let nullable = false; let data_type = DataType::UInt64; - Ok(Field::new(&self.name(), data_type, nullable)) + Ok(Field::new(self.name(), data_type, nullable)) } fn expressions(&self) -> Vec> { diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index bc2b17aa4f47..0a8c825aba1a 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -151,7 +151,7 @@ fn batch_filter( predicate: &Arc, ) -> ArrowResult { predicate - .evaluate(&batch) + .evaluate(batch) .map(|v| v.into_array(batch.num_rows())) .map_err(DataFusionError::into_arrow_external_error) .and_then(|array| { diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index eb312cabd7f0..49ca79a00496 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -344,7 +344,7 @@ pub fn return_type( // or the execution panics. // verify that this is a valid set of data types for this function - data_types(&arg_types, &signature(fun))?; + data_types(arg_types, &signature(fun))?; // the return type of the built in function. // Some built-in functions' return type depends on the incoming type. @@ -624,7 +624,7 @@ pub fn create_physical_expr( &format!("{}", fun), fun_expr, args, - &return_type(&fun, &arg_types)?, + &return_type(fun, &arg_types)?, ))); } BuiltinScalarFunction::InitCap => |args| match args[0].data_type() { @@ -953,7 +953,7 @@ pub fn create_physical_expr( &format!("{}", fun), fun_expr, args, - &return_type(&fun, &arg_types)?, + &return_type(fun, &arg_types)?, ))) } diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index ffb51b2e8a1f..453d500e98bd 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -120,8 +120,8 @@ fn create_schema( for (expr, name) in group_expr { fields.push(Field::new( name, - expr.data_type(&input_schema)?, - expr.nullable(&input_schema)?, + expr.data_type(input_schema)?, + expr.nullable(input_schema)?, )) } @@ -413,7 +413,7 @@ fn group_aggregate_batch( let mut offset_so_far = 0; for key in batch_keys.iter() { let (_, _, indices) = accumulators.get_mut(key).unwrap(); - batch_indices.append_slice(&indices)?; + batch_indices.append_slice(indices)?; offset_so_far += indices.len(); offsets.push(offset_so_far); } @@ -779,7 +779,7 @@ fn evaluate( batch: &RecordBatch, ) -> Result> { expr.iter() - .map(|expr| expr.evaluate(&batch)) + .map(|expr| expr.evaluate(batch)) .map(|r| r.map(|v| v.into_array(batch.num_rows()))) .collect::>>() } diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index d12e249cbe34..54648158a11f 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -133,13 +133,13 @@ impl HashJoinExec { ) -> Result { let left_schema = left.schema(); let right_schema = right.schema(); - check_join_is_valid(&left_schema, &right_schema, &on)?; + check_join_is_valid(&left_schema, &right_schema, on)?; let schema = Arc::new(build_join_schema( &left_schema, &right_schema, on, - &join_type, + join_type, )); let on = on @@ -289,7 +289,7 @@ impl ExecutionPlan for HashJoinExec { hashes_buffer.resize(batch.num_rows(), 0); update_hash( &on_left, - &batch, + batch, &mut hashmap, offset, &self.random_state, @@ -342,7 +342,7 @@ impl ExecutionPlan for HashJoinExec { hashes_buffer.resize(batch.num_rows(), 0); update_hash( &on_left, - &batch, + batch, &mut hashmap, offset, &self.random_state, @@ -436,7 +436,7 @@ fn update_hash( .collect::>>()?; // calculate the hash values - let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?; + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { @@ -539,8 +539,8 @@ fn build_batch( random_state: &RandomState, ) -> ArrowResult<(RecordBatch, UInt64Array)> { let (left_indices, right_indices) = build_join_indexes( - &left_data, - &batch, + left_data, + batch, join_type, on_left, on_right, @@ -613,7 +613,7 @@ fn build_join_indexes( }) .collect::>>()?; let hashes_buffer = &mut vec![0; keys_values[0].len()]; - let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?; + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; let left = &left_data.0; match join_type { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index d42948a8666c..e58749ffc10f 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -155,7 +155,7 @@ impl DefaultPhysicalPlanner { .map(|e| { self.create_window_expr( e, - &logical_input_schema, + logical_input_schema, &physical_input_schema, ctx_state, ) @@ -189,7 +189,7 @@ impl DefaultPhysicalPlanner { &physical_input_schema, ctx_state, ), - e.name(&logical_input_schema), + e.name(logical_input_schema), )) }) .collect::>>()?; @@ -198,7 +198,7 @@ impl DefaultPhysicalPlanner { .map(|e| { self.create_aggregate_expr( e, - &logical_input_schema, + logical_input_schema, &physical_input_schema, ctx_state, ) @@ -275,9 +275,9 @@ impl DefaultPhysicalPlanner { self.create_physical_expr( e, &input_exec.schema(), - &ctx_state, + ctx_state, ), - e.name(&input_schema), + e.name(input_schema), )) }) .collect::>>()?; @@ -313,7 +313,7 @@ impl DefaultPhysicalPlanner { let runtime_expr = expr .iter() .map(|e| { - self.create_physical_expr(e, &input_schema, &ctx_state) + self.create_physical_expr(e, &input_schema, ctx_state) }) .collect::>>()?; Partitioning::Hash(runtime_expr, *n) @@ -384,7 +384,7 @@ impl DefaultPhysicalPlanner { right, Partitioning::Hash(right_expr, ctx_state.config.concurrency), )?), - &keys, + keys, &physical_join_type, PartitionMode::Partitioned, )?)) @@ -392,7 +392,7 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(HashJoinExec::try_new( left, right, - &keys, + keys, &physical_join_type, PartitionMode::CollectLeft, )?)) @@ -510,7 +510,7 @@ impl DefaultPhysicalPlanner { } Expr::Column(name) => { // check that name exists - input_schema.field_with_name(&name)?; + input_schema.field_with_name(name)?; Ok(Arc::new(Column::new(name))) } Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), @@ -768,12 +768,12 @@ impl DefaultPhysicalPlanner { nulls_first, } => self.create_physical_sort_expr( expr, - &physical_input_schema, + physical_input_schema, SortOptions { descending: !*asc, nulls_first: *nulls_first, }, - &ctx_state, + ctx_state, ), _ => Err(DataFusionError::Plan( "Sort only accepts sort expressions".to_string(), diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index c0d78ff7168b..d4c0459c211b 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -166,7 +166,7 @@ fn batch_project( ) -> ArrowResult { expressions .iter() - .map(|expr| expr.evaluate(&batch)) + .map(|expr| expr.evaluate(batch)) .map(|r| r.map(|v| v.into_array(batch.num_rows()))) .collect::>>() .map_or_else( diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 37d98c7d118b..5d1f8d7760cf 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -479,7 +479,7 @@ mod tests { partitions, Partitioning::Hash( vec![Arc::new(crate::physical_plan::expressions::Column::new( - &"c0", + "c0", ))], 8, ), diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 283294a43ec7..c39acc474d31 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -376,7 +376,7 @@ impl SortPreservingMergeStream { match min_cursor { None => min_cursor = Some((idx, candidate)), - Some((_, ref min)) => { + Some((_, min)) => { if min.compare(candidate, &self.sort_options)? == Ordering::Greater { diff --git a/datafusion/src/physical_plan/string_expressions.rs b/datafusion/src/physical_plan/string_expressions.rs index 882fe30502fd..09e19c4dfa47 100644 --- a/datafusion/src/physical_plan/string_expressions.rs +++ b/datafusion/src/physical_plan/string_expressions.rs @@ -299,7 +299,7 @@ pub fn concat(args: &[ColumnarValue]) -> Result { ColumnarValue::Array(v) => { if v.is_valid(index) { let v = v.as_any().downcast_ref::().unwrap(); - owned_string.push_str(&v.value(index)); + owned_string.push_str(v.value(index)); } } _ => unreachable!(), @@ -353,10 +353,10 @@ pub fn concat_ws(args: &[ArrayRef]) -> Result { for arg_index in 1..args.len() { let arg = &args[arg_index]; if !arg.is_null(index) { - owned_string.push_str(&arg.value(index)); + owned_string.push_str(arg.value(index)); // if not last push separator if arg_index != args.len() - 1 { - owned_string.push_str(&sep); + owned_string.push_str(sep); } } } diff --git a/datafusion/src/physical_plan/type_coercion.rs b/datafusion/src/physical_plan/type_coercion.rs index 06d3739b53b2..fe87ecda872c 100644 --- a/datafusion/src/physical_plan/type_coercion.rs +++ b/datafusion/src/physical_plan/type_coercion.rs @@ -60,7 +60,7 @@ pub fn coerce( expressions .iter() .enumerate() - .map(|(i, expr)| try_cast(expr.clone(), &schema, new_types[i].clone())) + .map(|(i, expr)| try_cast(expr.clone(), schema, new_types[i].clone())) .collect::>>() } @@ -85,7 +85,7 @@ pub fn data_types( } for valid_types in valid_types { - if let Some(types) = maybe_data_types(&valid_types, ¤t_types) { + if let Some(types) = maybe_data_types(&valid_types, current_types) { return Ok(types); } } @@ -157,7 +157,7 @@ fn maybe_data_types( new_type.push(current_type.clone()) } else { // attempt to coerce - if can_coerce_from(valid_type, ¤t_type) { + if can_coerce_from(valid_type, current_type) { new_type.push(valid_type.clone()) } else { // not possible diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 565a9eef2857..f95dd446844d 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -145,7 +145,7 @@ impl WindowExpr for BuiltInWindowExpr { } fn name(&self) -> &str { - &self.window.name() + self.window.name() } fn field(&self) -> Result { @@ -191,7 +191,7 @@ impl WindowExpr for AggregateWindowExpr { } fn name(&self) -> &str { - &self.aggregate.name() + self.aggregate.name() } fn field(&self) -> Result { @@ -351,7 +351,7 @@ fn window_aggregate_batch( .map(|(window_acc, expr)| { let values = &expr .iter() - .map(|e| e.evaluate(&batch)) + .map(|e| e.evaluate(batch)) .map(|r| r.map(|v| v.into_array(batch.num_rows()))) .collect::>>()?; window_acc.scan_batch(batch.num_rows(), values) diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 860d21714ec6..7e7462ef390e 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -86,8 +86,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Generate a logical plan from an DataFusion SQL statement pub fn statement_to_plan(&self, statement: &DFStatement) -> Result { match statement { - DFStatement::CreateExternalTable(s) => self.external_table_to_plan(&s), - DFStatement::Statement(s) => self.sql_statement_to_plan(&s), + DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s), + DFStatement::Statement(s) => self.sql_statement_to_plan(s), } } @@ -98,9 +98,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { verbose, statement, analyze: _, - } => self.explain_statement_to_plan(*verbose, &statement), - Statement::Query(query) => self.query_to_plan(&query), - Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable), + } => self.explain_statement_to_plan(*verbose, statement), + Statement::Query(query) => self.query_to_plan(query), + Statement::ShowVariable { variable } => self.show_variable_to_plan(variable), Statement::ShowColumns { extended, full, @@ -232,7 +232,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { FileType::NdJson => {} }; - let schema = self.build_schema(&columns)?; + let schema = self.build_schema(columns)?; Ok(LogicalPlan::CreateExternalTable { schema: schema.to_dfschema_ref()?, @@ -250,7 +250,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { verbose: bool, statement: &Statement, ) -> Result { - let plan = self.sql_statement_to_plan(&statement)?; + let plan = self.sql_statement_to_plan(statement)?; let stringified_plans = vec![StringifiedPlan::new( PlanType::LogicalPlan, @@ -370,7 +370,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { left: &LogicalPlan, right: &LogicalPlan, ) -> Result { - LogicalPlanBuilder::from(&left).cross_join(&right)?.build() + LogicalPlanBuilder::from(left).cross_join(right)?.build() } fn parse_join( @@ -383,7 +383,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { match constraint { JoinConstraint::On(sql_expr) => { let mut keys: Vec<(String, String)> = vec![]; - let join_schema = left.schema().join(&right.schema())?; + let join_schema = left.schema().join(right.schema())?; // parse ON expression let expr = self.sql_to_rex(sql_expr, &join_schema)?; @@ -396,14 +396,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { keys.iter().map(|pair| pair.1.as_str()).collect(); // return the logical plan representing the join - LogicalPlanBuilder::from(&left) - .join(&right, join_type, &left_keys, &right_keys)? + LogicalPlanBuilder::from(left) + .join(right, join_type, &left_keys, &right_keys)? .build() } JoinConstraint::Using(idents) => { let keys: Vec<&str> = idents.iter().map(|x| x.value.as_str()).collect(); - LogicalPlanBuilder::from(&left) - .join(&right, join_type, &keys, &keys)? + LogicalPlanBuilder::from(left) + .join(right, join_type, &keys, &keys)? .build() } JoinConstraint::Natural => { @@ -472,7 +472,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // build join schema let mut fields = vec![]; for plan in &plans { - fields.extend_from_slice(&plan.schema().fields()); + fields.extend_from_slice(plan.schema().fields()); } let join_schema = DFSchema::new(fields)?; @@ -673,16 +673,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Ok(projection .iter() - .map(|expr| self.sql_select_to_rex(&expr, &input_schema)) + .map(|expr| self.sql_select_to_rex(expr, input_schema)) .collect::>>()? .iter() - .flat_map(|expr| expand_wildcard(&expr, &input_schema)) + .flat_map(|expr| expand_wildcard(expr, input_schema)) .collect::>()) } /// Wrap a plan in a projection fn project(&self, input: &LogicalPlan, expr: Vec) -> Result { - self.validate_schema_satisfies_exprs(&input.schema(), &expr)?; + self.validate_schema_satisfies_exprs(input.schema(), &expr)?; LogicalPlanBuilder::from(input).project(expr)?.build() } @@ -733,7 +733,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .cloned() .collect::>(); - let plan = LogicalPlanBuilder::from(&input) + let plan = LogicalPlanBuilder::from(input) .aggregate(group_by_exprs, aggr_exprs)? .build()?; @@ -784,14 +784,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn limit(&self, input: &LogicalPlan, limit: &Option) -> Result { match *limit { Some(ref limit_expr) => { - let n = match self.sql_to_rex(&limit_expr, &input.schema())? { + let n = match self.sql_to_rex(limit_expr, input.schema())? { Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as usize), _ => Err(DataFusionError::Plan( "Unexpected expression for LIMIT clause".to_string(), )), }?; - LogicalPlanBuilder::from(&input).limit(n)?.build() + LogicalPlanBuilder::from(input).limit(n)?.build() } _ => Ok(input.clone()), } @@ -812,7 +812,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|e| self.order_by_to_sort_expr(e)) .collect::>>()?; - LogicalPlanBuilder::from(&plan).sort(order_by_rex)?.build() + LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build() } /// convert sql OrderByExpr to Expr::Sort @@ -836,7 +836,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .iter() .try_for_each(|col| match col { Expr::Column(name) => { - schema.field_with_unqualified_name(&name).map_err(|_| { + schema.field_with_unqualified_name(name).map_err(|_| { DataFusionError::Plan(format!( "Invalid identifier '{}' for schema {}", name, @@ -854,7 +854,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { match sql { SelectItem::UnnamedExpr(expr) => self.sql_to_rex(expr, schema), SelectItem::ExprWithAlias { expr, alias } => Ok(Alias( - Box::new(self.sql_to_rex(&expr, schema)?), + Box::new(self.sql_to_rex(expr, schema)?), alias.value.clone(), )), SelectItem::Wildcard => Ok(Expr::Wildcard), @@ -977,7 +977,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ref expr, ref data_type, } => Ok(Expr::Cast { - expr: Box::new(self.sql_expr_to_logical_expr(&expr)?), + expr: Box::new(self.sql_expr_to_logical_expr(expr)?), data_type: convert_data_type(data_type)?, }), @@ -985,7 +985,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ref expr, ref data_type, } => Ok(Expr::TryCast { - expr: Box::new(self.sql_expr_to_logical_expr(&expr)?), + expr: Box::new(self.sql_expr_to_logical_expr(expr)?), data_type: convert_data_type(data_type)?, }), @@ -1040,10 +1040,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ref low, ref high, } => Ok(Expr::Between { - expr: Box::new(self.sql_expr_to_logical_expr(&expr)?), + expr: Box::new(self.sql_expr_to_logical_expr(expr)?), negated: *negated, - low: Box::new(self.sql_expr_to_logical_expr(&low)?), - high: Box::new(self.sql_expr_to_logical_expr(&high)?), + low: Box::new(self.sql_expr_to_logical_expr(low)?), + high: Box::new(self.sql_expr_to_logical_expr(high)?), }), SQLExpr::InList { @@ -1057,7 +1057,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>()?; Ok(Expr::InList { - expr: Box::new(self.sql_expr_to_logical_expr(&expr)?), + expr: Box::new(self.sql_expr_to_logical_expr(expr)?), list: list_expr, negated: *negated, }) @@ -1091,9 +1091,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }?; Ok(Expr::BinaryExpr { - left: Box::new(self.sql_expr_to_logical_expr(&left)?), + left: Box::new(self.sql_expr_to_logical_expr(left)?), op: operator, - right: Box::new(self.sql_expr_to_logical_expr(&right)?), + right: Box::new(self.sql_expr_to_logical_expr(right)?), }) } @@ -1209,7 +1209,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } - SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(&e), + SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(e), _ => Err(DataFusionError::NotImplemented(format!( "Unsupported ast node {:?} in sqltorel", @@ -3167,7 +3167,7 @@ mod tests { fn logical_plan(sql: &str) -> Result { let planner = SqlToRel::new(&MockContextProvider {}); - let result = DFParser::parse_sql(&sql); + let result = DFParser::parse_sql(sql); let ast = result.unwrap(); planner.statement_to_plan(&ast[0]) } diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index 5e9b9526ea83..82431c2314ab 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -146,7 +146,7 @@ where pub(crate) fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { match expr { Expr::Column(_) => Ok(expr.clone()), - _ => Ok(Expr::Column(expr.name(&plan.schema())?)), + _ => Ok(Expr::Column(expr.name(plan.schema())?)), } } @@ -448,7 +448,7 @@ fn generate_sort_key(partition_by: &[Expr], order_by: &[Expr]) -> WindowSortKey } }); order_by.iter().for_each(|e| { - if !sort_key.contains(&e) { + if !sort_key.contains(e) { sort_key.push(e.clone()); } }); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 5ce1884049d8..d9d77648c742 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -130,7 +130,7 @@ async fn parquet_single_nan_schema() { ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).unwrap(); let results = collect(plan).await.unwrap(); @@ -165,7 +165,7 @@ async fn parquet_list_columns() { ])); let sql = "SELECT int64_list, utf8_list FROM list_columns"; - let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).unwrap(); let results = collect(plan).await.unwrap(); @@ -647,7 +647,7 @@ async fn csv_query_error() -> Result<()> { let mut ctx = create_ctx()?; register_aggregate_csv(&mut ctx)?; let sql = "SELECT sin(c1) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(&sql); + let plan = ctx.create_logical_plan(sql); assert!(plan.is_err()); Ok(()) } @@ -748,7 +748,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_csv(&mut ctx)?; let sql = "SELECT avg(c12) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).unwrap(); let results = collect(plan).await.unwrap(); @@ -1615,7 +1615,7 @@ async fn csv_explain_plans() { // Logical plan // Create plan let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(&sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).expect(&msg); let logical_schema = plan.schema(); // println!("SQL: {}", sql); @@ -1812,7 +1812,7 @@ async fn csv_explain_verbose_plans() { // Logical plan // Create plan let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(&sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).expect(&msg); let logical_schema = plan.schema(); // println!("SQL: {}", sql); @@ -2088,7 +2088,7 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) { /// `result[row][column]` async fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec> { let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(&sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).expect(&msg); let logical_schema = plan.schema(); let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); @@ -2561,7 +2561,7 @@ async fn query_cte_incorrect() -> Result<()> { // self reference let sql = "WITH t AS (SELECT * FROM t) SELECT * from u"; - let plan = ctx.create_logical_plan(&sql); + let plan = ctx.create_logical_plan(sql); assert!(plan.is_err()); assert_eq!( format!("{}", plan.unwrap_err()), @@ -2570,7 +2570,7 @@ async fn query_cte_incorrect() -> Result<()> { // forward referencing let sql = "WITH t AS (SELECT * FROM u), u AS (SELECT 1) SELECT * from u"; - let plan = ctx.create_logical_plan(&sql); + let plan = ctx.create_logical_plan(sql); assert!(plan.is_err()); assert_eq!( format!("{}", plan.unwrap_err()), @@ -2579,7 +2579,7 @@ async fn query_cte_incorrect() -> Result<()> { // wrapping should hide u let sql = "WITH t AS (WITH u as (SELECT 1) SELECT 1) SELECT * from u"; - let plan = ctx.create_logical_plan(&sql); + let plan = ctx.create_logical_plan(sql); assert!(plan.is_err()); assert_eq!( format!("{}", plan.unwrap_err()), @@ -3326,7 +3326,7 @@ async fn test_cast_expressions_error() -> Result<()> { let mut ctx = create_ctx()?; register_aggregate_csv(&mut ctx)?; let sql = "SELECT CAST(c1 AS INT) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).unwrap(); let result = collect(plan).await; @@ -3355,7 +3355,7 @@ async fn test_physical_plan_display_indent() { GROUP BY c1 \ ORDER BY the_min DESC \ LIMIT 10"; - let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).unwrap(); @@ -3403,7 +3403,7 @@ async fn test_physical_plan_display_indent_multi_children() { ON c1=c2\ "; - let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).unwrap(); @@ -3443,7 +3443,7 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> { let mut ctx = ExecutionContext::new(); register_aggregate_csv(&mut ctx)?; let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100"; - let logical_plan = ctx.create_logical_plan(&sql)?; + let logical_plan = ctx.create_logical_plan(sql)?; let physical_plan = ctx.create_physical_plan(&logical_plan); let err = physical_plan.unwrap_err(); assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'"); From 435cc79f4dfc24d230bb8676c31cbcd7201a2b95 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 12 Jun 2021 15:22:06 +0800 Subject: [PATCH 2/3] do a format round --- .../rust/core/src/serde/logical_plan/to_proto.rs | 4 +--- datafusion/src/physical_optimizer/pruning.rs | 5 ++--- datafusion/src/physical_plan/hash_join.rs | 12 +++--------- datafusion/src/physical_plan/planner.rs | 6 +----- 4 files changed, 7 insertions(+), 20 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 3a509dcbf263..c454d03257f0 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1033,9 +1033,7 @@ impl TryInto for &Expr { .map(|e| e.try_into()) .collect::, _>>()?; let window_frame = window_frame.map(|window_frame| { - protobuf::window_expr_node::WindowFrame::Frame( - window_frame.into(), - ) + protobuf::window_expr_node::WindowFrame::Frame(window_frame.into()) }); let window_expr = Box::new(protobuf::WindowExprNode { expr: Some(Box::new(arg.try_into()?)), diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 8e0459b16073..da82d53871a8 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -522,9 +522,8 @@ fn build_predicate_expression( // match !col (don't do so recursively) Expr::Not(input) => { if let Expr::Column(name) = input.as_ref() { - let expr = - build_single_column_expr(name, schema, required_columns, true) - .unwrap_or(unhandled); + let expr = build_single_column_expr(name, schema, required_columns, true) + .unwrap_or(unhandled); return Ok(expr); } else { return Ok(unhandled); diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 54648158a11f..1b0322b521a5 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -538,15 +538,9 @@ fn build_batch( column_indices: &[ColumnIndex], random_state: &RandomState, ) -> ArrowResult<(RecordBatch, UInt64Array)> { - let (left_indices, right_indices) = build_join_indexes( - left_data, - batch, - join_type, - on_left, - on_right, - random_state, - ) - .unwrap(); + let (left_indices, right_indices) = + build_join_indexes(left_data, batch, join_type, on_left, on_right, random_state) + .unwrap(); if matches!(join_type, JoinType::Semi | JoinType::Anti) { return Ok(( diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index e58749ffc10f..409fc93abf40 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -272,11 +272,7 @@ impl DefaultPhysicalPlanner { .iter() .map(|e| { tuple_err(( - self.create_physical_expr( - e, - &input_exec.schema(), - ctx_state, - ), + self.create_physical_expr(e, &input_exec.schema(), ctx_state), e.name(input_schema), )) }) From f14084a66ef3d378a9b4c3b32ead68a64f21a984 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 12 Jun 2021 15:24:15 +0800 Subject: [PATCH 3/3] use warn not deny --- datafusion/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index cb4c5eef14f0..64cc0a1349a2 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#![warn(missing_docs)] +#![warn(missing_docs, clippy::needless_borrow)] // Clippy lints, some should be disabled incrementally #![allow( clippy::float_cmp, @@ -24,7 +24,6 @@ clippy::type_complexity, clippy::upper_case_acronyms )] -#![deny(clippy::needless_borrow)] //! [DataFusion](https://github.com/apache/arrow-datafusion) //! is an extensible query execution framework that uses