Skip to content

Commit

Permalink
feat: Make workspace watching realtime instead of polling (#4922)
Browse files Browse the repository at this point in the history
* feat: Make workspace watching realtime instead of polling

This was leading to performance issues on the frontend, where
the page should only be rendered if changes occur. While this
could be changed on the frontend, it was always the intention
to make this socket ~realtime anyways.

* Fix workspace tests waiting, erroring on workspace update, and add comments to workspace events
  • Loading branch information
kylecarbs committed Nov 7, 2022
1 parent a5cc197 commit 56b963a
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 76 deletions.
2 changes: 1 addition & 1 deletion coderd/activitybump.go
Expand Up @@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas

newDeadline := database.Now().Add(bumpAmount)

if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: database.Now(),
ProvisionerState: build.ProvisionerState,
Expand Down
6 changes: 3 additions & 3 deletions coderd/database/databasefake/databasefake.go
Expand Up @@ -2825,7 +2825,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database.
return sql.ErrNoRows
}

func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error {
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) (database.WorkspaceBuild, error) {
q.mutex.Lock()
defer q.mutex.Unlock()

Expand All @@ -2837,9 +2837,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U
workspaceBuild.ProvisionerState = arg.ProvisionerState
workspaceBuild.Deadline = arg.Deadline
q.workspaceBuilds[index] = workspaceBuild
return nil
return workspaceBuild, nil
}
return sql.ErrNoRows
return database.WorkspaceBuild{}, sql.ErrNoRows
}

func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error {
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 20 additions & 5 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions coderd/database/queries/workspacebuilds.sql
Expand Up @@ -124,12 +124,12 @@ INSERT INTO
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;

-- name: UpdateWorkspaceBuildByID :exec
-- name: UpdateWorkspaceBuildByID :one
UPDATE
workspace_builds
SET
updated_at = $2,
provisioner_state = $3,
deadline = $4
WHERE
id = $1;
id = $1 RETURNING *;
14 changes: 10 additions & 4 deletions coderd/httpapi/httpapi.go
Expand Up @@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f
buf := &bytes.Buffer{}
enc := json.NewEncoder(buf)

_, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type))
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type))
if err != nil {
return err
}

err = enc.Encode(sse.Data)
if err != nil {
return err
if sse.Data != nil {
_, err = buf.WriteString("data: ")
if err != nil {
return err
}
err = enc.Encode(sse.Data)
if err != nil {
return err
}
}

err = buf.WriteByte('\n')
Expand Down
17 changes: 15 additions & 2 deletions coderd/provisionerdaemons.go
Expand Up @@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
if err != nil {
return nil, failJob(fmt.Sprintf("get owner: %s", err))
}
err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{})
if err != nil {
return nil, failJob(fmt.Sprintf("publish workspace update: %s", err))
}

// Compute parameters for the workspace to consume.
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
Expand Down Expand Up @@ -547,7 +551,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
}
err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: input.WorkspaceBuildID,
UpdatedAt: database.Now(),
ProvisionerState: jobType.WorkspaceBuild.State,
Expand All @@ -556,6 +560,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("update workspace build state: %w", err)
}
err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.FailedJob_TemplateImport_:
}

Expand Down Expand Up @@ -661,7 +669,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return xerrors.Errorf("update provisioner job: %w", err)
}
err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: workspaceBuild.ID,
Deadline: workspaceDeadline,
ProvisionerState: jobType.WorkspaceBuild.State,
Expand Down Expand Up @@ -696,6 +704,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return nil, xerrors.Errorf("complete job: %w", err)
}

err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.CompletedJob_TemplateDryRun_:
for _, resource := range jobType.TemplateDryRun.Resources {
server.Logger.Info(ctx, "inserting template dry-run job resource",
Expand Down
28 changes: 28 additions & 0 deletions coderd/workspaceagents.go
Expand Up @@ -539,13 +539,15 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
Valid: true,
}
_ = updateConnectionTimes()
_ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
}()

err = updateConnectionTimes()
if err != nil {
_ = conn.Close(websocket.StatusGoingAway, err.Error())
return
}
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)

// End span so we don't get long lived trace data.
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
Expand Down Expand Up @@ -972,6 +974,32 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request)
}
}

resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace resource.",
Detail: err.Error(),
})
return
}
job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace build.",
Detail: err.Error(),
})
return
}
workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
})
return
}
api.publishWorkspaceUpdate(r.Context(), workspace.ID)

httpapi.Write(r.Context(), rw, http.StatusOK, nil)
}

Expand Down
5 changes: 5 additions & 0 deletions coderd/workspacebuilds.go
Expand Up @@ -574,6 +574,8 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

httpapi.Write(ctx, rw, http.StatusCreated, apiBuild)
}

Expand Down Expand Up @@ -632,6 +634,9 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques
})
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{
Message: "Job has been marked as canceled...",
})
Expand Down
105 changes: 68 additions & 37 deletions coderd/workspaces.go
Expand Up @@ -634,6 +634,8 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) {
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

aReq.New = newWorkspace
rw.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -839,7 +841,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) {
return err
}

if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: build.UpdatedAt,
ProvisionerState: build.ProvisionerState,
Expand Down Expand Up @@ -883,48 +885,65 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
// Ignore all trace spans after this, they're not too useful.
ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan)

t := time.NewTicker(time.Second * 1)
defer t.Stop()
cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) {
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}

data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}

_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error subscribing to workspace events.",
Detail: err.Error(),
},
})
return
}
defer cancelSubscribe()

// An initial ping signals to the request that the server is now ready
// and the client can begin servicing a channel with data.
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypePing,
})

for {
select {
case <-ctx.Done():
return
case <-senderClosed:
return
case <-t.C:
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}

data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}

_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
}
}
}
Expand Down Expand Up @@ -1213,3 +1232,15 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes

return parts
}

func watchWorkspaceChannel(id uuid.UUID) string {
return fmt.Sprintf("workspace:%s", id)
}

func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) {
err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{})
if err != nil {
api.Logger.Warn(ctx, "failed to publish workspace update",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}

0 comments on commit 56b963a

Please sign in to comment.