Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SQL syntax support for alter, execute, describe, show, drop task #13344

Merged
merged 1 commit into from Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/query/ast/src/ast/statements/statement.rs
Expand Up @@ -233,6 +233,11 @@ pub enum Statement {

// tasks
CreateTask(CreateTaskStmt),
AlterTask(AlterTaskStmt),
ExecuteTask(ExecuteTaskStmt),
DescribeTask(DescribeTaskStmt),
DropTask(DropTaskStmt),
ShowTasks(ShowTasksStmt),
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -530,9 +535,12 @@ impl Display for Statement {
Statement::DropNetworkPolicy(stmt) => write!(f, "{stmt}")?,
Statement::DescNetworkPolicy(stmt) => write!(f, "{stmt}")?,
Statement::ShowNetworkPolicies => write!(f, "SHOW NETWORK POLICIES")?,
Statement::CreateTask(stmt) => {
write!(f, "{stmt}", stmt = stmt)?;
}
Statement::CreateTask(stmt) => write!(f, "{stmt}")?,
Statement::AlterTask(stmt) => write!(f, "{stmt}")?,
Statement::ExecuteTask(stmt) => write!(f, "{stmt}")?,
Statement::DropTask(stmt) => write!(f, "{stmt}")?,
Statement::ShowTasks(stmt) => write!(f, "{stmt}")?,
Statement::DescribeTask(stmt) => write!(f, "{stmt}")?,
}
Ok(())
}
Expand Down
129 changes: 129 additions & 0 deletions src/query/ast/src/ast/statements/task.rs
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Display;
use std::fmt::Formatter;

use crate::ast::ShowLimit;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateTaskStmt {
pub if_not_exists: bool,
Expand Down Expand Up @@ -46,6 +48,7 @@ impl Display for CreateTaskStmt {
write!(f, " COMMENTS = '{}'", self.comments)?;
}

write!(f, " AS {}", self.sql)?;
Ok(())
}
}
Expand Down Expand Up @@ -86,3 +89,129 @@ impl Display for ScheduleOptions {
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AlterTaskStmt {
pub if_exists: bool,
pub name: String,
pub options: AlterTaskOptions,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AlterTaskOptions {
Resume,
Suspend,
Set {
warehouse: Option<String>,
schedule: Option<ScheduleOptions>,
suspend_task_after_num_failures: Option<u64>,
comments: Option<String>,
},
Unset {
warehouse: bool,
},
// Change SQL
ModifyAs(String),
}

impl Display for AlterTaskOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
AlterTaskOptions::Resume => write!(f, " RESUME"),
AlterTaskOptions::Suspend => write!(f, " SUSPEND"),
AlterTaskOptions::Set {
warehouse,
schedule,
suspend_task_after_num_failures,
comments,
} => {
if let Some(wh) = warehouse {
write!(f, " SET WAREHOUSE = {}", wh)?;
}
if let Some(schedule) = schedule {
write!(f, " SET {}", schedule)?;
}
if let Some(num) = suspend_task_after_num_failures {
write!(f, " SUSPEND TASK AFTER {} FAILURES", num)?;
}
if let Some(comments) = comments {
write!(f, " COMMENTS = '{}'", comments)?;
}
Ok(())
}
AlterTaskOptions::Unset { warehouse } => {
if *warehouse {
write!(f, " UNSET WAREHOUSE")?;
}
Ok(())
}
AlterTaskOptions::ModifyAs(sql) => write!(f, " AS {}", sql),
}
}
}

impl Display for AlterTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ALTER TASK")?;
if self.if_exists {
write!(f, " IF EXISTS")?;
}
write!(f, " {}", self.name)?;
write!(f, "{}", self.options)?;
Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DropTaskStmt {
pub if_exists: bool,
pub name: String,
}

impl Display for DropTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DROP TASK")?;
if self.if_exists {
write!(f, " IF EXISTS")?;
}
write!(f, " {}", self.name)
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct ShowTasksStmt {
pub limit: Option<ShowLimit>,
}

impl Display for ShowTasksStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SHOW ")?;
write!(f, "TASKS")?;
if let Some(limit) = &self.limit {
write!(f, " {limit}")?;
}

Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecuteTaskStmt {
pub name: String,
}

impl Display for ExecuteTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "EXECUTE TASK {}", self.name)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DescribeTaskStmt {
pub name: String,
}

