Skip to content

Commit

Permalink
feat: add SQL syntax support for alter, execute, describe, show, drop…
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhiHanZ authored and andylokandy committed Nov 27, 2023
1 parent 82f4016 commit 34018d9
Show file tree
Hide file tree
Showing 18 changed files with 710 additions and 26 deletions.
14 changes: 11 additions & 3 deletions src/query/ast/src/ast/statements/statement.rs
Expand Up @@ -233,6 +233,11 @@ pub enum Statement {

// tasks
CreateTask(CreateTaskStmt),
AlterTask(AlterTaskStmt),
ExecuteTask(ExecuteTaskStmt),
DescribeTask(DescribeTaskStmt),
DropTask(DropTaskStmt),
ShowTasks(ShowTasksStmt),
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -530,9 +535,12 @@ impl Display for Statement {
Statement::DropNetworkPolicy(stmt) => write!(f, "{stmt}")?,
Statement::DescNetworkPolicy(stmt) => write!(f, "{stmt}")?,
Statement::ShowNetworkPolicies => write!(f, "SHOW NETWORK POLICIES")?,
Statement::CreateTask(stmt) => {
write!(f, "{stmt}", stmt = stmt)?;
}
Statement::CreateTask(stmt) => write!(f, "{stmt}")?,
Statement::AlterTask(stmt) => write!(f, "{stmt}")?,
Statement::ExecuteTask(stmt) => write!(f, "{stmt}")?,
Statement::DropTask(stmt) => write!(f, "{stmt}")?,
Statement::ShowTasks(stmt) => write!(f, "{stmt}")?,
Statement::DescribeTask(stmt) => write!(f, "{stmt}")?,
}
Ok(())
}
Expand Down
129 changes: 129 additions & 0 deletions src/query/ast/src/ast/statements/task.rs
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Display;
use std::fmt::Formatter;

use crate::ast::ShowLimit;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateTaskStmt {
pub if_not_exists: bool,
Expand Down Expand Up @@ -46,6 +48,7 @@ impl Display for CreateTaskStmt {
write!(f, " COMMENTS = '{}'", self.comments)?;
}

write!(f, " AS {}", self.sql)?;
Ok(())
}
}
Expand Down Expand Up @@ -86,3 +89,129 @@ impl Display for ScheduleOptions {
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AlterTaskStmt {
pub if_exists: bool,
pub name: String,
pub options: AlterTaskOptions,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AlterTaskOptions {
Resume,
Suspend,
Set {
warehouse: Option<String>,
schedule: Option<ScheduleOptions>,
suspend_task_after_num_failures: Option<u64>,
comments: Option<String>,
},
Unset {
warehouse: bool,
},
// Change SQL
ModifyAs(String),
}

impl Display for AlterTaskOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
AlterTaskOptions::Resume => write!(f, " RESUME"),
AlterTaskOptions::Suspend => write!(f, " SUSPEND"),
AlterTaskOptions::Set {
warehouse,
schedule,
suspend_task_after_num_failures,
comments,
} => {
if let Some(wh) = warehouse {
write!(f, " SET WAREHOUSE = {}", wh)?;
}
if let Some(schedule) = schedule {
write!(f, " SET {}", schedule)?;
}
if let Some(num) = suspend_task_after_num_failures {
write!(f, " SUSPEND TASK AFTER {} FAILURES", num)?;
}
if let Some(comments) = comments {
write!(f, " COMMENTS = '{}'", comments)?;
}
Ok(())
}
AlterTaskOptions::Unset { warehouse } => {
if *warehouse {
write!(f, " UNSET WAREHOUSE")?;
}
Ok(())
}
AlterTaskOptions::ModifyAs(sql) => write!(f, " AS {}", sql),
}
}
}

impl Display for AlterTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ALTER TASK")?;
if self.if_exists {
write!(f, " IF EXISTS")?;
}
write!(f, " {}", self.name)?;
write!(f, "{}", self.options)?;
Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DropTaskStmt {
pub if_exists: bool,
pub name: String,
}

impl Display for DropTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DROP TASK")?;
if self.if_exists {
write!(f, " IF EXISTS")?;
}
write!(f, " {}", self.name)
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct ShowTasksStmt {
pub limit: Option<ShowLimit>,
}

impl Display for ShowTasksStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SHOW ")?;
write!(f, "TASKS")?;
if let Some(limit) = &self.limit {
write!(f, " {limit}")?;
}

Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecuteTaskStmt {
pub name: String,
}

