diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index b49f073105c4..df63b77572e8 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -20,6 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; +use std::hash::Hash; use std::sync::Arc; use crate::error::{DataFusionError, Result, SchemaError}; @@ -496,6 +497,15 @@ impl From for SchemaRef { } } +// Hashing refers to a subset of fields considered in PartialEq. +#[allow(clippy::derive_hash_xor_eq)] +impl Hash for DFSchema { + fn hash(&self, state: &mut H) { + self.fields.hash(state); + self.metadata.len().hash(state); // HashMap is not hashable + } +} + /// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes pub trait ToDFSchema where @@ -587,7 +597,7 @@ impl ExprSchema for DFSchema { } /// DFField wraps an Arrow field and adds an optional qualifier -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct DFField { /// Optional qualifier (usually a table or relation name) qualifier: Option, diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs index 4aff7c7eb477..6a61da970d87 100644 --- a/datafusion/common/src/parsers.rs +++ b/datafusion/common/src/parsers.rs @@ -26,7 +26,7 @@ const SECONDS_PER_HOUR: f64 = 3_600_f64; const NANOS_PER_SECOND: f64 = 1_000_000_000_f64; /// Readable file compression type -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum CompressionTypeVariant { /// Gzip-ed file GZIP, diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index 4ca41edd9aa2..34656bc114a6 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -69,7 +69,7 @@ pub enum TableReference<'a> { /// Represents a path to a table that may require further resolution /// that owns the underlying names -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum OwnedTableReference { /// An unqualified table reference, e.g. "table" Bare { diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 1d4e7a86c86f..0a1ac4054f3c 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -2373,6 +2373,7 @@ Internal error: Optimizer rule 'type_coercion' failed due to unexpected error: E } } /// An example extension node that doesn't do anything + #[derive(PartialEq, Eq, Hash)] struct NoOpExtensionNode { schema: DFSchemaRef, } @@ -2425,6 +2426,19 @@ Internal error: Optimizer rule 'type_coercion' failed due to unexpected error: E ) -> Arc { unimplemented!("NoOp"); } + + fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool { + match other.as_any().downcast_ref::() { + Some(o) => self == o, + None => false, + } + } + + fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); + } } #[derive(Debug)] diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index 3b1ea76a84a2..02c8a1664da2 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -324,6 +324,7 @@ impl OptimizerRule for TopKOptimizerRule { } } +#[derive(PartialEq, Eq, Hash)] struct TopKPlanNode { k: usize, input: LogicalPlan, @@ -376,6 +377,19 @@ impl UserDefinedLogicalNode for TopKPlanNode { expr: exprs[0].clone(), }) } + + fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool { + match other.as_any().downcast_ref::() { + Some(o) => self == o, + None => false, + } + } + + fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); + } } /// Physical planner for TopK nodes diff --git a/datafusion/expr/src/logical_plan/extension.rs b/datafusion/expr/src/logical_plan/extension.rs index fd32741445fb..57a8cac6b549 100644 --- a/datafusion/expr/src/logical_plan/extension.rs +++ b/datafusion/expr/src/logical_plan/extension.rs @@ -18,14 +18,15 @@ //! This module defines the interface for logical nodes use crate::{Expr, LogicalPlan}; use datafusion_common::DFSchemaRef; -use std::{any::Any, collections::HashSet, fmt, sync::Arc}; +use std::hash::{Hash, Hasher}; +use std::{any::Any, cmp::Eq, collections::HashSet, fmt, sync::Arc}; /// This defines the interface for `LogicalPlan` nodes that can be /// used to extend DataFusion with custom relational operators. /// /// See the example in /// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an -/// example of how to use this extension API +/// example of how to use this extension API. pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { /// Return a reference to self as Any, to support dynamic downcasting fn as_any(&self) -> &dyn Any; @@ -77,4 +78,26 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { exprs: &[Expr], inputs: &[LogicalPlan], ) -> Arc; + + /// Hashing respecting requirements from [std::hash::Hash]. + fn dyn_hash(&self, state: &mut dyn Hasher); + + /// Comparison respecting requirements from [std::cmp::Eq]. + /// + /// When `other` has an another type than `self`, then the values are *not* equal. + fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool; +} + +impl Hash for dyn UserDefinedLogicalNode { + fn hash(&self, state: &mut H) { + self.dyn_hash(state); + } } + +impl std::cmp::PartialEq for dyn UserDefinedLogicalNode { + fn eq(&self, other: &Self) -> bool { + self.dyn_eq(other) + } +} + +impl Eq for dyn UserDefinedLogicalNode {} diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c3ef861eb3b4..f5866d5a96d5 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -49,7 +49,7 @@ use std::sync::Arc; /// an output relation (table) with a (potentially) different /// schema. A plan represents a dataflow tree where data flows /// from leaves up to the root to produce the query result. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub enum LogicalPlan { /// Evaluates an arbitrary list of expressions (essentially a /// SELECT with an expression list) on its input. @@ -1249,7 +1249,7 @@ impl Display for JoinType { } /// Join constraint -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum JoinConstraint { /// Join ON On, @@ -1258,7 +1258,7 @@ pub enum JoinConstraint { } /// Creates a catalog (aka "Database"). -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct CreateCatalog { /// The catalog name pub catalog_name: String, @@ -1269,7 +1269,7 @@ pub struct CreateCatalog { } /// Creates a schema. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct CreateCatalogSchema { /// The table schema pub schema_name: String, @@ -1280,7 +1280,7 @@ pub struct CreateCatalogSchema { } /// Drops a table. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct DropTable { /// The table name pub name: OwnedTableReference, @@ -1291,7 +1291,7 @@ pub struct DropTable { } /// Drops a view. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct DropView { /// The view name pub name: OwnedTableReference, @@ -1303,7 +1303,7 @@ pub struct DropView { /// Set a Variable's value -- value in /// [`ConfigOptions`](datafusion_common::config::ConfigOptions) -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct SetVariable { /// The variable name pub variable: String, @@ -1314,7 +1314,7 @@ pub struct SetVariable { } /// Produces no rows: An empty relation with an empty schema -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct EmptyRelation { /// Whether to produce a placeholder row pub produce_one_row: bool, @@ -1325,7 +1325,7 @@ pub struct EmptyRelation { /// Values expression. See /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Values { /// The table schema pub schema: DFSchemaRef, @@ -1335,7 +1335,7 @@ pub struct Values { /// Evaluates an arbitrary list of expressions (essentially a /// SELECT with an expression list) on its input. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] // mark non_exhaustive to encourage use of try_new/new() #[non_exhaustive] pub struct Projection { @@ -1400,7 +1400,7 @@ impl Projection { } /// Aliased subquery -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] // mark non_exhaustive to encourage use of try_new/new() #[non_exhaustive] pub struct SubqueryAlias { @@ -1440,7 +1440,7 @@ impl SubqueryAlias { /// /// Filter should not be created directly but instead use `try_new()` /// and that these fields are only pub to support pattern matching -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] #[non_exhaustive] pub struct Filter { /// The predicate expression, which must have Boolean type. @@ -1488,7 +1488,7 @@ impl Filter { } /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Window { /// The incoming logical plan pub input: Arc, @@ -1515,8 +1515,30 @@ pub struct TableScan { pub fetch: Option, } +impl PartialEq for TableScan { + fn eq(&self, other: &Self) -> bool { + self.table_name == other.table_name + && self.projection == other.projection + && self.projected_schema == other.projected_schema + && self.filters == other.filters + && self.fetch == other.fetch + } +} + +impl Eq for TableScan {} + +impl Hash for TableScan { + fn hash(&self, state: &mut H) { + self.table_name.hash(state); + self.projection.hash(state); + self.projected_schema.hash(state); + self.filters.hash(state); + self.fetch.hash(state); + } +} + /// Apply Cross Join to two logical plans -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct CrossJoin { /// Left input pub left: Arc, @@ -1527,7 +1549,7 @@ pub struct CrossJoin { } /// Repartition the plan based on a partitioning scheme. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Repartition { /// The incoming logical plan pub input: Arc, @@ -1536,7 +1558,7 @@ pub struct Repartition { } /// Union multiple inputs -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Union { /// Inputs to merge pub inputs: Vec>, @@ -1545,7 +1567,7 @@ pub struct Union { } /// Creates an in memory table. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct CreateMemoryTable { /// The table name pub name: OwnedTableReference, @@ -1558,7 +1580,7 @@ pub struct CreateMemoryTable { } /// Creates a view. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct CreateView { /// The table name pub name: OwnedTableReference, @@ -1571,7 +1593,7 @@ pub struct CreateView { } /// Creates an external table. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq)] pub struct CreateExternalTable { /// The table schema pub schema: DFSchemaRef, @@ -1597,7 +1619,25 @@ pub struct CreateExternalTable { pub options: HashMap, } -#[derive(Clone)] +// Hashing refers to a subset of fields considered in PartialEq. +#[allow(clippy::derive_hash_xor_eq)] +impl Hash for CreateExternalTable { + fn hash(&self, state: &mut H) { + self.schema.hash(state); + self.name.hash(state); + self.location.hash(state); + self.file_type.hash(state); + self.has_header.hash(state); + self.delimiter.hash(state); + self.table_partition_cols.hash(state); + self.if_not_exists.hash(state); + self.definition.hash(state); + self.file_compression_type.hash(state); + self.options.len().hash(state); // HashMap is not hashable + } +} + +#[derive(Clone, PartialEq, Eq, Hash)] pub enum WriteOp { Insert, Delete, @@ -1617,7 +1657,7 @@ impl Display for WriteOp { } /// The operator that modifies the content of a database (adapted from substrait WriteRel) -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct DmlStatement { /// The table name pub table_name: OwnedTableReference, @@ -1631,7 +1671,7 @@ pub struct DmlStatement { /// Prepare a statement but do not execute it. Prepare statements can have 0 or more /// `Expr::Placeholder` expressions that are filled in during execution -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Prepare { /// The name of the statement pub name: String, @@ -1642,7 +1682,7 @@ pub struct Prepare { } /// Describe the schema of table -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeTable { /// Table schema pub schema: Arc, @@ -1652,7 +1692,7 @@ pub struct DescribeTable { /// Produces a relation with string representations of /// various parts of the plan -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Explain { /// Should extra (detailed, intermediate plans) be included? pub verbose: bool, @@ -1668,7 +1708,7 @@ pub struct Explain { /// Runs the actual plan, and then prints the physical plan with /// with execution metrics. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Analyze { /// Should extra detail be included? pub verbose: bool, @@ -1679,14 +1719,24 @@ pub struct Analyze { } /// Extension operator defined outside of DataFusion -#[derive(Clone)] +#[allow(clippy::derive_hash_xor_eq)] // see impl PartialEq for explanation +#[derive(Clone, Eq, Hash)] pub struct Extension { /// The runtime extension operator pub node: Arc, } +impl PartialEq for Extension { + #[allow(clippy::op_ref)] // clippy false positive + fn eq(&self, other: &Self) -> bool { + // must be manually derived due to a bug in #[derive(PartialEq)] + // https://github.com/rust-lang/rust/issues/39128 + &self.node == &other.node + } +} + /// Produces the first `n` tuples from its input and discards the rest. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Limit { /// Number of rows to skip before fetch pub skip: usize, @@ -1698,7 +1748,7 @@ pub struct Limit { } /// Removes duplicate rows from the input -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Distinct { /// The logical plan that is being DISTINCT'd pub input: Arc, @@ -1706,7 +1756,7 @@ pub struct Distinct { /// Aggregates its input based on a set of grouping and aggregate /// expressions (e.g. SUM). -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] // mark non_exhaustive to encourage use of try_new/new() #[non_exhaustive] pub struct Aggregate { @@ -1779,7 +1829,7 @@ impl Aggregate { } /// Sorts its input according to a list of sort expressions. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Sort { /// The sort expressions pub expr: Vec, @@ -1790,7 +1840,7 @@ pub struct Sort { } /// Join two logical plans on one or more join columns -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Join { /// Left input pub left: Arc, @@ -1846,7 +1896,7 @@ impl Join { } /// Subquery -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Subquery { /// The subquery pub subquery: Arc, @@ -1873,29 +1923,8 @@ impl Debug for Subquery { } } -impl Hash for Subquery { - fn hash(&self, state: &mut H) { - state.finish(); - } - - fn hash_slice(_data: &[Self], state: &mut H) - where - Self: Sized, - { - state.finish(); - } -} - -impl PartialEq for Subquery { - fn eq(&self, _other: &Self) -> bool { - false - } -} - -impl Eq for Subquery {} - /// Logical partitioning schemes supported by the repartition operator. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Partitioning { /// Allocate batches using a round-robin algorithm and the specified number of partitions RoundRobinBatch(usize), @@ -1908,7 +1937,7 @@ pub enum Partitioning { /// Represents which type of plan, when storing multiple /// for use in EXPLAIN plans -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum PlanType { /// The initial LogicalPlan provided to DataFusion InitialLogicalPlan, @@ -1948,7 +1977,7 @@ impl Display for PlanType { } /// Represents some sort of execution plan, in String form -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[allow(clippy::rc_buffer)] pub struct StringifiedPlan { /// An identifier of what type of plan this string represents @@ -1984,7 +2013,7 @@ pub trait ToStringifiedPlan { } /// Unnest a column that contains a nested list type. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Unnest { /// The incoming logical plan pub input: Arc, diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr/src/signature.rs index 3efe77e43ea7..19909cf2fbf4 100644 --- a/datafusion/expr/src/signature.rs +++ b/datafusion/expr/src/signature.rs @@ -37,7 +37,7 @@ pub enum Volatility { } /// A function's type signature, which defines the function's supported argument types. -#[derive(Debug, Clone, PartialEq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum TypeSignature { /// arbitrary number of arguments of an common type out of a list of valid types // A function such as `concat` is `Variadic(vec![DataType::Utf8, DataType::LargeUtf8])` @@ -59,7 +59,7 @@ pub enum TypeSignature { } ///The Signature of a function defines its supported input types as well as its volatility. -#[derive(Debug, Clone, PartialEq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Signature { /// type_signature - The types that the function accepts. See [TypeSignature] for more information. pub type_signature: TypeSignature, diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 64a1750ee2e4..5077bed9090d 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -45,6 +45,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use log::{debug, trace, warn}; +use std::borrow::Cow; use std::sync::Arc; use std::time::Instant; @@ -264,7 +265,7 @@ impl Optimizer { { let options = config.options(); let start_time = Instant::now(); - let mut plan_str = format!("{}", plan.display_indent()); + let mut old_plan = Cow::Borrowed(plan); let mut new_plan = plan.clone(); let mut i = 0; while i < options.optimizer.max_passes { @@ -328,13 +329,12 @@ impl Optimizer { // TODO this is an expensive way to see if the optimizer did anything and // it would be better to change the OptimizerRule trait to return an Option // instead - let new_plan_str = format!("{}", new_plan.display_indent()); - if plan_str == new_plan_str { + if old_plan.as_ref() == &new_plan { // plan did not change, so no need to continue trying to optimize debug!("optimizer pass {} did not make changes", i); break; } - plan_str = new_plan_str; + old_plan = Cow::Owned(new_plan.clone()); i += 1; } log_plan("Final optimized plan", &new_plan); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 0d8da557397e..f004e3d31988 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1074,7 +1074,7 @@ mod tests { assert_optimized_plan_eq(&plan, expected) } - #[derive(Debug)] + #[derive(Debug, PartialEq, Eq, Hash)] struct NoopPlan { input: Vec, schema: DFSchemaRef, @@ -1118,6 +1118,19 @@ mod tests { schema: self.schema.clone(), }) } + + fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool { + match other.as_any().downcast_ref::() { + Some(o) => self == o, + None => false, + } + } + + fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); + } } #[test] diff --git a/datafusion/optimizer/src/test/user_defined.rs b/datafusion/optimizer/src/test/user_defined.rs index 92b56ee75a7b..a1d5e4623135 100644 --- a/datafusion/optimizer/src/test/user_defined.rs +++ b/datafusion/optimizer/src/test/user_defined.rs @@ -34,6 +34,7 @@ pub fn new(input: LogicalPlan) -> LogicalPlan { LogicalPlan::Extension(Extension { node }) } +#[derive(PartialEq, Eq, Hash)] struct TestUserDefinedPlanNode { input: LogicalPlan, } @@ -76,4 +77,17 @@ impl UserDefinedLogicalNode for TestUserDefinedPlanNode { input: inputs[0].clone(), }) } + + fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool { + match other.as_any().downcast_ref::() { + Some(o) => self == o, + None => false, + } + } + + fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); + } } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index bf3c733c005a..2f4f83d89d7e 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1639,6 +1639,7 @@ mod roundtrip_tests { } } + #[derive(PartialEq, Eq, Hash)] struct TopKPlanNode { k: usize, input: LogicalPlan, @@ -1695,6 +1696,19 @@ mod roundtrip_tests { expr: exprs[0].clone(), }) } + + fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool { + match other.as_any().downcast_ref::() { + Some(o) => self == o, + None => false, + } + } + + fn dyn_hash(&self, state: &mut dyn std::hash::Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); + } } #[derive(Debug)]