From 396788c429cdfb13710ed9f5250e74f9e0493c3d Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 17:24:12 +0200 Subject: [PATCH 1/8] Implement missing join types for Python dataframe --- datafusion/src/logical_plan/plan.rs | 24 ++++++++++++++++++++++++ python/Cargo.toml | 2 +- python/src/dataframe.rs | 18 ++++-------------- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 25cf9e33d2ca..63a463069a60 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -24,9 +24,11 @@ use super::{ display::{GraphvizVisitor, IndentVisitor}, }; use crate::datasource::TableProvider; +use crate::error::DataFusionError; use crate::logical_plan::dfschema::DFSchemaRef; use crate::sql::parser::FileType; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use std::str::FromStr; use std::{ cmp::min, fmt::{self, Display}, @@ -50,6 +52,28 @@ pub enum JoinType { Anti, } +impl FromStr for JoinType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "inner" => Ok(JoinType::Inner), + "left" => Ok(JoinType::Left), + "right" => Ok(JoinType::Right), + "full" => Ok(JoinType::Full), + "semi" => Ok(JoinType::Semi), + "anti" => Ok(JoinType::Semi), + how => { + return Err(DataFusionError::Internal(format!( + "The join type {} does not exist or is not implemented", + how + )) + .into()) + } + } + } +} + /// A LogicalPlan represents the different types of relational /// operators (such as Projection, Filter, etc) and can be created by /// the SQL query planner and the DataFrame API. diff --git a/python/Cargo.toml b/python/Cargo.toml index 859cf350ca51..282c9c8b86e6 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -31,7 +31,7 @@ libc = "0.2" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.7" pyo3 = { version = "0.13.2", features = ["extension-module"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "c3fc0c75af5ff2ebb99dba197d9d2ccd83eb5952" } +datafusion = { path = "../datafusion" } [lib] name = "datafusion" diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index 66e6916b6815..0c9d156b5531 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -26,8 +26,8 @@ use datafusion::logical_plan::{JoinType, LogicalPlanBuilder}; use datafusion::physical_plan::collect; use datafusion::{execution::context::ExecutionContextState, logical_plan}; +use crate::expression; use crate::{errors, to_py}; -use crate::{errors::DataFusionError, expression}; /// A DataFrame is a representation of a logical plan and an API to compose statements. /// Use it to build a plan and `.collect()` to execute the plan and collect the result. @@ -142,19 +142,9 @@ impl DataFrame { /// Returns the join of two DataFrames `on`. fn join(&self, right: &DataFrame, on: Vec<&str>, how: &str) -> PyResult { let builder = LogicalPlanBuilder::from(&self.plan); - - let join_type = match how { - "inner" => JoinType::Inner, - "left" => JoinType::Left, - "right" => JoinType::Right, - how => { - return Err(DataFusionError::Common(format!( - "The join type {} does not exist or is not implemented", - how - )) - .into()) - } - }; + let join_type = how + .parse::() + .map_err(|e| -> errors::DataFusionError { e.into() })?; let builder = errors::wrap(builder.join( &right.plan, From 38fc1645226e7e0401f5e66863b17aa8a5c53ab4 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 17:25:48 +0200 Subject: [PATCH 2/8] Fix mapping --- datafusion/src/logical_plan/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 63a463069a60..66f61f3d2909 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -62,7 +62,7 @@ impl FromStr for JoinType { "right" => Ok(JoinType::Right), "full" => Ok(JoinType::Full), "semi" => Ok(JoinType::Semi), - "anti" => Ok(JoinType::Semi), + "anti" => Ok(JoinType::Anti), how => { return Err(DataFusionError::Internal(format!( "The join type {} does not exist or is not implemented", From 4d2a42a604d92fdbf00a24b905d40aec830662da Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 17:56:17 +0200 Subject: [PATCH 3/8] Use commit hash instead --- python/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index 282c9c8b86e6..8f1480deedbc 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -31,7 +31,7 @@ libc = "0.2" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.7" pyo3 = { version = "0.13.2", features = ["extension-module"] } -datafusion = { path = "../datafusion" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "c92079dfb3045a9a46d12c3bc22361a44d11b8bc" } [lib] name = "datafusion" From 40fc29c2cb9b4b5f7851c7963c2c3293e8cacd43 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 18:01:14 +0200 Subject: [PATCH 4/8] undo some changes --- datafusion/src/logical_plan/plan.rs | 22 ---------------------- python/src/dataframe.rs | 27 ++++++++++++++++----------- 2 files changed, 16 insertions(+), 33 deletions(-) diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 66f61f3d2909..b1a2854cfe4b 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -52,28 +52,6 @@ pub enum JoinType { Anti, } -impl FromStr for JoinType { - type Err = DataFusionError; - - fn from_str(s: &str) -> Result { - match s { - "inner" => Ok(JoinType::Inner), - "left" => Ok(JoinType::Left), - "right" => Ok(JoinType::Right), - "full" => Ok(JoinType::Full), - "semi" => Ok(JoinType::Semi), - "anti" => Ok(JoinType::Anti), - how => { - return Err(DataFusionError::Internal(format!( - "The join type {} does not exist or is not implemented", - how - )) - .into()) - } - } - } -} - /// A LogicalPlan represents the different types of relational /// operators (such as Projection, Filter, etc) and can be created by /// the SQL query planner and the DataFrame API. diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index 0c9d156b5531..f8601eb7d2d0 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -26,8 +26,8 @@ use datafusion::logical_plan::{JoinType, LogicalPlanBuilder}; use datafusion::physical_plan::collect; use datafusion::{execution::context::ExecutionContextState, logical_plan}; -use crate::expression; use crate::{errors, to_py}; +use crate::{errors::DataFusionError, expression}; /// A DataFrame is a representation of a logical plan and an API to compose statements. /// Use it to build a plan and `.collect()` to execute the plan and collect the result. @@ -142,16 +142,21 @@ impl DataFrame { /// Returns the join of two DataFrames `on`. fn join(&self, right: &DataFrame, on: Vec<&str>, how: &str) -> PyResult { let builder = LogicalPlanBuilder::from(&self.plan); - let join_type = how - .parse::() - .map_err(|e| -> errors::DataFusionError { e.into() })?; - - let builder = errors::wrap(builder.join( - &right.plan, - join_type, - on.as_slice(), - on.as_slice(), - ))?; + let join_type = match how { + "inner" => JoinType::Inner, + "left" => JoinType::Left, + "right" => JoinType::Right, + "full" => JoinType::Full, + "semi" => JoinType::Semi, + "anti" => JoinType::Anti, + how => { + return Err(DataFusionError::Common(format!( + "The join type {} does not exist or is not implemented", + how + )) + .into()) + } + }; let plan = errors::wrap(builder.build())?; From 21f7d96cc38b644f012f46b5aa2b2567e12dad29 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 18:02:10 +0200 Subject: [PATCH 5/8] Remove imports --- datafusion/src/logical_plan/plan.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index b1a2854cfe4b..25cf9e33d2ca 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -24,11 +24,9 @@ use super::{ display::{GraphvizVisitor, IndentVisitor}, }; use crate::datasource::TableProvider; -use crate::error::DataFusionError; use crate::logical_plan::dfschema::DFSchemaRef; use crate::sql::parser::FileType; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use std::str::FromStr; use std::{ cmp::min, fmt::{self, Display}, From 69bb4e9fa3f05e9b29c67b551ba99adf6113453f Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 18:03:39 +0200 Subject: [PATCH 6/8] Undo removed part --- python/src/dataframe.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index f8601eb7d2d0..ac71ad7e3723 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -158,7 +158,12 @@ impl DataFrame { } }; - let plan = errors::wrap(builder.build())?; + let builder = errors::wrap(builder.join( + &right.plan, + join_type, + on.as_slice(), + on.as_slice(), + ))?; Ok(DataFrame { ctx_state: self.ctx_state.clone(), From d69bb9276a04c16d7775463a89d13464491f122c Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 18:04:04 +0200 Subject: [PATCH 7/8] Undo removed part --- python/src/dataframe.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index ac71ad7e3723..52df716238e1 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -165,6 +165,8 @@ impl DataFrame { on.as_slice(), ))?; + let plan = errors::wrap(builder.build())?; + Ok(DataFrame { ctx_state: self.ctx_state.clone(), plan, From c327e0081c484162bea4d4944cd29d485c7f7d7e Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Fri, 4 Jun 2021 18:04:36 +0200 Subject: [PATCH 8/8] minimize changes --- python/src/dataframe.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index 52df716238e1..8ceac64741e9 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -142,6 +142,7 @@ impl DataFrame { /// Returns the join of two DataFrames `on`. fn join(&self, right: &DataFrame, on: Vec<&str>, how: &str) -> PyResult { let builder = LogicalPlanBuilder::from(&self.plan); + let join_type = match how { "inner" => JoinType::Inner, "left" => JoinType::Left,