Skip to content

Commit

Permalink
Fixed databricks_job resource to clear instance-specific attributes…
Browse files Browse the repository at this point in the history
… when `instance_pool_id` is specified (#2507)

NodeTypeID cannot be set in jobsAPI.Update() if InstancePoolID is specified.
If both are specified, assume InstancePoolID takes precedence and NodeTypeID is only computed.

Closes #2502.
Closes #2141.
  • Loading branch information
marekbrysa committed Jul 29, 2023
1 parent 6b7220a commit 754aad4
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 2 deletions.
2 changes: 1 addition & 1 deletion clusters/clusters_api.go
Expand Up @@ -458,7 +458,7 @@ func (cluster *Cluster) ModifyRequestOnInstancePool() {
cluster.AwsAttributes = &awsAttributes
}
if cluster.AzureAttributes != nil {
cluster.AzureAttributes = nil
cluster.AzureAttributes = &AzureAttributes{}
}
if cluster.GcpAttributes != nil {
gcpAttributes := GcpAttributes{
Expand Down
2 changes: 1 addition & 1 deletion clusters/resource_cluster_test.go
Expand Up @@ -1567,7 +1567,7 @@ func TestModifyClusterRequestAzure(t *testing.T) {
DriverNodeTypeID: "e",
}
c.ModifyRequestOnInstancePool()
assert.Nil(t, c.AzureAttributes)
assert.Equal(t, &AzureAttributes{}, c.AzureAttributes)
assert.Equal(t, "", c.NodeTypeID)
assert.Equal(t, "", c.DriverNodeTypeID)
assert.Equal(t, false, c.EnableElasticDisk)
Expand Down
19 changes: 19 additions & 0 deletions jobs/resource_job.go
Expand Up @@ -747,6 +747,22 @@ func (c controlRunStateLifecycleManager) OnUpdate(ctx context.Context) error {
return api.StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate))
}

func prepareJobSettingsForUpdate(js JobSettings) {
if js.NewCluster != nil {
js.NewCluster.ModifyRequestOnInstancePool()
}
for _, task := range js.Tasks {
if task.NewCluster != nil {
task.NewCluster.ModifyRequestOnInstancePool()
}
}
for _, jc := range js.JobClusters {
if jc.NewCluster != nil {
jc.NewCluster.ModifyRequestOnInstancePool()
}
}
}

func ResourceJob() *schema.Resource {
getReadCtx := func(ctx context.Context, d *schema.ResourceData) context.Context {
var js JobSettings
Expand Down Expand Up @@ -823,6 +839,9 @@ func ResourceJob() *schema.Resource {
if js.isMultiTask() {
ctx = context.WithValue(ctx, common.Api, common.API_2_1)
}

prepareJobSettingsForUpdate(js)

jobsAPI := NewJobsAPI(ctx, c)
err := jobsAPI.Update(d.Id(), js)
if err != nil {
Expand Down
107 changes: 107 additions & 0 deletions jobs/resource_job_test.go
Expand Up @@ -1419,6 +1419,113 @@ func TestResourceJobUpdate(t *testing.T) {
assert.Equal(t, "Featurizer New", d.Get("name"))
}

func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.1/jobs/reset",
ExpectedRequest: UpdateJobRequest{
JobID: 789,
NewSettings: &JobSettings{
NewCluster: &clusters.Cluster{
InstancePoolID: "instance-pool-worker",
DriverInstancePoolID: "instance-pool-driver",
SparkVersion: "spark-1",
NumWorkers: 1,
},
Tasks: []JobTaskSettings{
{
NewCluster: &clusters.Cluster{
InstancePoolID: "instance-pool-worker-task",
DriverInstancePoolID: "instance-pool-driver-task",
SparkVersion: "spark-2",
NumWorkers: 2,
},
},
},
JobClusters: []JobCluster{
{
NewCluster: &clusters.Cluster{
InstancePoolID: "instance-pool-worker-job",
DriverInstancePoolID: "instance-pool-driver-job",
SparkVersion: "spark-3",
NumWorkers: 3,
},
},
},
Name: "Featurizer New",
MaxRetries: 3,
MinRetryIntervalMillis: 5000,
RetryOnTimeout: true,
MaxConcurrentRuns: 1,
},
},
},
{
Method: "GET",
Resource: "/api/2.1/jobs/get?job_id=789",
Response: Job{
JobID: 789,
Settings: &JobSettings{
NewCluster: &clusters.Cluster{
NodeTypeID: "node-type-id",
DriverNodeTypeID: "driver-node-type-id",
},
Name: "Featurizer New",
MaxRetries: 3,
MinRetryIntervalMillis: 5000,
RetryOnTimeout: true,
MaxConcurrentRuns: 1,
},
},
},
},
ID: "789",
Update: true,
Resource: ResourceJob(),
InstanceState: map[string]string{
"new_cluster.0.node_type_id": "node-type-id-worker",
"new_cluster.0.driver_node_type_id": "node-type-id-driver",
"task.0.new_cluster.0.node_type_id": "node-type-id-worker-task",
"task.0.new_cluster.0.driver_node_type_id": "node-type-id-driver-task",
"job_cluster.0.new_cluster.0.node_type_id": "node-type-id-worker-job",
"job_cluster.0.new_cluster.0.driver_node_type_id": "node-type-id-driver-job",
},
HCL: `
new_cluster = {
instance_pool_id = "instance-pool-worker"
driver_instance_pool_id = "instance-pool-driver"
spark_version = "spark-1"
num_workers = 1
}
task = {
new_cluster = {
instance_pool_id = "instance-pool-worker-task"
driver_instance_pool_id = "instance-pool-driver-task"
spark_version = "spark-2"
num_workers = 2
}
}
job_cluster = {
new_cluster = {
instance_pool_id = "instance-pool-worker-job"
driver_instance_pool_id = "instance-pool-driver-job"
spark_version = "spark-3"
num_workers = 3
}
}
max_concurrent_runs = 1
max_retries = 3
min_retry_interval_millis = 5000
name = "Featurizer New"
retry_on_timeout = true`,
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "789", d.Id(), "Id should be the same as in reading")
assert.Equal(t, "Featurizer New", d.Get("name"))
}

func TestResourceJobUpdate_Tasks(t *testing.T) {
qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
Expand Down

0 comments on commit 754aad4

Please sign in to comment.