From 7cfeef3a21e363ba631bd1b9827a7b839abf7678 Mon Sep 17 00:00:00 2001 From: AssHero Date: Wed, 25 May 2022 12:53:09 +0800 Subject: [PATCH 1/3] support create or replace table ... --- datafusion/core/src/execution/context.rs | 24 ++++++++++++++++++++---- datafusion/expr/src/logical_plan/plan.rs | 2 ++ datafusion/expr/src/utils.rs | 2 ++ datafusion/sql/src/planner.rs | 2 ++ 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 619ac13b1365..f1cba209c4f4 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -289,15 +289,17 @@ impl SessionContext { name, input, if_not_exists, + or_replace, }) => { let table = self.table(name.as_str()); - match (if_not_exists, table) { - (true, Ok(_)) => { + match (if_not_exists, or_replace, table) { + (true, false, Ok(_)) => { let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } - (_, Err(_)) => { + (_, true, Ok(_)) => { + self.deregister_table(name.as_str())?; let plan = self.optimize(&input)?; let physical = Arc::new(DataFrame::new(self.state.clone(), &plan)); @@ -311,7 +313,21 @@ impl SessionContext { self.register_table(name.as_str(), table)?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } - (false, Ok(_)) => Err(DataFusionError::Execution(format!( + (_, _, Err(_)) => { + let plan = self.optimize(&input)?; + let physical = + Arc::new(DataFrame::new(self.state.clone(), &plan)); + + let batches: Vec<_> = physical.collect_partitioned().await?; + let table = Arc::new(MemTable::try_new( + Arc::new(plan.schema().as_ref().into()), + batches, + )?); + + self.register_table(name.as_str(), table)?; + Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) + } + (false, false, Ok(_)) => Err(DataFusionError::Execution(format!( "Table '{:?}' already exists", name ))), diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 05c8a22e7d26..1bb091d3123a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1051,6 +1051,8 @@ pub struct CreateMemoryTable { pub input: Arc, /// Option to not error if table already exists pub if_not_exists: bool, + /// Option to replace table content if table already exists + pub or_replace: bool, } /// Creates a view. diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index a7aed1799dd5..fe830f4ba267 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -425,11 +425,13 @@ pub fn from_plan( LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, if_not_exists, + or_replace, .. }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { input: Arc::new(inputs[0].clone()), name: name.clone(), if_not_exists: *if_not_exists, + or_replace: *or_replace, })), LogicalPlan::CreateView(CreateView { name, or_replace, .. diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index ba05a224523d..e70af950c81b 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -161,6 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { table_properties, with_options, if_not_exists, + or_replace, .. } if columns.is_empty() && constraints.is_empty() @@ -173,6 +174,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { name: name.to_string(), input: Arc::new(plan), if_not_exists, + or_replace, })) } Statement::CreateView { From ba33ff001818e44f5555596b3e30f74846ba7d22 Mon Sep 17 00:00:00 2001 From: AssHero Date: Wed, 25 May 2022 22:38:28 +0800 Subject: [PATCH 2/3] 1. add test case for 'create or replace table'. 2. do not allow 'IF NOT EXISTS' coexist with 'REPLACE' --- datafusion/core/src/execution/context.rs | 7 +++- datafusion/core/tests/sql/create_drop.rs | 43 ++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f1cba209c4f4..be89608a7d91 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -298,7 +298,7 @@ impl SessionContext { let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } - (_, true, Ok(_)) => { + (false, true, Ok(_)) => { self.deregister_table(name.as_str())?; let plan = self.optimize(&input)?; let physical = @@ -313,6 +313,11 @@ impl SessionContext { self.register_table(name.as_str(), table)?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } + (true, true, Ok(_)) => { + Err(DataFusionError::Execution(format!( + "'IF NOT EXISTS' cannot coexist with 'REPLACE'" + ))) + } (_, _, Err(_)) => { let plan = self.optimize(&input)?; let physical = diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index 59df5d404395..e0afefbd8378 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -45,6 +45,49 @@ async fn create_table_as() -> Result<()> { Ok(()) } +#[tokio::test] +async fn create_or_replace_table_as() -> Result<()> { + // the information schema used to introduce cyclic Arcs + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + // Create table + ctx.sql("CREATE TABLE y AS VALUES (1,2),(3,4)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Replace table + ctx.sql("CREATE OR REPLACE TABLE y AS VALUES (5,6)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let sql_all = "SELECT * FROM y"; + let results_all = execute_to_batches(&ctx, sql_all).await; + + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 5 | 6 |", + "+---------+---------+", + ]; + + assert_batches_eq!(expected, &results_all); + + // 'IF NOT EXISTS' cannot coexist with 'REPLACE' + let result = ctx.sql("CREATE OR REPLACE TABLE if not exists y AS VALUES (7,8)") + .await; + assert!(result.is_err(), "'IF NOT EXISTS' cannot coexist with 'REPLACE'"); + + Ok(()) +} + #[tokio::test] async fn drop_table() -> Result<()> { let ctx = SessionContext::new(); From adff5440275a293b82447fad01c2424868e42e0a Mon Sep 17 00:00:00 2001 From: AssHero Date: Thu, 26 May 2022 10:43:28 +0800 Subject: [PATCH 3/3] refine the code format --- datafusion/core/src/execution/context.rs | 8 +++----- datafusion/core/tests/sql/create_drop.rs | 12 ++++++++---- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index be89608a7d91..28ceff8ee1bb 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -313,11 +313,9 @@ impl SessionContext { self.register_table(name.as_str(), table)?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } - (true, true, Ok(_)) => { - Err(DataFusionError::Execution(format!( - "'IF NOT EXISTS' cannot coexist with 'REPLACE'" - ))) - } + (true, true, Ok(_)) => Err(DataFusionError::Internal( + "'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(), + )), (_, _, Err(_)) => { let plan = self.optimize(&input)?; let physical = diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index e0afefbd8378..1d28f2e3371e 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -59,7 +59,7 @@ async fn create_or_replace_table_as() -> Result<()> { .await .unwrap(); - // Replace table + // Replace table ctx.sql("CREATE OR REPLACE TABLE y AS VALUES (5,6)") .await .unwrap() @@ -81,9 +81,13 @@ async fn create_or_replace_table_as() -> Result<()> { assert_batches_eq!(expected, &results_all); // 'IF NOT EXISTS' cannot coexist with 'REPLACE' - let result = ctx.sql("CREATE OR REPLACE TABLE if not exists y AS VALUES (7,8)") - .await; - assert!(result.is_err(), "'IF NOT EXISTS' cannot coexist with 'REPLACE'"); + let result = ctx + .sql("CREATE OR REPLACE TABLE if not exists y AS VALUES (7,8)") + .await; + assert!( + result.is_err(), + "'IF NOT EXISTS' cannot coexist with 'REPLACE'" + ); Ok(()) }