Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait-change API endpoint and client function #63

Merged
merged 5 commits into from Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions client/changes.go
Expand Up @@ -157,3 +157,30 @@ func (client *Client) Changes(opts *ChangesOptions) ([]*Change, error) {

return chgs, err
}

// WaitChangeOptions holds the options for the WaitChange call.
type WaitChangeOptions struct {
// If nonzero, wait at most this long before returning. If a timeout
// occurs, WaitChange will return an error.
Timeout time.Duration
}

// WaitChange waits for the change to be finished. If the wait operation
// succeeds, the returned Change.Err string will be non-empty if the change
// itself had an error.
func (client *Client) WaitChange(id string, opts *WaitChangeOptions) (*Change, error) {
var chgd changeAndData

query := url.Values{}
if opts != nil && opts.Timeout != 0 {
query.Set("timeout", opts.Timeout.String())
}

_, err := client.doSync("GET", "/v1/changes/"+id+"/wait", query, nil, nil, &chgd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the timeout behavior of the underlying http library, and do we have to account for that and implement retries here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I had observed it to be a very long timeout / no timeout in my manual testing, but just confirmed that now in the code -- we always create the http.Client with no Timeout specified, so no timeout applies here. Arguably that's bad practice, but we can follow up in a subsequent PR if we want to change that.

if err != nil {
return nil, err
}

chgd.Change.data = chgd.Data
return &chgd.Change, nil
}
44 changes: 44 additions & 0 deletions client/changes_test.go
Expand Up @@ -15,6 +15,7 @@
package client_test

import (
"fmt"
"io/ioutil"
"time"

Expand Down Expand Up @@ -56,6 +57,49 @@ func (cs *clientSuite) TestClientChange(c *check.C) {
})
}

func (cs *clientSuite) TestClientWaitChange(c *check.C) {
cs.rsp = `{"type": "sync", "result": {
"id": "uno",
"kind": "foo",
"summary": "...",
"status": "Do",
"ready": false,
"spawn-time": "2016-04-21T01:02:03Z",
"ready-time": "2016-04-21T01:02:04Z",
"tasks": [{"kind": "bar", "summary": "...", "status": "Do", "progress": {"done": 0, "total": 1}, "spawn-time": "2016-04-21T01:02:03Z", "ready-time": "2016-04-21T01:02:04Z"}]
}}`

chg, err := cs.cli.WaitChange("foo", nil)
c.Assert(err, check.IsNil)
c.Assert(cs.req.URL.String(), check.Equals, "http://localhost/v1/changes/foo/wait")
c.Check(chg, check.DeepEquals, &client.Change{
ID: "uno",
Kind: "foo",
Summary: "...",
Status: "Do",
Tasks: []*client.Task{{
Kind: "bar",
Summary: "...",
Status: "Do",
Progress: client.TaskProgress{Done: 0, Total: 1},
SpawnTime: time.Date(2016, 04, 21, 1, 2, 3, 0, time.UTC),
ReadyTime: time.Date(2016, 04, 21, 1, 2, 4, 0, time.UTC),
}},
SpawnTime: time.Date(2016, 04, 21, 1, 2, 3, 0, time.UTC),
ReadyTime: time.Date(2016, 04, 21, 1, 2, 4, 0, time.UTC),
})
}

func (cs *clientSuite) TestClientWaitChangeTimeout(c *check.C) {
cs.err = fmt.Errorf(`timed out waiting for change`)
opts := &client.WaitChangeOptions{
Timeout: 30 * time.Second,
}
_, err := cs.cli.WaitChange("bar", opts)
c.Assert(cs.req.URL.String(), check.Equals, "http://localhost/v1/changes/bar/wait?timeout=30s")
c.Assert(err, check.ErrorMatches, `.*timed out waiting for change.*`)
}

