-
Couldn't load subscription status.
- Fork 1.7k
Description
Is your feature request related to a problem or challenge?
In distributed contexts, the part of the plan that acts as a producer of a DynamicFilterPhysicalExpr might be executing a different host than the part that consumes that same expression.
Distributed systems built on top of DataFusion need a way of broadcasting updates from DynamicFilterPhysicalExpr producers over the network, so that consumer counter parts can subscribe and receive updates across network boundaries.
A challenge that's currently in the way is: given a specific ExecutionPlan, how can these systems discover all the DynamicFilterPhysicalExpr present in it so that they can subscribe/produce updates on them?
Describe the solution you'd like
Probably one of the following:
- Adding an
fn expressions(&self) -> Vec<&Arc<dyn PhysicalExpr>>method to theExecutionPlantrait
ExecutionPlan implementations can return all the PhysicalExpr to which they are pointing to, and users can recursively walk this expressions looking for references to Arc<DynamicFilterPhysicalExpr>s that they can then manipulate with whatever DynamicFilterPhysicalExpr decides to expose.
- Storing
DynamicFilterPhysicalExpr's state in a "global arena" accessible from theArc<TaskContext>
Rather than storing the state of DynamicFilterPhysicalExpr's struct fields, make it a stateless struct that just holds a unique identifier to the "global arena" of dynamic filter states, and broadcast/subscribe to updates there. Users are free to access this "global arena" from the Arc<TaskContext> and manipulate the dynamic filter values there, rather than accessing directly to the DynamicFilterPhysicalExpr present in the plan.
Describe alternatives you've considered
- Provide callbacks during protobuf serialization/deserialization of
DynamicFilterPhysicalExpr
The user can provide callbacks to the protobuf serde machinery in datafusion-proto for when a DynamicFilterPhysicalExpr gets serialized/deserialized, and when those callbacks are fired, the user can then choose to store references to this expressions wherever they want, and manipulate them with whatever DynamicFilterPhysicalExpr decides to expose.
The issue with this approach is that there's no guarantees that all DynamicFilterPhysicalExprs will get serialized/deserialized, some of them might just not, and then the user would be left with an incomplete registry of DynamicFilterPhysicalExpr.
Additional context
Related PRs: