From 5d244d8d4439b6e4763d0dd80fb7e90011110f57 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 6 Jun 2023 17:10:52 -0400 Subject: [PATCH] Move JoinType to datafusion_common --- datafusion/common/src/join_type.rs | 98 +++++++++++++++++++ datafusion/common/src/lib.rs | 2 + .../core/src/physical_plan/joins/hash_join.rs | 2 +- .../physical_plan/joins/sort_merge_join.rs | 4 +- .../joins/symmetric_hash_join.rs | 2 +- .../core/src/physical_plan/joins/utils.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 77 +-------------- 7 files changed, 108 insertions(+), 79 deletions(-) create mode 100644 datafusion/common/src/join_type.rs diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs new file mode 100644 index 000000000000..9da9e5625f72 --- /dev/null +++ b/datafusion/common/src/join_type.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`JoinType`] and [`JoinConstraint`] + +use std::{ + fmt::{self, Display, Formatter}, + str::FromStr, +}; + +use crate::{DataFusionError, Result}; + +/// Join type +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum JoinType { + /// Inner Join + Inner, + /// Left Join + Left, + /// Right Join + Right, + /// Full Join + Full, + /// Left Semi Join + LeftSemi, + /// Right Semi Join + RightSemi, + /// Left Anti Join + LeftAnti, + /// Right Anti Join + RightAnti, +} + +impl JoinType { + pub fn is_outer(self) -> bool { + self == JoinType::Left || self == JoinType::Right || self == JoinType::Full + } +} + +impl Display for JoinType { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let join_type = match self { + JoinType::Inner => "Inner", + JoinType::Left => "Left", + JoinType::Right => "Right", + JoinType::Full => "Full", + JoinType::LeftSemi => "LeftSemi", + JoinType::RightSemi => "RightSemi", + JoinType::LeftAnti => "LeftAnti", + JoinType::RightAnti => "RightAnti", + }; + write!(f, "{join_type}") + } +} + +impl FromStr for JoinType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let s = s.to_uppercase(); + match s.as_str() { + "INNER" => Ok(JoinType::Inner), + "LEFT" => Ok(JoinType::Left), + "RIGHT" => Ok(JoinType::Right), + "FULL" => Ok(JoinType::Full), + "LEFTSEMI" => Ok(JoinType::LeftSemi), + "RIGHTSEMI" => Ok(JoinType::RightSemi), + "LEFTANTI" => Ok(JoinType::LeftAnti), + "RIGHTANTI" => Ok(JoinType::RightAnti), + _ => Err(DataFusionError::NotImplemented(format!( + "The join type {s} does not exist or is not implemented" + ))), + } + } +} + +/// Join constraint +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum JoinConstraint { + /// Join ON + On, + /// Join USING + Using, +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 7bbb3fbb7190..f58964bc2288 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -22,6 +22,7 @@ pub mod delta; mod dfschema; mod error; pub mod from_slice; +mod join_type; pub mod parsers; #[cfg(feature = "pyarrow")] mod pyarrow; @@ -39,6 +40,7 @@ pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, SharedResult, }; +pub use join_type::{JoinConstraint, JoinType}; pub use scalar::{ScalarType, ScalarValue}; pub use schema_reference::{OwnedSchemaReference, SchemaReference}; pub use stats::{ColumnStatistics, Statistics}; diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 2c8f8b55daaa..a6cb1c6f8231 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -57,7 +57,6 @@ use crate::arrow::array::BooleanBufferBuilder; use crate::arrow::datatypes::TimeUnit; use crate::error::{DataFusionError, Result}; use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer}; -use crate::logical_expr::JoinType; use crate::physical_plan::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide, @@ -78,6 +77,7 @@ use crate::physical_plan::{ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_common::JoinType; use super::{ utils::{OnceAsync, OnceFut}, diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 85bd18e592f9..67e111fd46cb 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -40,7 +40,6 @@ use futures::{Stream, StreamExt}; use crate::error::DataFusionError; use crate::error::Result; -use crate::logical_expr::JoinType; use crate::physical_plan::expressions::Column; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::joins::utils::{ @@ -52,6 +51,7 @@ use crate::physical_plan::{ metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_common::JoinType; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; @@ -1396,7 +1396,6 @@ mod tests { use crate::common::assert_contains; use crate::error::Result; - use crate::logical_expr::JoinType; use crate::physical_plan::expressions::Column; use crate::physical_plan::joins::utils::JoinOn; use crate::physical_plan::joins::SortMergeJoinExec; @@ -1405,6 +1404,7 @@ mod tests { use crate::prelude::{SessionConfig, SessionContext}; use crate::test::{build_table_i32, columns}; use crate::{assert_batches_eq, assert_batches_sorted_eq}; + use datafusion_common::JoinType; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; fn build_table( diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index b5c5b06c3f5f..eaaec759b95a 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -51,7 +51,6 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound}; use crate::error::{DataFusionError, Result}; -use crate::logical_expr::JoinType; use crate::physical_plan::common::SharedMemoryReservation; use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema; use crate::physical_plan::joins::hash_join_utils::JoinHashMap; @@ -71,6 +70,7 @@ use crate::physical_plan::{ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_common::JoinType; use datafusion_execution::TaskContext; const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 412e774806ef..88da4726ec8d 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -42,8 +42,8 @@ use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_physical_expr::{EquivalentClass, PhysicalExpr}; use crate::error::{DataFusionError, Result}; -use crate::logical_expr::JoinType; use crate::physical_plan::expressions::Column; +use datafusion_common::JoinType; use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; use crate::physical_plan::SchemaRef; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 798a54273c94..e19b327785a2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -40,9 +40,11 @@ use datafusion_common::{ use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; -use std::str::FromStr; use std::sync::Arc; +// backwards compatible +pub use datafusion_common::{JoinConstraint, JoinType}; + use super::DdlStatement; /// A LogicalPlan represents the different types of relational @@ -1128,79 +1130,6 @@ impl ToStringifiedPlan for LogicalPlan { } } -/// Join type -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum JoinType { - /// Inner Join - Inner, - /// Left Join - Left, - /// Right Join - Right, - /// Full Join - Full, - /// Left Semi Join - LeftSemi, - /// Right Semi Join - RightSemi, - /// Left Anti Join - LeftAnti, - /// Right Anti Join - RightAnti, -} - -impl JoinType { - pub fn is_outer(self) -> bool { - self == JoinType::Left || self == JoinType::Right || self == JoinType::Full - } -} - -impl Display for JoinType { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - let join_type = match self { - JoinType::Inner => "Inner", - JoinType::Left => "Left", - JoinType::Right => "Right", - JoinType::Full => "Full", - JoinType::LeftSemi => "LeftSemi", - JoinType::RightSemi => "RightSemi", - JoinType::LeftAnti => "LeftAnti", - JoinType::RightAnti => "RightAnti", - }; - write!(f, "{join_type}") - } -} - -impl FromStr for JoinType { - type Err = DataFusionError; - - fn from_str(s: &str) -> Result { - let s = s.to_uppercase(); - match s.as_str() { - "INNER" => Ok(JoinType::Inner), - "LEFT" => Ok(JoinType::Left), - "RIGHT" => Ok(JoinType::Right), - "FULL" => Ok(JoinType::Full), - "LEFTSEMI" => Ok(JoinType::LeftSemi), - "RIGHTSEMI" => Ok(JoinType::RightSemi), - "LEFTANTI" => Ok(JoinType::LeftAnti), - "RIGHTANTI" => Ok(JoinType::RightAnti), - _ => Err(DataFusionError::NotImplemented(format!( - "The join type {s} does not exist or is not implemented" - ))), - } - } -} - -/// Join constraint -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum JoinConstraint { - /// Join ON - On, - /// Join USING - Using, -} - /// Produces no rows: An empty relation with an empty schema #[derive(Clone, PartialEq, Eq, Hash)] pub struct EmptyRelation {