Skip to content

Commit

Permalink
Merge pull request prometheus-junkyard#341 from codesome/delete-new-b…
Browse files Browse the repository at this point in the history
…locks-on-error

Cleanup new blocks on 'CleanTombstones' faliure
  • Loading branch information
fabxc committed Jun 5, 2018
2 parents 9a96bc4 + 0c93850 commit c848349
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 18 deletions.
15 changes: 8 additions & 7 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,9 @@ Outer:
return writeMetaFile(pb.dir, &pb.meta)
}

// CleanTombstones will rewrite the block if there any tombstones to remove them
// and returns if there was a re-write.
func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
// If there was a rewrite, then it returns the ULID of the new block written, else nil.
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
numStones := 0

pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
Expand All @@ -480,14 +480,15 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
})

if numStones == 0 {
return false, nil
return nil, nil
}

if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
return false, err
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime)
if err != nil {
return nil, err
}

return true, nil
return &uid, nil
}

// Snapshot creates snapshot of the block into dir.
Expand Down
34 changes: 23 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,34 +835,46 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
}

// CleanTombstones re-writes any blocks with tombstones.
func (db *DB) CleanTombstones() error {
func (db *DB) CleanTombstones() (err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()

start := time.Now()
defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds())

newUIDs := []ulid.ULID{}
defer func() {
// If any error is caused, we need to delete all the new directory created.
if err != nil {
for _, uid := range newUIDs {
dir := filepath.Join(db.Dir(), uid.String())
if err := os.RemoveAll(dir); err != nil {
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)
}
}
}
}()

db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()

deleted := []string{}
deletable := []string{}
for _, b := range blocks {
ok, err := b.CleanTombstones(db.Dir(), db.compactor)
if err != nil {
return errors.Wrapf(err, "clean tombstones: %s", b.Dir())
}

if ok {
deleted = append(deleted, b.Dir())
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
return err
} else if uid != nil { // New block was created.
deletable = append(deletable, b.Dir())
newUIDs = append(newUIDs, *uid)
}
}

if len(deleted) == 0 {
if len(deletable) == 0 {
return nil
}

return errors.Wrap(db.reload(deleted...), "reload blocks")
return errors.Wrap(db.reload(deletable...), "reload blocks")
}

func intervalOverlap(amin, amax, bmin, bmax int64) bool {
Expand Down
95 changes: 95 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package tsdb

import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"

"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -781,6 +783,99 @@ func TestTombstoneClean(t *testing.T) {
}
}

// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
// if TombstoneClean leaves any blocks behind these will overlap.
func TestTombstoneCleanFail(t *testing.T) {

db, close := openTestDB(t, nil)
defer close()

var expectedBlockDirs []string

// Create some empty blocks pending for compaction.
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
meta := &BlockMeta{
Version: 2,
ULID: uid,
}
blockDir := filepath.Join(db.Dir(), uid.String())
block := createEmptyBlock(t, blockDir, meta)

// Add some some fake tombstones to trigger the compaction.
tomb := memTombstones{}
tomb[0] = Intervals{{0, 1}}
block.tombstones = tomb

db.blocks = append(db.blocks, block)
expectedBlockDirs = append(expectedBlockDirs, blockDir)
}

// Initialize the mockCompactorFailing with a room for a single compaction iteration.
// mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected.
db.compactor = &mockCompactorFailing{
t: t,
blocks: db.blocks,
max: totalBlocks + 1,
}

// The compactor should trigger a failure here.
testutil.NotOk(t, db.CleanTombstones())

// Now check that the CleanTombstones didn't leave any blocks behind after a failure.
actualBlockDirs, err := blockDirs(db.dir)
testutil.Ok(t, err)
testutil.Equals(t, expectedBlockDirs, actualBlockDirs)
}

// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
type mockCompactorFailing struct {
t *testing.T
blocks []*Block
max int
}

func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
return nil, nil
}
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
if len(c.blocks) >= c.max {
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}

entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
meta := &BlockMeta{
Version: 2,
ULID: uid,
}

block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta)
c.blocks = append(c.blocks, block)

// Now check that all expected blocks are actually persisted on disk.
// This way we make sure that the we have some blocks that are supposed to be removed.
var expectedBlocks []string
for _, b := range c.blocks {
expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String()))
}
actualBlockDirs, err := blockDirs(dest)
testutil.Ok(c.t, err)

testutil.Equals(c.t, expectedBlocks, actualBlockDirs)

return block.Meta().ULID, nil
}

func (*mockCompactorFailing) Compact(dest string, dirs ...string) (ulid.ULID, error) {
return ulid.ULID{}, nil

}

func TestDB_Retention(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
Expand Down

0 comments on commit c848349

Please sign in to comment.