Skip to content

Conversation

@timsaucer
Copy link
Member

This is in draft form as a proof of concept

Which issue does this PR close?

Rationale for this change

This trait adds finer grained control over how processing occurs for protobuf serialization and deserialization. See the issue linked above for more discussion on why it is useful.

What changes are included in this PR?

  • Create traits for serialization and deserialization.

  • Add implementations for TaskContext and TaskContext with a PhysicalExtensionCodec

  • Give the same treatment to the logical side if we are in agreement

Are these changes tested?

  • Review unit tests
  • Add additional tests if coverage is not yet sufficient (draft form - not yet evaluated)

Are there any user-facing changes?

Users will need to make some slight changes to their calls to serialization and deserialization.

  • Update migration guide

@github-actions github-actions bot added proto Related to proto crate ffi Changes to the ffi crate labels Nov 19, 2025
Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proposal does make sense to me

Arc::new(ChildPhysicalExtensionCodec {}),
]);
]));
let mut parser = TaskContextWithPhysicalCodec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find parser as a name may be a bit confusing

pub fn serialize_physical_aggr_expr<S: PhysicalSerializer>(
aggr_expr: Arc<AggregateFunctionExpr>,
codec: &dyn PhysicalExtensionCodec,
parser: &mut S,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find name parser a bit confusing, but i have no better suggestion at the moment, maybe serializer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easily done.

pub fn parse_physical_sort_expr<D: PhysicalDeserializer>(
proto: &protobuf::PhysicalSortExprNode,
ctx: &TaskContext,
parser: &mut D,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deserializer rather than parser ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed I think deserializer is a better parameter name

let schema = WrappedSchema(FFI_ArrowSchema::try_from(args.schema)?);

let codec = DefaultPhysicalExtensionCodec {};
let mut codec = DefaultPhysicalExtensionCodec {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a plan @timsaucer to make this configurable so we can use our own codec? It’s likely covered in other PRs, is it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is my ultimate goal. I don't want to merge any PR until I've got at least a hacked together demo of it with FFI and used in df-python.

Comment on lines 101 to 110
pub fn parse_physical_sort_exprs<D: PhysicalDeserializer>(
proto: &[protobuf::PhysicalSortExprNode],
ctx: &TaskContext,
parser: &mut D,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Vec<PhysicalSortExpr>> {
proto
.iter()
.map(|sort_expr| parse_physical_sort_expr(sort_expr, ctx, input_schema, codec))
.map(|sort_expr| parse_physical_sort_expr(sort_expr, parser, input_schema))
.collect()
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need to change the signature from

pub fn parse_physical_sort_exprs<D: PhysicalDeserializer>(
    proto: &[protobuf::PhysicalSortExprNode],
    parser: &mut D,
    input_schema: &Schema,
) -> Result<Vec<PhysicalSortExpr>> 

into

pub fn parse_physical_sort_exprs(
    proto: &[protobuf::PhysicalSortExprNode],
    parser: &mut dyn PhysicalDeserializer,
    input_schema: &Schema,
) -> Result<Vec<PhysicalSortExpr>> 

My reasoning is that I do want to use this in the FFI crate. In that case I will have libraries where I do not know the underlying implementation. Instead I will probably have something like a Box<dyn PhysicalDeserializer>. So I can't use the generic on the function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

Comment on lines +3070 to +3075
pub trait PhysicalDeserializer {
fn try_decode_execution_plan(
&mut self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
) -> Result<Arc<dyn ExecutionPlan>>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there is a good use case for having two separate traits - PhysicalDeserializer and PhysicalSerializer. Almost all of the methods so closely mirror the PhysicalExtensionCodec. Maybe we make one trait with a better name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to combine the two traits I could imagine calling it PhysicalCodec or PhysicalSerde. The problem with PhysicalCodec is the name is so close to PhysicalExtensionCodec and the methods are so similar I suspect it will cause more confusion than good.

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for taking a stab at this @timsaucer ! I think this achieves one of the goals of #18477 ("Optionality of features. Some use cases demand more information / capabilities for serialization / deserialization ...") but I don't think it enables the flexibility requested there.

Comment on lines 381 to +387
ExprType::Extension(extension) => {
let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
.inputs
.iter()
.map(|e| parse_physical_expr(e, ctx, input_schema, codec))
.map(|e| parse_physical_expr(e, parser, input_schema))
.collect::<Result<_>>()?;
(codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
parser.try_decode_expr(extension.expr.as_slice(), &inputs)? as _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this is missing some key points of #18477:

  • It only operates on Extension variants
  • It doesn't allow pre/post/fork control of serde

From #18477:

design goals were:

  • More flexible pre/post processing. The current Codec system only operates as a "fallback" and doesn't support hooking into the moment before trying to deserialize or the moment after a successful deserialization. I have a lot of example use cases for this that I can share.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand what additional hook you need. Here is an example using the latest push. Of course my example doesn't really have an encoder so it doesn't parse through to give a count on each one of the physical expressions, so the output is :

Pre-encoding: 0 expressions.
        post: 1 expressions

Minimal example:

#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use datafusion::logical_expr::Operator;
    use datafusion_common::{DataFusionError, ScalarValue};
    use datafusion_expr::{lit, AggregateUDF, ScalarUDF, WindowUDF};
    use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
    use datafusion_physical_plan::ExecutionPlan;
    use crate::physical_plan::{PhysicalSerializer};

    struct PrintOnSerialize {
        num_exprs: u8,
    }

    impl PhysicalSerializer for PrintOnSerialize {
        fn try_encode_execution_plan(&mut self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> datafusion_common::Result<()> {
            todo!()
        }

        fn try_encode_udf(&mut self, node: &ScalarUDF, buf: &mut Vec<u8>) -> datafusion_common::Result<()> {
            todo!()
        }

        fn try_encode_expr(&mut self, node: &Arc<dyn PhysicalExpr>, buf: &mut Vec<u8>) -> datafusion_common::Result<()> {
            println!("Pre-encoding: {} expressions.", self.num_exprs);
            self.num_exprs += 1;
            // Call to inner actual encoder happens here
            buf.push(self.num_exprs);
            println!("        post: {} expressions", self.num_exprs);
            Ok(())
        }

        fn try_encode_udaf(&mut self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> datafusion_common::Result<()> {
            todo!()
        }

        fn try_encode_udwf(&mut self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> datafusion_common::Result<()> {
            todo!()
        }
    }

    #[test]
    fn demonstrate_serialization_hooks() -> Result<(), DataFusionError> {
        let literal_expr = Literal::new(ScalarValue::Boolean(Some(true)));
        let column_expr = Column::new("a", 0);
        let binary_expr = BinaryExpr::new(Arc::new(literal_expr), Operator::And, Arc::new(column_expr));
        let binary_expr = Arc::new(binary_expr) as Arc<dyn PhysicalExpr>;

        let mut custom_serialize = PrintOnSerialize {
            num_exprs: 0,
        };

        let mut bytes = Vec::new();
        custom_serialize.try_encode_expr(&binary_expr, &mut bytes)?;

        Ok(())
    }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The particular use case I have in mind is re-attaching a SchemaAdapterFactory to a ParquetSource after de-serializing it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ffi Changes to the ffi crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Redesign protobuf encode/decode hooks & state

3 participants