Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
"Error converting CreateMemoryTable. Not yet supported in Ballista",
)),
LogicalPlan::DropTable { .. } => Err(proto_error(
"Error converting DropTable. Not yet supported in Ballista",
)),
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,19 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}

LogicalPlan::DropTable { name, if_exist, .. } => {
let returned = self.deregister_table(name.as_str())?;
if !if_exist && returned.is_none() {
Err(DataFusionError::Execution(format!(
"Memory table {:?} doesn't exist.",
name
)))
} else {
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
}

plan => Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&self.optimize(&plan)?,
Expand Down
23 changes: 21 additions & 2 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ pub enum LogicalPlan {
/// The logical plan
input: Arc<LogicalPlan>,
},
/// Drops a table.
DropTable {
/// The table name
name: String,
/// If the table exists
if_exist: bool,
/// Dummy schema
schema: DFSchemaRef,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if it's valuable to actually just attach the fetched table schema

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise you can just remove this placeholder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a schema for dropping the table?

Copy link
Member Author

@viirya viirya Nov 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, seems an empty schema is better here.

},
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand Down Expand Up @@ -274,6 +283,7 @@ impl LogicalPlan {
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Union { schema, .. } => schema,
LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
LogicalPlan::DropTable { schema, .. } => schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could return an empty schema here instead of keeping it the plan node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried creating an empty schema before, but schema returns &DFSchemaRef, so rustc complains about a reference to dropped temporary object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that is unfortunate but I think the comments in the LogicalPlan struct are reasonable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Another solution could be having / referencing a static object for an empty schema, but I'm ok with the current situation.

}
}

Expand Down Expand Up @@ -320,6 +330,7 @@ impl LogicalPlan {
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable { input, .. }
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
LogicalPlan::DropTable { .. } => vec![],
}
}

Expand Down Expand Up @@ -366,6 +377,7 @@ impl LogicalPlan {
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::DropTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
Expand Down Expand Up @@ -397,7 +409,8 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => vec![],
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::DropTable { .. } => vec![],
}
}

Expand Down Expand Up @@ -545,7 +558,8 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. } => true,
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::DropTable { .. } => true,
};
if !recurse {
return Ok(false);
Expand Down Expand Up @@ -863,6 +877,11 @@ impl LogicalPlan {
LogicalPlan::CreateMemoryTable { ref name, .. } => {
write!(f, "CreateMemoryTable: {:?}", name)
}
LogicalPlan::DropTable {
ref name, if_exist, ..
} => {
write!(f, "DropTable: {:?} if not exist:={}", name, if_exist)
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::DropTable { .. }
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
let expr = plan.expressions();
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::DropTable { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::DropTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ pub fn from_plan(
}
LogicalPlan::EmptyRelation { .. }
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable { .. } => {
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::DropTable { .. } => {
// All of these plan types have no inputs / exprs so should not be called
assert!(expr.is_empty(), "{:?} should have no exprs", plan);
assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
LogicalPlan::CreateExternalTable { .. }=> {
LogicalPlan::CreateExternalTable { .. } => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
Expand All @@ -811,7 +811,7 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
| LogicalPlan::CreateMemoryTable {..} => {
| LogicalPlan::CreateMemoryTable {..} | LogicalPlan::DropTable {..} => {
// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
false,
Expand Down
22 changes: 19 additions & 3 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
ToStringifiedPlan,
DFSchema, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
ToDFSchema, ToStringifiedPlan,
};
use crate::optimizer::utils::exprlist_to_columns;
use crate::prelude::JoinType;
Expand All @@ -53,7 +53,7 @@ use sqlparser::ast::{
TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{OrderByExpr, Statement};
use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
use sqlparser::parser::ParserError::ParserError;

use super::{
Expand Down Expand Up @@ -163,6 +163,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
})
}

Statement::Drop {
object_type: ObjectType::Table,
if_exists,
names,
cascade: _,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to add cascade and purge to the statement to the logical plan, or otherwise have a warning here saying that they do not take any effect

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just added a comment here, is it okay?

purge: _,
} =>
// We don't support cascade and purge for now.
{
Ok(LogicalPlan::DropTable {
name: names.get(0).unwrap().to_string(),
if_exist: *if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
})
}

Statement::ShowColumns {
extended,
full,
Expand Down
20 changes: 20 additions & 0 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,26 @@ async fn create_table_as() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn drop_table() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_simple_csv(&mut ctx).await?;

let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple";
ctx.sql(sql).await.unwrap();

let sql = "DROP TABLE my_table";
ctx.sql(sql).await.unwrap();

let result = ctx.table("my_table");
assert!(result.is_err(), "drop table should deregister table.");

let sql = "DROP TABLE IF EXISTS my_table";
ctx.sql(sql).await.unwrap();

Ok(())
}

#[tokio::test]
async fn select_distinct() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down