Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor blocks cleaner: retry operations that could interfere with rewriting bucket index #8071

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777
* [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106
* [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082
* [ENHANCEMENT] Compactor: Make stale bucket indexes less common by retrying block cleanup operations. #8071
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
76 changes: 70 additions & 6 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -46,6 +47,9 @@ type BlocksCleanerConfig struct {
CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs.
}

type readIndexFunc func(context.Context, objstore.Bucket, string, bucket.TenantConfigProvider, log.Logger) (*bucketindex.Index, error)
type writeIndexFunc func(context.Context, objstore.Bucket, string, bucket.TenantConfigProvider, *bucketindex.Index) error

type BlocksCleaner struct {
services.Service

Expand All @@ -55,7 +59,10 @@ type BlocksCleaner struct {
bucketClient objstore.Bucket
usersScanner *mimir_tsdb.UsersScanner
ownUser func(userID string) (bool, error)
readIndex readIndexFunc
writeIndex writeIndexFunc
singleFlight *concurrency.LimitedConcurrencySingleFlight
retryConfig backoff.Config

// Keep track of the last owned users.
lastOwnedUsers []string
Expand All @@ -78,13 +85,22 @@ type BlocksCleaner struct {
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, ownUser func(userID string) (bool, error), cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
retryConfig := backoff.Config{
MinBackoff: 20 * time.Millisecond,
MaxBackoff: 250 * time.Millisecond,
MaxRetries: 3,
}

c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
usersScanner: mimir_tsdb.NewUsersScanner(bucketClient, ownUser, logger),
ownUser: ownUser,
readIndex: bucketindex.ReadIndex,
writeIndex: bucketindex.WriteIndex,
cfgProvider: cfgProvider,
singleFlight: concurrency.NewLimitedConcurrencySingleFlight(cfg.CleanupConcurrency),
retryConfig: retryConfig,
logger: log.With(logger, "component", "cleaner"),
runsStarted: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_started_total",
Expand Down Expand Up @@ -397,6 +413,46 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
return nil
}

// withRetries invokes the given function as many times as it takes according to
// the backoff config. Each invocation of f will be given perCallTimeout to
// complete. This is specifically designed to retry timeouts due to flaky
// connectivity with the objstore backend.
func withRetries(ctx context.Context, perCallTimeout time.Duration, bc backoff.Config, logger log.Logger, f func(context.Context) error) error {
Copy link
Member

@jhalterman jhalterman May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this functional approach towards doing retries. Maybe something like this could go into dskit? Alternatively, for this functional approach to retries, we could use Failsafe-go (which we're using for circuit breaking).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just looking at failsafe-go yesterday, randomly. Whoever wrote that has been around the block a few times. :)

Maybe something like this could go into dskit?

Yeah, I looked to see if it already existed in dskit. There's probably room for something there. For this application, the coupling of per-call timeout contexts and a shouldRetry function that looks for DeadlineExceeded seemed a little single-purpose.

Failsafe's backoff looks nice. But there is a plus to sticking with dskit/backoff as it is so pervasive in this codebase.

I expect to come back into this file to add similar retries inside of UpdateIndex ("sometime") so maybe we can keep this dialogue open/rule of three and all that?

if perCallTimeout <= 0 {
return f(ctx)
}

var err error
b := backoff.New(ctx, bc)

for b.Ongoing() {
rctx, cancel := context.WithTimeout(ctx, perCallTimeout)
err = f(rctx)
cancel()
if err == nil || !shouldRetry(err) {
return err
}
level.Info(logger).Log("msg", "single call failed with error", "err", err)
b.Wait()
}

level.Warn(logger).Log("msg", "retries exhausted")
return fmt.Errorf("failed with retries: %w (last err: %w)", b.Err(), err)
}

func shouldRetry(err error) bool {
var tempErr interface{ Temporary() bool }

switch {
case errors.Is(err, context.DeadlineExceeded):
return true
case errors.As(err, &tempErr):
return tempErr.Temporary()
}

return false
}

func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger log.Logger) (returnErr error) {
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
startTime := time.Now()
Expand All @@ -411,11 +467,17 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
}()

// Read the bucket index.
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, userLogger)

var idx *bucketindex.Index
err := withRetries(ctx, 1*time.Minute, c.retryConfig, log.With(userLogger, "op", "readIndex"), func(ctx context.Context) error {
var err error
idx, err = c.readIndex(ctx, c.bucketClient, userID, c.cfgProvider, userLogger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to just call bucketindex.ReadIndex here, and same for c.writeIndex vs bucketindex.WriteIndex later.

If we want to inject errors from read/write index calls, I'd suggest doing it at bucket level (see ErrorInjectedBucketClient, deceivingUploadBucket or errBucket), instead of introducing indirection in BlocksCleaner.

return err
})
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
return err
return fmt.Errorf("read index: %w", err)
}

level.Info(userLogger).Log("msg", "fetched existing bucket index")
Expand All @@ -435,7 +497,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, userLogger)
idx, partials, err := w.UpdateIndex(ctx, idx)
if err != nil {
return err
return fmt.Errorf("update index: %w", err)
}

c.deleteBlocksMarkedForDeletion(ctx, idx, userBucket, userLogger)
Expand All @@ -459,11 +521,13 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
// Otherwise upload the updated index to the storage.
if c.cfg.NoBlocksFileCleanupEnabled && len(idx.Blocks) == 0 {
if err := c.deleteRemainingData(ctx, userBucket, userID, userLogger); err != nil {
return err
return fmt.Errorf("delete remaining: %w", err)
}
} else {
if err := bucketindex.WriteIndex(ctx, c.bucketClient, userID, c.cfgProvider, idx); err != nil {
return err
if err := withRetries(ctx, 3*time.Minute, c.retryConfig, log.With(userLogger, "op", "writeIndex"), func(ctx context.Context) error {
return c.writeIndex(ctx, c.bucketClient, userID, c.cfgProvider, idx)
}); err != nil {
return fmt.Errorf("write index: %w", err)
}
}

Expand Down
171 changes: 163 additions & 8 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import (
"crypto/rand"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
Expand All @@ -42,11 +45,14 @@ type testBlocksCleanerOptions struct {
concurrency int
tenantDeletionDelay time.Duration
user4FilesExist bool // User 4 has "FinishedTime" in tenant deletion marker set to "1h" ago.
objStoreTimeouts bool
}

func (o testBlocksCleanerOptions) String() string {
return fmt.Sprintf("concurrency=%d, tenant deletion delay=%v",
o.concurrency, o.tenantDeletionDelay)
return fmt.Sprintf(
"concurrency=%d, tenant deletion delay=%v, user 4 files=%v, obj store timeouts=%v",
o.concurrency, o.tenantDeletionDelay, o.user4FilesExist, o.objStoreTimeouts,
)
}

func TestBlocksCleaner(t *testing.T) {
Expand All @@ -56,12 +62,14 @@ func TestBlocksCleaner(t *testing.T) {
{concurrency: 2},
{concurrency: 10},
} {
options := options

t.Run(options.String(), func(t *testing.T) {
t.Parallel()
testBlocksCleanerWithOptions(t, options)
})
for _, objStoreTimeouts := range []bool{false, true} {
options := options
options.objStoreTimeouts = objStoreTimeouts
t.Run(options.String(), func(t *testing.T) {
t.Parallel()
testBlocksCleanerWithOptions(t, options)
})
}
}
}

Expand Down Expand Up @@ -113,6 +121,20 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, logger, reg)
// Don't waste time sleeping in tests.
cleaner.retryConfig.MinBackoff = 0
cleaner.retryConfig.MaxBackoff = 0

var mockIndexLayer *mockIndexLayerWithTimeouts

if options.objStoreTimeouts {
mockIndexLayer = &mockIndexLayerWithTimeouts{
initialTimeouts: 2,
}
cleaner.readIndex = mockIndexLayer.ReadIndex
cleaner.writeIndex = mockIndexLayer.WriteIndex
}

require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -192,6 +214,14 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
}

if options.objStoreTimeouts {
readCalls, readSuccess, writeCalls, writeSuccess := mockIndexLayer.State()
assert.Greater(t, readCalls, 0)
assert.True(t, readSuccess)
assert.Greater(t, writeCalls, 0)
assert.True(t, writeSuccess)
}

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.
# TYPE cortex_bucket_blocks_count gauge
Expand Down Expand Up @@ -1239,6 +1269,89 @@ func TestConvertBucketIndexToMetasForCompactionJobPlanning(t *testing.T) {
}
}

type tmpErr struct{}

func (e *tmpErr) Error() string { return "invalid widget" }
func (e *tmpErr) Temporary() bool { return true }

func TestBucketCleaner_withRetries(t *testing.T) {
rc := backoff.Config{
MinBackoff: 0,
MaxBackoff: 0,
MaxRetries: 3,
}
l := log.NewNopLogger()
t.Run("eventually succeeds on deadline exceeded", func(t *testing.T) {
calls := 0
err := withRetries(context.Background(), 10*time.Hour, rc, l, func(ctx context.Context) error {
calls++
if calls <= 2 {
return context.DeadlineExceeded
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, 3, calls)
})
t.Run("eventually succeeds on temp err", func(t *testing.T) {
calls := 0
err := withRetries(context.Background(), 64000*time.Hour, rc, l, func(ctx context.Context) error {
calls++
if calls <= 2 {
return fmt.Errorf("problem: %w", &tmpErr{})
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, 3, calls)
})
t.Run("exhausts retries", func(t *testing.T) {
calls := 0
err := withRetries(context.Background(), 10*time.Hour, rc, l, func(ctx context.Context) error {
calls++
if calls <= 900 {
return context.DeadlineExceeded
}
return nil
})
assert.Error(t, err)
assert.ErrorContains(t, err, "failed with retries:")
assert.Equal(t, 3, calls)
})
t.Run("no retries attempted", func(t *testing.T) {
calls := 0
err := withRetries(context.Background(), 0, rc, l, func(ctx context.Context) error {
calls++
if calls <= 9 {
return context.DeadlineExceeded
}
return nil
})
assert.Error(t, err)
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Equal(t, 1, calls)
})
t.Run("no retries needed", func(t *testing.T) {
calls := 0
err := withRetries(context.Background(), 94000*time.Hour, rc, l, func(ctx context.Context) error {
calls++
return nil
})
assert.NoError(t, err)
assert.Equal(t, 1, calls)
})
t.Run("doesn't retry things that aren't timeouts", func(t *testing.T) {
calls := 0
err := withRetries(context.Background(), 0, rc, l, func(ctx context.Context) error {
calls++
return io.ErrUnexpectedEOF
})
assert.Error(t, err)
assert.ErrorIs(t, err, io.ErrUnexpectedEOF)
assert.Equal(t, 1, calls)
})
}

type mockBucketFailure struct {
objstore.Bucket

Expand All @@ -1252,6 +1365,48 @@ func (m *mockBucketFailure) Delete(ctx context.Context, name string) error {
return m.Bucket.Delete(ctx, name)
}

// mockIndexLayerWithTimeouts holds a pair of ReadIndex/WriteIndex functions
// that can time-out on initial calls.
type mockIndexLayerWithTimeouts struct {
initialTimeouts int

mu sync.Mutex
readCalls int
readSuccess bool
writeCalls int
writeSuccess bool
}

func (m *mockIndexLayerWithTimeouts) ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) (*bucketindex.Index, error) {
m.mu.Lock()
m.readCalls++
if m.readCalls <= m.initialTimeouts {
m.mu.Unlock()
return nil, context.DeadlineExceeded
}
m.readSuccess = true
m.mu.Unlock()
return bucketindex.ReadIndex(ctx, bkt, userID, cfgProvider, logger)
}

func (m *mockIndexLayerWithTimeouts) WriteIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, idx *bucketindex.Index) error {
m.mu.Lock()
m.writeCalls++
if m.writeCalls <= m.initialTimeouts {
m.mu.Unlock()
return context.DeadlineExceeded
}
m.writeSuccess = true
m.mu.Unlock()
return bucketindex.WriteIndex(ctx, bkt, userID, cfgProvider, idx)
}

func (m *mockIndexLayerWithTimeouts) State() (int, bool, int, bool) {
m.mu.Lock()
defer m.mu.Unlock()
return m.readCalls, m.readSuccess, m.writeCalls, m.writeSuccess
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
splitAndMergeShards map[string]int
Expand Down
Loading