Skip to content

Commit

Permalink
cmd: compact: clean partial / marked blocks concurrently (thanos-io#3115
Browse files Browse the repository at this point in the history
)

* cmd: compact: clean partial / marked blocks concurrently

Clean partially uploaded and blocks marked for deletion concurrently
with the whole compaction/downsampling process. One iteration could
potentially take a few days so it should be nice to periodically clean
unneeded blocks in the background. Without this, there are huge spikes
in block storage usage. The spike's size depends on how long it takes to
complete one iteration.

The implementation of this is simple - factored out the deletion part
into a separate function. It is called at the end of an iteration +
concurrently if `--wait` has been specified. Add a mutex to protect from
concurrent runs. Delete blocks from the deletion mark map so that we
wouldn't try to delete same blocks twice or more.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* *: update changelog, e2e tests

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* cmd: compact: fix according to comments

Remove "error" from the `error` and just directly call the function.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* CHANGELOG: cleanups

Forgot to remove this part while solving conflicts.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* CHANGELOG: update

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* CHANGELOG: clean whitespace

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
Signed-off-by: Chans321 <tsschand@gmail.com>
  • Loading branch information
GiedriusS authored and Chans321 committed Oct 23, 2020
1 parent 3a7f7d1 commit ec2187a
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased

### Added

- [#3259](https://github.com/thanos-io/thanos/pull/3259) Thanos BlockViewer: Added a button in the blockviewer that allows users to download the metadata of a block.
- [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before.
- [#3276](https://github.com/thanos-io/thanos/pull/3276) Query Frontend: Support query splitting and retry for label names, label values and series requests.
- [#3315](https://github.com/thanos-io/thanos/pull/3315) Query Frontend: Support results caching for label names, label values and series requests.
- [#3346](https://github.com/thanos-io/thanos/pull/3346) Ruler UI: Fix a bug preventing the /rules endpoint from loading.
- [#3115](https://github.com/thanos-io/thanos/pull/3115) compact: now deletes partially uploaded and blocks with deletion marks concurrently. It does that at the beginning and then every `--compact.cleanup-interval` time period. By default it is 5 minutes.

### Fixed
- [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.
Expand Down
55 changes: 49 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -301,6 +302,35 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
cleanPartialMarked := func() error {
cleanMtx.Lock()
defer cleanMtx.Unlock()

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "cleaning marked blocks")
}

if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "failed to sync metas", "err", err)
}
return nil
}

// Do it once at the beginning to ensure that it runs at least once before
// the main loop.
if err := sy.SyncMetas(ctx); err != nil {
cancel()
return errors.Wrap(err, "syncing metas")
}
if err := cleanPartialMarked(); err != nil {
cancel()
return errors.Wrap(err, "cleaning partial and marked blocks")
}

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction")
Expand Down Expand Up @@ -345,12 +375,7 @@ func runCompact(
return errors.Wrap(err, "retention failed")
}

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}
return nil
return cleanPartialMarked()
}

g.Add(func() error {
Expand Down Expand Up @@ -421,6 +446,21 @@ func runCompact(

srv.Handle("/", r)

// Periodically remove partial blocks and blocks marked for deletion
// since one iteration potentially could take a long time.
if conf.cleanupBlocksInterval > 0 {
g.Add(func() error {
// Wait the whole period at the beginning because we've executed this on boot.
select {
case <-time.After(conf.cleanupBlocksInterval):
case <-ctx.Done():
}
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), cleanPartialMarked)
}, func(error) {
cancel()
})
}

g.Add(func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval)
_, _, _ = f.Fetch(iterCtx)
Expand Down Expand Up @@ -463,6 +503,7 @@ type compactConfig struct {
disableDownsampling bool
blockSyncConcurrency int
blockViewerSyncBlockInterval time.Duration
cleanupBlocksInterval time.Duration
compactionConcurrency int
deleteDelay model.Duration
dedupReplicaLabels []string
Expand Down Expand Up @@ -512,6 +553,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("20").IntVar(&cc.blockSyncConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval)
cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration.").
Default("5m").DurationVar(&cc.cleanupBlocksInterval)

cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").IntVar(&cc.compactionConcurrency)
Expand Down
6 changes: 6 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ Flags:
Repeat interval for syncing the blocks between
local and remote view for /global Block Viewer
UI.
--compact.cleanup-interval=5m
How often we should clean up partially uploaded
blocks and blocks with deletion mark in the
background when --wait has been enabled. Setting
it to "0s" disables it - the cleaning will only
happen at the end of an iteration.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--delete-delay=48h Time before a block marked for deletion is
Expand Down
18 changes: 12 additions & 6 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,10 @@ func TestCompactWithStoreGateway(t *testing.T) {
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}},
}

t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) {
// No replica label with overlaps should halt compactor. This test is sequential
// because we do not want two Thanos Compact instances deleting the same partially
// uploaded blocks and blocks with deletion marks.
{
c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
Expand All @@ -499,22 +502,25 @@ func TestCompactWithStoreGateway(t *testing.T) {

// We expect no ops.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_compaction_runs_started_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total"))

// However, the blocks have been cleaned because that happens concurrently.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))

// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global"))
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded"))

testutil.Ok(t, s.Stop(c))
})
}

t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) {
// We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction.
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica")
Expand All @@ -524,10 +530,10 @@ func TestCompactWithStoreGateway(t *testing.T) {
// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
Expand Down

0 comments on commit ec2187a

Please sign in to comment.