diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index bc019725f36c..67977b1795a6 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -29,7 +29,7 @@ use crate::{ }, }; -use crate::protobuf::{proto_error, FromProtoError, ToProtoError}; +use crate::protobuf::{proto_error, ToProtoError}; use arrow::datatypes::{DataType, Schema, SchemaRef}; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -66,11 +66,10 @@ use datafusion_expr::{ }; use datafusion_expr::{AggregateUDF, Unnest}; +use self::to_proto::{serialize_expr, serialize_exprs}; use prost::bytes::BufMut; use prost::Message; -use self::to_proto::serialize_expr; - pub mod file_formats; pub mod from_proto; pub mod to_proto; @@ -273,13 +272,7 @@ impl AsLogicalPlan for LogicalPlanNode { values .values_list .chunks_exact(n_cols) - .map(|r| { - r.iter() - .map(|expr| { - from_proto::parse_expr(expr, ctx, extension_codec) - }) - .collect::, FromProtoError>>() - }) + .map(|r| from_proto::parse_exprs(r, ctx, extension_codec)) .collect::, _>>() .map_err(|e| e.into()) }?; @@ -288,11 +281,8 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::Projection(projection) => { let input: LogicalPlan = into_logical_plan!(projection.input, ctx, extension_codec)?; - let expr: Vec = projection - .expr - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let expr: Vec = + from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?; let new_proj = project(input, expr)?; match projection.optional_alias.as_ref() { @@ -324,26 +314,17 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::Window(window) => { let input: LogicalPlan = into_logical_plan!(window.input, ctx, extension_codec)?; - let window_expr = window - .window_expr - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let window_expr = + from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?; LogicalPlanBuilder::from(input).window(window_expr)?.build() } LogicalPlanType::Aggregate(aggregate) => { let input: LogicalPlan = into_logical_plan!(aggregate.input, ctx, extension_codec)?; - let group_expr = aggregate - .group_expr - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; - let aggr_expr = aggregate - .aggr_expr - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let group_expr = + from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?; + let aggr_expr = + from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?; LogicalPlanBuilder::from(input) .aggregate(group_expr, aggr_expr)? .build() @@ -361,20 +342,16 @@ impl AsLogicalPlan for LogicalPlanNode { projection = Some(column_indices); } - let filters = scan - .filters - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let filters = + from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?; let mut all_sort_orders = vec![]; for order in &scan.file_sort_order { - let file_sort_order = order - .logical_expr_nodes - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; - all_sort_orders.push(file_sort_order) + all_sort_orders.push(from_proto::parse_exprs( + &order.logical_expr_nodes, + ctx, + extension_codec, + )?) } let file_format: Arc = @@ -475,11 +452,8 @@ impl AsLogicalPlan for LogicalPlanNode { projection = Some(column_indices); } - let filters = scan - .filters - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let filters = + from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?; let table_name = from_table_reference(scan.table_name.as_ref(), "CustomScan")?; @@ -502,11 +476,8 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::Sort(sort) => { let input: LogicalPlan = into_logical_plan!(sort.input, ctx, extension_codec)?; - let sort_expr: Vec = sort - .expr - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let sort_expr: Vec = + from_proto::parse_exprs(&sort.expr, ctx, extension_codec)?; LogicalPlanBuilder::from(input).sort(sort_expr)?.build() } LogicalPlanType::Repartition(repartition) => { @@ -525,12 +496,7 @@ impl AsLogicalPlan for LogicalPlanNode { hash_expr: pb_hash_expr, partition_count, }) => Partitioning::Hash( - pb_hash_expr - .iter() - .map(|expr| { - from_proto::parse_expr(expr, ctx, extension_codec) - }) - .collect::, _>>()?, + from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?, *partition_count as usize, ), PartitionMethod::RoundRobin(partition_count) => { @@ -570,12 +536,11 @@ impl AsLogicalPlan for LogicalPlanNode { let mut order_exprs = vec![]; for expr in &create_extern_table.order_exprs { - let order_expr = expr - .logical_expr_nodes - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; - order_exprs.push(order_expr) + order_exprs.push(from_proto::parse_exprs( + &expr.logical_expr_nodes, + ctx, + extension_codec, + )?); } let mut column_defaults = @@ -693,16 +658,10 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanBuilder::from(input).limit(skip, fetch)?.build() } LogicalPlanType::Join(join) => { - let left_keys: Vec = join - .left_join_key - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; - let right_keys: Vec = join - .right_join_key - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let left_keys: Vec = + from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?; + let right_keys: Vec = + from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?; let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| { proto_error(format!( @@ -804,27 +763,20 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::DistinctOn(distinct_on) => { let input: LogicalPlan = into_logical_plan!(distinct_on.input, ctx, extension_codec)?; - let on_expr = distinct_on - .on_expr - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; - let select_expr = distinct_on - .select_expr - .iter() - .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec)) - .collect::, _>>()?; + let on_expr = + from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?; + let select_expr = from_proto::parse_exprs( + &distinct_on.select_expr, + ctx, + extension_codec, + )?; let sort_expr = match distinct_on.sort_expr.len() { 0 => None, - _ => Some( - distinct_on - .sort_expr - .iter() - .map(|expr| { - from_proto::parse_expr(expr, ctx, extension_codec) - }) - .collect::, _>>()?, - ), + _ => Some(from_proto::parse_exprs( + &distinct_on.sort_expr, + ctx, + extension_codec, + )?), }; LogicalPlanBuilder::from(input) .distinct_on(on_expr, select_expr, sort_expr)? @@ -943,11 +895,8 @@ impl AsLogicalPlan for LogicalPlanNode { } else { values[0].len() } as u64; - let values_list = values - .iter() - .flatten() - .map(|v| serialize_expr(v, extension_codec)) - .collect::, _>>()?; + let values_list = + serialize_exprs(values.iter().flatten(), extension_codec)?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Values( protobuf::ValuesNode { @@ -982,10 +931,8 @@ impl AsLogicalPlan for LogicalPlanNode { }; let schema: protobuf::Schema = schema.as_ref().try_into()?; - let filters: Vec = filters - .iter() - .map(|filter| serialize_expr(filter, extension_codec)) - .collect::, _>>()?; + let filters: Vec = + serialize_exprs(filters, extension_codec)?; if let Some(listing_table) = source.downcast_ref::() { let any = listing_table.options().format.as_any(); @@ -1037,10 +984,7 @@ impl AsLogicalPlan for LogicalPlanNode { let mut exprs_vec: Vec = vec![]; for order in &options.file_sort_order { let expr_vec = LogicalExprNodeCollection { - logical_expr_nodes: order - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, ToProtoError>>()?, + logical_expr_nodes: serialize_exprs(order, extension_codec)?, }; exprs_vec.push(expr_vec); } @@ -1118,10 +1062,7 @@ impl AsLogicalPlan for LogicalPlanNode { extension_codec, )?, )), - expr: expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, ToProtoError>>()?, + expr: serialize_exprs(expr, extension_codec)?, optional_alias: None, }, ))), @@ -1173,22 +1114,13 @@ impl AsLogicalPlan for LogicalPlanNode { )?; let sort_expr = match sort_expr { None => vec![], - Some(sort_expr) => sort_expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, _>>()?, + Some(sort_expr) => serialize_exprs(sort_expr, extension_codec)?, }; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new( protobuf::DistinctOnNode { - on_expr: on_expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, _>>()?, - select_expr: select_expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, _>>()?, + on_expr: serialize_exprs(on_expr, extension_codec)?, + select_expr: serialize_exprs(select_expr, extension_codec)?, sort_expr, input: Some(Box::new(input)), }, @@ -1207,10 +1139,7 @@ impl AsLogicalPlan for LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Window(Box::new( protobuf::WindowNode { input: Some(Box::new(input)), - window_expr: window_expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, _>>()?, + window_expr: serialize_exprs(window_expr, extension_codec)?, }, ))), }) @@ -1230,14 +1159,8 @@ impl AsLogicalPlan for LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new( protobuf::AggregateNode { input: Some(Box::new(input)), - group_expr: group_expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, _>>()?, - aggr_expr: aggr_expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, _>>()?, + group_expr: serialize_exprs(group_expr, extension_codec)?, + aggr_expr: serialize_exprs(aggr_expr, extension_codec)?, }, ))), }) @@ -1335,10 +1258,8 @@ impl AsLogicalPlan for LogicalPlanNode { input.as_ref(), extension_codec, )?; - let selection_expr: Vec = expr - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, ToProtoError>>()?; + let selection_expr: Vec = + serialize_exprs(expr, extension_codec)?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Sort(Box::new( protobuf::SortNode { @@ -1367,10 +1288,7 @@ impl AsLogicalPlan for LogicalPlanNode { let pb_partition_method = match partitioning_scheme { Partitioning::Hash(exprs, partition_count) => { PartitionMethod::Hash(protobuf::HashRepartition { - hash_expr: exprs - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, ToProtoError>>()?, + hash_expr: serialize_exprs(exprs, extension_codec)?, partition_count: *partition_count as u64, }) } @@ -1419,10 +1337,7 @@ impl AsLogicalPlan for LogicalPlanNode { let mut converted_order_exprs: Vec = vec![]; for order in order_exprs { let temp = LogicalExprNodeCollection { - logical_expr_nodes: order - .iter() - .map(|expr| serialize_expr(expr, extension_codec)) - .collect::, ToProtoError>>()?, + logical_expr_nodes: serialize_exprs(order, extension_codec)?, }; converted_order_exprs.push(temp); }