This repository has been archived by the owner on Jul 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
job_runner.go
253 lines (210 loc) · 7.51 KB
/
job_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
// Copyright 2018 the Service Broker Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tf
import (
"context"
"errors"
"fmt"
"time"
"code.cloudfoundry.org/lager"
"github.com/GoogleCloudPlatform/gcp-service-broker/brokerapi/brokers/models"
"github.com/GoogleCloudPlatform/gcp-service-broker/db_service"
"github.com/GoogleCloudPlatform/gcp-service-broker/pkg/providers/tf/wrapper"
"github.com/GoogleCloudPlatform/gcp-service-broker/utils"
)
const (
InProgress = "in progress"
Succeeded = "succeeded"
Failed = "failed"
)
// NewTfJobRunerFromEnv creates a new TfJobRunner with default configuration values.
func NewTfJobRunerFromEnv() (*TfJobRunner, error) {
projectId, err := utils.GetDefaultProjectId()
if err != nil {
return nil, err
}
return NewTfJobRunnerForProject(projectId), nil
}
// Construct a new JobRunner for the given project.
func NewTfJobRunnerForProject(projectId string) *TfJobRunner {
return &TfJobRunner{
ProjectId: projectId,
ServiceAccount: utils.GetServiceAccountJson(),
}
}
// TfJobRunner is responsible for executing terraform jobs in the background and
// providing a way to log and access the state of those background tasks.
//
// Jobs are given an ID and a workspace to operate in, and then the TfJobRunner
// is told which Terraform commands to execute on the given job.
// The TfJobRunner executes those commands in the background and keeps track of
// their state in a database table which gets updated once the task is completed.
//
// The TfJobRunner keeps track of the workspace and the Terraform state file so
// subsequent commands will operate on the same structure.
type TfJobRunner struct {
ProjectId string
ServiceAccount string
// Executor holds a custom executor that will be called when commands are run.
Executor wrapper.TerraformExecutor
}
// StageJob stages a job to be executed. Before the workspace is saved to the
// database, the modules and inputs are validated by Terraform.
func (runner *TfJobRunner) StageJob(ctx context.Context, jobId string, workspace *wrapper.TerraformWorkspace) error {
workspace.Executor = runner.Executor
// Validate that TF is happy with the workspace
if err := workspace.Validate(); err != nil {
return err
}
workspaceString, err := workspace.Serialize()
if err != nil {
return err
}
deployment := &models.TerraformDeployment{
ID: jobId,
Workspace: workspaceString,
LastOperationType: "validation",
}
return runner.operationFinished(nil, workspace, deployment)
}
func (runner *TfJobRunner) markJobStarted(ctx context.Context, deployment *models.TerraformDeployment, operationType string) error {
// update the deployment info
deployment.LastOperationType = operationType
deployment.LastOperationState = InProgress
deployment.LastOperationMessage = ""
if err := db_service.SaveTerraformDeployment(ctx, deployment); err != nil {
return err
}
return nil
}
func (runner *TfJobRunner) hydrateWorkspace(ctx context.Context, deployment *models.TerraformDeployment) (*wrapper.TerraformWorkspace, error) {
ws, err := wrapper.DeserializeWorkspace(deployment.Workspace)
if err != nil {
return nil, err
}
// TODO(josephlewis42) don't assume every pak needs Google specific creds
// GOOGLE_CREDENTIALS needs to be set to the JSON key and GOOGLE_PROJECT
// needs to be set to the project
env := map[string]string{
"GOOGLE_CREDENTIALS": runner.ServiceAccount,
"GOOGLE_PROJECT": runner.ProjectId,
}
ws.Executor = wrapper.CustomEnvironmentExecutor(env, runner.Executor)
logger := utils.NewLogger("job-runner")
logger.Info("wrapping", lager.Data{
"wrapper": ws,
})
return ws, nil
}
// Create runs `terraform apply` on the given workspace in the background.
// The status of the job can be found by polling the Status function.
func (runner *TfJobRunner) Create(ctx context.Context, id string) error {
deployment, err := db_service.GetTerraformDeploymentById(ctx, id)
if err != nil {
return err
}
workspace, err := runner.hydrateWorkspace(ctx, deployment)
if err != nil {
return err
}
if err := runner.markJobStarted(ctx, deployment, models.ProvisionOperationType); err != nil {
return err
}
go func() {
err := workspace.Apply()
runner.operationFinished(err, workspace, deployment)
}()
return nil
}
// Destroy runs `terraform destroy` on the given workspace in the background.
// The status of the job can be found by polling the Status function.
func (runner *TfJobRunner) Destroy(ctx context.Context, id string) error {
deployment, err := db_service.GetTerraformDeploymentById(ctx, id)
if err != nil {
return err
}
workspace, err := runner.hydrateWorkspace(ctx, deployment)
if err != nil {
return err
}
if err := runner.markJobStarted(ctx, deployment, models.DeprovisionOperationType); err != nil {
return err
}
go func() {
err := workspace.Destroy()
runner.operationFinished(err, workspace, deployment)
}()
return nil
}
// operationFinished closes out the state of the background job so clients that
// are polling can get the results.
func (runner *TfJobRunner) operationFinished(err error, workspace *wrapper.TerraformWorkspace, deployment *models.TerraformDeployment) error {
if err == nil {
deployment.LastOperationState = Succeeded
deployment.LastOperationMessage = ""
} else {
deployment.LastOperationState = Failed
deployment.LastOperationMessage = err.Error()
}
workspaceString, err := workspace.Serialize()
if err != nil {
deployment.LastOperationState = Failed
deployment.LastOperationMessage = fmt.Sprintf("couldn't serialize workspace, contact your operator for cleanup: %s", err.Error())
}
deployment.Workspace = workspaceString
return db_service.SaveTerraformDeployment(context.Background(), deployment)
}
// Status gets the status of the most recent job on the workspace.
// If isDone is true, then the status of the operation will not change again.
// if isDone is false, then the operation is ongoing.
func (runner *TfJobRunner) Status(ctx context.Context, id string) (isDone bool, err error) {
deployment, err := db_service.GetTerraformDeploymentById(ctx, id)
if err != nil {
return true, err
}
switch deployment.LastOperationState {
case Succeeded:
return true, nil
case Failed:
return true, errors.New(deployment.LastOperationMessage)
default:
return false, nil
}
}
// Outputs gets the output variables for the given module instance in the workspace.
func (runner *TfJobRunner) Outputs(ctx context.Context, id, instanceName string) (map[string]interface{}, error) {
deployment, err := db_service.GetTerraformDeploymentById(ctx, id)
if err != nil {
return nil, err
}
ws, err := wrapper.DeserializeWorkspace(deployment.Workspace)
if err != nil {
return nil, err
}
return ws.Outputs(instanceName)
}
// Wait waits for an operation to complete, polling its status once per second.
func (runner *TfJobRunner) Wait(ctx context.Context, id string) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(1 * time.Second):
isDone, err := runner.Status(ctx, id)
if isDone {
return err
}
}
}
}