Skip to content

Conversation

@mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Oct 17, 2022

Which issue does this PR close?

Closes #3854
Closes #3653
Closes #3400
Closes #189,

Rationale for this change

What changes are included in this PR?

  1. Add methods required_input_ordering() to ExecutionPlan trait to specify the ordering requirements
  2. Fix output_partitioning(), output_ordering(), required_input_distribution() in couple of trait implementations
  3. Add BasicEnforcement rule to ensure the Distribution and Ordering requirements in the physical plan tree
  4. Add method equivalence_properties() to ExecutionPlan trait to discover the equivalence properties in the physical plan tree
  5. Add convenience utils for writing optimizers rule:
    transform()/transform_down()/transform_up()
  6. Implement PartialEq<dyn Any> for PhysicalExpr

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Changes to the physical-expr crates labels Oct 17, 2022
@mingmwang
Copy link
Contributor Author

@alamb @Dandandan @andygrove @tustvold @thinkharderdev @yahoNanJing
Please help to take a look.

@mingmwang
Copy link
Contributor Author

Looks like the build failed with following error, any idea?

cargo check --manifest-path datafusion-cli/Cargo.toml --locked
3
shell: sh -e {0}
4
error: the lock file /__w/arrow-datafusion/arrow-datafusion/datafusion-cli/Cargo.lock needs to be updated but --locked was passed to prevent this

@alamb
Copy link
Contributor

alamb commented Oct 17, 2022

cargo check --manifest-path datafusion-cli/Cargo.toml --locked
3

I think you need to do cd datafusion-cli && cargo update && git commit -a -m 'Update lock' -- will try and review this PR later today

@alamb
Copy link
Contributor

alamb commented Oct 17, 2022

I ran out of time today to review this but I plan to do so tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty neat @mingmwang but it is basically impossible for me to carefully review such a large PR. I wonder if you can break it down into pieces so we can get it in?

Possible split:

  1. Add PhysicalExpr::children() and PhysicalExpr::new_with_children
  2. the code for iterating / manipulating physical exprs
  3. The code for calculating equivalence properties (and tests)
  4. The code in datafusion/core/src/physical_optimizer/enforcement.rs

let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(host);

if let Some(path) = env::var("GCP_SERVICE_ACCOUNT_PATH").ok() {
if let Ok(path) = env::var("GCP_SERVICE_ACCOUNT_PATH") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are nice cleanups but not directly connected to the other tickets in this PR. Were they suggested by clippy or just things you saw as you were reviewing the code?

Copy link
Contributor Author

@mingmwang mingmwang Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are suggested by the clippy, so I have to change accordingly.

.any(|dist| matches!(dist, Distribution::SinglePartition))
}

/// Get a list of equivalence properties within the plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please expand on this function's doc string -- specifically it would be nice to:

  1. define what "equivalence" means (is it equality? Does it include nullability?)
  2. explain what the Vec<Vec<Column>> represents(a set of equivalence properties?) Can there be duplicates? vec[vec[col("A"), col("B")], vec[col("A"), col("C")]?
  3. explain what happens if an operator does not return any equivalence

Some examples of what changes are made based on this information would likely also be helpful

Copy link
Contributor Author

@mingmwang mingmwang Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add more doc. I will also refine the interface to make it clear, something like this.

fn equivalence_properties(&self) -> Vec<EquivalenceProperties>;

struct EquivalenceProperties {
   /// First element in the EquivalenceProperties
   head :  Column,
   /// Other equal columns
   others : Set<Column>,
}

And it doesn't allow duplicates or intersections, the two EquivalenceProperties will be combined/merged if there is intersection.

See this method:

pub fn combine_equivalence_properties(
    eq_properties: &mut Vec<Vec<Column>>,
    new_condition: (&Column, &Column),
)

And If an operator does not return any equivalence properties, it will not impact the correctness, but the optimizer might choose a non-optimal plan(with additional unnecessary SortExec/RepartitionExec)

Equivalence property is a very useful concept, it is also mentioned in the "Incorporating Partitioning and Parallel Plans into the SCOPE Optimizer" paper:

A set of columns that are known to have the same value in all tuples of a relation belong to a column equivalence class. An equivalence class may also contain a constant c, which implies that all column in the class have the value c. Equivalence classes are generated by equality predicates, typically equijoin conditions and equality comparisons with a constant.

http://www.cs.albany.edu/~jhh/courses/readings/zhou10.pdf

And PostgreSQL optimizer also implemented the EquivalenceClasses and the join reodering can leverage this.

EquivalenceClasses selection

https://github.com/postgres/postgres/tree/f4c7c410ee4a7baa06f51ebb8d5333c169691dd3/src/backend/optimizer

In future, I would suggest to introduce the equivalence properties to logical plan also, couple of logical optimization rules can benefit from it, like FilterPushDown, EliminateFilter, etc.

A negative example is SparkSQL does not support the equivalence properties explicitly, it cause some performance issues when it try to infer additional filter constraints but causing exponential growth of filter constraints, an interesting sharing.

See the section of rewriting Spark’s constraint propagation mechanism
https://www.databricks.com/session_na21/optimizing-the-catalyst-optimizer-for-complex-plans

Partitioning::RoundRobinBatch(count2),
) if count1 == count2 => true,
(Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
if expr_list_eq_any_order(exprs1, exprs2) && (count1 == count2) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is true for our hash algorithm -- are we sure that hash(expr1, expr2) is equivalent to hash(expr2, expr1)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is my fault, I will fix it. This code is used by the partition-aware union to check if the partitioning specs are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there is a UT to test the partition aware union in dataframe.rs, I will add one more UT to cover the join key order different case.

#[tokio::test]
    async fn partition_aware_union() -> Result<()> {
        let left = test_table().await?.select_columns(&["c1", "c2"])?;
        let right = test_table_with_name("c2")
            .await?
            .select_columns(&["c1", "c3"])?
            .with_column_renamed("c2.c1", "c2_c1")?;

        let left_rows = left.collect().await?;
        let right_rows = right.collect().await?;
        let join1 =
            left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
        let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;

        let union = join1.union(join2)?;

        let union_rows = union.collect().await?;

        assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
        assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
        assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

        let physical_plan = union.create_physical_plan().await?;
        let partition_count = SessionContext::new().copied_config().target_partitions;
        assert_eq!(
            physical_plan.output_partitioning().partition_count(),
            partition_count
        );
        Ok(())
    }
}

