Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait-change API endpoint and client function #63

Merged
merged 5 commits into from Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions client/changes.go
Expand Up @@ -157,3 +157,31 @@ func (client *Client) Changes(opts *ChangesOptions) ([]*Change, error) {

return chgs, err
}

// WaitChangeOptions holds the options for the WaitChange call.
type WaitChangeOptions struct {
// If non-zero, wait at most this long before returning the current change
// data. The timeout elapsing is not considered an error, so if a timeout
// is specified, the caller should check Change.Ready to determine whether
// the change is actually finished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout certainly sounds like an error, unless the operation did finish and there was a race condition. But otherwise, the intent of the operation of waiting for it to be finished did not succeed in intent, so an error seems appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Harry and I went back and forth on that and ended up that it shouldn't be an error, primarily because when Pebble returns an error it can't also return other data (the Change in this case), and I thought it might be useful to have the current state of the change at the time of the timeout.

However, on reflection it makes for a less error-prone API if it's just an error, and in the success case the change is always ready. If the caller really wants the current state of a change in the case of a timeout/error, they can always make an additional call to client.Change() to get the current status. However, it's unlikely you'll want to.

In any case, I've used HTTP status code 504 Gateway Timeout for this (along with a reasonable error message). It's not quite a perfect semantic match, but it seems close enough.

Timeout time.Duration
}

// WaitChange waits for a given change to be finished (whether or not there
// was an error, which is indicated by Change.Err being set).
benhoyt marked this conversation as resolved.
Show resolved Hide resolved
func (client *Client) WaitChange(id string, opts *WaitChangeOptions) (*Change, error) {
var chgd changeAndData

query := url.Values{}
if opts != nil && opts.Timeout != 0 {
query.Set("timeout", opts.Timeout.String())
}

_, err := client.doSync("GET", "/v1/changes/"+id+"/wait", query, nil, nil, &chgd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the timeout behavior of the underlying http library, and do we have to account for that and implement retries here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I had observed it to be a very long timeout / no timeout in my manual testing, but just confirmed that now in the code -- we always create the http.Client with no Timeout specified, so no timeout applies here. Arguably that's bad practice, but we can follow up in a subsequent PR if we want to change that.

if err != nil {
return nil, err
}

chgd.Change.data = chgd.Data
return &chgd.Change, nil
}
44 changes: 44 additions & 0 deletions client/changes_test.go
Expand Up @@ -56,6 +56,50 @@ func (cs *clientSuite) TestClientChange(c *check.C) {
})
}

func (cs *clientSuite) TestClientWaitChange(c *check.C) {
cs.testClientWaitChange(c, "foo", nil, "http://localhost/v1/changes/foo/wait")
}

func (cs *clientSuite) TestClientWaitChangeTimeout(c *check.C) {
opts := &client.WaitChangeOptions{
Timeout: 30 * time.Second,
}
cs.testClientWaitChange(c, "bar", opts, "http://localhost/v1/changes/bar/wait?timeout=30s")
}

func (cs *clientSuite) testClientWaitChange(c *check.C, changeID string, opts *client.WaitChangeOptions, expectedURL string) {
cs.rsp = `{"type": "sync", "result": {
"id": "uno",
"kind": "foo",
"summary": "...",
"status": "Do",
"ready": false,
"spawn-time": "2016-04-21T01:02:03Z",
"ready-time": "2016-04-21T01:02:04Z",
"tasks": [{"kind": "bar", "summary": "...", "status": "Do", "progress": {"done": 0, "total": 1}, "spawn-time": "2016-04-21T01:02:03Z", "ready-time": "2016-04-21T01:02:04Z"}]
}}`

chg, err := cs.cli.WaitChange(changeID, opts)
c.Assert(err, check.IsNil)
c.Assert(cs.req.URL.String(), check.Equals, expectedURL)
c.Check(chg, check.DeepEquals, &client.Change{
ID: "uno",
Kind: "foo",
Summary: "...",
Status: "Do",
Tasks: []*client.Task{{
Kind: "bar",
Summary: "...",
Status: "Do",
Progress: client.TaskProgress{Done: 0, Total: 1},
SpawnTime: time.Date(2016, 04, 21, 1, 2, 3, 0, time.UTC),
ReadyTime: time.Date(2016, 04, 21, 1, 2, 4, 0, time.UTC),
}},
SpawnTime: time.Date(2016, 04, 21, 1, 2, 3, 0, time.UTC),
ReadyTime: time.Date(2016, 04, 21, 1, 2, 4, 0, time.UTC),
})
}

