Skip to content

Commit

Permalink
provisionerd sends failed or complete last (#2732)
Browse files Browse the repository at this point in the history
* provisionerd sends failed or complete last

Signed-off-by: Spike Curtis <spike@coder.com>

* Move runner into package

Signed-off-by: Spike Curtis <spike@coder.com>

* Remove jobRunner interface

Signed-off-by: Spike Curtis <spike@coder.com>

* renames and slight reworking from code review

Signed-off-by: Spike Curtis <spike@coder.com>

* Reword comment about okToSend

Signed-off-by: Spike Curtis <spike@coder.com>
  • Loading branch information
spikecurtis committed Jul 1, 2022
1 parent e5d5fa7 commit 22febc7
Show file tree
Hide file tree
Showing 8 changed files with 1,099 additions and 805 deletions.
2 changes: 1 addition & 1 deletion agent/reaper/reaper_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ func IsInitProcess() bool {
return false
}

func ForkReap(opt ...Option) error {
func ForkReap(_ ...Option) error {
return nil
}
1 change: 1 addition & 0 deletions coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ func UpdateTemplateVersion(t *testing.T, client *codersdk.Client, organizationID

// AwaitTemplateImportJob awaits for an import job to reach completed status.
func AwaitTemplateVersionJob(t *testing.T, client *codersdk.Client, version uuid.UUID) codersdk.TemplateVersion {
t.Logf("waiting for template version job %s", version)
var templateVersion codersdk.TemplateVersion
require.Eventually(t, func() bool {
var err error
Expand Down
12 changes: 12 additions & 0 deletions coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
if err != nil {
return nil, xerrors.Errorf("parse job id: %w", err)
}
server.Logger.Debug(ctx, "UpdateJob starting", slog.F("job_id", parsedID))
job, err := server.Database.GetProvisionerJobByID(ctx, parsedID)
if err != nil {
return nil, xerrors.Errorf("get job: %w", err)
Expand Down Expand Up @@ -368,19 +369,27 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
insertParams.Stage = append(insertParams.Stage, log.Stage)
insertParams.Source = append(insertParams.Source, logSource)
insertParams.Output = append(insertParams.Output, log.Output)
server.Logger.Debug(ctx, "job log",
slog.F("job_id", parsedID),
slog.F("stage", log.Stage),
slog.F("output", log.Output))
}
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
if err != nil {
server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err))
return nil, xerrors.Errorf("insert job logs: %w", err)
}
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
data, err := json.Marshal(logs)
if err != nil {
return nil, xerrors.Errorf("marshal job log: %w", err)
}
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
if err != nil {
server.Logger.Error(ctx, "failed to publish job logs", slog.F("job_id", parsedID), slog.Error(err))
return nil, xerrors.Errorf("publish job log: %w", err)
}
server.Logger.Debug(ctx, "published job logs", slog.F("job_id", parsedID))
}

if len(request.Readme) > 0 {
Expand Down Expand Up @@ -489,6 +498,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("parse job id: %w", err)
}
server.Logger.Debug(ctx, "FailJob starting", slog.F("job_id", jobID))
job, err := server.Database.GetProvisionerJobByID(ctx, jobID)
if err != nil {
return nil, xerrors.Errorf("get provisioner job: %w", err)
Expand Down Expand Up @@ -548,6 +558,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return nil, xerrors.Errorf("parse job id: %w", err)
}
server.Logger.Debug(ctx, "CompleteJob starting", slog.F("job_id", jobID))
job, err := server.Database.GetProvisionerJobByID(ctx, jobID)
if err != nil {
return nil, xerrors.Errorf("get job by id: %w", err)
Expand Down Expand Up @@ -700,6 +711,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
reflect.TypeOf(completed.Type).String())
}

server.Logger.Debug(ctx, "CompleteJob done", slog.F("job_id", jobID))
return &proto.Empty{}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
for _, log := range logs {
select {
case bufferedLogs <- log:
api.Logger.Debug(r.Context(), "subscribe buffered log", slog.F("job_id", job.ID), slog.F("stage", log.Stage))
default:
// If this overflows users could miss logs streaming. This can happen
// if a database request takes a long amount of time, and we get a lot of logs.
Expand Down Expand Up @@ -176,8 +177,10 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
for {
select {
case <-r.Context().Done():
api.Logger.Debug(context.Background(), "job logs context canceled", slog.F("job_id", job.ID))
return
case log := <-bufferedLogs:
api.Logger.Debug(r.Context(), "subscribe encoding log", slog.F("job_id", job.ID), slog.F("stage", log.Stage))
err = encoder.Encode(convertProvisionerJobLog(log))
if err != nil {
return
Expand All @@ -189,6 +192,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
continue
}
if job.CompletedAt.Valid {
api.Logger.Debug(context.Background(), "streaming job logs done; job done", slog.F("job_id", job.ID))
return
}
}
Expand Down

0 comments on commit 22febc7

Please sign in to comment.