transform_down(expr, op)
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the PhysicalExpr and all of its
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what you would think about following the same visitor pattern as rewriting Exprs, the ExprRewriter.
https://github.com/apache/arrow-datafusion/blob/488b2cec3c700821dfdfece2d85c4cd7956e718d/datafusion/expr/src/expr_rewriter.rs#L46-L56

This code is straightforward and well commented it is simply a different structure than the Logical exprs and uses different terms (like down and up rather than pre and post)?

Copy link
Contributor Author

@mingmwang mingmwang Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tree visit pattern is common among rules, either post order or pre order. So we do not need to implement a visitor again and again, the only difference is when the rule try to do the tree travese, the mutation behavior is different. So we can have a common tree visitor pattern and pass the different behavior as a Closure. It will make writing optimization rule much more straightforward and more FP programming.

For example, I rewrite the CoalesceBatches rule, use the new utils:

impl PhysicalOptimizerRule for CoalesceBatches {
    fn optimize(
        &self,
        plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
        _config: &crate::execution::context::SessionConfig,
    ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
        let target_batch_size = self.target_batch_size;
        transform_up(plan, &|plan| {
            let plan_any = plan.as_any();
            let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
                || plan_any.downcast_ref::<HashJoinExec>().is_some()
                || plan_any.downcast_ref::<RepartitionExec>().is_some();
            if wrap_in_coalesce {
                Some(Arc::new(CoalesceBatchesExec::new(
                    plan.clone(),
                    target_batch_size,
                )))
            } else {
                None
            }
        })
    }
}

This is very similar to how we iterator over vec/list in Rust or any other FP languages today, we do not need to implement different iterators again and again, but just pass the behavior as Closure/Lambda expression and the compiler will help us to generate the different iterators/visitors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Contributor Author

@mingmwang mingmwang Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
I had combined those transform utils and the visitor pattern into a Trait TreeNodeRewritable, users can feel free to choose Closure/Lambda expressions or pass in a visitor to do the plan/expr rewriting.
TreeNodeRewritable is general enough now and can be applied to both physical plan and physical expr, or even logical plan, please help to take a look.

But unfortunately, due to Rust's orphan rule's limitation, the Trait TreeNodeRewritable is copied to two crates to make them become Local Trait, still have duplicate code.


/// a Trait for marking tree node types that are rewritable
pub trait TreeNodeRewritable: Clone {
    /// Transform the tree node using the given [TreeNodeRewriter]
    /// It performs a depth first walk of an node and its children.
    ///
    /// For an node tree such as
    /// ```text
    /// ParentNode
    ///    left: ChildNode1
    ///    right: ChildNode2
    /// ```
    ///
    /// The nodes are visited using the following order
    /// ```text
    /// pre_visit(ParentNode)
    /// pre_visit(ChildNode1)
    /// mutatate(ChildNode1)
    /// pre_visit(ChildNode2)
    /// mutate(ChildNode2)
    /// mutate(ParentNode)
    /// ```
    ///
    /// If an Err result is returned, recursion is stopped immediately
    ///
    /// If [`false`] is returned on a call to pre_visit, no
    /// children of that node are visited, nor is mutate
    /// called on that node
    ///
    fn transform_using<R: TreeNodeRewriter<Self>>(
        self,
        rewriter: &mut R,
    ) -> Result<Self> {
        let need_mutate = match rewriter.pre_visit(&self)? {
            RewriteRecursion::Mutate => return rewriter.mutate(self),
            RewriteRecursion::Stop => return Ok(self),
            RewriteRecursion::Continue => true,
            RewriteRecursion::Skip => false,
        };

        let after_op_children =
            self.map_children(|node| node.transform_using(rewriter))?;

        // now rewrite this node itself
        if need_mutate {
            rewriter.mutate(after_op_children)
        } else {
            Ok(after_op_children)
        }
    }

