Skip to content

Commit

Permalink
feat: Add graceful exits to provisionerd (#372)
Browse files Browse the repository at this point in the history
* ci: Update DataDog GitHub branch to fallback to GITHUB_REF

This was detecting branches, but not our "main" branch before.
Hopefully this fixes it!

* Add basic Terraform Provider

* Rename post files to upload

* Add tests for resources

* Skip instance identity test

* Add tests for ensuring agent get's passed through properly

* Fix linting errors

* Add echo path

* Fix agent authentication

* fix: Convert all jobs to use a common resource and agent type

This enables a consistent API for project import and provisioned resources.

* Add "coder_workspace" data source

* feat: Remove magical parameters from being injected

This is a much cleaner abstraction. Explicitly declaring the user
parameters for each provisioner makes for significantly simpler
testing.

* feat: Add graceful exits to provisionerd

Terraform (or other provisioners) may need to cleanup state, or
cancel actions before exit. This adds the ability to gracefully
exit provisionerd.

* Fix cancel error check
  • Loading branch information
kylecarbs committed Feb 28, 2022
1 parent e5c9555 commit 9d2803e
Show file tree
Hide file tree
Showing 13 changed files with 1,091 additions and 537 deletions.
32 changes: 24 additions & 8 deletions coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
return &proto.UpdateJobResponse{}, nil
}

func (server *provisionerdServer) CancelJob(ctx context.Context, cancelJob *proto.CancelledJob) (*proto.Empty, error) {
jobID, err := uuid.Parse(cancelJob.JobId)
func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.FailedJob) (*proto.Empty, error) {
jobID, err := uuid.Parse(failJob.JobId)
if err != nil {
return nil, xerrors.Errorf("parse job id: %w", err)
}
Expand All @@ -422,19 +422,35 @@ func (server *provisionerdServer) CancelJob(ctx context.Context, cancelJob *prot
Time: database.Now(),
Valid: true,
},
CancelledAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
UpdatedAt: database.Now(),
Error: sql.NullString{
String: cancelJob.Error,
Valid: cancelJob.Error != "",
String: failJob.Error,
Valid: failJob.Error != "",
},
})
if err != nil {
return nil, xerrors.Errorf("update provisioner job: %w", err)
}
switch jobType := failJob.Type.(type) {
case *proto.FailedJob_WorkspaceProvision_:
if jobType.WorkspaceProvision.State == nil {
break
}
var input workspaceProvisionJob
err = json.Unmarshal(job.Input, &input)
if err != nil {
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
}
err = server.Database.UpdateWorkspaceHistoryByID(ctx, database.UpdateWorkspaceHistoryByIDParams{
ID: jobID,
UpdatedAt: database.Now(),
ProvisionerState: jobType.WorkspaceProvision.State,
})
if err != nil {
return nil, xerrors.Errorf("update workspace history state: %w", err)
}
case *proto.FailedJob_ProjectImport_:
}
return &proto.Empty{}, nil
}

Expand Down
10 changes: 5 additions & 5 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,18 @@ func convertProvisionerJob(provisionerJob database.ProvisionerJob) ProvisionerJo
case !provisionerJob.StartedAt.Valid:
job.Status = ProvisionerJobStatusPending
case provisionerJob.CompletedAt.Valid:
job.Status = ProvisionerJobStatusSucceeded
if job.Error == "" {
job.Status = ProvisionerJobStatusSucceeded
} else {
job.Status = ProvisionerJobStatusFailed
}
case database.Now().Sub(provisionerJob.UpdatedAt) > 30*time.Second:
job.Status = ProvisionerJobStatusFailed
job.Error = "Worker failed to update job in time."
default:
job.Status = ProvisionerJobStatusRunning
}

if !provisionerJob.CancelledAt.Valid && job.Error != "" {
job.Status = ProvisionerJobStatusFailed
}

return job
}

Expand Down
4 changes: 4 additions & 0 deletions provisioner/echo/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (*echo) Provision(request *proto.Provision_Request, stream proto.DRPCProvis
return stream.Context().Err()
}

func (*echo) Shutdown(_ context.Context, _ *proto.Empty) (*proto.Empty, error) {
return &proto.Empty{}, nil
}

