Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,18 +716,21 @@ impl protobuf::PhysicalPlanNode {
})?;

let filter_selectivity = filter.default_filter_selectivity.try_into();
let projection = if !filter.projection.is_empty() {
Some(
filter
.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>(),
)
} else {
// Determine if the projection is full to optimize used memory,
// storing `None` in this case.
Comment on lines +719 to +720
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"optimize used memory" seems the wrong intention here? Isn't it more to try to accurately recreate a None state which the proto definition can't encode?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems valid!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can store Some(vec![0,1,2...n-1]) to represent a full projection, so I thought to highlight why we focus on None usage.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should focus more on the fact this logic is trying to preserve a None across proto boundaries because of the limitations we face (with how its encoded the same as Some(vec![])) instead of explaining it only as a memory efficiency gain

let num_fields = input.schema().fields().len();
Comment thread
Adez017 marked this conversation as resolved.
let mut is_full_projection = filter.projection.len() == num_fields;
let mut projection_vec: Vec<usize> = Vec::with_capacity(filter.projection.len());
for (i, idx) in filter.projection.iter().enumerate() {
let idx = *idx as usize;
is_full_projection &= idx == i;
projection_vec.push(idx);
}
let projection = if is_full_projection {
None
} else {
Some(projection_vec)
};

let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(projection)?
.with_batch_size(filter.batch_size as usize)
Expand Down Expand Up @@ -2339,9 +2342,12 @@ impl protobuf::PhysicalPlanNode {
.physical_expr_to_proto(exec.predicate(), codec)?,
),
default_filter_selectivity: exec.default_selectivity() as u32,
projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
projection: match exec.projection() {
None => (0..exec.input().schema().fields().len())
.map(|i| i as u32)
.collect(),
Some(v) => v.iter().map(|x| *x as u32).collect(),
},
batch_size: exec.batch_size() as u32,
fetch: exec.fetch().map(|f| f as u32),
},
Expand Down
60 changes: 60 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3681,3 +3681,63 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> {

roundtrip_test_sql_with_context(sql, &ctx).await
}

#[tokio::test]
async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> {
let ctx = SessionContext::new();
let codec = DefaultPhysicalExtensionCodec {};

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));

let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::clone(&schema)));

let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
));

// Case 1: None -> should round-trip as None (return all columns)
let filter =
FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)).build()?;
let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?;
let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?;
let rt = roundtripped.as_ref().downcast_ref::<FilterExec>().unwrap();
assert_eq!(
rt.projection().as_deref(),
None,
"None projection must stay None after roundtrip"
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we can use roundtrip_test here as others do above, e.g.

roundtrip_test(Arc::new(FilterExec::try_new(
expr,
Arc::new(EmptyExec::new(schema)),
)?))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely , i just added a different test to use it in a better way .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

Copy link
Copy Markdown
Contributor Author

@Adez017 Adez017 Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue occurs because when a FilterExec has an empty projection, the previous proto serialization didn't explicitly encode the 'empty' state. This caused the physical plan to default back to a full projection or an invalid state upon deserialization. By explicitly handling the projection field even when empty, we ensure that the execution plan remains consistent across the network/serde boundary, which is critical for count-only queries.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment is suggesting we use roundtrip_test() function to streamline these tests. See the code snippet I linked

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment is suggesting we use roundtrip_test() function to streamline these tests. See the code snippet I linked

should i remove the additional test i had added ? @Jefffrey

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I am saying we should refactor these tests to use roundtrip_test() instead of manually asserting a specific property of the roundtripped struct

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I am saying we should refactor these tests to use roundtrip_test() instead of manually asserting a specific property of the roundtripped struct

sounds good . i'll try producing it

// Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None
let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
.apply_projection(Some(vec![]))?
.build()?;
let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?;
let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?;
let rt = roundtripped.as_ref().downcast_ref::<FilterExec>().unwrap();
assert_eq!(
rt.projection().as_deref(),
Some(&[][..]),
"Empty projection Some([]) must survive roundtrip, not become None"
);

// Case 3: Some(vec![2, 0]) -> partial projection must survive
let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
.apply_projection(Some(vec![2, 0]))?
.build()?;
let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?;
let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?;
let rt = roundtripped.as_ref().downcast_ref::<FilterExec>().unwrap();
assert_eq!(
rt.projection().as_deref(),
Some(&[2_usize, 0_usize][..]),
"Partial projection must survive roundtrip"
);

Ok(())
}
Loading