func (cs *clientSuite) TestClientChangeData(c *check.C) {
cs.rsp = `{"type": "sync", "result": {
"id": "uno",
Expand Down
4 changes: 4 additions & 0 deletions internal/daemon/api.go
Expand Up @@ -41,6 +41,10 @@ var api = []*Command{{
UserOK: true,
GET: v1GetChange,
POST: v1PostChange,
}, {
Path: "/v1/changes/{id}/wait",
UserOK: true,
GET: v1GetChangeWait,
}, {
Path: "/v1/services",
UserOK: true,
Expand Down
53 changes: 47 additions & 6 deletions internal/daemon/api_changes.go
Expand Up @@ -165,18 +165,59 @@ func v1GetChanges(c *Command, r *http.Request, _ *userState) Response {
}

func v1GetChange(c *Command, r *http.Request, _ *userState) Response {
chID := muxVars(r)["id"]
state := c.d.overlord.State()
state.Lock()
defer state.Unlock()
chg := state.Change(chID)
changeID := muxVars(r)["id"]
st := c.d.overlord.State()
st.Lock()
defer st.Unlock()
chg := st.Change(changeID)
if chg == nil {
return statusNotFound("cannot find change with id %q", chID)
return statusNotFound("cannot find change with id %q", changeID)
}

return SyncResponse(change2changeInfo(chg))
}

func v1GetChangeWait(c *Command, r *http.Request, _ *userState) Response {
changeID := muxVars(r)["id"]
st := c.d.overlord.State()
st.Lock()
change := st.Change(changeID)
st.Unlock()
if change == nil {
return statusNotFound("cannot find change with id %q", changeID)
}

timeoutStr := r.URL.Query().Get("timeout")
if timeoutStr != "" {
// Timeout specified, wait till change is ready or timeout occurs,
// whichever is first.
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
return statusBadRequest("invalid timeout %q: %v", timeoutStr, err)
}
timer := time.NewTimer(timeout)
select {
case <-change.Ready():
timer.Stop() // change ready, release timer resources
case <-timer.C:
benhoyt marked this conversation as resolved.
Show resolved Hide resolved
return statusGatewayTimeout("timed out waiting for change after %s", timeout)
case <-r.Context().Done():
return statusInternalError("request cancelled")
}
} else {
// No timeout, wait indefinitely for change to be ready.
select {
case <-change.Ready():
case <-r.Context().Done():
return statusInternalError("request cancelled")
}
}

st.Lock()
defer st.Unlock()
return SyncResponse(change2changeInfo(change))
}

func v1PostChange(c *Command, r *http.Request, _ *userState) Response {
chID := muxVars(r)["id"]
state := c.d.overlord.State()
Expand Down
98 changes: 98 additions & 0 deletions internal/daemon/api_changes_test.go
Expand Up @@ -16,6 +16,7 @@ package daemon

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -364,3 +365,100 @@ func (s *apiSuite) TestStateChangeAbortIsReady(c *check.C) {
"message": fmt.Sprintf("cannot abort change %s with nothing pending", ids[0]),
})
}

func (s *apiSuite) TestWaitChangeNotFound(c *check.C) {
s.daemon(c)
req, err := http.NewRequest("GET", "/v1/changes/x/wait", nil)
c.Assert(err, check.IsNil)
rsp := v1GetChangeWait(apiCmd("/v1/changes/{id}/wait"), req, nil).(*resp)
c.Check(rsp.Status, check.Equals, 404)
}

func (s *apiSuite) TestWaitChangeInvalidTimeout(c *check.C) {
rec, rsp, _ := s.testWaitChange(context.Background(), c, "?timeout=BAD", nil)
c.Check(rec.Code, check.Equals, 400)
c.Check(rsp.Status, check.Equals, 400)
c.Check(rsp.Type, check.Equals, ResponseTypeError)
result := rsp.Result.(*errorResult)
c.Check(result.Message, check.Matches, "invalid timeout .*")
}

func (s *apiSuite) TestWaitChangeSuccess(c *check.C) {
rec, rsp, changeID := s.testWaitChange(context.Background(), c, "", func(st *state.State, change *state.Change) {
// Mark change ready after a short interval
time.Sleep(10 * time.Millisecond)
st.Lock()
change.SetStatus(state.DoneStatus)
st.Unlock()
})

c.Check(rec.Code, check.Equals, 200)
c.Check(rsp.Status, check.Equals, 200)
c.Check(rsp.Type, check.Equals, ResponseTypeSync)
c.Check(rsp.Result, check.NotNil)

var body map[string]interface{}
err := json.Unmarshal(rec.Body.Bytes(), &body)
c.Check(err, check.IsNil)
result := body["result"].(map[string]interface{})
c.Check(result["id"].(string), check.Equals, changeID)
c.Check(result["kind"].(string), check.Equals, "exec")
c.Check(result["ready"].(bool), check.Equals, true)
}

func (s *apiSuite) TestWaitChangeCancel(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

rec, rsp, _ := s.testWaitChange(ctx, c, "", nil)
c.Check(rec.Code, check.Equals, 500)
c.Check(rsp.Status, check.Equals, 500)
c.Check(rsp.Type, check.Equals, ResponseTypeError)
result := rsp.Result.(*errorResult)
c.Check(result.Message, check.Equals, "request cancelled")
}

func (s *apiSuite) TestWaitChangeTimeout(c *check.C) {
rec, rsp, _ := s.testWaitChange(context.Background(), c, "?timeout=10ms", nil)
c.Check(rec.Code, check.Equals, 504)
c.Check(rsp.Status, check.Equals, 504)
c.Check(rsp.Type, check.Equals, ResponseTypeError)
result := rsp.Result.(*errorResult)
c.Check(result.Message, check.Matches, "timed out waiting for change .*")
}

func (s *apiSuite) TestWaitChangeTimeoutCancel(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

rec, rsp, _ := s.testWaitChange(ctx, c, "?timeout=20ms", nil)
c.Check(rec.Code, check.Equals, 500)
c.Check(rsp.Status, check.Equals, 500)
c.Check(rsp.Type, check.Equals, ResponseTypeError)
result := rsp.Result.(*errorResult)
c.Check(result.Message, check.Equals, "request cancelled")
}

func (s *apiSuite) testWaitChange(ctx context.Context, c *check.C, query string, markReady func(st *state.State, change *state.Change)) (*httptest.ResponseRecorder, *resp, string) {
// Setup
d := s.daemon(c)
st := d.overlord.State()
st.Lock()
change := st.NewChange("exec", "Exec")
task := st.NewTask("exec", "Exec")
change.AddAll(state.NewTaskSet(task))
st.Unlock()

if markReady != nil {
go markReady(st, change)
}

// Execute
s.vars = map[string]string{"id": change.ID()}
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/changes/"+change.ID()+"/wait"+query, nil)
c.Assert(err, check.IsNil)
rsp := v1GetChangeWait(apiCmd("/v1/changes/{id}/wait"), req, nil).(*resp)
rec := httptest.NewRecorder()
rsp.ServeHTTP(rec, req)
return rec, rsp, change.ID()
}
7 changes: 3 additions & 4 deletions internal/daemon/response.go
Expand Up @@ -194,12 +194,11 @@ type errorResponder func(string, ...interface{}) Response

// Standard error responses.
var (
statusBadRequest = makeErrorResponder(400)
statusUnauthorized = makeErrorResponder(401)
statusForbidden = makeErrorResponder(403)
statusNotFound = makeErrorResponder(404)
statusBadRequest = makeErrorResponder(400)
statusMethodNotAllowed = makeErrorResponder(405)
statusInternalError = makeErrorResponder(500)
statusNotImplemented = makeErrorResponder(501)
statusForbidden = makeErrorResponder(403)
statusConflict = makeErrorResponder(409)
statusGatewayTimeout = makeErrorResponder(504)
)