From 30281852d6f4e621ea54fd5550a8e683c5069120 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Sun, 6 Nov 2022 18:50:34 -0800 Subject: [PATCH] feat: Add buffering to provisioner job logs (#4918) * feat: Add bufferring to provisioner job logs This should improve overall build performance, and especially under load. It removes the old `id` column on the `provisioner_job_logs` table and replaces it with an auto-incrementing big integer to preserve order. Funny enough, we never had to care about order before because inserts would at minimum be 1ms different. Now they aren't, so the order needs to be preserved. * Fix log bufferring * Fix frontend log streaming * Fix JS test --- cli/cliui/provisionerjob.go | 4 +- cli/create.go | 6 +- cli/delete.go | 3 +- cli/portforward.go | 2 +- cli/ssh.go | 2 +- cli/start.go | 3 +- cli/state.go | 4 +- cli/stop.go | 3 +- cli/templatecreate.go | 3 +- cli/update.go | 4 +- coderd/database/databasefake/databasefake.go | 14 +- coderd/database/dump.sql | 15 +- .../000071_provisioner_log_lines.down.sql | 3 + .../000071_provisioner_log_lines.up.sql | 3 + coderd/database/models.go | 2 +- coderd/database/queries.sql.go | 33 ++- .../database/queries/provisionerjoblogs.sql | 9 +- coderd/provisionerdaemons.go | 10 +- coderd/provisionerjobs.go | 60 ++--- coderd/provisionerjobs_internal_test.go | 181 -------------- coderd/provisionerjobs_test.go | 10 +- coderd/templateversions_test.go | 7 +- coderd/workspacebuilds_test.go | 3 +- codersdk/provisionerdaemons.go | 14 +- codersdk/templateversions.go | 16 +- codersdk/workspacebuilds.go | 8 +- loadtest/workspacebuild/run.go | 10 +- provisionerd/provisionerd.go | 7 +- provisionerd/provisionerd_test.go | 32 ++- provisionerd/runner/runner.go | 226 +++++++++--------- site/src/api/typesGenerated.ts | 2 +- .../WorkspaceBuildLogs.test.ts | 6 +- site/src/testHelpers/entities.ts | 66 ++--- .../workspaceBuild/workspaceBuildXService.ts | 5 +- 34 files changed, 300 insertions(+), 476 deletions(-) create mode 100644 coderd/database/migrations/000071_provisioner_log_lines.down.sql create mode 100644 coderd/database/migrations/000071_provisioner_log_lines.up.sql diff --git a/cli/cliui/provisionerjob.go b/cli/cliui/provisionerjob.go index a7c2dcc86bad2..36f7d9c78c470 100644 --- a/cli/cliui/provisionerjob.go +++ b/cli/cliui/provisionerjob.go @@ -16,14 +16,14 @@ import ( "github.com/coder/coder/codersdk" ) -func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID, before time.Time) error { +func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID) error { return ProvisionerJob(ctx, writer, ProvisionerJobOptions{ Fetch: func() (codersdk.ProvisionerJob, error) { build, err := client.WorkspaceBuild(ctx, build) return build.Job, err }, Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) { - return client.WorkspaceBuildLogsAfter(ctx, build, before) + return client.WorkspaceBuildLogsAfter(ctx, build, 0) }, }) } diff --git a/cli/create.go b/cli/create.go index 4b268f7161d8f..854ba78565c51 100644 --- a/cli/create.go +++ b/cli/create.go @@ -139,7 +139,6 @@ func create() *cobra.Command { return err } - after := time.Now() workspace, err := client.CreateWorkspace(cmd.Context(), organization.ID, codersdk.Me, codersdk.CreateWorkspaceRequest{ TemplateID: template.ID, Name: workspaceName, @@ -151,7 +150,7 @@ func create() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID, after) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID) if err != nil { return err } @@ -238,7 +237,6 @@ PromptParamLoop: _, _ = fmt.Fprintln(cmd.OutOrStdout()) // Run a dry-run with the given parameters to check correctness - after := time.Now() dryRun, err := client.CreateTemplateVersionDryRun(cmd.Context(), templateVersion.ID, codersdk.CreateTemplateVersionDryRunRequest{ WorkspaceName: args.NewWorkspaceName, ParameterValues: parameters, @@ -255,7 +253,7 @@ PromptParamLoop: return client.CancelTemplateVersionDryRun(cmd.Context(), templateVersion.ID, dryRun.ID) }, Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) { - return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, after) + return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, 0) }, // Don't show log output for the dry-run unless there's an error. Silent: true, diff --git a/cli/delete.go b/cli/delete.go index fd30fe95a1c1e..4c655339d8a8f 100644 --- a/cli/delete.go +++ b/cli/delete.go @@ -47,7 +47,6 @@ func deleteWorkspace() *cobra.Command { ) } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionDelete, ProvisionerState: state, @@ -57,7 +56,7 @@ func deleteWorkspace() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID) if err != nil { return err } diff --git a/cli/portforward.go b/cli/portforward.go index 911e8fb5208d7..ca7cb51f14719 100644 --- a/cli/portforward.go +++ b/cli/portforward.go @@ -79,7 +79,7 @@ func portForward() *cobra.Command { return xerrors.New("workspace must be in start transition to port-forward") } if workspace.LatestBuild.Job.CompletedAt == nil { - err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt) + err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID) if err != nil { return err } diff --git a/cli/ssh.go b/cli/ssh.go index b4d4f6420da78..7d5b25337b3fb 100644 --- a/cli/ssh.go +++ b/cli/ssh.go @@ -250,7 +250,7 @@ func getWorkspaceAndAgent(ctx context.Context, cmd *cobra.Command, client *coder return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, xerrors.New("workspace must be in start transition to ssh") } if workspace.LatestBuild.Job.CompletedAt == nil { - err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt) + err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID) if err != nil { return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, err } diff --git a/cli/start.go b/cli/start.go index 7abcd036a4456..7bf4782e14bad 100644 --- a/cli/start.go +++ b/cli/start.go @@ -25,7 +25,6 @@ func start() *cobra.Command { if err != nil { return err } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionStart, }) @@ -33,7 +32,7 @@ func start() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID) if err != nil { return err } diff --git a/cli/state.go b/cli/state.go index 6a5858db704d7..bd1c0ccad4a6f 100644 --- a/cli/state.go +++ b/cli/state.go @@ -5,7 +5,6 @@ import ( "io" "os" "strconv" - "time" "github.com/spf13/cobra" @@ -100,7 +99,6 @@ func statePush() *cobra.Command { return err } - before := time.Now() build, err = client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ TemplateVersionID: build.TemplateVersionID, Transition: build.Transition, @@ -109,7 +107,7 @@ func statePush() *cobra.Command { if err != nil { return err } - return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID, before) + return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID) }, } cmd.Flags().IntVarP(&buildNumber, "build", "b", 0, "Specify a workspace build to target by name.") diff --git a/cli/stop.go b/cli/stop.go index 1b99c0251ccee..9bb355ef0bd5a 100644 --- a/cli/stop.go +++ b/cli/stop.go @@ -33,7 +33,6 @@ func stop() *cobra.Command { if err != nil { return err } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionStop, }) @@ -41,7 +40,7 @@ func stop() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID) if err != nil { return err } diff --git a/cli/templatecreate.go b/cli/templatecreate.go index 7c0d25f0f7e2f..458cb1c2015b0 100644 --- a/cli/templatecreate.go +++ b/cli/templatecreate.go @@ -160,7 +160,6 @@ type createValidTemplateVersionArgs struct { } func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVersionArgs, parameters ...codersdk.CreateParameterRequest) (*codersdk.TemplateVersion, []codersdk.CreateParameterRequest, error) { - before := time.Now() client := args.Client req := codersdk.CreateTemplateVersionRequest{ @@ -187,7 +186,7 @@ func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVers return client.CancelTemplateVersion(cmd.Context(), version.ID) }, Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) { - return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, before) + return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, 0) }, }) if err != nil { diff --git a/cli/update.go b/cli/update.go index 1ccfaad33a0b3..d419cac9389c4 100644 --- a/cli/update.go +++ b/cli/update.go @@ -2,7 +2,6 @@ package cli import ( "fmt" - "time" "github.com/spf13/cobra" @@ -57,7 +56,6 @@ func update() *cobra.Command { return nil } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ TemplateVersionID: template.ActiveVersionID, Transition: workspace.LatestBuild.Transition, @@ -66,7 +64,7 @@ func update() *cobra.Command { if err != nil { return err } - logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, before) + logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, 0) if err != nil { return err } diff --git a/coderd/database/databasefake/databasefake.go b/coderd/database/databasefake/databasefake.go index a9117f2b2efca..19cab8054db5d 100644 --- a/coderd/database/databasefake/databasefake.go +++ b/coderd/database/databasefake/databasefake.go @@ -2052,17 +2052,14 @@ func (q *fakeQuerier) GetProvisionerLogsByIDBetween(_ context.Context, arg datab if jobLog.JobID != arg.JobID { continue } - if !arg.CreatedBefore.IsZero() && jobLog.CreatedAt.After(arg.CreatedBefore) { + if arg.CreatedBefore != 0 && jobLog.ID > arg.CreatedBefore { continue } - if !arg.CreatedAfter.IsZero() && jobLog.CreatedAt.Before(arg.CreatedAfter) { + if arg.CreatedAfter != 0 && jobLog.ID < arg.CreatedAfter { continue } logs = append(logs, jobLog) } - if len(logs) == 0 { - return nil, sql.ErrNoRows - } return logs, nil } @@ -2212,10 +2209,15 @@ func (q *fakeQuerier) InsertProvisionerJobLogs(_ context.Context, arg database.I defer q.mutex.Unlock() logs := make([]database.ProvisionerJobLog, 0) + id := int64(1) + if len(q.provisionerJobLogs) > 0 { + id = q.provisionerJobLogs[len(q.provisionerJobLogs)-1].ID + } for index, output := range arg.Output { + id++ logs = append(logs, database.ProvisionerJobLog{ + ID: id, JobID: arg.JobID, - ID: arg.ID[index], CreatedAt: arg.CreatedAt[index], Source: arg.Source[index], Level: arg.Level[index], diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 8eed05162e176..2267fdc108232 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -272,15 +272,24 @@ CREATE TABLE provisioner_daemons ( ); CREATE TABLE provisioner_job_logs ( - id uuid NOT NULL, job_id uuid NOT NULL, created_at timestamp with time zone NOT NULL, source log_source NOT NULL, level log_level NOT NULL, stage character varying(128) NOT NULL, - output character varying(1024) NOT NULL + output character varying(1024) NOT NULL, + id bigint NOT NULL ); +CREATE SEQUENCE provisioner_job_logs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE provisioner_job_logs_id_seq OWNED BY provisioner_job_logs.id; + CREATE TABLE provisioner_jobs ( id uuid NOT NULL, created_at timestamp with time zone NOT NULL, @@ -463,6 +472,8 @@ CREATE TABLE workspaces ( ALTER TABLE ONLY licenses ALTER COLUMN id SET DEFAULT nextval('licenses_id_seq'::regclass); +ALTER TABLE ONLY provisioner_job_logs ALTER COLUMN id SET DEFAULT nextval('provisioner_job_logs_id_seq'::regclass); + ALTER TABLE ONLY agent_stats ADD CONSTRAINT agent_stats_pkey PRIMARY KEY (id); diff --git a/coderd/database/migrations/000071_provisioner_log_lines.down.sql b/coderd/database/migrations/000071_provisioner_log_lines.down.sql new file mode 100644 index 0000000000000..d851ab01360da --- /dev/null +++ b/coderd/database/migrations/000071_provisioner_log_lines.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE provisioner_job_logs DROP COLUMN id; + +ALTER TABLE provisioner_job_logs ADD COLUMN id uuid NOT NULL DEFAULT gen_random_uuid(); diff --git a/coderd/database/migrations/000071_provisioner_log_lines.up.sql b/coderd/database/migrations/000071_provisioner_log_lines.up.sql new file mode 100644 index 0000000000000..cb6678890d7e9 --- /dev/null +++ b/coderd/database/migrations/000071_provisioner_log_lines.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE provisioner_job_logs DROP COLUMN id; + +ALTER TABLE provisioner_job_logs ADD COLUMN id BIGSERIAL PRIMARY KEY; diff --git a/coderd/database/models.go b/coderd/database/models.go index 4f637ce5b1cec..bdebbf1a849cb 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -545,13 +545,13 @@ type ProvisionerJob struct { } type ProvisionerJobLog struct { - ID uuid.UUID `db:"id" json:"id"` JobID uuid.UUID `db:"job_id" json:"job_id"` CreatedAt time.Time `db:"created_at" json:"created_at"` Source LogSource `db:"source" json:"source"` Level LogLevel `db:"level" json:"level"` Stage string `db:"stage" json:"stage"` Output string `db:"output" json:"output"` + ID int64 `db:"id" json:"id"` } type Replica struct { diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 63732c6fc9ff6..6b1ea95c119de 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -2345,23 +2345,21 @@ func (q *sqlQuerier) UpdateProvisionerDaemonByID(ctx context.Context, arg Update const getProvisionerLogsByIDBetween = `-- name: GetProvisionerLogsByIDBetween :many SELECT - id, job_id, created_at, source, level, stage, output + job_id, created_at, source, level, stage, output, id FROM provisioner_job_logs WHERE job_id = $1 AND ( - created_at >= $2 - OR created_at <= $3 - ) -ORDER BY - created_at DESC + id > $2 + OR id < $3 + ) ORDER BY id ` type GetProvisionerLogsByIDBetweenParams struct { JobID uuid.UUID `db:"job_id" json:"job_id"` - CreatedAfter time.Time `db:"created_after" json:"created_after"` - CreatedBefore time.Time `db:"created_before" json:"created_before"` + CreatedAfter int64 `db:"created_after" json:"created_after"` + CreatedBefore int64 `db:"created_before" json:"created_before"` } func (q *sqlQuerier) GetProvisionerLogsByIDBetween(ctx context.Context, arg GetProvisionerLogsByIDBetweenParams) ([]ProvisionerJobLog, error) { @@ -2374,13 +2372,13 @@ func (q *sqlQuerier) GetProvisionerLogsByIDBetween(ctx context.Context, arg GetP for rows.Next() { var i ProvisionerJobLog if err := rows.Scan( - &i.ID, &i.JobID, &i.CreatedAt, &i.Source, &i.Level, &i.Stage, &i.Output, + &i.ID, ); err != nil { return nil, err } @@ -2399,17 +2397,15 @@ const insertProvisionerJobLogs = `-- name: InsertProvisionerJobLogs :many INSERT INTO provisioner_job_logs SELECT - unnest($1 :: uuid [ ]) AS id, - $2 :: uuid AS job_id, - unnest($3 :: timestamptz [ ]) AS created_at, - unnest($4 :: log_source [ ]) AS source, - unnest($5 :: log_level [ ]) AS LEVEL, - unnest($6 :: VARCHAR(128) [ ]) AS stage, - unnest($7 :: VARCHAR(1024) [ ]) AS output RETURNING id, job_id, created_at, source, level, stage, output + $1 :: uuid AS job_id, + unnest($2 :: timestamptz [ ]) AS created_at, + unnest($3 :: log_source [ ]) AS source, + unnest($4 :: log_level [ ]) AS LEVEL, + unnest($5 :: VARCHAR(128) [ ]) AS stage, + unnest($6 :: VARCHAR(1024) [ ]) AS output RETURNING job_id, created_at, source, level, stage, output, id ` type InsertProvisionerJobLogsParams struct { - ID []uuid.UUID `db:"id" json:"id"` JobID uuid.UUID `db:"job_id" json:"job_id"` CreatedAt []time.Time `db:"created_at" json:"created_at"` Source []LogSource `db:"source" json:"source"` @@ -2420,7 +2416,6 @@ type InsertProvisionerJobLogsParams struct { func (q *sqlQuerier) InsertProvisionerJobLogs(ctx context.Context, arg InsertProvisionerJobLogsParams) ([]ProvisionerJobLog, error) { rows, err := q.db.QueryContext(ctx, insertProvisionerJobLogs, - pq.Array(arg.ID), arg.JobID, pq.Array(arg.CreatedAt), pq.Array(arg.Source), @@ -2436,13 +2431,13 @@ func (q *sqlQuerier) InsertProvisionerJobLogs(ctx context.Context, arg InsertPro for rows.Next() { var i ProvisionerJobLog if err := rows.Scan( - &i.ID, &i.JobID, &i.CreatedAt, &i.Source, &i.Level, &i.Stage, &i.Output, + &i.ID, ); err != nil { return nil, err } diff --git a/coderd/database/queries/provisionerjoblogs.sql b/coderd/database/queries/provisionerjoblogs.sql index d9fb35af477c8..0034b8652dc4a 100644 --- a/coderd/database/queries/provisionerjoblogs.sql +++ b/coderd/database/queries/provisionerjoblogs.sql @@ -6,17 +6,14 @@ FROM WHERE job_id = @job_id AND ( - created_at >= @created_after - OR created_at <= @created_before - ) -ORDER BY - created_at DESC; + id > @created_after + OR id < @created_before + ) ORDER BY id; -- name: InsertProvisionerJobLogs :many INSERT INTO provisioner_job_logs SELECT - unnest(@id :: uuid [ ]) AS id, @job_id :: uuid AS job_id, unnest(@created_at :: timestamptz [ ]) AS created_at, unnest(@source :: log_source [ ]) AS source, diff --git a/coderd/provisionerdaemons.go b/coderd/provisionerdaemons.go index 87a9b9a98320a..8fd27c9a00b38 100644 --- a/coderd/provisionerdaemons.go +++ b/coderd/provisionerdaemons.go @@ -368,7 +368,6 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto. if err != nil { return nil, xerrors.Errorf("convert log source: %w", err) } - insertParams.ID = append(insertParams.ID, uuid.New()) insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt)) insertParams.Level = append(insertParams.Level, logLevel) insertParams.Stage = append(insertParams.Stage, log.Stage) @@ -384,10 +383,15 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto. server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err)) return nil, xerrors.Errorf("insert job logs: %w", err) } + // Publish by the lowest log ID inserted so the + // log stream will fetch everything from that point. + lowestID := logs[0].ID server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID)) - data, err := json.Marshal(provisionerJobLogsMessage{Logs: logs}) + data, err := json.Marshal(provisionerJobLogsMessage{ + CreatedAfter: lowestID, + }) if err != nil { - return nil, xerrors.Errorf("marshal job log: %w", err) + return nil, xerrors.Errorf("marshal: %w", err) } err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data) if err != nil { diff --git a/coderd/provisionerjobs.go b/coderd/provisionerjobs.go index 77a6589c71ac4..dda3c2607fffa 100644 --- a/coderd/provisionerjobs.go +++ b/coderd/provisionerjobs.go @@ -24,8 +24,8 @@ import ( // Returns provisioner logs based on query parameters. // The intended usage for a client to stream all logs (with JS API): // const timestamp = new Date().getTime(); -// 1. GET /logs?before= -// 2. GET /logs?after=&follow +// 1. GET /logs?before= +// 2. GET /logs?after=&follow // The combination of these responses should provide all current logs // to the consumer, and future logs are streamed in the follow request. func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) { @@ -74,10 +74,11 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job } } - var after time.Time + var after int64 // Only fetch logs created after the time provided. if afterRaw != "" { - afterMS, err := strconv.ParseInt(afterRaw, 10, 64) + var err error + after, err = strconv.ParseInt(afterRaw, 10, 64) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Query param \"after\" must be an integer.", @@ -87,16 +88,12 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job }) return } - after = time.UnixMilli(afterMS) - } else { - if follow { - after = database.Now() - } } - var before time.Time + var before int64 // Only fetch logs created before the time provided. if beforeRaw != "" { - beforeMS, err := strconv.ParseInt(beforeRaw, 10, 64) + var err error + before, err = strconv.ParseInt(beforeRaw, 10, 64) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Query param \"before\" must be an integer.", @@ -106,12 +103,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job }) return } - before = time.UnixMilli(beforeMS) - } else { - // If we're following, we don't want logs before a timestamp! - if !follow { - before = database.Now() - } } logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{ @@ -156,7 +147,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText) defer wsNetConn.Close() // Also closes conn. - logIdsDone := make(map[uuid.UUID]bool) + logIdsDone := make(map[int64]bool) // The Go stdlib JSON encoder appends a newline character after message write. encoder := json.NewEncoder(wsNetConn) @@ -370,8 +361,8 @@ func provisionerJobLogsChannel(jobID uuid.UUID) string { // provisionerJobLogsMessage is the message type published on the provisionerJobLogsChannel() channel type provisionerJobLogsMessage struct { - EndOfLogs bool `json:"end_of_logs,omitempty"` - Logs []database.ProvisionerJobLog `json:"logs,omitempty"` + CreatedAfter int64 `json:"created_after"` + EndOfLogs bool `json:"end_of_logs,omitempty"` } func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) { @@ -389,23 +380,32 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, return default: } - jlMsg := provisionerJobLogsMessage{} err := json.Unmarshal(message, &jlMsg) if err != nil { logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err)) return } + if jlMsg.CreatedAfter != 0 { + logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{ + JobID: jobID, + CreatedAfter: jlMsg.CreatedAfter, + }) + if err != nil { + logger.Warn(ctx, "get provisioner logs", slog.Error(err)) + return + } - for _, log := range jlMsg.Logs { - select { - case bufferedLogs <- log: - logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage)) - default: - // If this overflows users could miss logs streaming. This can happen - // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub, - // so just drop them. - logger.Warn(ctx, "provisioner job log overflowing channel") + for _, log := range logs { + select { + case bufferedLogs <- log: + logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage)) + default: + // If this overflows users could miss logs streaming. This can happen + // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub, + // so just drop them. + logger.Warn(ctx, "provisioner job log overflowing channel") + } } } if jlMsg.EndOfLogs { diff --git a/coderd/provisionerjobs_internal_test.go b/coderd/provisionerjobs_internal_test.go index 6ac466a57644a..54512e5da5cdc 100644 --- a/coderd/provisionerjobs_internal_test.go +++ b/coderd/provisionerjobs_internal_test.go @@ -1,160 +1,15 @@ package coderd import ( - "context" - "crypto/sha256" "database/sql" - "encoding/json" - "net/http/httptest" - "net/url" - "sync" "testing" - "time" - "github.com/google/uuid" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "cdr.dev/slog" - "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/coderd/database" - "github.com/coder/coder/coderd/database/databasefake" - "github.com/coder/coder/coderd/rbac" "github.com/coder/coder/codersdk" - "github.com/coder/coder/testutil" ) -func TestProvisionerJobLogs_Unit(t *testing.T) { - t.Parallel() - - t.Run("QueryPubSubDupes", func(t *testing.T) { - t.Parallel() - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - // mDB := mocks.NewStore(t) - fDB := databasefake.New() - fPubsub := &fakePubSub{t: t, cond: sync.NewCond(&sync.Mutex{})} - opts := Options{ - Logger: logger, - Database: fDB, - Pubsub: fPubsub, - } - api := New(&opts) - defer api.Close() - - server := httptest.NewServer(api.RootHandler) - defer server.Close() - userID := uuid.New() - keyID, keySecret, err := generateAPIKeyIDSecret() - require.NoError(t, err) - hashed := sha256.Sum256([]byte(keySecret)) - - u, err := url.Parse(server.URL) - require.NoError(t, err) - client := codersdk.Client{ - HTTPClient: server.Client(), - SessionToken: keyID + "-" + keySecret, - URL: u, - } - - buildID := uuid.New() - workspaceID := uuid.New() - jobID := uuid.New() - - expectedLogs := []database.ProvisionerJobLog{ - {ID: uuid.New(), JobID: jobID, Stage: "Stage0"}, - {ID: uuid.New(), JobID: jobID, Stage: "Stage1"}, - {ID: uuid.New(), JobID: jobID, Stage: "Stage2"}, - {ID: uuid.New(), JobID: jobID, Stage: "Stage3"}, - } - - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) - defer cancel() - - // wow there are a lot of DB rows we touch... - _, err = fDB.InsertAPIKey(ctx, database.InsertAPIKeyParams{ - ID: keyID, - HashedSecret: hashed[:], - UserID: userID, - ExpiresAt: time.Now().Add(5 * time.Hour), - LoginType: database.LoginTypePassword, - Scope: database.APIKeyScopeAll, - }) - require.NoError(t, err) - _, err = fDB.InsertUser(ctx, database.InsertUserParams{ - ID: userID, - RBACRoles: []string{rbac.RoleOwner()}, - }) - require.NoError(t, err) - _, err = fDB.InsertWorkspaceBuild(ctx, database.InsertWorkspaceBuildParams{ - ID: buildID, - WorkspaceID: workspaceID, - JobID: jobID, - }) - require.NoError(t, err) - _, err = fDB.InsertWorkspace(ctx, database.InsertWorkspaceParams{ - ID: workspaceID, - }) - require.NoError(t, err) - _, err = fDB.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{ - ID: jobID, - }) - require.NoError(t, err) - for _, l := range expectedLogs[:2] { - _, err := fDB.InsertProvisionerJobLogs(ctx, database.InsertProvisionerJobLogsParams{ - ID: []uuid.UUID{l.ID}, - JobID: jobID, - Stage: []string{l.Stage}, - }) - require.NoError(t, err) - } - - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, time.Now()) - require.NoError(t, err) - defer closer.Close() - - // when the endpoint calls subscribe, we get the listener here. - fPubsub.cond.L.Lock() - for fPubsub.listener == nil { - fPubsub.cond.Wait() - } - - // endpoint should now be listening - assert.False(t, fPubsub.canceled) - assert.False(t, fPubsub.closed) - - // send all the logs in two batches, duplicating what we already returned on the DB query. - msg := provisionerJobLogsMessage{} - msg.Logs = expectedLogs[:2] - data, err := json.Marshal(msg) - require.NoError(t, err) - fPubsub.listener(ctx, data) - msg.Logs = expectedLogs[2:] - data, err = json.Marshal(msg) - require.NoError(t, err) - fPubsub.listener(ctx, data) - - // send end of logs - msg.Logs = nil - msg.EndOfLogs = true - data, err = json.Marshal(msg) - require.NoError(t, err) - fPubsub.listener(ctx, data) - - var stages []string - for l := range logs { - logger.Info(ctx, "got log", - slog.F("id", l.ID), - slog.F("stage", l.Stage)) - stages = append(stages, l.Stage) - } - assert.Equal(t, []string{"Stage0", "Stage1", "Stage2", "Stage3"}, stages) - for !fPubsub.canceled { - fPubsub.cond.Wait() - } - assert.False(t, fPubsub.closed) - }) -} - func TestConvertProvisionerJob_Unit(t *testing.T) { t.Parallel() validNullTimeMock := sql.NullTime{ @@ -260,39 +115,3 @@ func TestConvertProvisionerJob_Unit(t *testing.T) { }) } } - -type fakePubSub struct { - t *testing.T - cond *sync.Cond - listener database.Listener - canceled bool - closed bool -} - -func (f *fakePubSub) Subscribe(_ string, listener database.Listener) (cancel func(), err error) { - f.cond.L.Lock() - defer f.cond.L.Unlock() - f.listener = listener - f.cond.Signal() - return f.cancel, nil -} - -func (f *fakePubSub) Publish(_ string, _ []byte) error { - f.t.Fail() - return nil -} - -func (f *fakePubSub) Close() error { - f.cond.L.Lock() - defer f.cond.L.Unlock() - f.closed = true - f.cond.Signal() - return nil -} - -func (f *fakePubSub) cancel() { - f.cond.L.Lock() - defer f.cond.L.Unlock() - f.canceled = true - f.cond.Signal() -} diff --git a/coderd/provisionerjobs_test.go b/coderd/provisionerjobs_test.go index f23c3672e3a13..d55374fef4c46 100644 --- a/coderd/provisionerjobs_test.go +++ b/coderd/provisionerjobs_test.go @@ -3,12 +3,10 @@ package coderd_test import ( "context" "testing" - "time" "github.com/stretchr/testify/require" "github.com/coder/coder/coderd/coderdtest" - "github.com/coder/coder/coderd/database" "github.com/coder/coder/provisioner/echo" "github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/testutil" @@ -38,13 +36,12 @@ func TestProvisionerJobLogs(t *testing.T) { template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - before := time.Now().UTC() coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) defer closer.Close() for { @@ -78,12 +75,11 @@ func TestProvisionerJobLogs(t *testing.T) { template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - before := database.Now() ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) defer closer.Close() for { @@ -121,7 +117,7 @@ func TestProvisionerJobLogs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, err := client.WorkspaceBuildLogsBefore(ctx, workspace.LatestBuild.ID, time.Now()) + logs, err := client.WorkspaceBuildLogsBefore(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) require.Greater(t, len(logs), 1) }) diff --git a/coderd/templateversions_test.go b/coderd/templateversions_test.go index 1bcb5ba8a0aaa..02ed6f1b9f30e 100644 --- a/coderd/templateversions_test.go +++ b/coderd/templateversions_test.go @@ -4,7 +4,6 @@ import ( "context" "net/http" "testing" - "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -430,7 +429,6 @@ func TestTemplateVersionLogs(t *testing.T) { t.Parallel() client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) user := coderdtest.CreateFirstUser(t, client) - before := time.Now() version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ Parse: echo.ParseComplete, ProvisionDryRun: echo.ProvisionComplete, @@ -465,7 +463,7 @@ func TestTemplateVersionLogs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, before) + logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0) require.NoError(t, err) defer closer.Close() for { @@ -625,7 +623,6 @@ func TestTemplateVersionDryRun(t *testing.T) { defer cancel() // Create template version dry-run - after := time.Now() job, err := client.CreateTemplateVersionDryRun(ctx, version.ID, codersdk.CreateTemplateVersionDryRunRequest{ ParameterValues: []codersdk.CreateParameterRequest{}, }) @@ -637,7 +634,7 @@ func TestTemplateVersionDryRun(t *testing.T) { require.Equal(t, job.ID, newJob.ID) // Stream logs - logs, closer, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, after) + logs, closer, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, 0) require.NoError(t, err) defer closer.Close() diff --git a/coderd/workspacebuilds_test.go b/coderd/workspacebuilds_test.go index 3d152932bf866..10245ce490bfc 100644 --- a/coderd/workspacebuilds_test.go +++ b/coderd/workspacebuilds_test.go @@ -452,7 +452,6 @@ func TestWorkspaceBuildLogs(t *testing.T) { t.Parallel() client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) user := coderdtest.CreateFirstUser(t, client) - before := time.Now() version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ Parse: echo.ParseComplete, Provision: []*proto.Provision_Response{{ @@ -487,7 +486,7 @@ func TestWorkspaceBuildLogs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before.Add(-time.Hour)) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) defer closer.Close() for { diff --git a/codersdk/provisionerdaemons.go b/codersdk/provisionerdaemons.go index adce0321be691..fb88e42aecb8d 100644 --- a/codersdk/provisionerdaemons.go +++ b/codersdk/provisionerdaemons.go @@ -76,7 +76,7 @@ type ProvisionerJob struct { } type ProvisionerJobLog struct { - ID uuid.UUID `json:"id"` + ID int64 `json:"id"` CreatedAt time.Time `json:"created_at"` Source LogSource `json:"log_source"` Level LogLevel `json:"log_level"` @@ -87,10 +87,10 @@ type ProvisionerJobLog struct { // provisionerJobLogsBefore provides log output that occurred before a time. // This is abstracted from a specific job type to provide consistency between // APIs. Logs is the only shared route between jobs. -func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, before time.Time) ([]ProvisionerJobLog, error) { +func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, before int64) ([]ProvisionerJobLog, error) { values := url.Values{} - if !before.IsZero() { - values["before"] = []string{strconv.FormatInt(before.UTC().UnixMilli(), 10)} + if before != 0 { + values["before"] = []string{strconv.FormatInt(before, 10)} } res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?%s", path, values.Encode()), nil) if err != nil { @@ -106,10 +106,10 @@ func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, befo } // provisionerJobLogsAfter streams logs that occurred after a specific time. -func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { afterQuery := "" - if !after.IsZero() { - afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli()) + if after != 0 { + afterQuery = fmt.Sprintf("&after=%d", after) } followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery)) if err != nil { diff --git a/codersdk/templateversions.go b/codersdk/templateversions.go index 3aee18ec80a18..3273b31b28f86 100644 --- a/codersdk/templateversions.go +++ b/codersdk/templateversions.go @@ -93,13 +93,13 @@ func (c *Client) TemplateVersionResources(ctx context.Context, version uuid.UUID return resources, json.NewDecoder(res.Body).Decode(&resources) } -// TemplateVersionLogsBefore returns logs that occurred before a specific time. -func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) { +// TemplateVersionLogsBefore returns logs that occurred before a specific log ID. +func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUID, before int64) ([]ProvisionerJobLog, error) { return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), before) } -// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific time. -func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific log ID. +func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), after) } @@ -159,14 +159,14 @@ func (c *Client) TemplateVersionDryRunResources(ctx context.Context, version, jo } // TemplateVersionDryRunLogsBefore returns logs for a template version dry-run -// that occurred before a specific time. -func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, job uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) { +// that occurred before a specific log ID. +func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, job uuid.UUID, before int64) ([]ProvisionerJobLog, error) { return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), before) } // TemplateVersionDryRunLogsAfter streams logs for a template version dry-run -// that occurred after a specific time. -func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +// that occurred after a specific log ID. +func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), after) } diff --git a/codersdk/workspacebuilds.go b/codersdk/workspacebuilds.go index b9241d09ecafc..19ad6acecf6f5 100644 --- a/codersdk/workspacebuilds.go +++ b/codersdk/workspacebuilds.go @@ -117,13 +117,13 @@ func (c *Client) CancelWorkspaceBuild(ctx context.Context, id uuid.UUID) error { return nil } -// WorkspaceBuildLogsBefore returns logs that occurred before a specific time. -func (c *Client) WorkspaceBuildLogsBefore(ctx context.Context, build uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) { +// WorkspaceBuildLogsBefore returns logs that occurred before a specific log ID. +func (c *Client) WorkspaceBuildLogsBefore(ctx context.Context, build uuid.UUID, before int64) ([]ProvisionerJobLog, error) { return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/workspacebuilds/%s/logs", build), before) } -// WorkspaceBuildLogsAfter streams logs for a workspace build that occurred after a specific time. -func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +// WorkspaceBuildLogsAfter streams logs for a workspace build that occurred after a specific log ID. +func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/workspacebuilds/%s/logs", build), after) } diff --git a/loadtest/workspacebuild/run.go b/loadtest/workspacebuild/run.go index d5ffd37596e1e..473a3fa9eb41f 100644 --- a/loadtest/workspacebuild/run.go +++ b/loadtest/workspacebuild/run.go @@ -41,14 +41,13 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { req.Name = "test-" + randName } - after := time.Now() workspace, err := r.client.CreateWorkspace(ctx, r.cfg.OrganizationID, r.cfg.UserID, req) if err != nil { return xerrors.Errorf("create workspace: %w", err) } r.workspaceID = workspace.ID - err = waitForBuild(ctx, logs, r.client, workspace.LatestBuild.ID, after) + err = waitForBuild(ctx, logs, r.client, workspace.LatestBuild.ID) if err != nil { return xerrors.Errorf("wait for build: %w", err) } @@ -68,7 +67,6 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error { return nil } - after := time.Now() build, err := r.client.CreateWorkspaceBuild(ctx, r.workspaceID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionDelete, }) @@ -78,7 +76,7 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error { // TODO: capture these logs logs := io.Discard - err = waitForBuild(ctx, logs, r.client, build.ID, after) + err = waitForBuild(ctx, logs, r.client, build.ID) if err != nil { return xerrors.Errorf("wait for build: %w", err) } @@ -86,7 +84,7 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error { return nil } -func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, buildID uuid.UUID, after time.Time) error { +func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, buildID uuid.UUID) error { _, _ = fmt.Fprint(w, "Build is currently queued...") // Wait for build to start. @@ -106,7 +104,7 @@ func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, bui _, _ = fmt.Fprintln(w, "\nBuild started! Streaming logs below:") - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, after) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, 0) if err != nil { return xerrors.Errorf("start streaming build logs: %w", err) } diff --git a/provisionerd/provisionerd.go b/provisionerd/provisionerd.go index fa71ed3057a03..feb0637c62f3e 100644 --- a/provisionerd/provisionerd.go +++ b/provisionerd/provisionerd.go @@ -50,6 +50,7 @@ type Options struct { ForceCancelInterval time.Duration UpdateInterval time.Duration + LogDebounceInterval time.Duration PollInterval time.Duration Provisioners Provisioners WorkDirectory string @@ -66,6 +67,9 @@ func New(clientDialer Dialer, opts *Options) *Server { if opts.ForceCancelInterval == 0 { opts.ForceCancelInterval = time.Minute } + if opts.LogDebounceInterval == 0 { + opts.LogDebounceInterval = 50 * time.Millisecond + } if opts.Filesystem == nil { opts.Filesystem = afero.NewOsFs() } @@ -315,7 +319,7 @@ func (p *Server) acquireJob(ctx context.Context) { return } - p.activeJob = runner.NewRunner( + p.activeJob = runner.New( ctx, job, p, @@ -325,6 +329,7 @@ func (p *Server) acquireJob(ctx context.Context) { provisioner, p.opts.UpdateInterval, p.opts.ForceCancelInterval, + p.opts.LogDebounceInterval, p.tracer, p.opts.Metrics.Runner, ) diff --git a/provisionerd/provisionerd_test.go b/provisionerd/provisionerd_test.go index 05becb863ebd7..a65ae60c1150a 100644 --- a/provisionerd/provisionerd_test.go +++ b/provisionerd/provisionerd_test.go @@ -558,11 +558,17 @@ func TestProvisionerd(t *testing.T) { }, nil }, updateJob: func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { - if len(update.Logs) > 0 && update.Logs[0].Source == proto.LogSource_PROVISIONER { - // Close on a log so we know when the job is in progress! - updated.Do(func() { - close(updateChan) - }) + if len(update.Logs) > 0 { + for _, log := range update.Logs { + if log.Source != proto.LogSource_PROVISIONER { + continue + } + // Close on a log so we know when the job is in progress! + updated.Do(func() { + close(updateChan) + }) + break + } } return &proto.UpdateJobResponse{}, nil }, @@ -634,11 +640,17 @@ func TestProvisionerd(t *testing.T) { }, updateJob: func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { resp := &proto.UpdateJobResponse{} - if len(update.Logs) > 0 && update.Logs[0].Source == proto.LogSource_PROVISIONER { - // Close on a log so we know when the job is in progress! - updated.Do(func() { - close(updateChan) - }) + if len(update.Logs) > 0 { + for _, log := range update.Logs { + if log.Source != proto.LogSource_PROVISIONER { + continue + } + // Close on a log so we know when the job is in progress! + updated.Do(func() { + close(updateChan) + }) + break + } } // start returning Canceled once we've gotten at least one log. select { diff --git a/provisionerd/runner/runner.go b/provisionerd/runner/runner.go index 13aafdbb1d373..dc4fd63954cd3 100644 --- a/provisionerd/runner/runner.go +++ b/provisionerd/runner/runner.go @@ -33,6 +33,10 @@ const ( MissingParameterErrorText = "missing parameter" ) +var ( + errUpdateSkipped = xerrors.New("update skipped; job complete or failed") +) + type Runner struct { tracer trace.Tracer metrics Metrics @@ -44,6 +48,7 @@ type Runner struct { provisioner sdkproto.DRPCProvisionerClient updateInterval time.Duration forceCancelInterval time.Duration + logBufferInterval time.Duration // closed when the Runner is finished sending any updates/failed/complete. done chan struct{} @@ -57,9 +62,11 @@ type Runner struct { // mutex controls access to all the following variables. mutex *sync.Mutex // used to wait for the failedJob or completedJob to be populated - cond *sync.Cond - failedJob *proto.FailedJob - completedJob *proto.CompletedJob + cond *sync.Cond + flushLogsTimer *time.Timer + queuedLogs []*proto.Log + failedJob *proto.FailedJob + completedJob *proto.CompletedJob // setting this false signals that no more messages about this job should be sent. Usually this // means that a terminal message like FailedJob or CompletedJob has been sent, even in the case // of a Cancel(). However, when someone calls Fail() or ForceStop(), we might not send the @@ -79,7 +86,7 @@ type JobUpdater interface { CompleteJob(ctx context.Context, in *proto.CompletedJob) error } -func NewRunner( +func New( ctx context.Context, job *proto.AcquiredJob, updater JobUpdater, @@ -89,6 +96,7 @@ func NewRunner( provisioner sdkproto.DRPCProvisionerClient, updateInterval time.Duration, forceCancelInterval time.Duration, + logDebounceInterval time.Duration, tracer trace.Tracer, metrics Metrics, ) *Runner { @@ -109,6 +117,8 @@ func NewRunner( provisioner: provisioner, updateInterval: updateInterval, forceCancelInterval: forceCancelInterval, + logBufferInterval: logDebounceInterval, + queuedLogs: make([]*proto.Log, 0), mutex: m, cond: sync.NewCond(m), done: make(chan struct{}), @@ -262,7 +272,7 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto. r.mutex.Lock() defer r.mutex.Unlock() if !r.okToSend { - return nil, xerrors.New("update skipped; job complete or failed") + return nil, errUpdateSkipped } return r.sender.UpdateJob(ctx, u) } @@ -291,19 +301,12 @@ func (r *Runner) doCleanFinish(ctx context.Context) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Cleaning Up", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Cleaning Up", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - r.logger.Warn(ctx, "failed to log cleanup") - return - } // Cleanup the work directory after execution. for attempt := 0; attempt < 5; attempt++ { @@ -320,6 +323,8 @@ func (r *Runner) doCleanFinish(ctx context.Context) { r.logger.Debug(ctx, "cleaned up work directory") break } + + r.flushQueuedLogs(ctx) }() completedJob, failedJob = r.do(ctx) @@ -335,14 +340,11 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) return nil, r.failedJobf("create work directory %q: %s", r.workDirectory, err) } - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Setting up", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Setting up", + CreatedAt: time.Now().UnixMilli(), }) if err != nil { return nil, r.failedJobf("write log: %s", err) @@ -402,7 +404,6 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) ) } } - switch jobType := r.job.Type.(type) { case *proto.AcquiredJob_TemplateImport_: r.logger.Debug(context.Background(), "acquired job is template import") @@ -489,19 +490,12 @@ func (r *Runner) runReadmeParse(ctx context.Context) *proto.FailedJob { fi, err := afero.ReadFile(r.filesystem, path.Join(r.workDirectory, ReadmeFile)) if err != nil { - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_DEBUG, - Stage: "No README.md provided", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_DEBUG, + Stage: "No README.md provided", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return r.failedJobf("write log: %s", err) - } - return nil } @@ -526,18 +520,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p defer span.End() // Parse parameters and update the job with the parameter specs - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Parsing template parameters", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Parsing template parameters", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } parameterSchemas, err := r.runTemplateImportParse(ctx) if err != nil { return nil, r.failedJobf("run parse: %s", err) @@ -562,18 +550,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p } // Determine persistent resources - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Detecting persistent resources", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Detecting persistent resources", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } startResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{ CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl, WorkspaceTransition: sdkproto.WorkspaceTransition_START, @@ -583,18 +565,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p } // Determine ephemeral resources. - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Detecting ephemeral resources", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Detecting ephemeral resources", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } stopResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{ CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl, WorkspaceTransition: sdkproto.WorkspaceTransition_STOP, @@ -638,19 +614,13 @@ func (r *Runner) runTemplateImportParse(ctx context.Context) ([]*sdkproto.Parame slog.F("output", msgType.Log.Output), ) - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER, - Level: msgType.Log.Level, - CreatedAt: time.Now().UnixMilli(), - Output: msgType.Log.Output, - Stage: "Parse parameters", - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER, + Level: msgType.Log.Level, + CreatedAt: time.Now().UnixMilli(), + Output: msgType.Log.Output, + Stage: "Parse parameters", }) - if err != nil { - return nil, xerrors.Errorf("update job: %w", err) - } case *sdkproto.Parse_Response_Complete: r.logger.Info(context.Background(), "parse complete", slog.F("parameter_schemas", msgType.Complete.ParameterSchemas)) @@ -721,19 +691,13 @@ func (r *Runner) runTemplateImportProvision(ctx context.Context, values []*sdkpr slog.F("level", msgType.Log.Level), slog.F("output", msgType.Log.Output), ) - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER, - Level: msgType.Log.Level, - CreatedAt: time.Now().UnixMilli(), - Output: msgType.Log.Output, - Stage: stage, - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER, + Level: msgType.Log.Level, + CreatedAt: time.Now().UnixMilli(), + Output: msgType.Log.Output, + Stage: stage, }) - if err != nil { - return nil, xerrors.Errorf("send job update: %w", err) - } case *sdkproto.Provision_Response_Complete: if msgType.Complete.Error != "" { r.logger.Info(context.Background(), "dry-run provision failure", @@ -822,18 +786,12 @@ func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *p stage = "Destroying workspace" } - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: stage, - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: stage, + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } // use the notStopped so that if we attempt to gracefully cancel, the stream will still be available for us // to send the cancel to the provisioner @@ -881,19 +839,13 @@ func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *p slog.F("workspace_build_id", r.job.GetWorkspaceBuild().WorkspaceBuildId), ) - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER, - Level: msgType.Log.Level, - CreatedAt: time.Now().UnixMilli(), - Output: msgType.Log.Output, - Stage: stage, - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER, + Level: msgType.Log.Level, + CreatedAt: time.Now().UnixMilli(), + Output: msgType.Log.Output, + Stage: stage, }) - if err != nil { - return nil, r.failedJobf("send job update: %s", err) - } case *sdkproto.Provision_Response_Complete: if msgType.Complete.Error != "" { r.logger.Info(context.Background(), "provision failed; updating state", @@ -945,3 +897,41 @@ func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.Span semconv.ServiceNameKey.String("coderd.provisionerd"), ))...) } + +func (r *Runner) queueLog(ctx context.Context, log *proto.Log) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.queuedLogs = append(r.queuedLogs, log) + if r.flushLogsTimer != nil { + r.flushLogsTimer.Reset(r.logBufferInterval) + return + } + if len(r.queuedLogs) > 100 { + // Flushing logs requires a lock, so this can happen async. + go r.flushQueuedLogs(ctx) + return + } + r.flushLogsTimer = time.AfterFunc(r.logBufferInterval, func() { + r.flushQueuedLogs(ctx) + }) +} + +func (r *Runner) flushQueuedLogs(ctx context.Context) { + r.mutex.Lock() + if r.flushLogsTimer != nil { + r.flushLogsTimer.Stop() + } + logs := r.queuedLogs[:] + r.queuedLogs = make([]*proto.Log, 0) + r.mutex.Unlock() + _, err := r.update(ctx, &proto.UpdateJobRequest{ + JobId: r.job.JobId, + Logs: logs, + }) + if err != nil { + if errors.Is(err, errUpdateSkipped) { + return + } + r.logger.Error(ctx, "flush queued logs", slog.Error(err)) + } +} diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index 764aab5c6e761..26a8e79115a2b 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -538,7 +538,7 @@ export interface ProvisionerJob { // From codersdk/provisionerdaemons.go export interface ProvisionerJobLog { - readonly id: string + readonly id: number readonly created_at: string readonly log_source: LogSource readonly log_level: LogLevel diff --git a/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts b/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts index ec5d24463a3a6..e9fc6aed079c1 100644 --- a/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts +++ b/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts @@ -5,7 +5,7 @@ describe("groupLogsByStage", () => { it("should group them by stage", () => { const input: ProvisionerJobLog[] = [ { - id: "1", + id: 1, created_at: "oct 13", log_source: "provisioner", log_level: "debug", @@ -13,7 +13,7 @@ describe("groupLogsByStage", () => { output: "test", }, { - id: "2", + id: 2, created_at: "oct 13", log_source: "provisioner", log_level: "debug", @@ -21,7 +21,7 @@ describe("groupLogsByStage", () => { output: "test", }, { - id: "3", + id: 3, created_at: "oct 13", log_source: "provisioner", log_level: "debug", diff --git a/site/src/testHelpers/entities.ts b/site/src/testHelpers/entities.ts index 81cfe8f6a5041..8b93801dcd70b 100644 --- a/site/src/testHelpers/entities.ts +++ b/site/src/testHelpers/entities.ts @@ -543,7 +543,7 @@ export const MockGitSSHKey: TypesGen.GitSSHKey = { export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ { - id: "836f8ab6-5202-4711-afa5-293394ced011", + id: 1, created_at: "2022-05-19T16:45:31.005Z", log_source: "provisioner_daemon", log_level: "info", @@ -551,7 +551,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "2db0ae92-b310-4a6e-8b1f-23380b70ac7f", + id: 2, created_at: "2022-05-19T16:45:31.006Z", log_source: "provisioner_daemon", log_level: "info", @@ -559,7 +559,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "37a5b7b1-b3eb-47cf-b80b-bd16e2e08a3d", + id: 3, created_at: "2022-05-19T16:45:31.072Z", log_source: "provisioner", log_level: "debug", @@ -567,7 +567,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "5e4e37a1-c217-48bc-84f5-7f1c3efbd042", + id: 4, created_at: "2022-05-19T16:45:31.073Z", log_source: "provisioner", log_level: "debug", @@ -575,7 +575,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Initializing the backend...", }, { - id: "060ed132-5d12-4584-9005-5c9557febe2f", + id: 5, created_at: "2022-05-19T16:45:31.077Z", log_source: "provisioner", log_level: "debug", @@ -583,7 +583,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "b2e70a1c-1943-4616-8ac9-25326c9f7e7b", + id: 6, created_at: "2022-05-19T16:45:31.078Z", log_source: "provisioner", log_level: "debug", @@ -591,7 +591,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Initializing provider plugins...", }, { - id: "993107fe-6dfb-42ec-912a-b32f50e60d62", + id: 7, created_at: "2022-05-19T16:45:31.078Z", log_source: "provisioner", log_level: "debug", @@ -599,7 +599,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: '- Finding hashicorp/google versions matching "~\u003e 4.15"...', }, { - id: "2ad2e2a1-7a75-4827-8cb9-928acfc6fc07", + id: 8, created_at: "2022-05-19T16:45:31.123Z", log_source: "provisioner", log_level: "debug", @@ -607,7 +607,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: '- Finding coder/coder versions matching "0.3.4"...', }, { - id: "7c723a90-0190-4c2f-9d97-ede39ef3d55f", + id: 9, created_at: "2022-05-19T16:45:31.137Z", log_source: "provisioner", log_level: "debug", @@ -615,7 +615,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "- Using hashicorp/google v4.21.0 from the shared cache directory", }, { - id: "3910144b-411b-4a53-9900-88d406ed9bf4", + id: 10, created_at: "2022-05-19T16:45:31.344Z", log_source: "provisioner", log_level: "debug", @@ -623,7 +623,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "- Using coder/coder v0.3.4 from the shared cache directory", }, { - id: "e3a02ad4-edc0-442f-8b9a-39d01d56b43b", + id: 11, created_at: "2022-05-19T16:45:31.388Z", log_source: "provisioner", log_level: "debug", @@ -631,7 +631,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "440cceb3-aabf-4838-979b-1fd37fe2d8d8", + id: 12, created_at: "2022-05-19T16:45:31.388Z", log_source: "provisioner", log_level: "debug", @@ -640,7 +640,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "Terraform has created a lock file .terraform.lock.hcl to record the provider", }, { - id: "90e1f244-78ff-4d95-871e-b2bebcabc39a", + id: 13, created_at: "2022-05-19T16:45:31.389Z", log_source: "provisioner", log_level: "debug", @@ -649,7 +649,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "selections it made above. Include this file in your version control repository", }, { - id: "e4527d6c-2412-452b-a946-5870787caf6b", + id: 14, created_at: "2022-05-19T16:45:31.389Z", log_source: "provisioner", log_level: "debug", @@ -658,7 +658,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "so that Terraform can guarantee to make the same selections by default when", }, { - id: "02f96d19-d94b-4d0e-a1c4-313a0d2ff9e3", + id: 15, created_at: "2022-05-19T16:45:31.39Z", log_source: "provisioner", log_level: "debug", @@ -666,7 +666,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: 'you run "terraform init" in the future.', }, { - id: "667c03ca-1b24-4f36-a598-f0322cf3e2a1", + id: 16, created_at: "2022-05-19T16:45:31.39Z", log_source: "provisioner", log_level: "debug", @@ -674,7 +674,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "48039d6a-9b21-460f-9ca3-4b0e2becfd18", + id: 17, created_at: "2022-05-19T16:45:31.391Z", log_source: "provisioner", log_level: "debug", @@ -682,7 +682,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Terraform has been successfully initialized!", }, { - id: "6fe4b64f-3aa6-4850-96e9-6db8478a53be", + id: 18, created_at: "2022-05-19T16:45:31.42Z", log_source: "provisioner", log_level: "info", @@ -690,7 +690,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Terraform 1.1.9", }, { - id: "fa7b6321-7ecd-492d-a671-6366186fad08", + id: 19, created_at: "2022-05-19T16:45:33.537Z", log_source: "provisioner", log_level: "info", @@ -698,7 +698,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "coder_agent.dev: Plan to create", }, { - id: "e677e49f-c5ba-417c-8c9d-78bdad744ce1", + id: 20, created_at: "2022-05-19T16:45:33.537Z", log_source: "provisioner", log_level: "info", @@ -706,7 +706,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_disk.root: Plan to create", }, { - id: "4b0e6168-29e4-4419-bf81-b57e31087666", + id: 21, created_at: "2022-05-19T16:45:33.538Z", log_source: "provisioner", log_level: "info", @@ -714,7 +714,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_instance.dev[0]: Plan to create", }, { - id: "5902f89c-8acd-45e2-9bd6-de4d6fd8fc9c", + id: 22, created_at: "2022-05-19T16:45:33.539Z", log_source: "provisioner", log_level: "info", @@ -722,7 +722,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Plan: 3 to add, 0 to change, 0 to destroy.", }, { - id: "a8107907-7c53-4aae-bb48-9a5f9759c7d5", + id: 23, created_at: "2022-05-19T16:45:33.712Z", log_source: "provisioner", log_level: "info", @@ -730,7 +730,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "coder_agent.dev: Creating...", }, { - id: "aaf13503-2f1a-4f6c-aced-b8fc48304dc1", + id: 24, created_at: "2022-05-19T16:45:33.719Z", log_source: "provisioner", log_level: "info", @@ -739,7 +739,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "coder_agent.dev: Creation complete after 0s [id=d07f5bdc-4a8d-4919-9cdb-0ac6ba9e64d6]", }, { - id: "4ada8886-f5b3-4fee-a1a3-72064b50d5ae", + id: 25, created_at: "2022-05-19T16:45:34.139Z", log_source: "provisioner", log_level: "info", @@ -747,7 +747,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_disk.root: Creating...", }, { - id: "8ffc59e8-a4d0-4ffe-9bcc-cb84ca51cc22", + id: 26, created_at: "2022-05-19T16:45:44.14Z", log_source: "provisioner", log_level: "info", @@ -755,7 +755,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_disk.root: Still creating... [10s elapsed]", }, { - id: "063189fd-75ad-415a-ac77-8c34b9e202b2", + id: 27, created_at: "2022-05-19T16:45:47.106Z", log_source: "provisioner", log_level: "info", @@ -764,7 +764,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "google_compute_disk.root: Creation complete after 13s [id=projects/bruno-coder-v2/zones/europe-west4-b/disks/coder-developer-bruno-dev-123-root]", }, { - id: "6fd554a1-a7a2-439f-b8d8-369d6c1ead21", + id: 28, created_at: "2022-05-19T16:45:47.118Z", log_source: "provisioner", log_level: "info", @@ -772,7 +772,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_instance.dev[0]: Creating...", }, { - id: "87388f7e-ab01-44b1-b35e-8e06636164d3", + id: 29, created_at: "2022-05-19T16:45:57.122Z", log_source: "provisioner", log_level: "info", @@ -780,7 +780,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_instance.dev[0]: Still creating... [10s elapsed]", }, { - id: "baa40120-3f18-40d2-a35c-b11f421a1ce1", + id: 30, created_at: "2022-05-19T16:46:00.837Z", log_source: "provisioner", log_level: "info", @@ -789,7 +789,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "google_compute_instance.dev[0]: Creation complete after 14s [id=projects/bruno-coder-v2/zones/europe-west4-b/instances/coder-developer-bruno-dev-123]", }, { - id: "00e18953-fba6-4b43-97a3-ecf376553c08", + id: 31, created_at: "2022-05-19T16:46:00.846Z", log_source: "provisioner", log_level: "info", @@ -797,7 +797,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Apply complete! Resources: 3 added, 0 changed, 0 destroyed.", }, { - id: "431811da-b534-4d92-b6e5-44814548c812", + id: 32, created_at: "2022-05-19T16:46:00.847Z", log_source: "provisioner", log_level: "info", @@ -805,7 +805,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Outputs: 0", }, { - id: "70459334-4878-4bda-a546-98eee166c4c6", + id: 33, created_at: "2022-05-19T16:46:02.283Z", log_source: "provisioner_daemon", log_level: "info", diff --git a/site/src/xServices/workspaceBuild/workspaceBuildXService.ts b/site/src/xServices/workspaceBuild/workspaceBuildXService.ts index fdd171e0bfde4..f77789c3c7567 100644 --- a/site/src/xServices/workspaceBuild/workspaceBuildXService.ts +++ b/site/src/xServices/workspaceBuild/workspaceBuildXService.ts @@ -125,11 +125,14 @@ export const workspaceBuildMachine = createMachine( API.getWorkspaceBuildLogs(ctx.buildId, ctx.timeCursor), streamWorkspaceBuildLogs: (ctx) => async (callback) => { return new Promise((resolve, reject) => { + if (!ctx.logs) { + return reject("logs must be set") + } const proto = location.protocol === "https:" ? "wss:" : "ws:" const socket = new WebSocket( `${proto}//${location.host}/api/v2/workspacebuilds/${ ctx.buildId - }/logs?follow=true&after=${ctx.timeCursor.getTime()}`, + }/logs?follow=true&after=${ctx.logs[ctx.logs.length - 1].id}`, ) socket.binaryType = "blob" socket.addEventListener("message", (event) => {