impl Display for ExecuteTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "EXECUTE TASK {}", self.name)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DescribeTaskStmt {
pub name: String,
}

impl Display for DescribeTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DESCRIBE TASK {}", self.name)
}
}
114 changes: 114 additions & 0 deletions src/query/ast/src/parser/statement.rs
Expand Up @@ -135,6 +135,61 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
},
);

let alter_task = map(
rule! {
ALTER ~ TASK ~ ( IF ~ ^EXISTS )?
~ #ident ~ #alter_task_option
},
|(_, _, opt_if_exists, task, options)| {
Statement::AlterTask(AlterTaskStmt {
if_exists: opt_if_exists.is_some(),
name: task.to_string(),
options,
})
},
);

let drop_task = map(
rule! {
DROP ~ TASK ~ ( IF ~ ^EXISTS )?
~ #ident
},
|(_, _, opt_if_exists, task)| {
Statement::DropTask(DropTaskStmt {
if_exists: opt_if_exists.is_some(),
name: task.to_string(),
})
},
);
let show_tasks = map(
rule! {
SHOW ~ TASKS ~ #show_limit?
},
|(_, _, limit)| Statement::ShowTasks(ShowTasksStmt { limit }),
);

let execute_task = map(
rule! {
EXECUTE ~ TASK ~ #ident
},
|(_, _, task)| {
Statement::ExecuteTask(ExecuteTaskStmt {
name: task.to_string(),
})
},
);

let desc_task = map(
rule! {
( DESC | DESCRIBE ) ~ TASK ~ #ident
},
|(_, _, task)| {
Statement::DescribeTask(DescribeTaskStmt {
name: task.to_string(),
})
},
);

let insert = map(
rule! {
INSERT ~ #hint? ~ ( INTO | OVERWRITE ) ~ TABLE?
Expand Down Expand Up @@ -1594,6 +1649,11 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
[ COMMENT = '<string_literal>' ]
AS
<sql>`"
| #drop_task : "`DROP TASK [ IF EXISTS ] <name>`"
| #alter_task : "`ALTER TASK [ IF EXISTS ] <name> SUSPEND | RESUME | SET <option> = <value>` | UNSET <option> | MODIFY AS <sql>`"
| #show_tasks : "`SHOW TASKS [<show_limit>]`"
| #desc_task : "`DESC | DESCRIBE TASK <name>`"
| #execute_task: "`EXECUTE TASK <name>`"
),
));

Expand Down Expand Up @@ -2393,6 +2453,60 @@ pub fn vacuum_table_option(i: Input) -> IResult<VacuumTableOption> {
),))(i)
}