type Responses struct {
Parse []*proto.Parse_Response
Provision []*proto.Provision_Response
Expand Down
67 changes: 53 additions & 14 deletions provisioner/terraform/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
Expand Down Expand Up @@ -253,25 +254,21 @@ func (t *terraform) runTerraformPlan(ctx context.Context, terraform *tfexec.Terr
}

func (t *terraform) runTerraformApply(ctx context.Context, terraform *tfexec.Terraform, request *proto.Provision_Request, stream proto.DRPCProvisioner_ProvisionStream, statefilePath string) error {
env := map[string]string{
"CODER_URL": request.Metadata.CoderUrl,
"CODER_WORKSPACE_TRANSITION": strings.ToLower(request.Metadata.WorkspaceTransition.String()),
env := []string{
"CODER_URL=" + request.Metadata.CoderUrl,
"CODER_WORKSPACE_TRANSITION=" + strings.ToLower(request.Metadata.WorkspaceTransition.String()),
}
options := []tfexec.ApplyOption{tfexec.JSON(true)}
vars := []string{}
for _, param := range request.ParameterValues {
switch param.DestinationScheme {
case proto.ParameterDestination_ENVIRONMENT_VARIABLE:
env[param.Name] = param.Value
env = append(env, fmt.Sprintf("%s=%s", param.Name, param.Value))
case proto.ParameterDestination_PROVISIONER_VARIABLE:
options = append(options, tfexec.Var(fmt.Sprintf("%s=%s", param.Name, param.Value)))
vars = append(vars, fmt.Sprintf("%s=%s", param.Name, param.Value))
default:
return xerrors.Errorf("unsupported parameter type %q for %q", param.DestinationScheme, param.Name)
}
}
err := terraform.SetEnv(env)
if err != nil {
return xerrors.Errorf("apply environment variables: %w", err)
}

reader, writer := io.Pipe()
defer reader.Close()
Expand Down Expand Up @@ -319,11 +316,24 @@ func (t *terraform) runTerraformApply(ctx context.Context, terraform *tfexec.Ter
}
}()

terraform.SetStdout(writer)
t.logger.Debug(ctx, "running apply", slog.F("options", options))
err = terraform.Apply(ctx, options...)
t.logger.Debug(ctx, "running apply", slog.F("vars", len(vars)), slog.F("env", len(env)))
err := runApplyCommand(ctx, t.shutdownCtx, terraform.ExecPath(), terraform.WorkingDir(), writer, env, vars)
if err != nil {
return xerrors.Errorf("apply terraform: %w", err)
errorMessage := err.Error()
// Terraform can fail and apply and still need to store it's state.
// In this case, we return Complete with an explicit error message.
statefileContent, err := os.ReadFile(statefilePath)
if err != nil {
return xerrors.Errorf("read file %q: %w", statefilePath, err)
}
return stream.Send(&proto.Provision_Response{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
State: statefileContent,
Error: errorMessage,
},
},
})
}
t.logger.Debug(ctx, "ran apply")

Expand Down Expand Up @@ -428,6 +438,35 @@ func (t *terraform) runTerraformApply(ctx context.Context, terraform *tfexec.Ter
})
}

// This couldn't use terraform-exec, because it doesn't support cancellation, and there didn't appear
// to be a straight-forward way to add it.
func runApplyCommand(ctx, shutdownCtx context.Context, bin, dir string, stdout io.Writer, env, vars []string) error {
args := []string{
"apply",
"-no-color",
"-auto-approve",
"-input=false",
"-json",
"-refresh=true",
}
for _, variable := range vars {
args = append(args, "-var", variable)
}
cmd := exec.CommandContext(ctx, bin, args...)
go func() {
select {
case <-ctx.Done():
return
case <-shutdownCtx.Done():
_ = cmd.Process.Signal(os.Kill)
}
}()
cmd.Stdout = stdout
cmd.Env = env
cmd.Dir = dir
return cmd.Run()
}

type terraformProvisionLog struct {
Level string `json:"@level"`
Message string `json:"@message"`
Expand Down
18 changes: 15 additions & 3 deletions provisioner/terraform/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"cdr.dev/slog"

"github.com/coder/coder/provisionersdk"
"github.com/coder/coder/provisionersdk/proto"
)

var (
Expand Down Expand Up @@ -43,14 +44,25 @@ func Serve(ctx context.Context, options *ServeOptions) error {
}
options.BinaryPath = binaryPath
}

shutdownCtx, shutdownCancel := context.WithCancel(ctx)
return provisionersdk.Serve(ctx, &terraform{
binaryPath: options.BinaryPath,
logger: options.Logger,
binaryPath: options.BinaryPath,
logger: options.Logger,
shutdownCtx: shutdownCtx,
shutdownCancel: shutdownCancel,
}, options.ServeOptions)
}

type terraform struct {
binaryPath string
logger slog.Logger

shutdownCtx context.Context
shutdownCancel context.CancelFunc
}

// Shutdown signals to begin graceful shutdown of any running operations.
func (t *terraform) Shutdown(_ context.Context, _ *proto.Empty) (*proto.Empty, error) {
t.shutdownCancel()
return &proto.Empty{}, nil
}

0 comments on commit 9d2803e

Please sign in to comment.