Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make workspace watching realtime instead of polling #4922

Merged
merged 2 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion coderd/activitybump.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas

newDeadline := database.Now().Add(bumpAmount)

if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: database.Now(),
ProvisionerState: build.ProvisionerState,
Expand Down
6 changes: 3 additions & 3 deletions coderd/database/databasefake/databasefake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2823,7 +2823,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database.
return sql.ErrNoRows
}

func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error {
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) (database.WorkspaceBuild, error) {
q.mutex.Lock()
defer q.mutex.Unlock()

Expand All @@ -2835,9 +2835,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U
workspaceBuild.ProvisionerState = arg.ProvisionerState
workspaceBuild.Deadline = arg.Deadline
q.workspaceBuilds[index] = workspaceBuild
return nil
return workspaceBuild, nil
}
return sql.ErrNoRows
return database.WorkspaceBuild{}, sql.ErrNoRows
}

func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error {
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 20 additions & 5 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions coderd/database/queries/workspacebuilds.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ INSERT INTO
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;

-- name: UpdateWorkspaceBuildByID :exec
-- name: UpdateWorkspaceBuildByID :one
UPDATE
workspace_builds
SET
updated_at = $2,
provisioner_state = $3,
deadline = $4
WHERE
id = $1;
id = $1 RETURNING *;
14 changes: 10 additions & 4 deletions coderd/httpapi/httpapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f
buf := &bytes.Buffer{}
enc := json.NewEncoder(buf)

_, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type))
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type))
if err != nil {
return err
}

err = enc.Encode(sse.Data)
if err != nil {
return err
if sse.Data != nil {
_, err = buf.WriteString("data: ")
if err != nil {
return err
}
err = enc.Encode(sse.Data)
if err != nil {
return err
}
}

err = buf.WriteByte('\n')
Expand Down
17 changes: 15 additions & 2 deletions coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
if err != nil {
return nil, failJob(fmt.Sprintf("get owner: %s", err))
}
err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{})
if err != nil {
return nil, failJob(fmt.Sprintf("publish workspace update: %s", err))
}

// Compute parameters for the workspace to consume.
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
Expand Down Expand Up @@ -543,7 +547,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
}
err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: input.WorkspaceBuildID,
UpdatedAt: database.Now(),
ProvisionerState: jobType.WorkspaceBuild.State,
Expand All @@ -552,6 +556,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
if err != nil {
return nil, xerrors.Errorf("update workspace build state: %w", err)
}
err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.FailedJob_TemplateImport_:
}

Expand Down Expand Up @@ -657,7 +665,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return xerrors.Errorf("update provisioner job: %w", err)
}
err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: workspaceBuild.ID,
Deadline: workspaceDeadline,
ProvisionerState: jobType.WorkspaceBuild.State,
Expand Down Expand Up @@ -692,6 +700,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
if err != nil {
return nil, xerrors.Errorf("complete job: %w", err)
}

err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{})
if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err)
}
case *proto.CompletedJob_TemplateDryRun_:
for _, resource := range jobType.TemplateDryRun.Resources {
server.Logger.Info(ctx, "inserting template dry-run job resource",
Expand Down
28 changes: 28 additions & 0 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,15 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
Valid: true,
}
_ = updateConnectionTimes()
_ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
}()

err = updateConnectionTimes()
if err != nil {
_ = conn.Close(websocket.StatusGoingAway, err.Error())
return
}
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)

// End span so we don't get long lived trace data.
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
Expand Down Expand Up @@ -972,6 +974,32 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request)
}
}

resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace resource.",
Detail: err.Error(),
})
return
}
job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace build.",
Detail: err.Error(),
})
return
}
workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID)
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
})
return
}
api.publishWorkspaceUpdate(r.Context(), workspace.ID)

httpapi.Write(r.Context(), rw, http.StatusOK, nil)
}

Expand Down
5 changes: 5 additions & 0 deletions coderd/workspacebuilds.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,8 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

httpapi.Write(ctx, rw, http.StatusCreated, apiBuild)
}

Expand Down Expand Up @@ -632,6 +634,9 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques
})
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{
Message: "Job has been marked as canceled...",
})
Expand Down
105 changes: 68 additions & 37 deletions coderd/workspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) {
return
}

api.publishWorkspaceUpdate(ctx, workspace.ID)

aReq.New = newWorkspace
rw.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -839,7 +841,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) {
return err
}

if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: build.UpdatedAt,
ProvisionerState: build.ProvisionerState,
Expand Down Expand Up @@ -883,48 +885,65 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
// Ignore all trace spans after this, they're not too useful.
ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan)

t := time.NewTicker(time.Second * 1)
defer t.Stop()
cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) {
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}

data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}

_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error subscribing to workspace events.",
Detail: err.Error(),
},
})
return
}
defer cancelSubscribe()

// An initial ping signals to the request that the server is now ready
// and the client can begin servicing a channel with data.
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypePing,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed because ServerSentEventSender() doesn't send the first ping until the timeout has been hit once? Should we "prime" it in there instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want the caller to indicate readiness by sending the ping themselves.

If we don't a race can occur where the client's request has been completed but subscribe hasn't been triggered yet. We could fix this by moving the initialization below the subscribe and making it send a ping, which seems more reasonable.

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is a bit trickier than I thought... because we use the sendEvent callback in the Subscribe function, we can't initialize after safely. Seems like we'll have to keep it the way it is, but I'll add a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment and will leave it as-is for now because nothing comes to mind as being cleaner... if you have ideas leave em here and I'll implement!

Copy link
Member

@mafredri mafredri Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't fully understand the race scenario, but I just realized that the httpapi.Write above (if err) could be problematic since we've already primed rw for server side events in httpapi.ServerSentEventSender (and started a timer for pings). So it might be a good idea to change things around a bit 👍🏻. If the subscription fails, I believe this handler wouldn't return until the first SSE auto-ping has been sent (because of the defer <-chan above).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed that error case! Goooood catch


for {
select {
case <-ctx.Done():
return
case <-senderClosed:
return
case <-t.C:
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace.",
Detail: err.Error(),
},
})
return
}

data, err := api.workspaceData(ctx, []database.Workspace{workspace})
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error fetching workspace data.",
Detail: err.Error(),
},
})
return
}

_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspace(
workspace,
data.builds[0],
data.templates[0],
findUser(workspace.OwnerID, data.users),
),
})
}
}
}
Expand Down Expand Up @@ -1213,3 +1232,15 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes

return parts
}

func watchWorkspaceChannel(id uuid.UUID) string {
return fmt.Sprintf("workspace:%s", id)
}

func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) {
err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{})
if err != nil {
api.Logger.Warn(ctx, "failed to publish workspace update",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}