Skip to content

Commit

Permalink
fix: Simplify provisionerd job acquire (#158)
Browse files Browse the repository at this point in the history
This uses a simple channel to detect whether a
job is running or not, and moves all cancels
to be in goroutines.
  • Loading branch information
kylecarbs committed Feb 4, 2022
1 parent 7884b43 commit c65850b
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 127 deletions.
2 changes: 2 additions & 0 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Options struct {
func New(options *Options) http.Handler {
api := &api{
Database: options.Database,
Logger: options.Logger,
Pubsub: options.Pubsub,
}

Expand Down Expand Up @@ -110,5 +111,6 @@ func New(options *Options) http.Handler {
// be added to this struct for code clarity.
type api struct {
Database database.Store
Logger slog.Logger
Pubsub database.Pubsub
}
2 changes: 1 addition & 1 deletion coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func New(t *testing.T) Server {
}

handler := coderd.New(&coderd.Options{
Logger: slogtest.Make(t, nil),
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
Database: db,
Pubsub: pubsub,
})
Expand Down
19 changes: 15 additions & 4 deletions coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"

"cdr.dev/slog"

"github.com/coder/coder/coderd/projectparameter"
"github.com/coder/coder/database"
"github.com/coder/coder/httpapi"
Expand Down Expand Up @@ -84,6 +86,7 @@ func (api *api) provisionerDaemonsServe(rw http.ResponseWriter, r *http.Request)
Database: api.Database,
Pubsub: api.Pubsub,
Provisioners: daemon.Provisioners,
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
})
if err != nil {
_ = conn.Close(websocket.StatusInternalError, fmt.Sprintf("drpc register provisioner daemon: %s", err))
Expand All @@ -109,6 +112,7 @@ type projectImportJob struct {
// Implementation of the provisioner daemon protobuf server.
type provisionerdServer struct {
ID uuid.UUID
Logger slog.Logger
Provisioners []database.ProvisionerType
Database database.Store
Pubsub database.Pubsub
Expand Down Expand Up @@ -136,9 +140,11 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
if err != nil {
return nil, xerrors.Errorf("acquire job: %w", err)
}
server.Logger.Debug(ctx, "locked job from database", slog.F("id", job.ID))

// Marks the acquired job as failed with the error message provided.
failJob := func(errorMessage string) error {
err = server.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
CompletedAt: sql.NullTime{
Time: database.Now(),
Expand Down Expand Up @@ -381,8 +387,12 @@ func (server *provisionerdServer) CancelJob(ctx context.Context, cancelJob *prot
if err != nil {
return nil, xerrors.Errorf("parse job id: %w", err)
}
err = server.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: jobID,
CompletedAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
CancelledAt: sql.NullTime{
Time: database.Now(),
Valid: true,
Expand Down Expand Up @@ -476,7 +486,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr

// This must occur in a transaction in case of failure.
err = server.Database.InTx(func(db database.Store) error {
err = db.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: jobID,
UpdatedAt: database.Now(),
CompletedAt: sql.NullTime{
Expand All @@ -495,6 +505,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
return xerrors.Errorf("insert project parameter %q: %w", projectParameter.Name, err)
}
}
server.Logger.Debug(ctx, "marked import job as completed", slog.F("job_id", jobID))
return nil
})
if err != nil {
Expand All @@ -513,7 +524,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
}

err = server.Database.InTx(func(db database.Store) error {
err = db.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: jobID,
UpdatedAt: database.Now(),
CompletedAt: sql.NullTime{
Expand Down
2 changes: 1 addition & 1 deletion coderd/provisioners.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ProvisionerJobStatus string

// Completed returns whether the job is still processing.
func (p ProvisionerJobStatus) Completed() bool {
return p == ProvisionerJobStatusSucceeded || p == ProvisionerJobStatusFailed
return p == ProvisionerJobStatusSucceeded || p == ProvisionerJobStatusFailed || p == ProvisionerJobStatusCancelled
}

const (
Expand Down
4 changes: 4 additions & 0 deletions coderd/workspacehistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (api *api) postWorkspaceHistoryByUser(rw http.ResponseWriter, r *http.Reque
Message: fmt.Sprintf("The provided project history %q has failed to import. You cannot create workspaces using it!", projectHistory.Name),
})
return
case ProvisionerJobStatusCancelled:
httpapi.Write(rw, http.StatusPreconditionFailed, httpapi.Response{
Message: "The provided project history was canceled during import. You cannot create workspaces using it!",
})
}

project, err := api.Database.GetProjectByID(r.Context(), projectHistory.ProjectID)
Expand Down
1 change: 1 addition & 0 deletions coderd/workspacehistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestWorkspaceHistory(t *testing.T) {
require.Eventually(t, func() bool {
hist, err := client.ProjectHistory(context.Background(), user.Organization, project.Name, projectHistory.Name)
require.NoError(t, err)
t.Logf("Import status: %s\n", hist.Import.Status)
return hist.Import.Status.Completed()
}, 15*time.Second, 50*time.Millisecond)
return projectHistory
Expand Down
17 changes: 16 additions & 1 deletion database/databasefake/databasefake.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,24 @@ func (q *fakeQuerier) UpdateProvisionerJobByID(_ context.Context, arg database.U
if arg.ID.String() != job.ID.String() {
continue
}
job.UpdatedAt = arg.UpdatedAt
q.provisionerJobs[index] = job
return nil
}
return sql.ErrNoRows
}

func (q *fakeQuerier) UpdateProvisionerJobWithCompleteByID(_ context.Context, arg database.UpdateProvisionerJobWithCompleteByIDParams) error {
q.mutex.Lock()
defer q.mutex.Unlock()

for index, job := range q.provisionerJobs {
if arg.ID.String() != job.ID.String() {
continue
}
job.UpdatedAt = arg.UpdatedAt
job.CompletedAt = arg.CompletedAt
job.CancelledAt = arg.CancelledAt
job.UpdatedAt = arg.UpdatedAt
job.Error = arg.Error
q.provisionerJobs[index] = job
return nil
Expand Down
1 change: 1 addition & 0 deletions database/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions database/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -604,12 +604,20 @@ WHERE
id = $1;

-- name: UpdateProvisionerJobByID :exec
UPDATE
provisioner_job
SET
updated_at = $2
WHERE
id = $1;

-- name: UpdateProvisionerJobWithCompleteByID :exec
UPDATE
provisioner_job
SET
updated_at = $2,
cancelled_at = $3,
completed_at = $4,
completed_at = $3,
cancelled_at = $4,
error = $5
WHERE
id = $1;
Expand Down
33 changes: 26 additions & 7 deletions database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c65850b

Please sign in to comment.