Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix actions acks delay #2406

Merged
merged 4 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1680101776-Fix-actions-acks-delay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix actions acks delay

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/2406

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/2410
42 changes: 29 additions & 13 deletions internal/pkg/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ type requestClient struct {
lastErrOcc time.Time
}

func (r *requestClient) SetLastError(err error) {
r.lastUsed = time.Now().UTC()
r.lastErr = err
if err != nil {
r.lastErrOcc = r.lastUsed
} else {
r.lastErrOcc = time.Time{}
}
}

// Client wraps a http.Client and takes care of making the raw calls, the client should
// stay simple and specifics should be implemented in external action instead of adding new methods
// to the client. For authenticated calls or sending fields on every request, create a custom RoundTripper
Expand Down Expand Up @@ -160,14 +170,13 @@ func (c *Client) Send(
}

c.log.Debugf("Request method: %s, path: %s, reqID: %s", method, path, reqID)
c.clientLock.Lock()
defer c.clientLock.Unlock()

var resp *http.Response
var multiErr error

c.sortClients()
for i, requester := range c.clients {
clients := c.sortClients()

for i, requester := range clients {
req, err := requester.newRequest(method, path, params, body)
if err != nil {
return nil, fmt.Errorf(
Expand Down Expand Up @@ -195,24 +204,23 @@ func (c *Client) Send(
}
}

requester.lastUsed = time.Now().UTC()

resp, err = requester.client.Do(req.WithContext(ctx))
if err != nil {
requester.lastErr = err
requester.lastErrOcc = time.Now().UTC()

// Using the same lock that was used for sorting above
c.clientLock.Lock()
requester.SetLastError(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this is completely correct if Send() is called concurrently for two different API endpoints on the same host.

This assumes that errors from one HTTP route have equal weight for other routes on the same host which isn't actually true. It might be true if the error is a 5xx error but isn't necessarily true for 4xx.

At the same time I'm not sure that fixing this makes any functional difference since this just alters the order in which hosts are tried.

This is probably worth a comment though in case the logic here changes in the future. Can you add some comments making it obvious that this function is intended to be used concurrently for multiple APIs on the same host?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only obvious fixes for this are having the errors be per API, having the agent use two clients, etc. Those seems like they add complexity vs just accepting that under some circumstances the priority of the hosts won't be exactly optimal when we are sorting them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not debating the existing code logic here. The logic of the errors "per client" is still the same, and I'm not exactly sure why this was implemented the way it is, just making the existing code safe for using concurrently.

c.clientLock.Unlock()

if err != nil {
msg := fmt.Sprintf("requester %d/%d to host %s errored",
i, len(c.clients), requester.host)
i, len(clients), requester.host)
multiErr = multierror.Append(multiErr, fmt.Errorf("%s: %w", msg, err))

// Using debug level as the error is only relevant if all clients fail.
c.log.With("error", err).Debugf(msg)
continue
}

requester.lastErr = nil
requester.lastErrOcc = time.Time{}
return resp, nil
}

Expand Down Expand Up @@ -250,7 +258,10 @@ func newClient(
// - last errored.
//
// It also removes the last error after retryOnBadConnTimeout has elapsed.
func (c *Client) sortClients() {
func (c *Client) sortClients() []*requestClient {
c.clientLock.Lock()
defer c.clientLock.Unlock()

now := time.Now().UTC()

sort.Slice(c.clients, func(i, j int) bool {
Expand Down Expand Up @@ -293,6 +304,11 @@ func (c *Client) sortClients() {
// Lastly, the one that errored last
return c.clients[i].lastUsed.Before(c.clients[j].lastUsed)
})

// return a copy of the slice so we can iterate over it without the lock
res := make([]*requestClient, len(c.clients))
copy(res, c.clients)
return res
}

func (r requestClient) newRequest(method string, path string, params url.Values, body io.Reader) (*http.Request, error) {
Expand Down
111 changes: 95 additions & 16 deletions internal/pkg/remote/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -58,8 +59,8 @@ func TestPortDefaults(t *testing.T) {
c, err := NewWithConfig(l, cfg, nil)
require.NoError(t, err)

c.sortClients()
r, err := c.clients[0].newRequest(http.MethodGet, "/", nil, strings.NewReader(""))
clients := c.sortClients()
r, err := clients[0].newRequest(http.MethodGet, "/", nil, strings.NewReader(""))
require.NoError(t, err)

if tc.ExpectedPort > 0 {
Expand Down Expand Up @@ -133,6 +134,84 @@ func TestHTTPClient(t *testing.T) {
},
))

// This test for the bug that was introduced in agent 8.6 where the long polling checkin request was blocking the second request for acks
//
// There are two requests being issued in the test in the following sequence:
// 1. The first request starts.
// 2. The second request starts only after the first request handler is started execution.
// 3. The second request should complete, while the first request is still in progress.
// 4. The first request handler is signaled to complete only after the second request completes.
//
// This test timed out before the fix https://github.com/elastic/elastic-agent/pull/2406
//
// ➜ go test -timeout 30s -run "^\QTestHTTPClient\E$/^\QTwo_requests\E$" github.com/elastic/elastic-agent/internal/pkg/remote
// panic: test timed out after 30s
// running tests:
// TestHTTPClient (30s)
// TestHTTPClient/Two_requests (30s)
//
// The test passes after the fix https://github.com/elastic/elastic-agent/pull/2406
var wgInReq, wgSecondReq sync.WaitGroup
t.Run("Two requests blocking test", withServer(
func(t *testing.T) *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/longpoll", func(w http.ResponseWriter, r *http.Request) {
// Signal that the long poll request handle is called
// The second request is waiting on this to test that the second request doesn't block
wgInReq.Done()

// Wait until the second request is done
wgSecondReq.Wait()

// This will block this request until the second request completes
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, successResp)
})
mux.HandleFunc("/second", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, successResp)
})
return mux
}, func(t *testing.T, host string) {
cfg := config.MustNewConfigFrom(map[string]interface{}{
"host": host,
})

client, err := NewWithRawConfig(nil, cfg, nil)
require.NoError(t, err)

issueRequest := func(ctx context.Context, path string) error {
resp, err := client.Send(ctx, http.MethodGet, path, nil, nil, nil)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}

wgInReq.Add(1)
wgSecondReq.Add(1)

// Issue long poll request
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return issueRequest(ctx, "/longpoll")
})

// The second request should not block waiting on the first request to complete
g.Go(func() error {
// Wait until the first request handler is hit
wgInReq.Wait()
err := issueRequest(ctx, "/second")
wgSecondReq.Done()
return err
})

err = g.Wait()
require.NoError(t, err)
},
))

