diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 272cdc6da892..652834d73ae3 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -290,15 +290,17 @@ impl SessionContext { name, input, if_not_exists, + or_replace, }) => { let table = self.table(name.as_str()); - match (if_not_exists, table) { - (true, Ok(_)) => { + match (if_not_exists, or_replace, table) { + (true, false, Ok(_)) => { let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } - (_, Err(_)) => { + (false, true, Ok(_)) => { + self.deregister_table(name.as_str())?; let plan = self.optimize(&input)?; let physical = Arc::new(DataFrame::new(self.state.clone(), &plan)); @@ -312,7 +314,24 @@ impl SessionContext { self.register_table(name.as_str(), table)?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } - (false, Ok(_)) => Err(DataFusionError::Execution(format!( + (true, true, Ok(_)) => Err(DataFusionError::Internal( + "'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(), + )), + (_, _, Err(_)) => { + let plan = self.optimize(&input)?; + let physical = + Arc::new(DataFrame::new(self.state.clone(), &plan)); + + let batches: Vec<_> = physical.collect_partitioned().await?; + let table = Arc::new(MemTable::try_new( + Arc::new(plan.schema().as_ref().into()), + batches, + )?); + + self.register_table(name.as_str(), table)?; + Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) + } + (false, false, Ok(_)) => Err(DataFusionError::Execution(format!( "Table '{:?}' already exists", name ))), diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index 59df5d404395..1d28f2e3371e 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -45,6 +45,53 @@ async fn create_table_as() -> Result<()> { Ok(()) } +#[tokio::test] +async fn create_or_replace_table_as() -> Result<()> { + // the information schema used to introduce cyclic Arcs + let ctx = + SessionContext::with_config(SessionConfig::new().with_information_schema(true)); + + // Create table + ctx.sql("CREATE TABLE y AS VALUES (1,2),(3,4)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Replace table + ctx.sql("CREATE OR REPLACE TABLE y AS VALUES (5,6)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let sql_all = "SELECT * FROM y"; + let results_all = execute_to_batches(&ctx, sql_all).await; + + let expected = vec![ + "+---------+---------+", + "| column1 | column2 |", + "+---------+---------+", + "| 5 | 6 |", + "+---------+---------+", + ]; + + assert_batches_eq!(expected, &results_all); + + // 'IF NOT EXISTS' cannot coexist with 'REPLACE' + let result = ctx + .sql("CREATE OR REPLACE TABLE if not exists y AS VALUES (7,8)") + .await; + assert!( + result.is_err(), + "'IF NOT EXISTS' cannot coexist with 'REPLACE'" + ); + + Ok(()) +} + #[tokio::test] async fn drop_table() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index acdb9092062a..ea8075bf2be0 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1063,6 +1063,8 @@ pub struct CreateMemoryTable { pub input: Arc, /// Option to not error if table already exists pub if_not_exists: bool, + /// Option to replace table content if table already exists + pub or_replace: bool, } /// Creates a view. diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index bb81d6776f6c..483b12b49a5e 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -427,11 +427,13 @@ pub fn from_plan( LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, if_not_exists, + or_replace, .. }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable { input: Arc::new(inputs[0].clone()), name: name.clone(), if_not_exists: *if_not_exists, + or_replace: *or_replace, })), LogicalPlan::CreateView(CreateView { name, or_replace, .. diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 3556b0203598..3feb870baa1c 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -161,6 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { table_properties, with_options, if_not_exists, + or_replace, .. } if columns.is_empty() && constraints.is_empty() @@ -173,6 +174,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { name: name.to_string(), input: Arc::new(plan), if_not_exists, + or_replace, })) } Statement::CreateView {