diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index ca83ffdaf437..e4c7656cc8a1 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1011,6 +1011,9 @@ impl TryInto for &LogicalPlan { LogicalPlan::CreateMemoryTable { .. } => Err(proto_error( "Error converting CreateMemoryTable. Not yet supported in Ballista", )), + LogicalPlan::DropTable { .. } => Err(proto_error( + "Error converting DropTable. Not yet supported in Ballista", + )), } } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 6295184208ff..ca160d52e74b 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -255,6 +255,19 @@ impl ExecutionContext { Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } + LogicalPlan::DropTable { name, if_exist, .. } => { + let returned = self.deregister_table(name.as_str())?; + if !if_exist && returned.is_none() { + Err(DataFusionError::Execution(format!( + "Memory table {:?} doesn't exist.", + name + ))) + } else { + let plan = LogicalPlanBuilder::empty(false).build()?; + Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) + } + } + plan => Ok(Arc::new(DataFrameImpl::new( self.state.clone(), &self.optimize(&plan)?, diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 6faac01d7360..d1e1678b6617 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -212,6 +212,15 @@ pub enum LogicalPlan { /// The logical plan input: Arc, }, + /// Drops a table. + DropTable { + /// The table name + name: String, + /// If the table exists + if_exist: bool, + /// Dummy schema + schema: DFSchemaRef, + }, /// Values expression. See /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. @@ -274,6 +283,7 @@ impl LogicalPlan { LogicalPlan::Extension { node } => node.schema(), LogicalPlan::Union { schema, .. } => schema, LogicalPlan::CreateMemoryTable { input, .. } => input.schema(), + LogicalPlan::DropTable { schema, .. } => schema, } } @@ -320,6 +330,7 @@ impl LogicalPlan { | LogicalPlan::Sort { input, .. } | LogicalPlan::CreateMemoryTable { input, .. } | LogicalPlan::Filter { input, .. } => input.all_schemas(), + LogicalPlan::DropTable { .. } => vec![], } } @@ -366,6 +377,7 @@ impl LogicalPlan { | LogicalPlan::Limit { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::CreateMemoryTable { .. } + | LogicalPlan::DropTable { .. } | LogicalPlan::CrossJoin { .. } | LogicalPlan::Analyze { .. } | LogicalPlan::Explain { .. } @@ -397,7 +409,8 @@ impl LogicalPlan { LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::CreateExternalTable { .. } => vec![], + | LogicalPlan::CreateExternalTable { .. } + | LogicalPlan::DropTable { .. } => vec![], } } @@ -545,7 +558,8 @@ impl LogicalPlan { LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::CreateExternalTable { .. } => true, + | LogicalPlan::CreateExternalTable { .. } + | LogicalPlan::DropTable { .. } => true, }; if !recurse { return Ok(false); @@ -863,6 +877,11 @@ impl LogicalPlan { LogicalPlan::CreateMemoryTable { ref name, .. } => { write!(f, "CreateMemoryTable: {:?}", name) } + LogicalPlan::DropTable { + ref name, if_exist, .. + } => { + write!(f, "DropTable: {:?} if not exist:={}", name, if_exist) + } LogicalPlan::Explain { .. } => write!(f, "Explain"), LogicalPlan::Analyze { .. } => write!(f, "Analyze"), LogicalPlan::Union { .. } => write!(f, "Union"), diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 9f2f2afea6a4..c88032e38b2e 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -206,6 +206,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { // apply the optimization to all inputs of the plan let expr = plan.expressions(); diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index 5299b9a219ef..a0bc04a0caf5 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -71,6 +71,7 @@ impl OptimizerRule for ConstantFolding { | LogicalPlan::Repartition { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::CreateMemoryTable { .. } + | LogicalPlan::DropTable { .. } | LogicalPlan::Values { .. } | LogicalPlan::Extension { .. } | LogicalPlan::Sort { .. } diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index a30523d21deb..7c087a17a986 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -437,6 +437,7 @@ fn optimize_plan( | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::CreateMemoryTable { .. } + | LogicalPlan::DropTable { .. } | LogicalPlan::CrossJoin { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index c94d48c8defb..52beb695529c 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -263,7 +263,8 @@ pub fn from_plan( } LogicalPlan::EmptyRelation { .. } | LogicalPlan::TableScan { .. } - | LogicalPlan::CreateExternalTable { .. } => { + | LogicalPlan::CreateExternalTable { .. } + | LogicalPlan::DropTable { .. } => { // All of these plan types have no inputs / exprs so should not be called assert!(expr.is_empty(), "{:?} should have no exprs", plan); assert!(inputs.is_empty(), "{:?} should have no inputs", plan); diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index e1170ed891b1..402f119e8ea0 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -802,7 +802,7 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(GlobalLimitExec::new(input, limit))) } - LogicalPlan::CreateExternalTable { .. }=> { + LogicalPlan::CreateExternalTable { .. } => { // There is no default plan for "CREATE EXTERNAL // TABLE" -- it must be handled at a higher level (so // that the appropriate table can be registered with @@ -811,7 +811,7 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: CreateExternalTable".to_string(), )) } - | LogicalPlan::CreateMemoryTable {..} => { + | LogicalPlan::CreateMemoryTable {..} | LogicalPlan::DropTable {..} => { // Create a dummy exec. Ok(Arc::new(EmptyExec::new( false, diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 50875a8f3779..1731c905d779 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -29,8 +29,8 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column, - DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, - ToStringifiedPlan, + DFSchema, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, + ToDFSchema, ToStringifiedPlan, }; use crate::optimizer::utils::exprlist_to_columns; use crate::prelude::JoinType; @@ -53,7 +53,7 @@ use sqlparser::ast::{ TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues, }; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; -use sqlparser::ast::{OrderByExpr, Statement}; +use sqlparser::ast::{ObjectType, OrderByExpr, Statement}; use sqlparser::parser::ParserError::ParserError; use super::{ @@ -163,6 +163,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) } + Statement::Drop { + object_type: ObjectType::Table, + if_exists, + names, + cascade: _, + purge: _, + } => + // We don't support cascade and purge for now. + { + Ok(LogicalPlan::DropTable { + name: names.get(0).unwrap().to_string(), + if_exist: *if_exists, + schema: DFSchemaRef::new(DFSchema::empty()), + }) + } + Statement::ShowColumns { extended, full, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index e6064afe7142..ad5c31bd13c5 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -711,6 +711,26 @@ async fn create_table_as() -> Result<()> { Ok(()) } +#[tokio::test] +async fn drop_table() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple"; + ctx.sql(sql).await.unwrap(); + + let sql = "DROP TABLE my_table"; + ctx.sql(sql).await.unwrap(); + + let result = ctx.table("my_table"); + assert!(result.is_err(), "drop table should deregister table."); + + let sql = "DROP TABLE IF EXISTS my_table"; + ctx.sql(sql).await.unwrap(); + + Ok(()) +} + #[tokio::test] async fn select_distinct() -> Result<()> { let mut ctx = ExecutionContext::new();