diff --git a/docs/resources/job.md b/docs/resources/job.md index e95af2aef3..c242216107 100644 --- a/docs/resources/job.md +++ b/docs/resources/job.md @@ -1,6 +1,7 @@ --- subcategory: "Compute" --- + # databricks_job Resource The `databricks_job` resource allows you to manage [Databricks Jobs](https://docs.databricks.com/jobs.html) to run non-interactive code in a [databricks_cluster](cluster.md). @@ -78,7 +79,13 @@ The resource supports the following arguments: * `name` - (Optional) An optional name for the job. The default value is Untitled. * `job_cluster` - (Optional) A list of job [databricks_cluster](cluster.md) specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in task settings. *Multi-task syntax* -* `always_running` - (Optional) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. Any job runs are started with `parameters` specified in `spark_jar_task` or `spark_submit_task` or `spark_python_task` or `notebook_task` blocks. +* `always_running` - (Optional, Deprecated) (Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. Any job runs are started with `parameters` specified in `spark_jar_task` or `spark_submit_task` or `spark_python_task` or `notebook_task` blocks. +* `control_run_state` - (Optional) (Bool) If true, the Databricks provider will stop and start the job as needed to ensure that the active run for the job reflects the deployed configuration. For continuous jobs, the provider respects the `pause_status` by stopping the current active run. This flag cannot be set for non-continuous jobs. + + When migrating from `always_running` to `control_run_state`, set `continuous` as follows: + ``` + continuous { } + ``` * `library` - (Optional) (Set) An optional list of libraries to be installed on the cluster that will execute the job. Please consult [libraries section](cluster.md#libraries) for [databricks_cluster](cluster.md) resource. * `retry_on_timeout` - (Optional) (Bool) An optional policy to specify whether to retry a job when it times out. The default behavior is to not retry on timeout. * `max_retries` - (Optional) (Integer) An optional maximum number of times to retry an unsuccessful run. A run is considered to be unsuccessful if it completes with a FAILED or INTERNAL_ERROR lifecycle state. The value -1 means to retry indefinitely and the value 0 means to never retry. The default behavior is to never retry. A run can have the following lifecycle state: PENDING, RUNNING, TERMINATING, TERMINATED, SKIPPED or INTERNAL_ERROR diff --git a/internal/acceptance/job_test.go b/internal/acceptance/job_test.go index e1f2c094eb..b1682b6415 100644 --- a/internal/acceptance/job_test.go +++ b/internal/acceptance/job_test.go @@ -1,7 +1,16 @@ package acceptance import ( + "context" + "errors" + "strconv" "testing" + "time" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/terraform-provider-databricks/common" + "github.com/stretchr/testify/assert" ) func TestAccJobTasks(t *testing.T) { @@ -83,3 +92,125 @@ func TestAccJobTasks(t *testing.T) { }`, }) } + +// An integration test which creates a continuous job with control_run_state = true, verifying +// that a job run was triggered within 5 minutes of the job creation. Then, the test updates the +// job, verifying that the existing run was cancelled within 5 minutes of the update. +func TestAccJobControlRunState(t *testing.T) { + getJobTemplate := func(name string, continuousBlock string) string { + return ` + data "databricks_current_user" "me" {} + data "databricks_spark_version" "latest" {} + data "databricks_node_type" "smallest" { + local_disk = true + } + + resource "databricks_notebook" "this" { + path = "${data.databricks_current_user.me.home}/Terraform` + name + `" + language = "PYTHON" + content_base64 = base64encode(<<-EOT + # created from ${abspath(path.module)} + import time + + display(spark.range(10)) + time.sleep(3600) + EOT + ) + } + + resource "databricks_job" "this" { + name = "{var.RANDOM}" + + task { + task_key = "a" + + new_cluster { + num_workers = 1 + spark_version = data.databricks_spark_version.latest.id + node_type_id = data.databricks_node_type.smallest.id + } + + notebook_task { + notebook_path = databricks_notebook.this.path + } + } + + continuous { + ` + continuousBlock + ` + } + + control_run_state = true + }` + } + previousRunIds := make([]int64, 0) + checkIfRunHasStarted := func(ctx context.Context, w *databricks.WorkspaceClient, jobID int64) (bool, error) { + runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{JobId: jobID}) + assert.NoError(t, err) + runIdsMap := make(map[int64]bool) + for _, id := range previousRunIds { + runIdsMap[id] = true + } + + for _, run := range runs { + if _, ok := runIdsMap[run.RunId]; !ok && run.State.LifeCycleState == "RUNNING" { + previousRunIds = append(previousRunIds, run.RunId) + return true, nil + } + } + return false, nil + } + checkIfAllRunsHaveEnded := func(ctx context.Context, w *databricks.WorkspaceClient, jobID int64) (bool, error) { + runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{JobId: jobID}) + assert.NoError(t, err) + for _, run := range runs { + if run.State.LifeCycleState == "RUNNING" { + return false, nil + } + } + return true, nil + } + retryFor := func(ctx context.Context, client *common.DatabricksClient, id string, lastErr error, f func(context.Context, *databricks.WorkspaceClient, int64) (bool, error)) error { + ctx = context.WithValue(ctx, common.Api, common.API_2_1) + w, err := client.WorkspaceClient() + assert.NoError(t, err) + jobID, err := strconv.ParseInt(id, 10, 64) + assert.NoError(t, err) + for i := 0; i < 100; i++ { + success, err := f(ctx, w, jobID) + if err != nil { + return err + } + if success { + return nil + } + // sleep for 5 seconds + time.Sleep(5 * time.Second) + } + return lastErr + } + waitForRunToStart := func(ctx context.Context, client *common.DatabricksClient, id string) error { + return retryFor(ctx, client, id, errors.New("timed out waiting for job run to start"), checkIfRunHasStarted) + } + waitForAllRunsToEnd := func(ctx context.Context, client *common.DatabricksClient, id string) error { + return retryFor(ctx, client, id, errors.New("timed out waiting for job run to end"), checkIfAllRunsHaveEnded) + } + randomName1 := RandomName("notebook-") + randomName2 := RandomName("updated-notebook-") + workspaceLevel(t, step{ + // A new continuous job with empty block should be started automatically + Template: getJobTemplate(randomName1, ``), + Check: resourceCheck("databricks_job.this", waitForRunToStart), + }, step{ + // Updating the notebook should cancel the existing run + Template: getJobTemplate(randomName2, ``), + Check: resourceCheck("databricks_job.this", waitForRunToStart), + }, step{ + // Marking the job as paused should cancel existing run and not start a new one + Template: getJobTemplate(randomName2, `pause_status = "PAUSED"`), + Check: resourceCheck("databricks_job.this", waitForAllRunsToEnd), + }, step{ + // No pause status should be the equivalent of unpaused + Template: getJobTemplate(randomName2, `pause_status = "UNPAUSED"`), + Check: resourceCheck("databricks_job.this", waitForRunToStart), + }) +} diff --git a/jobs/resource_job.go b/jobs/resource_job.go index 5247675f05..c62a790860 100644 --- a/jobs/resource_job.go +++ b/jobs/resource_job.go @@ -334,11 +334,11 @@ type RunState struct { // JobRun is a simplified representation of corresponding entity type JobRun struct { - JobID int64 `json:"job_id"` - RunID int64 `json:"run_id"` - NumberInJob int64 `json:"number_in_job"` + JobID int64 `json:"job_id,omitempty"` + RunID int64 `json:"run_id,omitempty"` + NumberInJob int64 `json:"number_in_job,omitempty"` StartTime int64 `json:"start_time,omitempty"` - State RunState `json:"state"` + State RunState `json:"state,omitempty"` Trigger string `json:"trigger,omitempty"` RuntType string `json:"run_type,omitempty"` @@ -480,19 +480,11 @@ func (a JobsAPI) Start(jobID int64, timeout time.Duration) error { return a.waitForRunState(runID, "RUNNING", timeout) } -func (a JobsAPI) Restart(id string, timeout time.Duration) error { - jobID, err := strconv.ParseInt(id, 10, 64) - if err != nil { - return err - } +func (a JobsAPI) StopActiveRun(jobID int64, timeout time.Duration) error { runs, err := a.RunsList(JobRunsListRequest{JobID: jobID, ActiveOnly: true}) if err != nil { return err } - if len(runs.Runs) == 0 { - // nothing to cancel - return a.Start(jobID, timeout) - } if len(runs.Runs) > 1 { return fmt.Errorf("`always_running` must be specified only with "+ "`max_concurrent_runs = 1`. There are %d active runs", len(runs.Runs)) @@ -504,7 +496,7 @@ func (a JobsAPI) Restart(id string, timeout time.Duration) error { return fmt.Errorf("cannot cancel run %d: %v", activeRun.RunID, err) } } - return a.Start(jobID, timeout) + return nil } // Create creates a job on the workspace given the job settings @@ -528,7 +520,7 @@ func (a JobsAPI) Create(jobSettings JobSettings) (Job, error) { // Update updates a job given the id and a new set of job settings func (a JobsAPI) Update(id string, jobSettings JobSettings) error { - jobID, err := strconv.ParseInt(id, 10, 64) + jobID, err := parseJobId(id) if err != nil { return err } @@ -540,7 +532,7 @@ func (a JobsAPI) Update(id string, jobSettings JobSettings) error { // Read returns the job object with all the attributes func (a JobsAPI) Read(id string) (job Job, err error) { - jobID, err := strconv.ParseInt(id, 10, 64) + jobID, err := parseJobId(id) if err != nil { return } @@ -556,7 +548,7 @@ func (a JobsAPI) Read(id string) (job Job, err error) { // Delete deletes the job given a job id func (a JobsAPI) Delete(id string) error { - jobID, err := strconv.ParseInt(id, 10, 64) + jobID, err := parseJobId(id) if err != nil { return err } @@ -631,9 +623,17 @@ var jobSchema = common.StructToSchema(JobSettings{}, Computed: true, } s["always_running"] = &schema.Schema{ - Optional: true, - Default: false, - Type: schema.TypeBool, + Optional: true, + Default: false, + Type: schema.TypeBool, + Deprecated: "always_running will be replaced by control_run_state in the next major release.", + ConflictsWith: []string{"control_run_state", "continuous"}, + } + s["control_run_state"] = &schema.Schema{ + Optional: true, + Default: false, + Type: schema.TypeBool, + ConflictsWith: []string{"always_running"}, } s["schedule"].ConflictsWith = []string{"continuous", "trigger"} s["continuous"].ConflictsWith = []string{"schedule", "trigger"} @@ -641,6 +641,112 @@ var jobSchema = common.StructToSchema(JobSettings{}, return s }) +func parseJobId(id string) (int64, error) { + return strconv.ParseInt(id, 10, 64) +} + +// Callbacks to manage runs for jobs after creation and update. +// +// There are three types of lifecycle management for jobs: +// 1. always_running: When enabled, a new run will be started after the job configuration is updated. +// An existing active run will be cancelled if one exists. +// 2. control_run_state: When enabled, stops the active run of continuous jobs after the job configuration is updated. +// 3. Noop: No lifecycle management. +// +// always_running is deprecated but still supported for backwards compatibility. +type jobLifecycleManager interface { + OnCreate(ctx context.Context) error + OnUpdate(ctx context.Context) error +} + +func getJobLifecycleManager(d *schema.ResourceData, m any) jobLifecycleManager { + if d.Get("always_running").(bool) { + return alwaysRunningLifecycleManager{d: d, m: m} + } + if d.Get("control_run_state").(bool) { + return controlRunStateLifecycleManager{d: d, m: m} + } + return noopLifecycleManager{} +} + +type noopLifecycleManager struct{} + +func (n noopLifecycleManager) OnCreate(ctx context.Context) error { + return nil +} +func (n noopLifecycleManager) OnUpdate(ctx context.Context) error { + return nil +} + +type alwaysRunningLifecycleManager struct { + d *schema.ResourceData + m any +} + +func (a alwaysRunningLifecycleManager) OnCreate(ctx context.Context) error { + jobID, err := parseJobId(a.d.Id()) + if err != nil { + return err + } + return NewJobsAPI(ctx, a.m).Start(jobID, a.d.Timeout(schema.TimeoutCreate)) +} +func (a alwaysRunningLifecycleManager) OnUpdate(ctx context.Context) error { + api := NewJobsAPI(ctx, a.m) + jobID, err := parseJobId(a.d.Id()) + if err != nil { + return err + } + err = api.StopActiveRun(jobID, a.d.Timeout(schema.TimeoutUpdate)) + if err != nil { + return err + } + return api.Start(jobID, a.d.Timeout(schema.TimeoutUpdate)) +} + +type controlRunStateLifecycleManager struct { + d *schema.ResourceData + m any +} + +func (c controlRunStateLifecycleManager) OnCreate(ctx context.Context) error { + return nil +} + +func (c controlRunStateLifecycleManager) OnUpdate(ctx context.Context) error { + if c.d.Get("continuous") == nil { + return nil + } + + jobID, err := parseJobId(c.d.Id()) + if err != nil { + return err + } + + api := NewJobsAPI(ctx, c.m) + + // Only use RunNow to stop the active run if the job is unpaused. + pauseStatus := c.d.Get("continuous.0.pause_status").(string) + if pauseStatus == "UNPAUSED" { + // Previously, RunNow() was not supported for continuous jobs. Now, calling RunNow() + // on a continuous job works, cancelling the active run if there is one, and resetting + // the exponential backoff timer. So, we try to call RunNow() first, and if it fails, + // we call StopActiveRun() instead. + _, err = api.RunNow(jobID) + + if err == nil { + return nil + } + + // RunNow() returns 404 when the feature is disabled. + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode != 404 { + return err + } + } + + return api.StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate)) +} + func ResourceJob() *schema.Resource { getReadCtx := func(ctx context.Context, d *schema.ResourceData) context.Context { var js JobSettings @@ -664,6 +770,15 @@ func ResourceJob() *schema.Resource { if alwaysRunning && js.MaxConcurrentRuns > 1 { return fmt.Errorf("`always_running` must be specified only with `max_concurrent_runs = 1`") } + controlRunState := d.Get("control_run_state").(bool) + if controlRunState { + if js.Continuous == nil { + return fmt.Errorf("`control_run_state` must be specified only with `continuous`") + } + if js.MaxConcurrentRuns > 1 { + return fmt.Errorf("`control_run_state` must be specified only with `max_concurrent_runs = 1`") + } + } for _, task := range js.Tasks { if task.NewCluster == nil { continue @@ -691,10 +806,7 @@ func ResourceJob() *schema.Resource { return err } d.SetId(job.ID()) - if d.Get("always_running").(bool) { - return jobsAPI.Start(job.JobID, d.Timeout(schema.TimeoutCreate)) - } - return nil + return getJobLifecycleManager(d, c).OnCreate(ctx) }, Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { ctx = getReadCtx(ctx, d) @@ -716,10 +828,7 @@ func ResourceJob() *schema.Resource { if err != nil { return err } - if d.Get("always_running").(bool) { - return jobsAPI.Restart(d.Id(), d.Timeout(schema.TimeoutUpdate)) - } - return nil + return getJobLifecycleManager(d, c).OnUpdate(ctx) }, Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { ctx = getReadCtx(ctx, d) diff --git a/jobs/resource_job_test.go b/jobs/resource_job_test.go index 7794fd3ef1..b86ebeb8d6 100644 --- a/jobs/resource_job_test.go +++ b/jobs/resource_job_test.go @@ -615,6 +615,208 @@ func TestResourceJobCreate_AlwaysRunning_Conflict(t *testing.T) { }.ExpectError(t, "`always_running` must be specified only with `max_concurrent_runs = 1`") } +func TestResourceJobCreate_ControlRunState_AlwaysRunningConflict(t *testing.T) { + qa.ResourceFixture{ + Create: true, + Resource: ResourceJob(), + HCL: `control_run_state = true + always_running = true + continuous { + pause_status = "UNPAUSED" + }`, + }.ExpectError(t, "invalid config supplied. [always_running] Conflicting configuration arguments. [control_run_state] Conflicting configuration arguments") +} + +func TestResourceJobCreate_ControlRunState_NoContinuous(t *testing.T) { + qa.ResourceFixture{ + Create: true, + Resource: ResourceJob(), + HCL: `control_run_state = true`, + }.ExpectError(t, "`control_run_state` must be specified only with `continuous`") +} + +func TestResourceJobCreate_ControlRunState_ContinuousCreate(t *testing.T) { + qa.ResourceFixture{ + Create: true, + Resource: ResourceJob(), + Fixtures: []qa.HTTPFixture{ + { + Method: "POST", + Resource: "/api/2.0/jobs/create", + ExpectedRequest: JobSettings{ + MaxConcurrentRuns: 1, + Name: "Test", + Continuous: &ContinuousConf{ + PauseStatus: "UNPAUSED", + }, + }, + Response: Job{ + JobID: 789, + }, + }, + { + Method: "GET", + Resource: "/api/2.0/jobs/get?job_id=789", + Response: Job{ + JobID: 789, + Settings: &JobSettings{ + MaxConcurrentRuns: 1, + Name: "Test", + Continuous: &ContinuousConf{ + PauseStatus: "UNPAUSED", + }, + }, + }, + }, + }, + HCL: ` + continuous { + pause_status = "UNPAUSED" + } + control_run_state = true + max_concurrent_runs = 1 + name = "Test" + `, + }.Apply(t) +} + +func TestResourceJobCreate_ControlRunState_ContinuousUpdateRunNow(t *testing.T) { + qa.ResourceFixture{ + Update: true, + ID: "789", + Resource: ResourceJob(), + Fixtures: []qa.HTTPFixture{ + { + Method: "POST", + Resource: "/api/2.0/jobs/reset", + ExpectedRequest: UpdateJobRequest{ + JobID: 789, + NewSettings: &JobSettings{ + MaxConcurrentRuns: 1, + Name: "Test", + Continuous: &ContinuousConf{ + PauseStatus: "UNPAUSED", + }, + }, + }, + }, + { + Method: "GET", + Resource: "/api/2.0/jobs/get?job_id=789", + Response: Job{ + JobID: 789, + Settings: &JobSettings{ + MaxConcurrentRuns: 1, + Name: "Test", + Continuous: &ContinuousConf{ + PauseStatus: "UNPAUSED", + }, + }, + }, + }, + { + Method: "POST", + Resource: "/api/2.0/jobs/run-now", + ExpectedRequest: RunParameters{ + JobID: 789, + }, + Response: JobRun{}, + }, + }, + HCL: ` + continuous { + pause_status = "UNPAUSED" + } + control_run_state = true + max_concurrent_runs = 1 + name = "Test" + `, + }.Apply(t) +} + +func TestResourceJobCreate_ControlRunState_ContinuousUpdateCancel(t *testing.T) { + qa.ResourceFixture{ + Update: true, + ID: "789", + Resource: ResourceJob(), + Fixtures: []qa.HTTPFixture{ + { + Method: "POST", + Resource: "/api/2.0/jobs/reset", + ExpectedRequest: UpdateJobRequest{ + JobID: 789, + NewSettings: &JobSettings{ + MaxConcurrentRuns: 1, + Name: "Test", + Continuous: &ContinuousConf{ + PauseStatus: "UNPAUSED", + }, + }, + }, + }, + { + Method: "GET", + Resource: "/api/2.0/jobs/get?job_id=789", + Response: Job{ + JobID: 789, + Settings: &JobSettings{ + MaxConcurrentRuns: 1, + Name: "Test", + Continuous: &ContinuousConf{ + PauseStatus: "UNPAUSED", + }, + }, + }, + }, + { + Method: "POST", + Resource: "/api/2.0/jobs/run-now", + ExpectedRequest: RunParameters{ + JobID: 789, + }, + Response: apierr.APIError{StatusCode: 404}, + Status: 404, + }, + { + Method: "GET", + Resource: "/api/2.0/jobs/runs/list?active_only=true&job_id=789", + Response: JobRunsList{ + Runs: []JobRun{ + { + RunID: 567, + }, + }, + }, + }, + { + Method: "POST", + Resource: "/api/2.0/jobs/runs/cancel", + ExpectedRequest: map[string]any{ + "run_id": 567, + }, + }, + { + Method: "GET", + Resource: "/api/2.0/jobs/runs/get?run_id=567", + Response: JobRun{ + RunID: 567, + State: RunState{ + LifeCycleState: "TERMINATED", + }, + }, + }, + }, + HCL: ` + continuous { + pause_status = "UNPAUSED" + } + control_run_state = true + max_concurrent_runs = 1 + name = "Test" + `, + }.Apply(t) +} + func TestResourceJobCreateSingleNode(t *testing.T) { cluster := clusters.Cluster{ NumWorkers: 0, SparkVersion: "7.3.x-scala2.12", NodeTypeID: "Standard_DS3_v2", @@ -1479,26 +1681,35 @@ func TestJobRestarts(t *testing.T) { err = ja.waitForRunState(890, "RUNNING", timeout) assert.EqualError(t, err, "run is SOMETHING: Checking...") + testRestart := func(jobID int64, stopErr, startErr string) { + err := ja.StopActiveRun(jobID, timeout) + if stopErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, stopErr) + } + if stopErr == "" { + err = ja.Start(jobID, timeout) + if startErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, startErr) + } + } + } + // no active runs for the first time - err = ja.Restart("123", timeout) - assert.NoError(t, err) + testRestart(123, "", "") // one active run for the second time - err = ja.Restart("123", timeout) - assert.NoError(t, err) - - err = ja.Restart("111", timeout) - assert.EqualError(t, err, "cannot cancel run 567: nope") + testRestart(123, "", "") - err = ja.Restart("a", timeout) - assert.EqualError(t, err, "strconv.ParseInt: parsing \"a\": invalid syntax") + testRestart(111, "cannot cancel run 567: nope", "") - err = ja.Restart("222", timeout) - assert.EqualError(t, err, "nope") + testRestart(222, "nope", "") - err = ja.Restart("678", timeout) - assert.EqualError(t, err, "`always_running` must be specified only "+ - "with `max_concurrent_runs = 1`. There are 2 active runs") + testRestart(678, "`always_running` must be specified only "+ + "with `max_concurrent_runs = 1`. There are 2 active runs", "") }) }