Skip to content

Commit

Permalink
Added control_run_state flag to the databricks_job resource for c…
Browse files Browse the repository at this point in the history
…ontinuous jobs (#2466)

This PR introduces a new flag, control_run_state, to replace the always_running flag. This flag only applies to continuous jobs. Its behavior is described below:

For jobs with pause_status = PAUSED, it is a no-op on create and stops the active job run on update (if applicable).
For jobs with pause_status = UNPAUSED, it starts a job run on create and stops the active job run on update (if applicable).
The job does not need to be started, as that is handled by the Jobs service itself.

This fixes #2130.
  • Loading branch information
mgyucht committed Jul 21, 2023
1 parent a2e3ff7 commit ce26b1c
Show file tree
Hide file tree
Showing 4 changed files with 501 additions and 43 deletions.
9 changes: 8 additions & 1 deletion docs/resources/job.md
@@ -1,6 +1,7 @@
---
subcategory: "Compute"
---

# databricks_job Resource

The `databricks_job` resource allows you to manage [Databricks Jobs](https://docs.databricks.com/jobs.html) to run non-interactive code in a [databricks_cluster](cluster.md).
Expand Down Expand Up @@ -78,7 +79,13 @@ The resource supports the following arguments:

* `name` - (Optional) An optional name for the job. The default value is Untitled.
* `job_cluster` - (Optional) A list of job [databricks_cluster](cluster.md) specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in task settings. *Multi-task syntax*
* `always_running` - (Optional) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. Any job runs are started with `parameters` specified in `spark_jar_task` or `spark_submit_task` or `spark_python_task` or `notebook_task` blocks.
* `always_running` - (Optional, Deprecated) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. Any job runs are started with `parameters` specified in `spark_jar_task` or `spark_submit_task` or `spark_python_task` or `notebook_task` blocks.
* `control_run_state` - (Optional) (Bool) If true, the Databricks provider will stop and start the job as needed to ensure that the active run for the job reflects the deployed configuration. For continuous jobs, the provider respects the `pause_status` by stopping the current active run. This flag cannot be set for non-continuous jobs.

When migrating from `always_running` to `control_run_state`, set `continuous` as follows:
```
continuous { }
```
* `library` - (Optional) (Set) An optional list of libraries to be installed on the cluster that will execute the job. Please consult [libraries section](cluster.md#libraries) for [databricks_cluster](cluster.md) resource.
* `retry_on_timeout` - (Optional) (Bool) An optional policy to specify whether to retry a job when it times out. The default behavior is to not retry on timeout.
* `max_retries` - (Optional) (Integer) An optional maximum number of times to retry an unsuccessful run. A run is considered to be unsuccessful if it completes with a FAILED or INTERNAL_ERROR lifecycle state. The value -1 means to retry indefinitely and the value 0 means to never retry. The default behavior is to never retry. A run can have the following lifecycle state: PENDING, RUNNING, TERMINATING, TERMINATED, SKIPPED or INTERNAL_ERROR
Expand Down
131 changes: 131 additions & 0 deletions internal/acceptance/job_test.go
@@ -1,7 +1,16 @@
package acceptance

import (
"context"
"errors"
"strconv"
"testing"
"time"

"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/terraform-provider-databricks/common"
"github.com/stretchr/testify/assert"
)

func TestAccJobTasks(t *testing.T) {
Expand Down Expand Up @@ -83,3 +92,125 @@ func TestAccJobTasks(t *testing.T) {
}`,
})
}

// An integration test which creates a continuous job with control_run_state = true, verifying
// that a job run was triggered within 5 minutes of the job creation. Then, the test updates the
// job, verifying that the existing run was cancelled within 5 minutes of the update.
func TestAccJobControlRunState(t *testing.T) {
getJobTemplate := func(name string, continuousBlock string) string {
return `
data "databricks_current_user" "me" {}
data "databricks_spark_version" "latest" {}
data "databricks_node_type" "smallest" {
local_disk = true
}
resource "databricks_notebook" "this" {
path = "${data.databricks_current_user.me.home}/Terraform` + name + `"
language = "PYTHON"
content_base64 = base64encode(<<-EOT
# created from ${abspath(path.module)}
import time
display(spark.range(10))
time.sleep(3600)
EOT
)
}
resource "databricks_job" "this" {
name = "{var.RANDOM}"
task {
task_key = "a"
new_cluster {
num_workers = 1
spark_version = data.databricks_spark_version.latest.id
node_type_id = data.databricks_node_type.smallest.id
}
notebook_task {
notebook_path = databricks_notebook.this.path
}
}
continuous {
` + continuousBlock + `
}
control_run_state = true
}`
}
previousRunIds := make([]int64, 0)
checkIfRunHasStarted := func(ctx context.Context, w *databricks.WorkspaceClient, jobID int64) (bool, error) {
runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{JobId: jobID})
assert.NoError(t, err)
runIdsMap := make(map[int64]bool)
for _, id := range previousRunIds {
runIdsMap[id] = true
}

for _, run := range runs {
if _, ok := runIdsMap[run.RunId]; !ok && run.State.LifeCycleState == "RUNNING" {
previousRunIds = append(previousRunIds, run.RunId)
return true, nil
}
}
return false, nil
}
checkIfAllRunsHaveEnded := func(ctx context.Context, w *databricks.WorkspaceClient, jobID int64) (bool, error) {
runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{JobId: jobID})
assert.NoError(t, err)
for _, run := range runs {
if run.State.LifeCycleState == "RUNNING" {
return false, nil
}
}
return true, nil
}
retryFor := func(ctx context.Context, client *common.DatabricksClient, id string, lastErr error, f func(context.Context, *databricks.WorkspaceClient, int64) (bool, error)) error {
ctx = context.WithValue(ctx, common.Api, common.API_2_1)
w, err := client.WorkspaceClient()
assert.NoError(t, err)
jobID, err := strconv.ParseInt(id, 10, 64)
assert.NoError(t, err)
for i := 0; i < 100; i++ {
success, err := f(ctx, w, jobID)
if err != nil {
return err
}
if success {
return nil
}
// sleep for 5 seconds
time.Sleep(5 * time.Second)
}
return lastErr
}
waitForRunToStart := func(ctx context.Context, client *common.DatabricksClient, id string) error {
return retryFor(ctx, client, id, errors.New("timed out waiting for job run to start"), checkIfRunHasStarted)
}
waitForAllRunsToEnd := func(ctx context.Context, client *common.DatabricksClient, id string) error {
return retryFor(ctx, client, id, errors.New("timed out waiting for job run to end"), checkIfAllRunsHaveEnded)
}
randomName1 := RandomName("notebook-")
randomName2 := RandomName("updated-notebook-")
workspaceLevel(t, step{
// A new continuous job with empty block should be started automatically
Template: getJobTemplate(randomName1, ``),
Check: resourceCheck("databricks_job.this", waitForRunToStart),
}, step{
// Updating the notebook should cancel the existing run
Template: getJobTemplate(randomName2, ``),
Check: resourceCheck("databricks_job.this", waitForRunToStart),
}, step{
// Marking the job as paused should cancel existing run and not start a new one
Template: getJobTemplate(randomName2, `pause_status = "PAUSED"`),
Check: resourceCheck("databricks_job.this", waitForAllRunsToEnd),
}, step{
// No pause status should be the equivalent of unpaused
Template: getJobTemplate(randomName2, `pause_status = "UNPAUSED"`),
Check: resourceCheck("databricks_job.this", waitForRunToStart),
})
}

0 comments on commit ce26b1c

Please sign in to comment.