Skip to content

Commit

Permalink
feat: handle serverless tasks (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
vince5678 committed Oct 28, 2021
1 parent e0d74be commit bde252e
Show file tree
Hide file tree
Showing 7 changed files with 573 additions and 58 deletions.
28 changes: 25 additions & 3 deletions docs/resources/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,27 @@ resource snowflake_task task {
when = "foo AND bar"
enabled = true
}
resource snowflake_task serverless_task {
comment = "my serverless task"
database = "db"
schema = "schema"
name = "serverless_task"
schedule = "10"
sql_statement = "select * from foo;"
session_parameters = {
"foo" : "bar",
}
user_task_timeout_ms = 10000
user_task_managed_initial_warehouse_size = "XSMALL"
after = "preceding_task"
when = "foo AND bar"
enabled = true
}
```

<!-- schema generated by tfplugindocs -->
Expand All @@ -44,17 +65,18 @@ resource snowflake_task task {
- **name** (String) Specifies the identifier for the task; must be unique for the database and schema in which the task is created.
- **schema** (String) The schema in which to create the task.
- **sql_statement** (String) Any single SQL statement, or a call to a stored procedure, executed when the task runs.
- **warehouse** (String) The warehouse the task will use.

### Optional

- **after** (String) Specifies the predecessor task in the same database and schema of the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag).
- **after** (String) Specifies the predecessor task in the same database and schema of the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag). (Conflict with schedule)
- **comment** (String) Specifies a comment for the task.
- **enabled** (Boolean) Specifies if the task should be started (enabled) after creation or should remain suspended (default).
- **id** (String) The ID of this resource.
- **schedule** (String) The schedule for periodically running the task. This can be a cron or interval in minutes.
- **schedule** (String) The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflict with after)
- **session_parameters** (Map of String) Specifies session parameters to set for the session when the task runs. A task supports all session parameters.
- **user_task_managed_initial_warehouse_size** (String) Specifies the size of the compute resources to provision for the first run of the task, before a task history is available for Snowflake to determine an ideal size. Once a task has successfully completed a few runs, Snowflake ignores this parameter setting. (Conflicts with warehouse)
- **user_task_timeout_ms** (Number) Specifies the time limit on a single run of the task before it times out (in milliseconds).
- **warehouse** (String) The warehouse the task will use. Omit this parameter to use Snowflake-managed compute resources for runs of this task. (Conflicts with user_task_managed_initial_warehouse_size)
- **when** (String) Specifies a Boolean SQL expression; multiple conditions joined with AND/OR are supported.

## Import
Expand Down
21 changes: 21 additions & 0 deletions examples/resources/snowflake_task/resource.tf
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,24 @@ resource snowflake_task task {
when = "foo AND bar"
enabled = true
}

resource snowflake_task serverless_task {
comment = "my serverless task"

database = "db"
schema = "schema"

name = "serverless_task"
schedule = "10"
sql_statement = "select * from foo;"

session_parameters = {
"foo" : "bar",
}

user_task_timeout_ms = 10000
user_task_managed_initial_warehouse_size = "XSMALL"
after = "preceding_task"
when = "foo AND bar"
enabled = true
}
142 changes: 102 additions & 40 deletions pkg/resources/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ var taskSchema = map[string]*schema.Schema{
ForceNew: true,
},
"warehouse": {
Type: schema.TypeString,
Required: true,
Description: "The warehouse the task will use.",
ForceNew: false,
Type: schema.TypeString,
Optional: true,
Description: "The warehouse the task will use. Omit this parameter to use Snowflake-managed compute resources for runs of this task. (Conflicts with user_task_managed_initial_warehouse_size)",
ForceNew: false,
ConflictsWith: []string{"user_task_managed_initial_warehouse_size"},
},
"schedule": {
Type: schema.TypeString,
Optional: true,
Description: "The schedule for periodically running the task. This can be a cron or interval in minutes.",
Type: schema.TypeString,
Optional: true,
Description: "The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflict with after)",
ConflictsWith: []string{"after"},
},
"session_parameters": {
Type: schema.TypeMap,
Expand All @@ -72,9 +74,10 @@ var taskSchema = map[string]*schema.Schema{
Description: "Specifies a comment for the task.",
},
"after": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the predecessor task in the same database and schema of the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag).",
Type: schema.TypeString,
Optional: true,
Description: "Specifies the predecessor task in the same database and schema of the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag). (Conflict with schedule)",
ConflictsWith: []string{"schedule"},
},
"when": {
Type: schema.TypeString,
Expand All @@ -88,6 +91,15 @@ var taskSchema = map[string]*schema.Schema{
ForceNew: false,
DiffSuppressFunc: DiffSuppressStatement,
},
"user_task_managed_initial_warehouse_size": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringInSlice([]string{
"XSMALL", "X-SMALL", "SMALL", "MEDIUM", "LARGE", "XLARGE", "X-LARGE", "XXLARGE", "X2LARGE", "2X-LARGE",
}, true),
Description: "Specifies the size of the compute resources to provision for the first run of the task, before a task history is available for Snowflake to determine an ideal size. Once a task has successfully completed a few runs, Snowflake ignores this parameter setting. (Conflicts with warehouse)",
ConflictsWith: []string{"warehouse"},
},
}

type taskID struct {
Expand Down Expand Up @@ -333,19 +345,32 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error {

if len(params) > 0 {
paramMap := map[string]interface{}{}
var userTaskManagedInitialWarehouseSize = ""

for _, param := range params {
log.Printf("[TRACE] %+v\n", param)
if param.Value == param.DefaultValue || param.Level == "ACCOUNT" {

if param.Level != "TASK" {
continue
}

paramMap[param.Key] = param.Value
if param.Key == "USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE" {
userTaskManagedInitialWarehouseSize = param.Value

} else {
paramMap[param.Key] = param.Value
}
}

err := d.Set("session_parameters", paramMap)
if err != nil {
return err
}

err = d.Set("user_task_managed_initial_warehouse_size", userTaskManagedInitialWarehouseSize)
if err != nil {
return err
}
}

return nil
Expand All @@ -359,15 +384,21 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error {
database := d.Get("database").(string)
dbSchema := d.Get("schema").(string)
name := d.Get("name").(string)
warehouse := d.Get("warehouse").(string)
sql := d.Get("sql_statement").(string)
enabled := d.Get("enabled").(bool)

builder := snowflake.Task(name, database, dbSchema)
builder.WithWarehouse(warehouse)
builder.WithStatement(sql)

// Set optionals
if v, ok := d.GetOk("warehouse"); ok {
builder.WithWarehouse(v.(string))
}

if v, ok := d.GetOk("user_task_managed_initial_warehouse_size"); ok {
builder.WithInitialWarehouseSize(v.(string))
}

if v, ok := d.GetOk("schedule"); ok {
builder.WithSchedule(v.(string))
}
Expand Down Expand Up @@ -429,6 +460,7 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error {
// UpdateTask implements schema.UpdateFunc
func UpdateTask(d *schema.ResourceData, meta interface{}) error {
taskID, err := taskIDFromString(d.Id())
var needResumeCurrentTask = false
if err != nil {
return err
}
Expand All @@ -438,22 +470,66 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
dbSchema := taskID.SchemaName
name := taskID.TaskName
builder := snowflake.Task(name, database, dbSchema)

root, err := getActiveRootTaskAndSuspend(d, meta)
if err != nil {
return err
}
defer resumeTask(root, meta)

if d.HasChange("warehouse") {
new := d.Get("warehouse")
q := builder.ChangeWarehouse(new.(string))
var q string
newWarehouse := d.Get("warehouse")

if newWarehouse == "" {
q = builder.SwitchWarehouseToManaged()
} else {
q = builder.ChangeWarehouse(newWarehouse.(string))
}

err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating warehouse on task %v", d.Id())
}
}

if d.HasChange("user_task_managed_initial_warehouse_size") {
newSize := d.Get("user_task_managed_initial_warehouse_size")
warehouse := d.Get("warehouse")

if warehouse == "" && newSize != "" {
var q = builder.SwitchManagedWithInitialSize(newSize.(string))
err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating user_task_managed_initial_warehouse_size on task %v", d.Id())
}
}

}

// Need to remove dependency before adding schedule if needed
if d.HasChange("after") {
var (
q string
err error
)

old, _ := d.GetChange("after")

q = builder.Suspend()
err = snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error suspending task %v", d.Id())
}
needResumeCurrentTask = d.Get("enabled").(bool)

if old != "" {
q = builder.RemoveDependency(old.(string))
err = snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error removing old after dependency from task %v", d.Id())
}
}
}

if d.HasChange("schedule") {
var q string
old, new := d.GetChange("schedule")
Expand Down Expand Up @@ -498,29 +574,9 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {

if d.HasChange("after") {
var (
q string
err error
q string
)

old, new := d.GetChange("after")
enabled := d.Get("enabled").(bool)

if enabled {
q = builder.Suspend()
err = snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error suspending task %v", d.Id())
}
defer resumeTask(builder, meta)
}

if old != "" {
q = builder.RemoveDependency(old.(string))
err = snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error removing old after dependency from task %v", d.Id())
}
}
_, new := d.GetChange("after")

if new != "" {
q = builder.AddDependency(new.(string))
Expand Down Expand Up @@ -593,6 +649,7 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
q = builder.Suspend()
// make sure defer doesn't enable task again
// when standalone or root task and status is supsended
needResumeCurrentTask = false
if root != nil && builder.QualifiedName() == root.QualifiedName() {
root = root.SetDisabled() //nolint
}
Expand All @@ -604,6 +661,11 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {
}
}

if needResumeCurrentTask {
resumeTask(builder, meta)
}

resumeTask(root, meta)
return ReadTask(d, meta)
}

Expand Down

0 comments on commit bde252e

Please sign in to comment.