impl Display for DescribeTaskStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DESCRIBE TASK {}", self.name)
}
}
114 changes: 114 additions & 0 deletions src/query/ast/src/parser/statement.rs
Expand Up @@ -135,6 +135,61 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
},
);

let alter_task = map(
rule! {
ALTER ~ TASK ~ ( IF ~ ^EXISTS )?
~ #ident ~ #alter_task_option
},
|(_, _, opt_if_exists, task, options)| {
Statement::AlterTask(AlterTaskStmt {
if_exists: opt_if_exists.is_some(),
name: task.to_string(),
options,
})
},
);

let drop_task = map(
rule! {
DROP ~ TASK ~ ( IF ~ ^EXISTS )?
~ #ident
},
|(_, _, opt_if_exists, task)| {
Statement::DropTask(DropTaskStmt {
if_exists: opt_if_exists.is_some(),
name: task.to_string(),
})
},
);
let show_tasks = map(
rule! {
SHOW ~ TASKS ~ #show_limit?
},
|(_, _, limit)| Statement::ShowTasks(ShowTasksStmt { limit }),
);

let execute_task = map(
rule! {
EXECUTE ~ TASK ~ #ident
},
|(_, _, task)| {
Statement::ExecuteTask(ExecuteTaskStmt {
name: task.to_string(),
})
},
);

let desc_task = map(
rule! {
( DESC | DESCRIBE ) ~ TASK ~ #ident
},
|(_, _, task)| {
Statement::DescribeTask(DescribeTaskStmt {
name: task.to_string(),
})
},
);

let insert = map(
rule! {
INSERT ~ #hint? ~ ( INTO | OVERWRITE ) ~ TABLE?
Expand Down Expand Up @@ -1594,6 +1649,11 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
[ COMMENT = '<string_literal>' ]
AS
<sql>`"
| #drop_task : "`DROP TASK [ IF EXISTS ] <name>`"
| #alter_task : "`ALTER TASK [ IF EXISTS ] <name> SUSPEND | RESUME | SET <option> = <value>` | UNSET <option> | MODIFY AS <sql>`"
| #show_tasks : "`SHOW TASKS [<show_limit>]`"
| #desc_task : "`DESC | DESCRIBE TASK <name>`"
| #execute_task: "`EXECUTE TASK <name>`"
),
));

Expand Down Expand Up @@ -2393,6 +2453,60 @@ pub fn vacuum_table_option(i: Input) -> IResult<VacuumTableOption> {
),))(i)
}

