Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decoupled blocks deletion from compaction #2623

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -34,6 +34,11 @@
* [ENHANCEMENT] Thanos and Prometheus upgraded to [806479182a6b](https://github.com/thanos-io/thanos/commit/806479182a6b) and [cd73b3d33e06](https://github.com/prometheus/prometheus/commit/cd73b3d33e06) respectively. #2604
* TSDB now supports isolation of append and queries.
* TSDB now holds less WAL files after Head Truncation.
* [ENHANCEMENT] Experimental TSDB: decoupled blocks deletion from blocks compaction in the compactor, so that blocks deletion is not blocked by a busy compactor. The following metrics have been added: #2623
* `cortex_compactor_block_cleanup_started_total`
* `cortex_compactor_block_cleanup_completed_total`
* `cortex_compactor_block_cleanup_failed_total`
* `cortex_compactor_block_cleanup_last_successful_run_timestamp_seconds`
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
Expand Down
178 changes: 178 additions & 0 deletions pkg/compactor/blocks_cleaner.go
@@ -0,0 +1,178 @@
package compactor

import (
"context"
"path"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

type BlocksCleanerConfig struct {
DataDir string
MetaSyncConcurrency int
DeletionDelay time.Duration
CleanupInterval time.Duration
}

type BlocksCleaner struct {
services.Service

cfg BlocksCleanerConfig
logger log.Logger
bucketClient objstore.Bucket
usersScanner *UsersScanner

// Metrics.
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *UsersScanner, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
logger: log.With(logger, "component", "cleaner"),
runsStarted: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_started_total",
Help: "Total number of blocks cleanup runs started.",
}),
runsCompleted: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_completed_total",
Help: "Total number of blocks cleanup runs successfully completed.",
}),
runsFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_failed_total",
Help: "Total number of blocks cleanup runs failed.",
}),
runsLastSuccess: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_compactor_block_cleanup_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful blocks cleanup run.",
}),
blocksCleanedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_blocks_cleaned_total",
Help: "Total number of blocks deleted.",
}),
blocksFailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_cleanup_failures_total",
Help: "Total number of blocks failed to be deleted.",
}),
}

c.Service = services.NewTimerService(cfg.CleanupInterval, c.starting, c.ticker, nil)

return c
}

func (c *BlocksCleaner) starting(ctx context.Context) error {
// Run a cleanup so that any other service depending on this service
// is guaranteed to start once the initial cleanup has been done.
c.runCleanup(ctx)

return nil
}

func (c *BlocksCleaner) ticker(ctx context.Context) error {
c.runCleanup(ctx)

return nil
}

func (c *BlocksCleaner) runCleanup(ctx context.Context) {
level.Info(c.logger).Log("msg", "started hard deletion of blocks marked for deletion")
c.runsStarted.Inc()

if err := c.cleanUsers(ctx); err == nil {
level.Info(c.logger).Log("msg", "successfully completed hard deletion of blocks marked for deletion")
c.runsCompleted.Inc()
c.runsLastSuccess.SetToCurrentTime()
} else if errors.Is(err, context.Canceled) {
level.Info(c.logger).Log("msg", "canceled hard deletion of blocks marked for deletion", "err", err)
return
} else {
level.Error(c.logger).Log("msg", "failed to hard delete blocks marked for deletion", "err", err.Error())
c.runsFailed.Inc()
}
}

func (c *BlocksCleaner) cleanUsers(ctx context.Context) error {
users, err := c.usersScanner.ScanUsers(ctx)
if err != nil {
return errors.Wrap(err, "failed to discover users from bucket")
}

errs := tsdb_errors.MultiError{}
for _, userID := range users {
// Ensure the context has not been canceled (ie. shutdown has been triggered).
if ctx.Err() != nil {
return ctx.Err()
}

if err = c.cleanUser(ctx, userID); err != nil {
errs.Add(errors.Wrapf(err, "failed to delete user blocks (user: %s)", userID))
continue
}
}

return errs.Err()
}

func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error {
userLogger := util.WithUserID(userID, c.logger)
userBucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(userLogger, userBucket, c.cfg.DeletionDelay)

fetcher, err := block.NewMetaFetcher(
userLogger,
c.cfg.MetaSyncConcurrency,
userBucket,
// The fetcher stores cached metas in the "meta-syncer/" sub directory,
// but we prefix it in order to guarantee no clashing with the compactor.
path.Join(c.cfg.DataDir, "blocks-cleaner-meta-"+userID),
// No metrics.
nil,
[]block.MetadataFilter{ignoreDeletionMarkFilter},
nil,
)
if err != nil {
return errors.Wrap(err, "error creating metadata fetcher")
}

// Runs a bucket scan to get a fresh list of all blocks and populate
// the list of deleted blocks in filter.
if _, _, err = fetcher.Fetch(ctx); err != nil {
return errors.Wrap(err, "error fetching metadata")
}

cleaner := compact.NewBlocksCleaner(
userLogger,
userBucket,
ignoreDeletionMarkFilter,
c.cfg.DeletionDelay,
c.blocksCleanedTotal,
c.blocksFailedTotal)

if err := cleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

return nil
}
80 changes: 80 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
@@ -0,0 +1,80 @@
package compactor

import (
"context"
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/backend/filesystem"
"github.com/cortexproject/cortex/pkg/util/services"
)

func TestBlocksCleaner(t *testing.T) {
// Create a temporary directory for local storage.
storageDir, err := ioutil.TempDir(os.TempDir(), "storage")
require.NoError(t, err)
defer os.RemoveAll(storageDir) //nolint:errcheck

// Create a temporary directory for cleaner.
dataDir, err := ioutil.TempDir(os.TempDir(), "data")
require.NoError(t, err)
defer os.RemoveAll(dataDir) //nolint:errcheck

// Create blocks.
now := time.Now()
deletionDelay := 12 * time.Hour
block1 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 10, 20, nil)
block2 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 20, 30, nil)
block3 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 30, 40, nil)
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block2, now.Add(-deletionDelay).Add(time.Hour)) // Hasn't reached the deletion threshold yet.
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block3, now.Add(-deletionDelay).Add(-time.Hour)) // Reached the deletion threshold.

// Create a bucket client on the local storage.
bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

cfg := BlocksCleanerConfig{
DataDir: dataDir,
MetaSyncConcurrency: 10,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
}

ctx := context.Background()
logger := log.NewNopLogger()
scanner := NewUsersScanner(bucketClient, func(_ string) (bool, error) { return true, nil }, logger)

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, logger, nil)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

// Check the storage to ensure only the block which has reached the deletion threshold
// has been effectively deleted.
exists, err := bucketClient.Exists(ctx, path.Join("user-1", block1.String(), metadata.MetaFilename))
require.NoError(t, err)
assert.True(t, exists)

exists, err = bucketClient.Exists(ctx, path.Join("user-1", block2.String(), metadata.MetaFilename))
require.NoError(t, err)
assert.True(t, exists)

exists, err = bucketClient.Exists(ctx, path.Join("user-1", block3.String(), metadata.MetaFilename))
require.NoError(t, err)
assert.False(t, exists)

assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted))
assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted))
assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed))
assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.blocksCleanedTotal))
assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.blocksFailedTotal))
}