Skip to content

Commit

Permalink
compact: Make sure sources are removed after compaction. (#258)
Browse files Browse the repository at this point in the history
* compact: Make sure sources are removed after compaction.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Addressed comments.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Mar 27, 2018
1 parent ca9614e commit 94e26c6
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 48 deletions.
4 changes: 2 additions & 2 deletions cmd/thanos/compact.go
Expand Up @@ -30,7 +30,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
cmd := app.Command(name, "continously compacts blocks in an object store bucket")

haltOnError := cmd.Flag("debug.halt-on-error", "halt the process if a critical compaction error is detected").
Hidden().Bool()
Hidden().Default("true").Bool()

httpAddr := cmd.Flag("http-address", "listen host:port for HTTP endpoints").
Default(defaultHTTPAddr).String()
Expand All @@ -44,7 +44,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
s3config := s3.RegisterS3Params(cmd)

syncDelay := cmd.Flag("sync-delay", "Minimum age of blocks before they are being processed.").
Default("2h").Duration()
Default("30m").Duration()

wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()
Expand Down
99 changes: 59 additions & 40 deletions pkg/compact/compact.go
Expand Up @@ -222,6 +222,7 @@ func (c *Syncer) Groups() (res []*Group, err error) {
m.Thanos.Downsample.Resolution,
c.metrics.compactions.WithLabelValues(GroupKey(*m)),
c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)),
c.metrics.garbageCollectedBlocks,
)
if err != nil {
return nil, errors.Wrap(err, "create compaction group")
Expand Down Expand Up @@ -356,14 +357,15 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
// Group captures a set of blocks that have the same origin labels and downsampling resolution.
// Those blocks generally contain the same series and can thus efficiently be compacted.
type Group struct {
logger log.Logger
bkt objstore.Bucket
labels labels.Labels
resolution int64
mtx sync.Mutex
blocks map[ulid.ULID]*block.Meta
compactions prometheus.Counter
compactionFailures prometheus.Counter
logger log.Logger
bkt objstore.Bucket
labels labels.Labels
resolution int64
mtx sync.Mutex
blocks map[ulid.ULID]*block.Meta
compactions prometheus.Counter
compactionFailures prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
}

// newGroup returns a new compaction group.
Expand All @@ -374,18 +376,20 @@ func newGroup(
resolution int64,
compactions prometheus.Counter,
compactionFailures prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
) (*Group, error) {
if logger == nil {
logger = log.NewNopLogger()
}
g := &Group{
logger: logger,
bkt: bkt,
labels: lset,
resolution: resolution,
blocks: map[ulid.ULID]*block.Meta{},
compactions: compactions,
compactionFailures: compactionFailures,
logger: logger,
bkt: bkt,
labels: lset,
resolution: resolution,
blocks: map[ulid.ULID]*block.Meta{},
compactions: compactions,
compactionFailures: compactionFailures,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
}
return g, nil
}
Expand Down Expand Up @@ -469,7 +473,7 @@ func IsHaltError(err error) bool {
return ok1 || ok2
}

func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (id ulid.ULID, err error) {
func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (compID ulid.ULID, err error) {
cg.mtx.Lock()
defer cg.mtx.Unlock()

Expand All @@ -478,21 +482,21 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
for _, meta := range cg.blocks {
bdir := filepath.Join(dir, meta.ULID.String())
if err := os.MkdirAll(bdir, 0777); err != nil {
return id, errors.Wrap(err, "create planning block dir")
return compID, errors.Wrap(err, "create planning block dir")
}
if err := block.WriteMetaFile(bdir, meta); err != nil {
return id, errors.Wrap(err, "write planning meta file")
return compID, errors.Wrap(err, "write planning meta file")
}
}

// Plan against the written meta.json files.
plan, err := comp.Plan(dir)
if err != nil {
return id, errors.Wrap(err, "plan compaction")
return compID, errors.Wrap(err, "plan compaction")
}
if len(plan) == 0 {
// Nothing to do.
return id, nil
return compID, nil
}

// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
Expand All @@ -502,11 +506,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
for _, pdir := range plan {
meta, err := block.ReadMetaFile(pdir)
if err != nil {
return id, errors.Wrapf(err, "read meta from %s", pdir)
return compID, errors.Wrapf(err, "read meta from %s", pdir)
}
for _, s := range meta.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return id, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
return compID, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
}
uniqueSources[s] = struct{}{}
}
Expand All @@ -519,63 +523,78 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
idStr := filepath.Base(b)
id, err := ulid.Parse(idStr)
if err != nil {
return id, errors.Wrapf(err, "plan dir %s", b)
return compID, errors.Wrapf(err, "plan dir %s", b)
}

if err := block.Download(ctx, cg.bkt, id, b); err != nil {
return id, errors.Wrapf(err, "download block %s", idStr)
return compID, errors.Wrapf(err, "download block %s", idStr)
}

// Ensure all input blocks are valid.
if err := block.VerifyIndex(filepath.Join(b, "index")); err != nil {
return id, errors.Wrapf(halt(err), "invalid plan block %s", b)
return compID, errors.Wrapf(halt(err), "invalid plan block %s", b)
}
}
level.Debug(cg.logger).Log("msg", "downloaded blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))

begin = time.Now()

id, err = comp.Compact(dir, plan...)
compID, err = comp.Compact(dir, plan...)
if err != nil {
return id, errors.Wrapf(err, "compact blocks %v", plan)
return compID, errors.Wrapf(err, "compact blocks %v", plan)
}
level.Debug(cg.logger).Log("msg", "compacted blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))

bdir := filepath.Join(dir, id.String())
bdir := filepath.Join(dir, compID.String())

os.Remove(filepath.Join(bdir, "tombstones"))

newMeta, err := block.ReadMetaFile(bdir)
if err != nil {
return id, errors.Wrap(err, "read new meta")
return compID, errors.Wrap(err, "read new meta")
}
newMeta.Thanos.Labels = cg.labels.Map()

if err := block.WriteMetaFile(bdir, newMeta); err != nil {
return id, errors.Wrap(err, "write new meta")
return compID, errors.Wrap(err, "write new meta")
}

// Ensure the output block is valid.
if err := block.VerifyIndex(filepath.Join(bdir, "index")); err != nil {
return id, errors.Wrapf(halt(err), "invalid result block %s", bdir)
return compID, errors.Wrapf(halt(err), "invalid result block %s", bdir)
}

begin = time.Now()

if err := objstore.UploadDir(ctx, cg.bkt, bdir, id.String()); err != nil {
return id, errors.Wrap(err, "upload block")
if err := objstore.UploadDir(ctx, cg.bkt, bdir, compID.String()); err != nil {
return compID, errors.Wrap(err, "upload block")
}
level.Debug(cg.logger).Log("msg", "uploaded block", "block", id, "duration", time.Since(begin))
level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin))

// Delete the blocks we just compacted from the group so they do not get included
// Delete the blocks we just compacted from the group and bucket so they do not get included
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again.
for _, p := range plan {
if err := os.RemoveAll(p); err != nil {
level.Error(cg.logger).Log("msg", "remove compacted block dir", "err", err)
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, b := range plan {
id, err := ulid.Parse(filepath.Base(b))
if err != nil {
return compID, errors.Wrapf(err, "plan dir %s", b)
}

if err := os.RemoveAll(b); err != nil {
return compID, errors.Wrapf(err, "remove old block dir %s", id)
}

// Spawn a new context so we always delete a block in full on shutdown.
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id, "result_block", compID)
err = block.Delete(delCtx, cg.bkt, id)
cancel()
if err != nil {
return compID, errors.Wrapf(err, "delete old block %s from bucket ", id)
}
cg.groupGarbageCollectedBlocks.Inc()
}
return id, nil
return compID, nil
}
31 changes: 25 additions & 6 deletions pkg/compact/compact_test.go
Expand Up @@ -13,15 +13,14 @@ import (
"testing"
"time"

"github.com/improbable-eng/thanos/pkg/objstore"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"

"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
)

// TODO(bplotka): Add leaktest when this is done: https://github.com/improbable-eng/thanos/issues/234
Expand Down Expand Up @@ -249,7 +248,15 @@ func TestGroup_Compact(t *testing.T) {
defer os.RemoveAll(dir)

metrics := newSyncerMetrics(nil)
g, err := newGroup(nil, bkt, nil, 0, metrics.compactions.WithLabelValues(""), metrics.compactionFailures.WithLabelValues(""))
g, err := newGroup(
nil,
bkt,
nil,
0,
metrics.compactions.WithLabelValues(""),
metrics.compactionFailures.WithLabelValues(""),
metrics.garbageCollectedBlocks,
)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil)
Expand Down Expand Up @@ -280,4 +287,16 @@ func TestGroup_Compact(t *testing.T) {
testutil.Equals(t, uint64(2*4*100), meta.Stats.NumSamples) // Only 2 times 4*100 because one block was empty.
testutil.Equals(t, 2, meta.Compaction.Level)
testutil.Equals(t, []ulid.ULID{b1, b3, b2}, meta.Compaction.Sources)

// Check object storage. All blocks that were included in new compacted one should be removed.
err = bkt.Iter(ctx, "", func(n string) error {
id := ulid.MustParse(n[:len(n)-1])
for _, source := range meta.Compaction.Sources {
if id.Compare(source) == 0 {
return errors.Errorf("Unexpectedly found %s block in bucket", source.String())
}
}
return nil
})
testutil.Ok(t, err)
}

0 comments on commit 94e26c6

Please sign in to comment.