pub fn alter_task_option(i: Input) -> IResult<AlterTaskOptions> {
let suspend = map(
rule! {
SUSPEND
},
|_| AlterTaskOptions::Suspend,
);
let resume = map(
rule! {
RESUME
},
|_| AlterTaskOptions::Resume,
);
let modify_as = map(
rule! {
MODIFY ~ AS ~ #statement
},
|(_, _, sql)| {
let sql = pretty_statement(sql.stmt, 10)
.map_err(|_| ErrorKind::Other("invalid statement"))
.expect("failed to alter task");
AlterTaskOptions::ModifyAs(sql)
},
);
let set = map(
rule! {
SET
~ ( WAREHOUSE ~ "=" ~ #literal_string )?
~ ( SCHEDULE ~ "=" ~ #task_schedule_option )?
~ ( SUSPEND_TASK_AFTER_NUM_FAILURES ~ "=" ~ #literal_u64 )?
~ ( COMMENT ~ "=" ~ #literal_string )?
},
|(_, warehouse_opts, schedule_opts, suspend_opts, comment)| AlterTaskOptions::Set {
warehouse: warehouse_opts.map(|(_, _, warehouse)| warehouse),
schedule: schedule_opts.map(|(_, _, schedule)| schedule),
suspend_task_after_num_failures: suspend_opts.map(|(_, _, num)| num),
comments: comment.map(|(_, _, comment)| comment),
},
);
let unset = map(
rule! {
UNSET ~ WAREHOUSE
},
|_| AlterTaskOptions::Unset { warehouse: true },
);
rule!(
#suspend
| #resume
| #modify_as
| #set
| #unset
)(i)
}

pub fn task_warehouse_option(i: Input) -> IResult<WarehouseOptions> {
alt((map(
rule! {
Expand Down
9 changes: 9 additions & 0 deletions src/query/ast/src/parser/token.rs
Expand Up @@ -1018,6 +1018,8 @@ pub enum TokenKind {
LANGUAGE,
#[token("TASK", ignore(ascii_case))]
TASK,
#[token("TASKS", ignore(ascii_case))]
TASKS,
#[token("WAREHOUSE", ignore(ascii_case))]
WAREHOUSE,
#[token("SCHEDULE", ignore(ascii_case))]
Expand All @@ -1026,6 +1028,12 @@ pub enum TokenKind {
SUSPEND_TASK_AFTER_NUM_FAILURES,
#[token("CRON", ignore(ascii_case))]
CRON,
#[token("EXECUTE", ignore(ascii_case))]
EXECUTE,
#[token("SUSPEND", ignore(ascii_case))]
SUSPEND,
#[token("RESUME", ignore(ascii_case))]
RESUME,
}

// Reference: https://www.postgresql.org/docs/current/sql-keywords-appendix.html
Expand Down Expand Up @@ -1367,6 +1375,7 @@ impl TokenKind {
| TokenKind::IGNORE_RESULT
| TokenKind::MASKING
| TokenKind::POLICY
| TokenKind::TASK
if !after_as => true,
_ => false
}
Expand Down
10 changes: 10 additions & 0 deletions src/query/ast/src/visitors/visitor.rs
Expand Up @@ -588,6 +588,16 @@ pub trait Visitor<'ast>: Sized {

fn visit_create_task(&mut self, _stmt: &'ast CreateTaskStmt) {}

fn visit_drop_task(&mut self, _stmt: &'ast DropTaskStmt) {}

fn visit_show_tasks(&mut self, _stmt: &'ast ShowTasksStmt) {}

fn visit_execute_task(&mut self, _stmt: &'ast ExecuteTaskStmt) {}

fn visit_describe_task(&mut self, _stmt: &'ast DescribeTaskStmt) {}

fn visit_alter_task(&mut self, _stmt: &'ast AlterTaskStmt) {}

fn visit_with(&mut self, with: &'ast With) {
let With { ctes, .. } = with;
for cte in ctes.iter() {
Expand Down
10 changes: 10 additions & 0 deletions src/query/ast/src/visitors/visitor_mut.rs
Expand Up @@ -603,6 +603,16 @@ pub trait VisitorMut: Sized {

fn visit_create_task(&mut self, _stmt: &mut CreateTaskStmt) {}

fn visit_drop_task(&mut self, _stmt: &mut DropTaskStmt) {}

fn visit_show_tasks(&mut self, _stmt: &mut ShowTasksStmt) {}

fn visit_execute_task(&mut self, _stmt: &mut ExecuteTaskStmt) {}

fn visit_describe_task(&mut self, _stmt: &mut DescribeTaskStmt) {}

fn visit_alter_task(&mut self, _stmt: &mut AlterTaskStmt) {}

fn visit_with(&mut self, with: &mut With) {
let With { ctes, .. } = with;
for cte in ctes.iter_mut() {
Expand Down
5 changes: 5 additions & 0 deletions src/query/ast/src/visitors/walk.rs
Expand Up @@ -473,5 +473,10 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
Statement::DescNetworkPolicy(stmt) => visitor.visit_desc_network_policy(stmt),
Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(),
Statement::CreateTask(stmt) => visitor.visit_create_task(stmt),
Statement::ExecuteTask(stmt) => visitor.visit_execute_task(stmt),
Statement::DropTask(stmt) => visitor.visit_drop_task(stmt),
Statement::AlterTask(stmt) => visitor.visit_alter_task(stmt),
Statement::ShowTasks(stmt) => visitor.visit_show_tasks(stmt),
Statement::DescribeTask(stmt) => visitor.visit_describe_task(stmt),
}
}
5 changes: 5 additions & 0 deletions src/query/ast/src/visitors/walk_mut.rs
Expand Up @@ -449,5 +449,10 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(),

Statement::CreateTask(stmt) => visitor.visit_create_task(stmt),
Statement::ExecuteTask(stmt) => visitor.visit_execute_task(stmt),
Statement::DropTask(stmt) => visitor.visit_drop_task(stmt),
Statement::AlterTask(stmt) => visitor.visit_alter_task(stmt),
Statement::ShowTasks(stmt) => visitor.visit_show_tasks(stmt),
Statement::DescribeTask(stmt) => visitor.visit_describe_task(stmt),
}
}