t.Run("Simple call with a prefix path", withServer(
func(t *testing.T) *http.ServeMux {
mux := http.NewServeMux()
Expand Down Expand Up @@ -194,8 +273,8 @@ func TestHTTPClient(t *testing.T) {
{host: "http://must.fail-3.co/"},
}}

resp, err := client.Send(ctx, http.MethodGet, "/echo-hello", nil, nil, nil)
assert.Contains(t, err.Error(), "http://must.fail-3.co/") // error contains last host
resp, err := client.Send(ctx, http.MethodGet, "/echo-hello", nil, nil, nil) //nolint:bodyclose // wad
assert.Contains(t, err.Error(), "http://must.fail-3.co/") // error contains last host
assert.Nil(t, resp)
})

Expand Down Expand Up @@ -299,9 +378,9 @@ func TestSortClients(t *testing.T) {
client, err := newClient(nil, Config{}, one, two)
require.NoError(t, err)

client.sortClients()
clients := client.sortClients()

assert.Equal(t, one, client.clients[0])
assert.Equal(t, one, clients[0])
})

t.Run("Picks second requester when first has error", func(t *testing.T) {
Expand All @@ -314,9 +393,9 @@ func TestSortClients(t *testing.T) {
client, err := newClient(nil, Config{}, one, two)
require.NoError(t, err)

client.sortClients()
clients := client.sortClients()

assert.Equal(t, two, client.clients[0])
assert.Equal(t, two, clients[0])
})

t.Run("Picks second requester when first has been used", func(t *testing.T) {
Expand All @@ -327,9 +406,9 @@ func TestSortClients(t *testing.T) {
client, err := newClient(nil, Config{}, one, two)
require.NoError(t, err)

client.sortClients()
clients := client.sortClients()

assert.Equal(t, two, client.clients[0])
assert.Equal(t, two, clients[0])
})

t.Run("Picks second requester when it's the oldest", func(t *testing.T) {
Expand All @@ -345,9 +424,9 @@ func TestSortClients(t *testing.T) {
client, err := newClient(nil, Config{}, one, two, three)
require.NoError(t, err)

client.sortClients()
clients := client.sortClients()

assert.Equal(t, two, client.clients[0])
assert.Equal(t, two, clients[0])
})

t.Run("Picks third requester when second has error and first is last used", func(t *testing.T) {
Expand All @@ -364,9 +443,9 @@ func TestSortClients(t *testing.T) {
}
client := &Client{clients: []*requestClient{one, two, three}}

client.sortClients()
clients := client.sortClients()

assert.Equal(t, three, client.clients[0])
assert.Equal(t, three, clients[0])
})

t.Run("Picks second requester when its oldest and all have old errors", func(t *testing.T) {
Expand All @@ -388,9 +467,9 @@ func TestSortClients(t *testing.T) {
client, err := newClient(nil, Config{}, one, two, three)
require.NoError(t, err)

client.sortClients()
clients := client.sortClients()

assert.Equal(t, two, client.clients[0])
assert.Equal(t, two, clients[0])
})
}

Expand Down