From 8562f7296315a471d6f36f6845381f681d21e6ba Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 10:55:57 +0530 Subject: [PATCH 1/8] Refactor projection handling in FilterExecBuilder --- datafusion/proto/src/physical_plan/mod.rs | 27 ++++++++++------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6145df65032e..73a7ad3dda1a 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -716,18 +716,14 @@ impl protobuf::PhysicalPlanNode { })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); - let projection = if !filter.projection.is_empty() { - Some( - filter - .projection - .iter() - .map(|i| *i as usize) - .collect::>(), - ) - } else { - None - }; - + let num_fields = input.schema().fields().len(); + let mut is_full_projection = filter.projection.len() == num_fields; + let mut projection_vec: Vec = Vec::with_capacity(filter.projection.len()); + for (i, idx) in filter.projection.iter().enumerate() { + let idx = *idx as usize; + is_full_projection &= idx == i; + projection_vec.push(idx); + } let filter = FilterExecBuilder::new(predicate, input) .apply_projection(projection)? .with_batch_size(filter.batch_size as usize) @@ -2339,9 +2335,10 @@ impl protobuf::PhysicalPlanNode { .physical_expr_to_proto(exec.predicate(), codec)?, ), default_filter_selectivity: exec.default_selectivity() as u32, - projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() - }), + projection: match exec.projection() { + None => (0..exec.input().schema().fields().len()).map(|i| i as u32).collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), + }, batch_size: exec.batch_size() as u32, fetch: exec.fetch().map(|f| f as u32), }, From e890283bdf93dc68425c2edbe8cdfa971274158d Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 10:56:58 +0530 Subject: [PATCH 2/8] Update roundtrip_physical_plan.rs --- .../tests/cases/roundtrip_physical_plan.rs | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index e7d38b57a152..ef02f835efbc 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3681,3 +3681,63 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> { roundtrip_test_sql_with_context(sql, &ctx).await } + +#[tokio::test] +async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { + use datafusion::physical_plan::memory::MemoryExec; + + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let input: Arc = Arc::new( + MemoryExec::try_new(&[], Arc::clone(&schema), None)? + ); + + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), + )); + + // Case 1: None -> should round-trip as None (return all columns) + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!(rt.projection(), None, "None projection must stay None after roundtrip"); + + // Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!( + rt.projection(), + Some(&vec![]), + "Empty projection Some([]) must survive roundtrip, not become None" + ); + + // Case 3: Some(vec![2, 0]) -> partial projection must survive + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![2, 0]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!( + rt.projection(), + Some(&vec![2_usize, 0_usize]), + "Partial projection must survive roundtrip" + ); + + Ok(()) +} From 89c4328e070f1c454abbfd0627d67a2f61cab84c Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 06:12:28 +0000 Subject: [PATCH 3/8] added deserialization --- datafusion/proto/src/physical_plan/mod.rs | 7 ++++-- .../tests/cases/roundtrip_physical_plan.rs | 24 +++++-------------- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 73a7ad3dda1a..dbc52b65e8db 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -724,6 +724,7 @@ impl protobuf::PhysicalPlanNode { is_full_projection &= idx == i; projection_vec.push(idx); } + let projection = if is_full_projection { None } else { Some(projection_vec) }; let filter = FilterExecBuilder::new(predicate, input) .apply_projection(projection)? .with_batch_size(filter.batch_size as usize) @@ -2336,8 +2337,10 @@ impl protobuf::PhysicalPlanNode { ), default_filter_selectivity: exec.default_selectivity() as u32, projection: match exec.projection() { - None => (0..exec.input().schema().fields().len()).map(|i| i as u32).collect(), - Some(v) => v.iter().map(|x| *x as u32).collect(), + None => (0..exec.input().schema().fields().len()) + .map(|i| i as u32) + .collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), }, batch_size: exec.batch_size() as u32, fetch: exec.fetch().map(|f| f as u32), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index ef02f835efbc..09f25e885ebb 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3684,8 +3684,6 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> { #[tokio::test] async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { - use datafusion::physical_plan::memory::MemoryExec; - let ctx = SessionContext::new(); let codec = DefaultPhysicalExtensionCodec {}; @@ -3695,9 +3693,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { Field::new("c", DataType::Int32, false), ])); - let input: Arc = Arc::new( - MemoryExec::try_new(&[], Arc::clone(&schema), None)? - ); + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); let predicate: Arc = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), @@ -3710,7 +3706,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { .build()?; let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; - let rt = roundtripped.as_any().downcast_ref::().unwrap(); + let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!(rt.projection(), None, "None projection must stay None after roundtrip"); // Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None @@ -3719,12 +3715,8 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { .build()?; let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; - let rt = roundtripped.as_any().downcast_ref::().unwrap(); - assert_eq!( - rt.projection(), - Some(&vec![]), - "Empty projection Some([]) must survive roundtrip, not become None" - ); + let rt = roundtripped.as_ref().downcast_ref::().unwrap(); + assert_eq!(rt.projection(), Some(&vec![]), "Empty projection Some([]) must survive roundtrip, not become None"); // Case 3: Some(vec![2, 0]) -> partial projection must survive let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) @@ -3732,12 +3724,8 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { .build()?; let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; - let rt = roundtripped.as_any().downcast_ref::().unwrap(); - assert_eq!( - rt.projection(), - Some(&vec![2_usize, 0_usize]), - "Partial projection must survive roundtrip" - ); + let rt = roundtripped.as_ref().downcast_ref::().unwrap(); + assert_eq!(rt.projection(), Some(&vec![2_usize, 0_usize]), "Partial projection must survive roundtrip"); Ok(()) } From 9951c1678c3036797fc13140af4a775a070db03e Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 06:19:13 +0000 Subject: [PATCH 4/8] fmt --- datafusion/proto/src/physical_plan/mod.rs | 6 ++++- .../tests/cases/roundtrip_physical_plan.rs | 22 ++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index dbc52b65e8db..a9f4ea73a066 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -724,7 +724,11 @@ impl protobuf::PhysicalPlanNode { is_full_projection &= idx == i; projection_vec.push(idx); } - let projection = if is_full_projection { None } else { Some(projection_vec) }; + let projection = if is_full_projection { + None + } else { + Some(projection_vec) + }; let filter = FilterExecBuilder::new(predicate, input) .apply_projection(projection)? .with_batch_size(filter.batch_size as usize) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 09f25e885ebb..6033e2fbfa64 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3702,12 +3702,16 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { )); // Case 1: None -> should round-trip as None (return all columns) - let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) - .build()?; + let filter = + FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)).build()?; let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); - assert_eq!(rt.projection(), None, "None projection must stay None after roundtrip"); + assert_eq!( + rt.projection(), + None, + "None projection must stay None after roundtrip" + ); // Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) @@ -3716,7 +3720,11 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); - assert_eq!(rt.projection(), Some(&vec![]), "Empty projection Some([]) must survive roundtrip, not become None"); + assert_eq!( + rt.projection(), + Some(&vec![]), + "Empty projection Some([]) must survive roundtrip, not become None" + ); // Case 3: Some(vec![2, 0]) -> partial projection must survive let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) @@ -3725,7 +3733,11 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); - assert_eq!(rt.projection(), Some(&vec![2_usize, 0_usize]), "Partial projection must survive roundtrip"); + assert_eq!( + rt.projection(), + Some(&vec![2_usize, 0_usize]), + "Partial projection must survive roundtrip" + ); Ok(()) } From 896120b30e3946ca90cf193bbe4a6b467031a1d9 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 12:10:04 +0530 Subject: [PATCH 5/8] fix test --- .../proto/tests/cases/roundtrip_physical_plan.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 6033e2fbfa64..a414b08882c5 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3708,7 +3708,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( - rt.projection(), + rt.projection().as_ref(), None, "None projection must stay None after roundtrip" ); @@ -3721,8 +3721,8 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( - rt.projection(), - Some(&vec![]), + rt.projection().as_ref(), + Some(&[]), "Empty projection Some([]) must survive roundtrip, not become None" ); @@ -3734,8 +3734,8 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( - rt.projection(), - Some(&vec![2_usize, 0_usize]), + rt.projection().as_ref(), + Some(&[2_usize, 0_usize]), "Partial projection must survive roundtrip" ); From 2ba3a9762781b58220bbc4dbf09e85f1e9df8e10 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 12:14:39 +0530 Subject: [PATCH 6/8] Fix projection assertions in roundtrip tests --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a414b08882c5..e1bdabaab329 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3708,7 +3708,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( - rt.projection().as_ref(), + rt.projection().as_deref(), None, "None projection must stay None after roundtrip" ); @@ -3721,7 +3721,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( - rt.projection().as_ref(), + rt.projection().as_deref(), Some(&[]), "Empty projection Some([]) must survive roundtrip, not become None" ); @@ -3734,7 +3734,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( - rt.projection().as_ref(), + rt.projection().as_deref(), Some(&[2_usize, 0_usize]), "Partial projection must survive roundtrip" ); From 6847a167ed34bb7f14edcc362439e5ccc7e24192 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 12:29:49 +0530 Subject: [PATCH 7/8] Update roundtrip_physical_plan.rs --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index e1bdabaab329..65b16079c9e6 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3722,7 +3722,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( rt.projection().as_deref(), - Some(&[]), + Some(&[][..]), "Empty projection Some([]) must survive roundtrip, not become None" ); @@ -3735,7 +3735,7 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { let rt = roundtripped.as_ref().downcast_ref::().unwrap(); assert_eq!( rt.projection().as_deref(), - Some(&[2_usize, 0_usize]), + Some(&[2_usize, 0_usize][..]), "Partial projection must survive roundtrip" ); From f702ce8fed263171bf8a8d30a9b9f384fd1a6557 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 13:33:35 +0530 Subject: [PATCH 8/8] added comments --- datafusion/proto/src/physical_plan/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index a9f4ea73a066..d1a5235c7523 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -716,6 +716,8 @@ impl protobuf::PhysicalPlanNode { })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); + // Determine if the projection is full to optimize used memory, + // storing `None` in this case. let num_fields = input.schema().fields().len(); let mut is_full_projection = filter.projection.len() == num_fields; let mut projection_vec: Vec = Vec::with_capacity(filter.projection.len());