diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6145df65032e..d1a5235c7523 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -716,18 +716,21 @@ impl protobuf::PhysicalPlanNode { })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); - let projection = if !filter.projection.is_empty() { - Some( - filter - .projection - .iter() - .map(|i| *i as usize) - .collect::>(), - ) - } else { + // Determine if the projection is full to optimize used memory, + // storing `None` in this case. + let num_fields = input.schema().fields().len(); + let mut is_full_projection = filter.projection.len() == num_fields; + let mut projection_vec: Vec = Vec::with_capacity(filter.projection.len()); + for (i, idx) in filter.projection.iter().enumerate() { + let idx = *idx as usize; + is_full_projection &= idx == i; + projection_vec.push(idx); + } + let projection = if is_full_projection { None + } else { + Some(projection_vec) }; - let filter = FilterExecBuilder::new(predicate, input) .apply_projection(projection)? .with_batch_size(filter.batch_size as usize) @@ -2339,9 +2342,12 @@ impl protobuf::PhysicalPlanNode { .physical_expr_to_proto(exec.predicate(), codec)?, ), default_filter_selectivity: exec.default_selectivity() as u32, - projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() - }), + projection: match exec.projection() { + None => (0..exec.input().schema().fields().len()) + .map(|i| i as u32) + .collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), + }, batch_size: exec.batch_size() as u32, fetch: exec.fetch().map(|f| f as u32), }, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index e7d38b57a152..65b16079c9e6 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3681,3 +3681,63 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> { roundtrip_test_sql_with_context(sql, &ctx).await } + +#[tokio::test] +async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), + )); + + // Case 1: None -> should round-trip as None (return all columns) + let filter = + FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)).build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_ref().downcast_ref::().unwrap(); + assert_eq!( + rt.projection().as_deref(), + None, + "None projection must stay None after roundtrip" + ); + + // Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_ref().downcast_ref::().unwrap(); + assert_eq!( + rt.projection().as_deref(), + Some(&[][..]), + "Empty projection Some([]) must survive roundtrip, not become None" + ); + + // Case 3: Some(vec![2, 0]) -> partial projection must survive + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![2, 0]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_ref().downcast_ref::().unwrap(); + assert_eq!( + rt.projection().as_deref(), + Some(&[2_usize, 0_usize][..]), + "Partial projection must survive roundtrip" + ); + + Ok(()) +}