pub fn alter_task_option(i: Input) -> IResult<AlterTaskOptions> {
let suspend = map(
rule! {
SUSPEND
},
|_| AlterTaskOptions::Suspend,
);
let resume = map(
rule! {
RESUME
},
|_| AlterTaskOptions::Resume,
);
let modify_as = map(
rule! {
MODIFY ~ AS ~ #statement
},
|(_, _, sql)| {
let sql = pretty_statement(sql.stmt, 10)
.map_err(|_| ErrorKind::Other("invalid statement"))
.expect("failed to alter task");
AlterTaskOptions::ModifyAs(sql)
},
);
let set = map(
rule! {
SET
~ ( WAREHOUSE ~ "=" ~ #literal_string )?
~ ( SCHEDULE ~ "=" ~ #task_schedule_option )?
~ ( SUSPEND_TASK_AFTER_NUM_FAILURES ~ "=" ~ #literal_u64 )?
~ ( COMMENT ~ "=" ~ #literal_string )?
},
|(_, warehouse_opts, schedule_opts, suspend_opts, comment)| AlterTaskOptions::Set {
warehouse: warehouse_opts.map(|(_, _, warehouse)| warehouse),
schedule: schedule_opts.map(|(_, _, schedule)| schedule),
suspend_task_after_num_failures: suspend_opts.map(|(_, _, num)| num),
comments: comment.map(|(_, _, comment)| comment),
},
);
let unset = map(
rule! {
UNSET ~ WAREHOUSE
},
|_| AlterTaskOptions::Unset { warehouse: true },
);
rule!(
#suspend
| #resume
| #modify_as
| #set
| #unset
)(i)
}

pub fn task_warehouse_option(i: Input) -> IResult<WarehouseOptions> {
alt((map(
rule! {
Expand Down
9 changes: 9 additions & 0 deletions src/query/ast/src/parser/token.rs
Expand Up @@ -1018,6 +1018,8 @@ pub enum TokenKind {
LANGUAGE,
#[token("TASK", ignore(ascii_case))]
TASK,
#[token("TASKS", ignore(ascii_case))]
TASKS,
#[token("WAREHOUSE", ignore(ascii_case))]
WAREHOUSE,
#[token("SCHEDULE", ignore(ascii_case))]
Expand All @@ -1026,6 +1028,12 @@ pub enum TokenKind {
SUSPEND_TASK_AFTER_NUM_FAILURES,
#[token("CRON", ignore(ascii_case))]
CRON,
#[token("EXECUTE", ignore(ascii_case))]
EXECUTE,
#[token("SUSPEND", ignore(ascii_case))]
SUSPEND,
#[token("RESUME", ignore(ascii_case))]
RESUME,
}

// Reference: https://www.postgresql.org/docs/current/sql-keywords-appendix.html
Expand Down Expand Up @@ -1367,6 +1375,7 @@ impl TokenKind {
| TokenKind::IGNORE_RESULT
| TokenKind::MASKING
| TokenKind::POLICY
| TokenKind::TASK
if !after_as => true,
_ => false
}
Expand Down
10 changes: 10 additions & 0 deletions src/query/ast/src/visitors/visitor.rs
Expand Up @@ -588,6 +588,16 @@ pub trait Visitor<'ast>: Sized {

fn visit_create_task(&mut self, _stmt: &'ast CreateTaskStmt) {}

fn visit_drop_task(&mut self, _stmt: &'ast DropTaskStmt) {}

fn visit_show_tasks(&mut self, _stmt: &'ast ShowTasksStmt) {}

fn visit_execute_task(&mut self, _stmt: &'ast ExecuteTaskStmt) {}

fn visit_describe_task(&mut self, _stmt: &'ast DescribeTaskStmt) {}

fn visit_alter_task(&mut self, _stmt: &'ast AlterTaskStmt) {}

fn visit_with(&mut self, with: &'ast With) {
let With { ctes, .. } = with;
for cte in ctes.iter() {
Expand Down
10 changes: 10 additions & 0 deletions src/query/ast/src/visitors/visitor_mut.rs
Expand Up @@ -603,6 +603,16 @@ pub trait VisitorMut: Sized {

fn visit_create_task(&mut self, _stmt: &mut CreateTaskStmt) {}

fn visit_drop_task(&mut self, _stmt: &mut DropTaskStmt) {}

fn visit_show_tasks(&mut self, _stmt: &mut ShowTasksStmt) {}

fn visit_execute_task(&mut self, _stmt: &mut ExecuteTaskStmt) {}

fn visit_describe_task(&mut self, _stmt: &mut DescribeTaskStmt) {}

fn visit_alter_task(&mut self, _stmt: &mut AlterTaskStmt) {}

fn visit_with(&mut self, with: &mut With) {
let With { ctes, .. } = with;
for cte in ctes.iter_mut() {
Expand Down
5 changes: 5 additions & 0 deletions src/query/ast/src/visitors/walk.rs
Expand Up @@ -473,5 +473,10 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
Statement::DescNetworkPolicy(stmt) => visitor.visit_desc_network_policy(stmt),
Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(),
Statement::CreateTask(stmt) => visitor.visit_create_task(stmt),
Statement::ExecuteTask(stmt) => visitor.visit_execute_task(stmt),
Statement::DropTask(stmt) => visitor.visit_drop_task(stmt),
Statement::AlterTask(stmt) => visitor.visit_alter_task(stmt),
Statement::ShowTasks(stmt) => visitor.visit_show_tasks(stmt),
Statement::DescribeTask(stmt) => visitor.visit_describe_task(stmt),
}
}
5 changes: 5 additions & 0 deletions src/query/ast/src/visitors/walk_mut.rs
Expand Up @@ -449,5 +449,10 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(),

Statement::CreateTask(stmt) => visitor.visit_create_task(stmt),
Statement::ExecuteTask(stmt) => visitor.visit_execute_task(stmt),
Statement::DropTask(stmt) => visitor.visit_drop_task(stmt),
Statement::AlterTask(stmt) => visitor.visit_alter_task(stmt),
Statement::ShowTasks(stmt) => visitor.visit_show_tasks(stmt),
Statement::DescribeTask(stmt) => visitor.visit_describe_task(stmt),
}
}

0 comments on commit 34018d9

Please sign in to comment.