From e08ef93ca66bfb6e34ac17acfdcba6ab013c78af Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Thu, 19 Oct 2023 16:07:47 +0100 Subject: [PATCH 01/14] feat: Adding a batch updater to allow usage updates to be batched This will allows a client to update the usage asynchronously. The updater will only call the API when a configurable number of rows have been updated or a timeout is reached. In addition the updater will flush any remaining rows before closing. --- go.mod | 1 + go.sum | 3 + premium/usage.go | 193 +++++++++++++++++++++++++++++++++ premium/usage_test.go | 242 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 439 insertions(+) create mode 100644 premium/usage.go create mode 100644 premium/usage_test.go diff --git a/go.mod b/go.mod index c4d1c0b6d4..020c7383ca 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/CloudyKit/jet/v6 v6.2.0 // indirect github.com/Joker/jade v1.1.3 // indirect github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect + github.com/adrg/xdg v0.4.0 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 427a27a662..e57d68e4e8 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 h1:KkH3I3sJuOLP3TjA/dfr4NAY8bghDwnXiU7cTKxQqo0= github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM= +github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls= +github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E= github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= @@ -556,6 +558,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/premium/usage.go b/premium/usage.go new file mode 100644 index 0000000000..509764ac79 --- /dev/null +++ b/premium/usage.go @@ -0,0 +1,193 @@ +package premium + +import ( + "context" + "fmt" + cqapi "github.com/cloudquery/cloudquery-api-go" + "github.com/google/uuid" + "github.com/rs/zerolog/log" + "net/http" + "sync" + "sync/atomic" + "time" +) + +const ( + defaultBatchLimit = 1000 + defaultDurationMS = 10000 +) + +type UsageClient interface { + // Increase updates the usage by the given number of rows + Increase(context.Context, uint32) + // HasQuota returns true if the quota has not been exceeded + HasQuota(context.Context) (bool, error) + // Close flushes any remaining rows and closes the quota service + Close() error +} + +type UpdaterOptions func(updater *BatchUpdater) + +func WithBatchLimit(batchLimit uint32) UpdaterOptions { + return func(updater *BatchUpdater) { + updater.batchLimit = batchLimit + } +} + +func WithTickerDuration(durationms int) UpdaterOptions { + return func(updater *BatchUpdater) { + updater.tickerDuration = durationms + } +} + +type BatchUpdater struct { + apiClient *cqapi.ClientWithResponses + + teamName string + pluginTeam string + pluginKind string + pluginName string + + batchLimit uint32 + tickerDuration int + rowsToUpdate atomic.Uint32 + triggerUpdate chan struct{} + done chan struct{} + wg *sync.WaitGroup + isClosed bool +} + +func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, teamName, pluginTeam, pluginKind, pluginName string, ops ...UpdaterOptions) *BatchUpdater { + u := &BatchUpdater{ + apiClient: apiClient, + + teamName: teamName, + pluginTeam: pluginTeam, + pluginKind: pluginKind, + pluginName: pluginName, + + batchLimit: defaultBatchLimit, + tickerDuration: defaultDurationMS, + triggerUpdate: make(chan struct{}), + done: make(chan struct{}), + wg: &sync.WaitGroup{}, + } + for _, op := range ops { + op(u) + } + + u.backgroundUpdater(ctx) + + return u +} + +func (u *BatchUpdater) Increase(_ context.Context, rows uint32) error { + if rows <= 0 { + return fmt.Errorf("rows must be greater than zero got %d", rows) + } + + if u.isClosed { + return fmt.Errorf("usage updater is closed") + } + + u.rowsToUpdate.Add(rows) + + // Trigger an update unless an update is already in process + select { + case u.triggerUpdate <- struct{}{}: + default: + return nil + } + + return nil +} + +func (u *BatchUpdater) HasQuota(ctx context.Context) (bool, error) { + usage, err := u.apiClient.GetTeamPluginUsageWithResponse(ctx, u.teamName, u.pluginTeam, cqapi.PluginKind(u.pluginKind), u.pluginName) + if err != nil { + return false, fmt.Errorf("failed to get usage: %w", err) + } + if usage.StatusCode() != http.StatusOK { + return false, fmt.Errorf("failed to get usage: %s", usage.Status()) + } + return *usage.JSON200.RemainingRows > 0, nil +} + +func (u *BatchUpdater) Close(_ context.Context) error { + u.isClosed = true + + close(u.done) + u.wg.Wait() + + return nil +} + +func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { + started := make(chan struct{}) + u.wg.Add(1) + + duration := time.Duration(u.tickerDuration) * time.Millisecond + ticker := time.NewTicker(duration) + + go func() { + defer u.wg.Done() + started <- struct{}{} + for { + select { + case <-u.triggerUpdate: + rowsToUpdate := u.rowsToUpdate.Load() + if rowsToUpdate < u.batchLimit { + // Not enough rows to update + continue + } + log.Info().Msgf("updating usage: %d", rowsToUpdate) + if err := u.updateUsageWithRetryAndBackoff(ctx, rowsToUpdate); err != nil { + log.Error().Err(err).Msg("failed to update usage") + // TODO: what to do with an update error + continue + } + u.rowsToUpdate.Add(-rowsToUpdate) + case <-ticker.C: + rowsToUpdate := u.rowsToUpdate.Load() + if rowsToUpdate == 0 { + continue + } + if err := u.updateUsageWithRetryAndBackoff(ctx, rowsToUpdate); err != nil { + log.Error().Err(err).Msg("failed to update usage") + // TODO: what to do with an update error + continue + } + u.rowsToUpdate.Add(-rowsToUpdate) + case <-u.done: + remainingRows := u.rowsToUpdate.Load() + if remainingRows != 0 { + log.Info().Msgf("updating usage: %d", remainingRows) + if err := u.updateUsageWithRetryAndBackoff(ctx, remainingRows); err != nil { + log.Error().Err(err).Msg("failed to update usage") + } + u.rowsToUpdate.Add(-remainingRows) + } + log.Info().Msg("background updater exiting") + return + } + } + }() + <-started +} + +func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numberToUpdate uint32) error { + resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{ + RequestId: uuid.New(), + PluginTeam: u.pluginTeam, + PluginKind: cqapi.PluginKind(u.pluginKind), + PluginName: u.pluginName, + Rows: int(numberToUpdate), + }) + if err != nil { + return fmt.Errorf("failed to update usage: %w", err) + } + if resp.StatusCode() != http.StatusOK { + return fmt.Errorf("failed to update usage: %s", resp.Status()) + } + return nil +} diff --git a/premium/usage_test.go b/premium/usage_test.go new file mode 100644 index 0000000000..cfe17e17e9 --- /dev/null +++ b/premium/usage_test.go @@ -0,0 +1,242 @@ +package premium + +import ( + "context" + "encoding/json" + "fmt" + cqapi "github.com/cloudquery/cloudquery-api-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "math" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestUsageService_HasQuota_NoRowsRemaining(t *testing.T) { + ctx := context.Background() + + s := createTestServerWithRemainingRows(t, 0) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + + hasQuota, err := usageClient.HasQuota(ctx) + require.NoError(t, err) + + assert.False(t, hasQuota, "should not have quota") +} + +func TestUsageService_HasQuota_WithRowsRemaining(t *testing.T) { + ctx := context.Background() + + s := createTestServerWithRemainingRows(t, 100) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + + hasQuota, err := usageClient.HasQuota(ctx) + require.NoError(t, err) + + assert.True(t, hasQuota, "should have quota") +} + +func TestUsageService_ZeroBatchSize(t *testing.T) { + ctx := context.Background() + + s := createTestServer(t) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + + for i := 0; i < 10000; i++ { + err = usageClient.Increase(ctx, 1) + require.NoError(t, err) + } + + err = usageClient.Close(ctx) + require.NoError(t, err) + + assert.Equal(t, 10000, s.sumOfUpdates(), "total should equal number of updated rows") +} + +func TestUsageService_WithBatchSize(t *testing.T) { + ctx := context.Background() + batchSize := 2000 + + s := createTestServer(t) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(uint32(batchSize))) + + for i := 0; i < 10000; i++ { + err = usageClient.Increase(ctx, 1) + require.NoError(t, err) + } + err = usageClient.Close(ctx) + require.NoError(t, err) + + assert.Equal(t, 10000, s.sumOfUpdates(), "total should equal number of updated rows") + assert.True(t, true, s.minExcludingClose() > batchSize, "minimum should be greater than batch size") +} + +func TestUsageService_WithTicker(t *testing.T) { + ctx := context.Background() + batchSize := 2000 + + s := createTestServer(t) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(uint32(batchSize)), WithTickerDuration(1)) + + for i := 0; i < 10; i++ { + err = usageClient.Increase(ctx, 10) + require.NoError(t, err) + time.Sleep(5 * time.Millisecond) + } + err = usageClient.Close(ctx) + require.NoError(t, err) + + assert.Equal(t, 100, s.sumOfUpdates(), "total should equal number of updated rows") + assert.True(t, s.minExcludingClose() < batchSize, "we should see updates less than batchsize if ticker is firing") +} + +func TestUsageService_NoUpdates(t *testing.T) { + ctx := context.Background() + + s := createTestServer(t) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + + err = usageClient.Close(ctx) + require.NoError(t, err) + + assert.Equal(t, 0, s.numberOfUpdates(), "total number of updates should be zero") +} + +func TestUsageService_UpdatesWithZeroRows(t *testing.T) { + ctx := context.Background() + + s := createTestServer(t) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + + err = usageClient.Increase(ctx, 0) + require.Error(t, err, "should not be able to update with zero rows") + + err = usageClient.Close(ctx) + require.NoError(t, err) + + assert.Equal(t, 0, s.numberOfUpdates(), "total number of updates should be zero") +} + +func TestUsageService_ShouldNotUpdateClosedService(t *testing.T) { + ctx := context.Background() + + s := createTestServer(t) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + + // Close the service first + err = usageClient.Close(ctx) + require.NoError(t, err) + + err = usageClient.Increase(ctx, 10) + require.Error(t, err, "should not be able to update closed service") + + assert.Equal(t, 0, s.numberOfUpdates(), "total number of updates should be zero") +} + +func createTestServerWithRemainingRows(t *testing.T, remainingRows int) *testStage { + stage := testStage{ + remainingRows: remainingRows, + update: make([]int, 0), + } + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + w.Header().Set("Content-Type", "application/json") + if _, err := fmt.Fprintf(w, `{"remaining_rows": %d}`, stage.remainingRows); err != nil { + t.Fatal(err) + } + w.WriteHeader(http.StatusOK) + return + } + if r.Method == "POST" { + dec := json.NewDecoder(r.Body) + var req cqapi.IncreaseTeamPluginUsageJSONRequestBody + err := dec.Decode(&req) + require.NoError(t, err) + + stage.update = append(stage.update, req.Rows) + + w.WriteHeader(http.StatusOK) + return + } + }) + + stage.server = httptest.NewServer(handler) + + return &stage +} + +func createTestServer(t *testing.T) *testStage { + return createTestServerWithRemainingRows(t, 0) +} + +type testStage struct { + server *httptest.Server + + remainingRows int + update []int +} + +func (s *testStage) numberOfUpdates() int { + return len(s.update) +} + +func (s *testStage) sumOfUpdates() int { + sum := 0 + for _, val := range s.update { + sum += val + } + return sum +} + +func (s *testStage) minExcludingClose() int { + m := math.MaxInt + for i := 0; i < len(s.update); i++ { + if s.update[i] < m { + m = s.update[i] + } + } + return m +} From 2e00acd1f4cad3c3b71c2788ab4118b4200b8e81 Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Fri, 20 Oct 2023 16:00:18 +0100 Subject: [PATCH 02/14] Adding retry logic with `maxRetries` and `maxWaitTime` --- premium/usage.go | 68 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 509764ac79..2cef852390 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -13,8 +13,10 @@ import ( ) const ( - defaultBatchLimit = 1000 - defaultDurationMS = 10000 + defaultBatchLimit = 1000 + defaultDurationMS = 10000 + defaultMaxRetries = 5 + defaultMaxWaitTime = 60 * time.Second ) type UsageClient interface { @@ -28,18 +30,34 @@ type UsageClient interface { type UpdaterOptions func(updater *BatchUpdater) +// WithBatchLimit sets the maximum number of rows to update in a single request func WithBatchLimit(batchLimit uint32) UpdaterOptions { return func(updater *BatchUpdater) { updater.batchLimit = batchLimit } } +// WithTickerDuration sets the duration between updates if the number of rows to update have not reached the batch limit func WithTickerDuration(durationms int) UpdaterOptions { return func(updater *BatchUpdater) { updater.tickerDuration = durationms } } +// WithMaxRetries sets the maximum number of retries to update the usage in case of an API error +func WithMaxRetries(maxRetries int) UpdaterOptions { + return func(updater *BatchUpdater) { + updater.maxRetries = maxRetries + } +} + +// WithMaxWaitTime sets the maximum time to wait before retrying a failed update +func WithMaxWaitTime(maxWaitTime time.Duration) UpdaterOptions { + return func(updater *BatchUpdater) { + updater.maxWaitTime = maxWaitTime + } +} + type BatchUpdater struct { apiClient *cqapi.ClientWithResponses @@ -50,6 +68,8 @@ type BatchUpdater struct { batchLimit uint32 tickerDuration int + maxRetries int + maxWaitTime time.Duration rowsToUpdate atomic.Uint32 triggerUpdate chan struct{} done chan struct{} @@ -68,6 +88,8 @@ func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, t batchLimit: defaultBatchLimit, tickerDuration: defaultDurationMS, + maxRetries: defaultMaxRetries, + maxWaitTime: defaultMaxWaitTime, triggerUpdate: make(chan struct{}), done: make(chan struct{}), wg: &sync.WaitGroup{}, @@ -176,18 +198,34 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { } func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numberToUpdate uint32) error { - resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{ - RequestId: uuid.New(), - PluginTeam: u.pluginTeam, - PluginKind: cqapi.PluginKind(u.pluginKind), - PluginName: u.pluginName, - Rows: int(numberToUpdate), - }) - if err != nil { - return fmt.Errorf("failed to update usage: %w", err) - } - if resp.StatusCode() != http.StatusOK { - return fmt.Errorf("failed to update usage: %s", resp.Status()) + var retryDelay time.Duration + + for retry := 0; retry < u.maxRetries; retry++ { + startTime := time.Now() + + resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{ + RequestId: uuid.New(), + PluginTeam: u.pluginTeam, + PluginKind: cqapi.PluginKind(u.pluginKind), + PluginName: u.pluginName, + Rows: int(numberToUpdate), + }) + if err != nil { + return fmt.Errorf("failed to update usage: %w", err) + } + if resp.StatusCode() == http.StatusOK { + return nil + } + + retryDelay = time.Duration(1< u.maxWaitTime { + retryDelay = u.maxWaitTime + } + + sleepDuration := retryDelay - time.Since(startTime) + if sleepDuration > 0 { + time.Sleep(sleepDuration) + } } - return nil + return fmt.Errorf("failed to update usage: max retries exceeded") } From d4b85a34428dcf46f10b886c847725ac7aea121a Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Fri, 20 Oct 2023 16:20:05 +0100 Subject: [PATCH 03/14] Replace waitGroup with final close error --- premium/usage.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 2cef852390..60aa6df514 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -7,7 +7,6 @@ import ( "github.com/google/uuid" "github.com/rs/zerolog/log" "net/http" - "sync" "sync/atomic" "time" ) @@ -73,7 +72,7 @@ type BatchUpdater struct { rowsToUpdate atomic.Uint32 triggerUpdate chan struct{} done chan struct{} - wg *sync.WaitGroup + closeError chan error isClosed bool } @@ -92,7 +91,7 @@ func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, t maxWaitTime: defaultMaxWaitTime, triggerUpdate: make(chan struct{}), done: make(chan struct{}), - wg: &sync.WaitGroup{}, + closeError: make(chan error), } for _, op := range ops { op(u) @@ -139,20 +138,17 @@ func (u *BatchUpdater) Close(_ context.Context) error { u.isClosed = true close(u.done) - u.wg.Wait() - return nil + return <-u.closeError } func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { started := make(chan struct{}) - u.wg.Add(1) duration := time.Duration(u.tickerDuration) * time.Millisecond ticker := time.NewTicker(duration) go func() { - defer u.wg.Done() started <- struct{}{} for { select { @@ -176,7 +172,7 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { } if err := u.updateUsageWithRetryAndBackoff(ctx, rowsToUpdate); err != nil { log.Error().Err(err).Msg("failed to update usage") - // TODO: what to do with an update error + // TODO: what to do with a timer error continue } u.rowsToUpdate.Add(-rowsToUpdate) @@ -185,11 +181,13 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { if remainingRows != 0 { log.Info().Msgf("updating usage: %d", remainingRows) if err := u.updateUsageWithRetryAndBackoff(ctx, remainingRows); err != nil { - log.Error().Err(err).Msg("failed to update usage") + u.closeError <- err + return } u.rowsToUpdate.Add(-remainingRows) } log.Info().Msg("background updater exiting") + u.closeError <- nil return } } From 22de9a3f65076d13ff57223dc37dd708b9d8356f Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 12:40:36 +0100 Subject: [PATCH 04/14] Add support for `retry-after` header --- premium/usage.go | 36 ++++++++---- premium/usage_test.go | 125 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 10 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 60aa6df514..26f95257b2 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -196,10 +196,8 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { } func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numberToUpdate uint32) error { - var retryDelay time.Duration - for retry := 0; retry < u.maxRetries; retry++ { - startTime := time.Now() + queryStartTime := time.Now() resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{ RequestId: uuid.New(), @@ -215,15 +213,33 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numbe return nil } - retryDelay = time.Duration(1< u.maxWaitTime { - retryDelay = u.maxWaitTime + retryDuration, err := u.calculateRetryDuration(resp.StatusCode(), resp.HTTPResponse.Header, queryStartTime, retry) + if err != nil { + return fmt.Errorf("failed to calculate retry duration: %w", err) } - - sleepDuration := retryDelay - time.Since(startTime) - if sleepDuration > 0 { - time.Sleep(sleepDuration) + if retryDuration > 0 { + time.Sleep(retryDuration) } } return fmt.Errorf("failed to update usage: max retries exceeded") } + +// calculateRetryDuration calculates the duration to sleep relative to the query start time before retrying an update +func (u *BatchUpdater) calculateRetryDuration(statusCode int, headers http.Header, queryStartTime time.Time, retry int) (time.Duration, error) { + if statusCode == http.StatusTooManyRequests { + retryAfter := headers.Get("Retry-After") + if retryAfter != "" { + retryDelay, err := time.ParseDuration(retryAfter + "s") + if err != nil { + return 0, fmt.Errorf("failed to parse retry-after header: %w", err) + } + if retryDelay > u.maxWaitTime { + return 0, fmt.Errorf("retry-after header exceeds max wait time: %s > %s", retryDelay, u.maxWaitTime) + } + return retryDelay, nil + } + } + + retryDelay := time.Duration(1< 30s"), + }, + } + + for _, tt := range tests { + usageClient := NewUsageClient(context.Background(), nil, "myteam", "mnorbury-team", "source", "vault") + if tt.ops != nil { + tt.ops(usageClient) + } + t.Run(tt.name, func(t *testing.T) { + retryDuration, err := usageClient.calculateRetryDuration(tt.statusCode, tt.headers, time.Now(), tt.retry) + if tt.wantErr == nil { + require.NoError(t, err) + } else { + assert.Contains(t, err.Error(), tt.wantErr.Error()) + } + + assert.InDeltaf(t, tt.expectedSeconds, retryDuration.Seconds(), 0.1, "retry duration should be %d seconds", tt.expectedSeconds) + }) + } +} + func createTestServerWithRemainingRows(t *testing.T, remainingRows int) *testStage { stage := testStage{ remainingRows: remainingRows, From 8e4964f81e81925c6c971508b30b2339cfcf051b Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 12:52:58 +0100 Subject: [PATCH 05/14] Remove log messages --- premium/usage.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 26f95257b2..f0ece5e8bb 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -158,10 +158,8 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { // Not enough rows to update continue } - log.Info().Msgf("updating usage: %d", rowsToUpdate) if err := u.updateUsageWithRetryAndBackoff(ctx, rowsToUpdate); err != nil { - log.Error().Err(err).Msg("failed to update usage") - // TODO: what to do with an update error + log.Warn().Err(err).Msg("failed to update usage") continue } u.rowsToUpdate.Add(-rowsToUpdate) @@ -171,22 +169,19 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { continue } if err := u.updateUsageWithRetryAndBackoff(ctx, rowsToUpdate); err != nil { - log.Error().Err(err).Msg("failed to update usage") - // TODO: what to do with a timer error + log.Warn().Err(err).Msg("failed to update usage") continue } u.rowsToUpdate.Add(-rowsToUpdate) case <-u.done: remainingRows := u.rowsToUpdate.Load() if remainingRows != 0 { - log.Info().Msgf("updating usage: %d", remainingRows) if err := u.updateUsageWithRetryAndBackoff(ctx, remainingRows); err != nil { u.closeError <- err return } u.rowsToUpdate.Add(-remainingRows) } - log.Info().Msg("background updater exiting") u.closeError <- nil return } From ba03f2d71e1145df79cce8a5173b16b9ab37c925 Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 13:35:20 +0100 Subject: [PATCH 06/14] Replace with duration --- premium/usage.go | 51 +++++++++++++++++++++---------------------- premium/usage_test.go | 4 ++-- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index f0ece5e8bb..4013b2dd39 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -12,10 +12,10 @@ import ( ) const ( - defaultBatchLimit = 1000 - defaultDurationMS = 10000 - defaultMaxRetries = 5 - defaultMaxWaitTime = 60 * time.Second + defaultBatchLimit = 1000 + defaultFlushDuration = 10 * time.Second + defaultMaxRetries = 5 + defaultMaxWaitTime = 60 * time.Second ) type UsageClient interface { @@ -36,10 +36,10 @@ func WithBatchLimit(batchLimit uint32) UpdaterOptions { } } -// WithTickerDuration sets the duration between updates if the number of rows to update have not reached the batch limit -func WithTickerDuration(durationms int) UpdaterOptions { +// WithFlushEvery sets the flush duration - the time at which an update will be triggered even if the batch limit is not reached +func WithFlushEvery(flushDuration time.Duration) UpdaterOptions { return func(updater *BatchUpdater) { - updater.tickerDuration = durationms + updater.flushDuration = flushDuration } } @@ -65,15 +65,15 @@ type BatchUpdater struct { pluginKind string pluginName string - batchLimit uint32 - tickerDuration int - maxRetries int - maxWaitTime time.Duration - rowsToUpdate atomic.Uint32 - triggerUpdate chan struct{} - done chan struct{} - closeError chan error - isClosed bool + batchLimit uint32 + flushDuration time.Duration + maxRetries int + maxWaitTime time.Duration + rowsToUpdate atomic.Uint32 + triggerUpdate chan struct{} + done chan struct{} + closeError chan error + isClosed bool } func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, teamName, pluginTeam, pluginKind, pluginName string, ops ...UpdaterOptions) *BatchUpdater { @@ -85,13 +85,13 @@ func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, t pluginKind: pluginKind, pluginName: pluginName, - batchLimit: defaultBatchLimit, - tickerDuration: defaultDurationMS, - maxRetries: defaultMaxRetries, - maxWaitTime: defaultMaxWaitTime, - triggerUpdate: make(chan struct{}), - done: make(chan struct{}), - closeError: make(chan error), + batchLimit: defaultBatchLimit, + flushDuration: defaultFlushDuration, + maxRetries: defaultMaxRetries, + maxWaitTime: defaultMaxWaitTime, + triggerUpdate: make(chan struct{}), + done: make(chan struct{}), + closeError: make(chan error), } for _, op := range ops { op(u) @@ -145,8 +145,7 @@ func (u *BatchUpdater) Close(_ context.Context) error { func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { started := make(chan struct{}) - duration := time.Duration(u.tickerDuration) * time.Millisecond - ticker := time.NewTicker(duration) + flushDuration := time.NewTicker(u.flushDuration) go func() { started <- struct{}{} @@ -163,7 +162,7 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { continue } u.rowsToUpdate.Add(-rowsToUpdate) - case <-ticker.C: + case <-flushDuration.C: rowsToUpdate := u.rowsToUpdate.Load() if rowsToUpdate == 0 { continue diff --git a/premium/usage_test.go b/premium/usage_test.go index 05504968ed..6b04dc932d 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -94,7 +94,7 @@ func TestUsageService_WithBatchSize(t *testing.T) { assert.True(t, true, s.minExcludingClose() > batchSize, "minimum should be greater than batch size") } -func TestUsageService_WithTicker(t *testing.T) { +func TestUsageService_WithFlushDuration(t *testing.T) { ctx := context.Background() batchSize := 2000 @@ -104,7 +104,7 @@ func TestUsageService_WithTicker(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(uint32(batchSize)), WithTickerDuration(1)) + usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(uint32(batchSize)), WithFlushEvery(1*time.Millisecond)) for i := 0; i < 10; i++ { err = usageClient.Increase(ctx, 10) From 59e4ab28f23cc93ab2acc49bdb2024d3c969dab2 Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 14:09:09 +0100 Subject: [PATCH 07/14] Add a minimum update check - we don't want to hit the API too much, so allow a minimum update interval to be set also --- premium/usage.go | 65 ++++++++++++++++++++++++++++++------------- premium/usage_test.go | 22 +++++++++++++++ 2 files changed, 67 insertions(+), 20 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 4013b2dd39..7d1fd3a461 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -12,10 +12,11 @@ import ( ) const ( - defaultBatchLimit = 1000 - defaultFlushDuration = 10 * time.Second - defaultMaxRetries = 5 - defaultMaxWaitTime = 60 * time.Second + defaultBatchLimit = 1000 + defaultMaxRetries = 5 + defaultMaxWaitTime = 60 * time.Second + defaultMinimumUpdateDuration = 10 * time.Second + defaultFlushDuration = 30 * time.Second ) type UsageClient interface { @@ -43,6 +44,13 @@ func WithFlushEvery(flushDuration time.Duration) UpdaterOptions { } } +// WithMinimumUpdateDuration sets the minimum time between updates +func WithMinimumUpdateDuration(minimumUpdateDuration time.Duration) UpdaterOptions { + return func(updater *BatchUpdater) { + updater.minimumUpdateDuration = minimumUpdateDuration + } +} + // WithMaxRetries sets the maximum number of retries to update the usage in case of an API error func WithMaxRetries(maxRetries int) UpdaterOptions { return func(updater *BatchUpdater) { @@ -60,20 +68,26 @@ func WithMaxWaitTime(maxWaitTime time.Duration) UpdaterOptions { type BatchUpdater struct { apiClient *cqapi.ClientWithResponses + // Plugin details teamName string pluginTeam string pluginKind string pluginName string - batchLimit uint32 - flushDuration time.Duration - maxRetries int - maxWaitTime time.Duration - rowsToUpdate atomic.Uint32 - triggerUpdate chan struct{} - done chan struct{} - closeError chan error - isClosed bool + // Configuration + batchLimit uint32 + maxRetries int + maxWaitTime time.Duration + minimumUpdateDuration time.Duration + flushDuration time.Duration + + // State + lastUpdateTime time.Time + rowsToUpdate atomic.Uint32 + triggerUpdate chan struct{} + done chan struct{} + closeError chan error + isClosed bool } func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, teamName, pluginTeam, pluginKind, pluginName string, ops ...UpdaterOptions) *BatchUpdater { @@ -85,13 +99,14 @@ func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, t pluginKind: pluginKind, pluginName: pluginName, - batchLimit: defaultBatchLimit, - flushDuration: defaultFlushDuration, - maxRetries: defaultMaxRetries, - maxWaitTime: defaultMaxWaitTime, - triggerUpdate: make(chan struct{}), - done: make(chan struct{}), - closeError: make(chan error), + batchLimit: defaultBatchLimit, + minimumUpdateDuration: defaultMinimumUpdateDuration, + flushDuration: defaultFlushDuration, + maxRetries: defaultMaxRetries, + maxWaitTime: defaultMaxWaitTime, + triggerUpdate: make(chan struct{}), + done: make(chan struct{}), + closeError: make(chan error), } for _, op := range ops { op(u) @@ -152,6 +167,11 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { for { select { case <-u.triggerUpdate: + if time.Since(u.lastUpdateTime) < u.minimumUpdateDuration { + // Not enough time since last update + continue + } + rowsToUpdate := u.rowsToUpdate.Load() if rowsToUpdate < u.batchLimit { // Not enough rows to update @@ -163,6 +183,10 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { } u.rowsToUpdate.Add(-rowsToUpdate) case <-flushDuration.C: + if time.Since(u.lastUpdateTime) < u.minimumUpdateDuration { + // Not enough time since last update + continue + } rowsToUpdate := u.rowsToUpdate.Load() if rowsToUpdate == 0 { continue @@ -204,6 +228,7 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numbe return fmt.Errorf("failed to update usage: %w", err) } if resp.StatusCode() == http.StatusOK { + u.lastUpdateTime = time.Now().UTC() return nil } diff --git a/premium/usage_test.go b/premium/usage_test.go index 6b04dc932d..47b0afc44d 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -118,6 +118,28 @@ func TestUsageService_WithFlushDuration(t *testing.T) { assert.True(t, s.minExcludingClose() < batchSize, "we should see updates less than batchsize if ticker is firing") } +func TestUsageService_WithMinimumUpdateDuration(t *testing.T) { + ctx := context.Background() + + s := createTestServer(t) + defer s.server.Close() + + apiClient, err := cqapi.NewClientWithResponses(s.server.URL) + require.NoError(t, err) + + usageClient := newClient(ctx, apiClient, WithBatchLimit(0), WithMinimumUpdateDuration(30*time.Second)) + + for i := 0; i < 10000; i++ { + err = usageClient.Increase(ctx, 1) + require.NoError(t, err) + } + err = usageClient.Close(ctx) + require.NoError(t, err) + + assert.Equal(t, 10000, s.sumOfUpdates(), "total should equal number of updated rows") + assert.Equal(t, 2, s.numberOfUpdates(), "should only update first time and on close if minimum update duration is set") +} + func TestUsageService_NoUpdates(t *testing.T) { ctx := context.Background() From c2f5be47987ff2db8038fd6032f4d8f1e7caff28 Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 14:09:57 +0100 Subject: [PATCH 08/14] Tidy up tests --- premium/usage_test.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/premium/usage_test.go b/premium/usage_test.go index 47b0afc44d..9394933d89 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -24,7 +24,7 @@ func TestUsageService_HasQuota_NoRowsRemaining(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(0)) hasQuota, err := usageClient.HasQuota(ctx) require.NoError(t, err) @@ -41,7 +41,7 @@ func TestUsageService_HasQuota_WithRowsRemaining(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(0)) hasQuota, err := usageClient.HasQuota(ctx) require.NoError(t, err) @@ -58,7 +58,7 @@ func TestUsageService_ZeroBatchSize(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(0)) for i := 0; i < 10000; i++ { err = usageClient.Increase(ctx, 1) @@ -81,7 +81,7 @@ func TestUsageService_WithBatchSize(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(uint32(batchSize))) + usageClient := newClient(ctx, apiClient, WithBatchLimit(uint32(batchSize))) for i := 0; i < 10000; i++ { err = usageClient.Increase(ctx, 1) @@ -104,7 +104,7 @@ func TestUsageService_WithFlushDuration(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(uint32(batchSize)), WithFlushEvery(1*time.Millisecond)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(uint32(batchSize)), WithFlushEvery(1*time.Millisecond), WithMinimumUpdateDuration(0*time.Millisecond)) for i := 0; i < 10; i++ { err = usageClient.Increase(ctx, 10) @@ -149,7 +149,7 @@ func TestUsageService_NoUpdates(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(0)) err = usageClient.Close(ctx) require.NoError(t, err) @@ -166,7 +166,7 @@ func TestUsageService_UpdatesWithZeroRows(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(0)) err = usageClient.Increase(ctx, 0) require.Error(t, err, "should not be able to update with zero rows") @@ -186,7 +186,7 @@ func TestUsageService_ShouldNotUpdateClosedService(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", WithBatchLimit(0)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(0)) // Close the service first err = usageClient.Close(ctx) @@ -248,7 +248,7 @@ func TestUsageService_CalculateRetryDuration_Exp(t *testing.T) { } for _, tt := range tests { - usageClient := NewUsageClient(context.Background(), nil, "myteam", "mnorbury-team", "source", "vault") + usageClient := newClient(context.Background(), nil) if tt.ops != nil { tt.ops(usageClient) } @@ -305,7 +305,7 @@ func TestUsageService_CalculateRetryDuration_ServerBackPressure(t *testing.T) { } for _, tt := range tests { - usageClient := NewUsageClient(context.Background(), nil, "myteam", "mnorbury-team", "source", "vault") + usageClient := newClient(context.Background(), nil) if tt.ops != nil { tt.ops(usageClient) } @@ -322,6 +322,10 @@ func TestUsageService_CalculateRetryDuration_ServerBackPressure(t *testing.T) { } } +func newClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, ops ...UpdaterOptions) *BatchUpdater { + return NewUsageClient(ctx, apiClient, "myteam", "mnorbury-team", "source", "vault", ops...) +} + func createTestServerWithRemainingRows(t *testing.T, remainingRows int) *testStage { stage := testStage{ remainingRows: remainingRows, From 0184ea72049bb481ae93244845e6cc1f5ac01b4d Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 15:02:50 +0100 Subject: [PATCH 09/14] Clean up min/max time between flushes naming --- premium/usage.go | 30 +++++++++++++++--------------- premium/usage_test.go | 4 ++-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 7d1fd3a461..454cd13eb0 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -15,8 +15,8 @@ const ( defaultBatchLimit = 1000 defaultMaxRetries = 5 defaultMaxWaitTime = 60 * time.Second - defaultMinimumUpdateDuration = 10 * time.Second - defaultFlushDuration = 30 * time.Second + defaultMinTimeBetweenFlushes = 10 * time.Second + defaultMaxTimeBetweenFlushes = 30 * time.Second ) type UsageClient interface { @@ -37,17 +37,17 @@ func WithBatchLimit(batchLimit uint32) UpdaterOptions { } } -// WithFlushEvery sets the flush duration - the time at which an update will be triggered even if the batch limit is not reached -func WithFlushEvery(flushDuration time.Duration) UpdaterOptions { +// WithMaxTimeBetweenFlushes sets the flush duration - the time at which an update will be triggered even if the batch limit is not reached +func WithMaxTimeBetweenFlushes(maxTimeBetweenFlushes time.Duration) UpdaterOptions { return func(updater *BatchUpdater) { - updater.flushDuration = flushDuration + updater.maxTimeBetweenFlushes = maxTimeBetweenFlushes } } -// WithMinimumUpdateDuration sets the minimum time between updates -func WithMinimumUpdateDuration(minimumUpdateDuration time.Duration) UpdaterOptions { +// WithMinTimeBetweenFlushes sets the minimum time between updates +func WithMinTimeBetweenFlushes(minTimeBetweenFlushes time.Duration) UpdaterOptions { return func(updater *BatchUpdater) { - updater.minimumUpdateDuration = minimumUpdateDuration + updater.minTimeBetweenFlushes = minTimeBetweenFlushes } } @@ -78,8 +78,8 @@ type BatchUpdater struct { batchLimit uint32 maxRetries int maxWaitTime time.Duration - minimumUpdateDuration time.Duration - flushDuration time.Duration + minTimeBetweenFlushes time.Duration + maxTimeBetweenFlushes time.Duration // State lastUpdateTime time.Time @@ -100,8 +100,8 @@ func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, t pluginName: pluginName, batchLimit: defaultBatchLimit, - minimumUpdateDuration: defaultMinimumUpdateDuration, - flushDuration: defaultFlushDuration, + minTimeBetweenFlushes: defaultMinTimeBetweenFlushes, + maxTimeBetweenFlushes: defaultMaxTimeBetweenFlushes, maxRetries: defaultMaxRetries, maxWaitTime: defaultMaxWaitTime, triggerUpdate: make(chan struct{}), @@ -160,14 +160,14 @@ func (u *BatchUpdater) Close(_ context.Context) error { func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { started := make(chan struct{}) - flushDuration := time.NewTicker(u.flushDuration) + flushDuration := time.NewTicker(u.maxTimeBetweenFlushes) go func() { started <- struct{}{} for { select { case <-u.triggerUpdate: - if time.Since(u.lastUpdateTime) < u.minimumUpdateDuration { + if time.Since(u.lastUpdateTime) < u.minTimeBetweenFlushes { // Not enough time since last update continue } @@ -183,7 +183,7 @@ func (u *BatchUpdater) backgroundUpdater(ctx context.Context) { } u.rowsToUpdate.Add(-rowsToUpdate) case <-flushDuration.C: - if time.Since(u.lastUpdateTime) < u.minimumUpdateDuration { + if time.Since(u.lastUpdateTime) < u.minTimeBetweenFlushes { // Not enough time since last update continue } diff --git a/premium/usage_test.go b/premium/usage_test.go index 9394933d89..ac6f72fa45 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -104,7 +104,7 @@ func TestUsageService_WithFlushDuration(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := newClient(ctx, apiClient, WithBatchLimit(uint32(batchSize)), WithFlushEvery(1*time.Millisecond), WithMinimumUpdateDuration(0*time.Millisecond)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(uint32(batchSize)), WithMaxTimeBetweenFlushes(1*time.Millisecond), WithMinTimeBetweenFlushes(0*time.Millisecond)) for i := 0; i < 10; i++ { err = usageClient.Increase(ctx, 10) @@ -127,7 +127,7 @@ func TestUsageService_WithMinimumUpdateDuration(t *testing.T) { apiClient, err := cqapi.NewClientWithResponses(s.server.URL) require.NoError(t, err) - usageClient := newClient(ctx, apiClient, WithBatchLimit(0), WithMinimumUpdateDuration(30*time.Second)) + usageClient := newClient(ctx, apiClient, WithBatchLimit(0), WithMinTimeBetweenFlushes(30*time.Second)) for i := 0; i < 10000; i++ { err = usageClient.Increase(ctx, 1) From 8375810e984c63fbe1c75326f34ce839fd3323f3 Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 15:07:00 +0100 Subject: [PATCH 10/14] Do not error if server wait time exceeds max wait time In this case we want to respect the wait time the server has suggested, so just wait. --- premium/usage.go | 3 --- premium/usage_test.go | 11 ----------- 2 files changed, 14 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 454cd13eb0..5aea6a0b9d 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -252,9 +252,6 @@ func (u *BatchUpdater) calculateRetryDuration(statusCode int, headers http.Heade if err != nil { return 0, fmt.Errorf("failed to parse retry-after header: %w", err) } - if retryDelay > u.maxWaitTime { - return 0, fmt.Errorf("retry-after header exceeds max wait time: %s > %s", retryDelay, u.maxWaitTime) - } return retryDelay, nil } } diff --git a/premium/usage_test.go b/premium/usage_test.go index ac6f72fa45..18f59f0d54 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -3,7 +3,6 @@ package premium import ( "context" "encoding/json" - "errors" "fmt" cqapi "github.com/cloudquery/cloudquery-api-go" "github.com/stretchr/testify/assert" @@ -292,16 +291,6 @@ func TestUsageService_CalculateRetryDuration_ServerBackPressure(t *testing.T) { retry: 0, expectedSeconds: 5, }, - { - name: "should raise an error if the server wants us to wait longer than max wait time", - statusCode: http.StatusTooManyRequests, - headers: http.Header{"Retry-After": []string{"40"}}, - retry: 0, - ops: func(client *BatchUpdater) { - client.maxWaitTime = 30 * time.Second - }, - wantErr: errors.New("retry-after header exceeds max wait time: 40s > 30s"), - }, } for _, tt := range tests { From c54c81c98a4c0fe14be9686eb3f968222cab9fa3 Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 15:09:45 +0100 Subject: [PATCH 11/14] We are happy with any 2xx response going forward --- premium/usage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/premium/usage.go b/premium/usage.go index 5aea6a0b9d..510ccc2f1c 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -227,7 +227,7 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numbe if err != nil { return fmt.Errorf("failed to update usage: %w", err) } - if resp.StatusCode() == http.StatusOK { + if resp.StatusCode() >= 200 && resp.StatusCode() < 300 { u.lastUpdateTime = time.Now().UTC() return nil } From ba46c4a5af82a74cafe6e3d39bb4c0e4218f7d07 Mon Sep 17 00:00:00 2001 From: Martin Norbury Date: Tue, 24 Oct 2023 15:20:06 +0100 Subject: [PATCH 12/14] Only retry on `429` and `503` status codes --- premium/usage.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/premium/usage.go b/premium/usage.go index 510ccc2f1c..4ede0e7705 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -245,7 +245,7 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numbe // calculateRetryDuration calculates the duration to sleep relative to the query start time before retrying an update func (u *BatchUpdater) calculateRetryDuration(statusCode int, headers http.Header, queryStartTime time.Time, retry int) (time.Duration, error) { - if statusCode == http.StatusTooManyRequests { + if retryableStatusCode(statusCode) { retryAfter := headers.Get("Retry-After") if retryAfter != "" { retryDelay, err := time.ParseDuration(retryAfter + "s") @@ -259,3 +259,7 @@ func (u *BatchUpdater) calculateRetryDuration(statusCode int, headers http.Heade retryDelay := time.Duration(1< Date: Tue, 24 Oct 2023 15:24:39 +0100 Subject: [PATCH 13/14] Add some jitter to retry delay --- premium/usage.go | 7 +++++-- premium/usage_test.go | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index 4ede0e7705..bae5a2e635 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -6,6 +6,7 @@ import ( cqapi "github.com/cloudquery/cloudquery-api-go" "github.com/google/uuid" "github.com/rs/zerolog/log" + "math/rand" "net/http" "sync/atomic" "time" @@ -256,8 +257,10 @@ func (u *BatchUpdater) calculateRetryDuration(statusCode int, headers http.Heade } } - retryDelay := time.Duration(1< Date: Tue, 24 Oct 2023 16:02:36 +0100 Subject: [PATCH 14/14] Only calculate retry duration on retryable errors --- premium/usage.go | 20 ++++++++++++-------- premium/usage_test.go | 14 +++++++------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/premium/usage.go b/premium/usage.go index bae5a2e635..e8016949aa 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -246,17 +246,21 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numbe // calculateRetryDuration calculates the duration to sleep relative to the query start time before retrying an update func (u *BatchUpdater) calculateRetryDuration(statusCode int, headers http.Header, queryStartTime time.Time, retry int) (time.Duration, error) { - if retryableStatusCode(statusCode) { - retryAfter := headers.Get("Retry-After") - if retryAfter != "" { - retryDelay, err := time.ParseDuration(retryAfter + "s") - if err != nil { - return 0, fmt.Errorf("failed to parse retry-after header: %w", err) - } - return retryDelay, nil + if !retryableStatusCode(statusCode) { + return 0, fmt.Errorf("non-retryable status code: %d", statusCode) + } + + // Check if we have a retry-after header + retryAfter := headers.Get("Retry-After") + if retryAfter != "" { + retryDelay, err := time.ParseDuration(retryAfter + "s") + if err != nil { + return 0, fmt.Errorf("failed to parse retry-after header: %w", err) } + return retryDelay, nil } + // Calculate exponential backoff baseRetry := min(time.Duration(1<