    /// Convenience utils for writing optimizers rule: recursively apply the given `op` to the node tree.
    /// When `op` does not apply to a given node, it is left uncshanged.
    /// The default tree traversal direction is transform_up(Postorder Traversal).
    fn transform<F>(self, op: &F) -> Result<Self>
    where
        F: Fn(Self) -> Option<Self>,
    {
        self.transform_up(op)
    }

    /// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its
    /// children(Preorder Traversal).
    /// When the `op` does not apply to a given node, it is left unchanged.
    fn transform_down<F>(self, op: &F) -> Result<Self>
    where
        F: Fn(Self) -> Option<Self>,
    {
        let node_cloned = self.clone();
        let after_op = match op(node_cloned) {
            Some(value) => value,
            None => self,
        };
        after_op.map_children(|node| node.transform_down(op))
    }

    /// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its
    /// children and then itself(Postorder Traversal).
    /// When the `op` does not apply to a given node, it is left unchanged.
    fn transform_up<F>(self, op: &F) -> Result<Self>
    where
        F: Fn(Self) -> Option<Self>,
    {
        let after_op_children = self.map_children(|node| node.transform_up(op))?;

        let after_op_children_clone = after_op_children.clone();
        let new_node = match op(after_op_children) {
            Some(value) => value,
            None => after_op_children_clone,
        };
        Ok(new_node)
    }

    /// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder)
    fn map_children<F>(self, transform: F) -> Result<Self>
    where
        F: FnMut(Self) -> Result<Self>;
}

} else {
Ok(expr)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some unit tests (or doctests) for these functions would be good

}

impl PartialEq<dyn Any> for Column {
fn eq(&self, other: &dyn Any) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/// Return the equals Column-Pairs and Non-equals Column-Pairs
fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think seeing examples of this code working in tests would help verify its correctness

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update they are in datafusion/core/src/physical_optimizer/enforcement.rs

}

/// Apply transform `F` to the plan's children, the transform `F` might have a direction(Preorder or Postorder)
fn map_children<F>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this code different than what is in datafusion/physical-expr/src/utils.rs ?

Copy link
Contributor Author

@mingmwang mingmwang Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, those are duplications, those utils are applied to Arc<dyn ExecutionPlan> and the others applied to Arc<dyn PhysicalExpr>. Maybe I should define a common TreeVisitor struct to hold those methods, but I don't how to make it work for both Arc<dyn ExecutionPlan> and Arc<dyn PhysicalExpr> in Rust.

Need help, any suggestion here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed the subtlety that they were for (slightly) different classes. 🤔

///
/// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c).
#[derive(Default)]
pub struct BasicEnforcement {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could call this DistributionAndOrdering to better reflect what it is doing?

Copy link
Contributor Author

@mingmwang mingmwang Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some Cascade style optimizers, they have the enforcement rules which do the similar things. So I just name the rule to BasicEnforcement, and this is for Phase1.

In the following PRs, I will also try to implement a more dynamic enforcement rule which will consider different alternative partitioning options and choose a best plan which will have the least number of RepartitionExec as the goal. Following are some papers I read recently, I will try to refer to them and implement the DynamicEnforcement rule.

Incorporating Partitioning and Parallel Plans into the SCOPE Optimizer
http://www.cs.albany.edu/~jhh/courses/readings/zhou10.pdf

Another recent paper, see the EXCHANGE PLACEMENT sections.
https://vldb.org/pvldb/vol15/p936-rajan.pdf

And we can have an optimizer conf option like 'prefer_join_keys_exactly_match', if this option is set to true, DataFusion physical optimizer will run BasicEnforcement, otherwise run DynamicEnforcement.

#[derive(Default)]
pub struct DynamicEnforcement {}

// TODO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this supposed to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty neat @mingmwang but it is basically impossible for me to carefully review such a large PR. I wonder if you can break it down into pieces so we can get it in?

Possible split:

  1. Add PhysicalExpr::children() and PhysicalExpr::new_with_children
  2. the code for iterating / manipulating physical exprs
  3. The code for calculating equivalence properties (and tests)
  4. The code in datafusion/core/src/physical_optimizer/enforcement.rs

Sure, I will split this PR to the 3 PRs:

  1. PhysicalExpr related change: implement PartialEq for PhysicalExpr, PhysicalExpr::children() and PhysicalExpr::new_with_children, the utils code for iterating / manipulating physical exprs.
  2. Execution plan related changes: output_partitioning(), output_ordering(), required_input_distribution(), the code for calculating equivalence properties (and tests)
  3. The code in datafusion/core/src/physical_optimizer/enforcement.rs and the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank. you very much

@alamb alamb marked this pull request as draft October 27, 2022 14:26
@alamb
Copy link
Contributor

alamb commented Oct 27, 2022

Marking this PR as a draft so it is not on the review queue as I believe @mingmwang is breaking it up until smaller PRs for review and merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-expr Changes to the physical-expr crates

Projects

None yet

2 participants