func (cs *clientSuite) TestClientChangeData(c *check.C) {
cs.rsp = `{"type": "sync", "result": {
"id": "uno",
Expand Down
4 changes: 4 additions & 0 deletions internal/daemon/api.go
Expand Up @@ -41,6 +41,10 @@ var api = []*Command{{
UserOK: true,
GET: v1GetChange,
POST: v1PostChange,
}, {
Path: "/v1/changes/{id}/wait",
UserOK: true,
GET: v1GetChangeWait,
}, {
Path: "/v1/services",
UserOK: true,
Expand Down
50 changes: 44 additions & 6 deletions internal/daemon/api_changes.go
Expand Up @@ -165,18 +165,56 @@ func v1GetChanges(c *Command, r *http.Request, _ *userState) Response {
}

func v1GetChange(c *Command, r *http.Request, _ *userState) Response {
chID := muxVars(r)["id"]
state := c.d.overlord.State()
state.Lock()
defer state.Unlock()
chg := state.Change(chID)
changeID := muxVars(r)["id"]
st := c.d.overlord.State()
st.Lock()
defer st.Unlock()
chg := st.Change(changeID)
if chg == nil {
return statusNotFound("cannot find change with id %q", chID)
return statusNotFound("cannot find change with id %q", changeID)
}

return SyncResponse(change2changeInfo(chg))
}

func v1GetChangeWait(c *Command, r *http.Request, _ *userState) Response {
changeID := muxVars(r)["id"]
st := c.d.overlord.State()
st.Lock()
change := st.Change(changeID)
st.Unlock()
if change == nil {
return statusNotFound("cannot find change with id %q", changeID)
}

timeoutStr := r.URL.Query().Get("timeout")
if timeoutStr != "" {
// Timeout specified, wait till change is ready or timeout occurs,
// whichever is first.
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
return statusBadRequest("invalid timeout %q: %v", timeoutStr, err)
}
timer := time.NewTimer(timeout)
select {
case <-change.Ready():
timer.Stop() // change ready, release timer resources
case <-timer.C:
benhoyt marked this conversation as resolved.
Show resolved Hide resolved
case <-r.Context().Done():
}
} else {
// No timeout, wait indefinitely for change to be ready.
select {
case <-change.Ready():
case <-r.Context().Done():
}
}

st.Lock()
defer st.Unlock()
return SyncResponse(change2changeInfo(change))
}

func v1PostChange(c *Command, r *http.Request, _ *userState) Response {
chID := muxVars(r)["id"]
state := c.d.overlord.State()
Expand Down
92 changes: 92 additions & 0 deletions internal/daemon/api_changes_test.go
Expand Up @@ -16,6 +16,7 @@ package daemon

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -364,3 +365,94 @@ func (s *apiSuite) TestStateChangeAbortIsReady(c *check.C) {
"message": fmt.Sprintf("cannot abort change %s with nothing pending", ids[0]),
})
}

func (s *apiSuite) TestStateChangeNotFound(c *check.C) {
s.daemon(c)
req, err := http.NewRequest("GET", "/v1/changes/x/wait", nil)
c.Assert(err, check.IsNil)
rsp := v1GetChangeWait(apiCmd("/v1/changes/{id}/wait"), req, nil).(*resp)
c.Check(rsp.Status, check.Equals, 404)
}

func (s *apiSuite) TestStateChangeInvalidTimeout(c *check.C) {
// Setup
d := s.daemon(c)
st := d.overlord.State()
st.Lock()
change := st.NewChange("exec", "Exec")
task := st.NewTask("exec", "Exec")
change.AddAll(state.NewTaskSet(task))
st.Unlock()

// Execute
s.vars = map[string]string{"id": change.ID()}
req, err := http.NewRequest("GET", "/v1/changes/"+change.ID()+"/wait?timeout=BAD", nil)
c.Assert(err, check.IsNil)
rsp := v1GetChangeWait(apiCmd("/v1/changes/{id}/wait"), req, nil).(*resp)

// Verify
c.Check(rsp.Status, check.Equals, 400)
}

func (s *apiSuite) TestStateChangeWait(c *check.C) {
ready := s.testStateChangeWait(context.Background(), c, "", func(st *state.State, change *state.Change) {
// Mark change ready after a short interval
time.Sleep(10 * time.Millisecond)
st.Lock()
change.SetStatus(state.DoneStatus)
st.Unlock()
})
c.Check(ready, check.Equals, true)
}

func (s *apiSuite) TestStateChangeWaitCancel(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
ready := s.testStateChangeWait(ctx, c, "", func(st *state.State, change *state.Change) {})
c.Check(ready, check.Equals, false)
}

func (s *apiSuite) TestStateChangeWaitTimeout(c *check.C) {
ready := s.testStateChangeWait(context.Background(), c, "?timeout=10ms", func(st *state.State, change *state.Change) {})
c.Check(ready, check.Equals, false)
}

func (s *apiSuite) TestStateChangeWaitTimeoutCancel(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
ready := s.testStateChangeWait(ctx, c, "?timeout=20ms", func(st *state.State, change *state.Change) {})
c.Check(ready, check.Equals, false)
}

func (s *apiSuite) testStateChangeWait(ctx context.Context, c *check.C, query string, markReady func(st *state.State, change *state.Change)) bool {
// Setup
d := s.daemon(c)
st := d.overlord.State()
st.Lock()
change := st.NewChange("exec", "Exec")
task := st.NewTask("exec", "Exec")
change.AddAll(state.NewTaskSet(task))
st.Unlock()
go markReady(st, change)

// Execute
s.vars = map[string]string{"id": change.ID()}
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/changes/"+change.ID()+"/wait"+query, nil)
c.Assert(err, check.IsNil)
rsp := v1GetChangeWait(apiCmd("/v1/changes/{id}/wait"), req, nil).(*resp)
rec := httptest.NewRecorder()
rsp.ServeHTTP(rec, req)

// Verify
c.Check(rec.Code, check.Equals, 200)
c.Check(rsp.Status, check.Equals, 200)
c.Check(rsp.Result, check.NotNil)

var body map[string]interface{}
err = json.Unmarshal(rec.Body.Bytes(), &body)
c.Check(err, check.IsNil)
result := body["result"].(map[string]interface{})
c.Check(result["id"].(string), check.Equals, change.ID())
c.Check(result["kind"].(string), check.Equals, "exec")
return result["ready"].(bool)
}