From bde3c91f820a51034de27b0597caeea3ef0cdd9e Mon Sep 17 00:00:00 2001 From: askoa <112126368+askoa@users.noreply.github.com> Date: Thu, 1 Dec 2022 09:30:37 -0500 Subject: [PATCH] Replace `&Option` with `Option<&T>` (#4446) * Replace &Option with Option<&T> * empty commit to start build * Change &Option to Option<&Vec> Co-authored-by: askoa --- .../examples/custom_datasource.rs | 8 ++++---- datafusion/common/src/scalar.rs | 12 +++++++++--- datafusion/core/src/dataframe.rs | 3 +-- datafusion/core/src/datasource/datasource.rs | 2 +- datafusion/core/src/datasource/empty.rs | 4 ++-- .../core/src/datasource/listing/table.rs | 12 ++++++------ datafusion/core/src/datasource/memory.rs | 18 +++++++----------- datafusion/core/src/datasource/view.rs | 6 +++--- .../src/physical_optimizer/join_selection.rs | 10 +++++----- .../core/src/physical_plan/joins/hash_join.rs | 8 ++++---- datafusion/core/src/physical_plan/planner.rs | 2 +- datafusion/core/src/test_util.rs | 2 +- datafusion/core/tests/custom_sources.rs | 4 ++-- .../core/tests/provider_filter_pushdown.rs | 2 +- datafusion/core/tests/row.rs | 8 ++++---- .../core/tests/sql/information_schema.rs | 2 +- datafusion/core/tests/statistics.rs | 4 ++-- datafusion/expr/src/type_coercion/other.rs | 2 +- datafusion/optimizer/src/type_coercion.rs | 2 +- .../physical-expr/src/window/lead_lag.rs | 6 +++--- datafusion/proto/src/logical_plan.rs | 2 +- datafusion/sql/src/planner.rs | 4 ++-- 22 files changed, 62 insertions(+), 61 deletions(-) diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 7256c94ff0c..563f2cd3d32 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -118,7 +118,7 @@ impl Debug for CustomDataSource { impl CustomDataSource { pub(crate) async fn create_physical_plan( &self, - projections: &Option>, + projections: Option<&Vec>, schema: SchemaRef, ) -> Result> { Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) @@ -177,7 +177,7 @@ impl TableProvider for CustomDataSource { async fn scan( &self, _state: &SessionState, - projection: &Option>, + projection: Option<&Vec>, // filters and limit can be used here to inject some push-down operations if needed _filters: &[Expr], _limit: Option, @@ -194,11 +194,11 @@ struct CustomExec { impl CustomExec { fn new( - projections: &Option>, + projections: Option<&Vec>, schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections.as_ref()).unwrap(); + let projected_schema = project_schema(&schema, projections).unwrap(); Self { db, projected_schema, diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 79bfcad65c8..d7c5df0656e 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2160,7 +2160,7 @@ impl ScalarValue { fn eq_array_decimal( array: &ArrayRef, index: usize, - value: &Option, + value: Option<&i128>, precision: u8, scale: i8, ) -> Result { @@ -2196,8 +2196,14 @@ impl ScalarValue { pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool { match self { ScalarValue::Decimal128(v, precision, scale) => { - ScalarValue::eq_array_decimal(array, index, v, *precision, *scale) - .unwrap() + ScalarValue::eq_array_decimal( + array, + index, + v.as_ref(), + *precision, + *scale, + ) + .unwrap() } ScalarValue::Boolean(val) => { eq_array_primitive!(array, index, BooleanArray, val) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 4a35332274a..d7b7ccc942c 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -795,12 +795,11 @@ impl TableProvider for DataFrame { async fn scan( &self, _ctx: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result> { let mut expr = projection - .as_ref() // construct projections .map_or_else( || { diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 5a2ba4c2ead..95fdf05fa56 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -61,7 +61,7 @@ pub trait TableProvider: Sync + Send { async fn scan( &self, ctx: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], // limit can be used to reduce the amount scanned // from the datasource as a performance optimization. diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index df459955539..56b38c2944d 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -69,12 +69,12 @@ impl TableProvider for EmptyTable { async fn scan( &self, _ctx: &SessionState, - projection: &Option>, + projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> Result> { // even though there is no data, projections apply - let projected_schema = project_schema(&self.schema, projection.as_ref())?; + let projected_schema = project_schema(&self.schema, projection)?; Ok(Arc::new( EmptyExec::new(false, projected_schema).with_partitions(self.partitions), )) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 11b295a48fc..37469906563 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -523,7 +523,7 @@ impl TableProvider for ListingTable { async fn scan( &self, ctx: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result> { @@ -533,7 +533,7 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { let schema = self.schema(); - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection)?; return Ok(Arc::new(EmptyExec::new(false, projected_schema))); } @@ -562,7 +562,7 @@ impl TableProvider for ListingTable { file_schema: Arc::clone(&self.file_schema), file_groups: partitioned_file_lists, statistics, - projection: projection.clone(), + projection: projection.cloned(), limit, output_ordering: self.try_create_output_ordering()?, table_partition_cols, @@ -686,7 +686,7 @@ mod tests { let table = load_table(&ctx, "alltypes_plain.parquet").await?; let projection = None; let exec = table - .scan(&ctx.state(), &projection, &[], None) + .scan(&ctx.state(), projection, &[], None) .await .expect("Scan table"); @@ -716,7 +716,7 @@ mod tests { .with_schema(schema); let table = ListingTable::try_new(config)?; - let exec = table.scan(&state, &None, &[], None).await?; + let exec = table.scan(&state, None, &[], None).await?; assert_eq!(exec.statistics().num_rows, Some(8)); assert_eq!(exec.statistics().total_byte_size, Some(671)); @@ -855,7 +855,7 @@ mod tests { let filter = Expr::not_eq(col("p1"), lit("v1")); let scan = table - .scan(&ctx.state(), &None, &[filter], None) + .scan(&ctx.state(), None, &[filter], None) .await .expect("Empty execution plan"); diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 38e16fb03e8..632ef8d287d 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -69,7 +69,7 @@ impl MemTable { ctx: &SessionState, ) -> Result { let schema = t.schema(); - let exec = t.scan(ctx, &None, &[], None).await?; + let exec = t.scan(ctx, None, &[], None).await?; let partition_count = exec.output_partitioning().partition_count(); let tasks = (0..partition_count) @@ -136,14 +136,14 @@ impl TableProvider for MemTable { async fn scan( &self, _ctx: &SessionState, - projection: &Option>, + projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> Result> { Ok(Arc::new(MemoryExec::try_new( &self.batches.clone(), self.schema(), - projection.clone(), + projection.cloned(), )?)) } } @@ -184,7 +184,7 @@ mod tests { // scan with projection let exec = provider - .scan(&session_ctx.state(), &Some(vec![2, 1]), &[], None) + .scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None) .await?; let mut it = exec.execute(0, task_ctx)?; @@ -218,9 +218,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; - let exec = provider - .scan(&session_ctx.state(), &None, &[], None) - .await?; + let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); @@ -253,7 +251,7 @@ mod tests { let projection: Vec = vec![0, 4]; match provider - .scan(&session_ctx.state(), &Some(projection), &[], None) + .scan(&session_ctx.state(), Some(&projection), &[], None) .await { Err(DataFusionError::ArrowError(ArrowError::SchemaError(e))) => { @@ -381,9 +379,7 @@ mod tests { let provider = MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?; - let exec = provider - .scan(&session_ctx.state(), &None, &[], None) - .await?; + let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 34a427030ae..e5c27f8944c 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -61,8 +61,8 @@ impl ViewTable { } /// Get definition ref - pub fn definition(&self) -> &Option { - &self.definition + pub fn definition(&self) -> Option<&String> { + self.definition.as_ref() } /// Get logical_plan ref @@ -104,7 +104,7 @@ impl TableProvider for ViewTable { async fn scan( &self, state: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result> { diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 7428c5ed671..7b9873c65c6 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -181,8 +181,8 @@ fn swap_reverting_projection( } /// Swaps join sides for filter column indices and produces new JoinFilter -fn swap_join_filter(filter: &Option) -> Option { - filter.as_ref().map(|filter| { +fn swap_join_filter(filter: Option<&JoinFilter>) -> Option { + filter.map(|filter| { let column_indices = filter .column_indices() .iter() @@ -334,7 +334,7 @@ fn try_collect_left( Arc::clone(left), Arc::clone(right), hash_join.on().to_vec(), - hash_join.filter().clone(), + hash_join.filter().cloned(), hash_join.join_type(), PartitionMode::CollectLeft, hash_join.null_equals_null(), @@ -345,7 +345,7 @@ fn try_collect_left( Arc::clone(left), Arc::clone(right), hash_join.on().to_vec(), - hash_join.filter().clone(), + hash_join.filter().cloned(), hash_join.join_type(), PartitionMode::CollectLeft, hash_join.null_equals_null(), @@ -377,7 +377,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result &Option { - &self.filter + pub fn filter(&self) -> Option<&JoinFilter> { + self.filter.as_ref() } /// How the join is performed @@ -698,7 +698,7 @@ fn build_join_indices( left_data: &JoinLeftData, on_left: &[Column], on_right: &[Column], - filter: &Option, + filter: Option<&JoinFilter>, random_state: &RandomState, null_equals_null: &bool, ) -> Result<(UInt64Array, UInt32Array)> { @@ -1363,7 +1363,7 @@ impl HashJoinStream { left_data, &self.on_left, &self.on_right, - &self.filter, + self.filter.as_ref(), &self.random_state, &self.null_equals_null, ); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index e325b361b23..d11b27670bf 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -480,7 +480,7 @@ impl DefaultPhysicalPlanner { // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); let unaliased: Vec = filters.into_iter().map(unalias).collect(); - source.scan(session_state, projection, &unaliased, *fetch).await + source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await } LogicalPlan::Values(Values { values, diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 089cc5a2360..162e41a3403 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -321,7 +321,7 @@ impl TableProvider for TestTableProvider { async fn scan( &self, _ctx: &SessionState, - _projection: &Option>, + _projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> datafusion_common::Result> { diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index fde296d8542..260b991f530 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -193,12 +193,12 @@ impl TableProvider for CustomTableProvider { async fn scan( &self, _state: &SessionState, - projection: &Option>, + projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> Result> { Ok(Arc::new(CustomExecutionPlan { - projection: projection.clone(), + projection: projection.cloned(), })) } } diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs index 2cc57f8b056..7276820f6f5 100644 --- a/datafusion/core/tests/provider_filter_pushdown.rs +++ b/datafusion/core/tests/provider_filter_pushdown.rs @@ -143,7 +143,7 @@ impl TableProvider for CustomProvider { async fn scan( &self, _state: &SessionState, - _: &Option>, + _: Option<&Vec>, filters: &[Expr], _: Option, ) -> Result> { diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 6567c8e758a..1849fc9f8b8 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -34,7 +34,7 @@ async fn test_with_parquet() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -55,7 +55,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = get_exec("alltypes_plain.parquet", projection.as_ref(), None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -73,7 +73,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> { async fn get_exec( file_name: &str, - projection: &Option>, + projection: Option<&Vec>, limit: Option, ) -> Result> { let testdata = datafusion::test_util::parquet_test_data(); @@ -103,7 +103,7 @@ async fn get_exec( file_schema, file_groups, statistics, - projection: projection.clone(), + projection: projection.cloned(), limit, table_partition_cols: vec![], config_options: ConfigOptions::new().into_shareable(), diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index fbd740faeff..fa2e2056955 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -191,7 +191,7 @@ async fn information_schema_tables_table_types() { async fn scan( &self, _ctx: &SessionState, - _: &Option>, + _: Option<&Vec>, _: &[Expr], _: Option, ) -> Result> { diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs index 686d54f210d..bb43c5c4dbb 100644 --- a/datafusion/core/tests/statistics.rs +++ b/datafusion/core/tests/statistics.rs @@ -75,7 +75,7 @@ impl TableProvider for StatisticsValidation { async fn scan( &self, _ctx: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], // limit is ignored because it is not mandatory for a `TableProvider` to honor it _limit: Option, @@ -86,7 +86,7 @@ impl TableProvider for StatisticsValidation { filters.len(), "Unsupported expressions should not be pushed down" ); - let projection = match projection.clone() { + let projection = match projection.cloned() { Some(p) => p, None => (0..self.schema.fields().len()).collect(), }; diff --git a/datafusion/expr/src/type_coercion/other.rs b/datafusion/expr/src/type_coercion/other.rs index 2419f8d1bb3..6ff1300f64e 100644 --- a/datafusion/expr/src/type_coercion/other.rs +++ b/datafusion/expr/src/type_coercion/other.rs @@ -39,7 +39,7 @@ pub fn get_coerce_type_for_list( /// Returns the common data type for `then_types` and `else_type` pub fn get_coerce_type_for_case_when( then_types: &[DataType], - else_type: &Option, + else_type: Option<&DataType>, ) -> Option { let else_type = match else_type { None => then_types[0].clone(), diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index 266f557cb60..50f4e0838e9 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -319,7 +319,7 @@ impl ExprRewriter for TypeCoercionRewriter { Some(expr) => expr.get_type(&self.schema).map(Some), }?; let case_when_coerce_type = - get_coerce_type_for_case_when(&then_types, &else_type); + get_coerce_type_for_case_when(&then_types, else_type.as_ref()); match case_when_coerce_type { None => Err(DataFusionError::Internal(format!( "Failed to coerce then ({:?}) and else ({:?}) to common types in CASE WHEN expression", diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index c50df3c1c9b..860df716af9 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -120,7 +120,7 @@ pub(crate) struct WindowShiftEvaluator { } fn create_empty_array( - value: &Option, + value: Option<&ScalarValue>, data_type: &DataType, size: usize, ) -> Result { @@ -140,7 +140,7 @@ fn create_empty_array( fn shift_with_default_value( array: &ArrayRef, offset: i64, - value: &Option, + value: Option<&ScalarValue>, ) -> Result { use arrow::compute::concat; @@ -172,7 +172,7 @@ impl PartitionEvaluator for WindowShiftEvaluator { fn evaluate_partition(&self, partition: Range) -> Result { let value = &self.values[0]; let value = value.slice(partition.start, partition.end - partition.start); - shift_with_default_value(&value, self.shift_offset, &self.default_value) + shift_with_default_value(&value, self.shift_offset, self.default_value.as_ref()) } } diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 4c2827ed7f9..346143348fb 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -929,7 +929,7 @@ impl AsLogicalPlan for LogicalPlanNode { projection, definition: view_table .definition() - .clone() + .map(|s| s.to_string()) .unwrap_or_default(), }, ))), diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index f2f5acd71a7..154e36f48fa 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -1129,7 +1129,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.aggregate( plan, &select_exprs, - &having_expr_opt, + having_expr_opt.as_ref(), group_by_exprs, aggr_exprs, )? @@ -1267,7 +1267,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { &self, input: LogicalPlan, select_exprs: &[Expr], - having_expr_opt: &Option, + having_expr_opt: Option<&Expr>, group_by_exprs: Vec, aggr_exprs: Vec, ) -> Result<(LogicalPlan, Vec, Option)> {